logo
RustyBT Documentation
Custom Broker Adapter
Initializing search
    jerryinyang/rustybt
    • Home
    • Getting Started
    • User Guides
    • Migration Guides
    • Examples & Tutorials
    • API Reference
    • About
    jerryinyang/rustybt
    • Home
      • Installation
      • Quick Start
      • Configuration
      • Decimal Precision
        • Backtest Output Organization
        • Strategy Code Capture
        • Cash Validation
        • CSV Data Import
        • Data Ingestion
        • Databento Data Import
        • Data Validation
        • Creating Data Adapters
        • Migrating to Unified Data
        • Caching System
        • Caching Guide
        • Broker Setup
        • Testnet Setup
        • Live vs Backtest Data
        • WebSocket Streaming
        • Type Hinting
        • Exception Handling
        • Execution Methods
        • Pipeline API
        • Advanced Pipeline Techniques
        • Multi-Strategy Portfolio
        • Deployment Guide
        • Production Checklist
        • Audit Logging
        • Troubleshooting
      • Cash Validation Migration
      • Overview
        • Overview
          • Crypto Backtesting with CCXT Data Adapter
          • Equity Backtesting with YFinance Data Adapter
          • Getting Started with RustyBT
          • Data Ingestion with RustyBT
          • Strategy Development with RustyBT
          • Performance Analysis
          • Strategy Optimization
          • Walk-Forward Optimization
          • Risk Analytics
          • Portfolio Construction (Single-Strategy Multi-Asset)
          • 09. Multi-Strategy Portfolio
          • Live Paper Trading
          • Complete Workflow: Data → Backtest → Analysis → Optimization
          • CCXT Data Ingestion
          • YFinance Data Ingestion
          • Custom Data Adapter
          • Backtest with Cache
          • Full Validation (Backtest & Paper)
          • Cache Warming
          • Generate Backtest Report
          • Live Trading (Simple)
          • Live Trading (Advanced)
          • Paper Trading (Simple)
          • Paper Trading Validation
          • Shadow Trading (Simple)
          • Shadow Trading Dashboard
          • Portfolio Allocator Tutorial
          • Allocation Algorithms
          • Attribution Analysis
          • Slippage Models
          • Borrow Costs
          • Overnight Financing
          • High-Frequency Custom Triggers
          • Latency Simulation
          • Pipeline API
          • WebSocket Streaming
          • Custom Broker Adapter
          • Grid Search MA Crossover
          • Random Search vs Grid
          • Bayesian Optimization (5 Params)
          • Parallel Optimization
          • Walk-Forward Analysis
      • Overview & Interactive Docs
        • Overview
        • Asset Finder
          • Overview
          • Selection Guide
          • Base Adapter
          • CCXT
          • YFinance
          • CSV
          • Polygon
          • Alpaca
          • AlphaVantage
          • Overview
          • Architecture
          • Catalog API
          • Bundle System
          • Metadata Tracking
          • Migration Guide
          • Overview
          • Data Portal
          • Polars Data Portal
          • Bar Reader
          • Daily Bars
          • Overview
          • Providers
          • Storage
          • Converters
          • FX & Caching
          • Caching
          • Optimization
          • Troubleshooting
          • Overview
          • Computation API
        • Overview
          • Overview
          • Types Reference
          • Blotter
          • Blotter System
          • Decimal Blotter
          • Execution Pipeline
          • Latency Models
          • Partial Fills
          • Order Status Tracking
          • Slippage
          • Slippage Models
          • Commissions
          • Commission Models
          • Borrow Costs & Financing
          • Order Lifecycle
          • Examples
        • Overview
          • Allocation Algorithms
          • Multi-Strategy Allocation
          • Portfolio Allocator
          • Allocators
          • Risk Management
          • Risk Metrics
          • Position Limits
          • Performance Tracking
          • Metrics
          • Order Aggregation
          • Analytics Suite
        • Overview
          • Parameter Spaces
          • Objective Functions
          • Grid Search
          • Random Search
          • Bayesian
          • Genetic
          • Overview
          • Monte Carlo
          • Noise Infusion
          • Sensitivity Analysis
        • Overview
        • Artifact Manager
        • Code Capture
        • Overview
        • Reports
        • Visualization
          • Overview
          • Overview
          • Metrics
          • VaR & CVaR
          • Drawdown
          • Overview
        • Overview
        • Production Deployment
          • Circuit Breakers
        • Overview
        • Datasource API
        • Optimization API
        • Analytics API
      • License
      • Contributing
      • Changelog
    In [ ]:
    Copied!
    #
    # Copyright 2025 RustyBT Contributors
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    """
    Example: Creating a Custom Broker Adapter
    
    This example demonstrates how to create a custom broker adapter for RustyBT
    that interfaces with a proprietary or custom trading system.
    
    Key Concepts Demonstrated:
    - Inheriting from BrokerAdapter base class
    - Implementing required abstract methods
    - Order management and tracking
    - Position reconciliation
    - Market data integration
    - Error handling and retry logic
    
    Usage:
        python examples/custom_broker_adapter.py
    """
    
    # # Copyright 2025 RustyBT Contributors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Example: Creating a Custom Broker Adapter This example demonstrates how to create a custom broker adapter for RustyBT that interfaces with a proprietary or custom trading system. Key Concepts Demonstrated: - Inheriting from BrokerAdapter base class - Implementing required abstract methods - Order management and tracking - Position reconciliation - Market data integration - Error handling and retry logic Usage: python examples/custom_broker_adapter.py """
    In [ ]:
    Copied!
    import asyncio
    import random
    import uuid
    from decimal import Decimal, getcontext
    
    import asyncio import random import uuid from decimal import Decimal, getcontext
    In [ ]:
    Copied!
    import pandas as pd
    import structlog
    
    import pandas as pd import structlog
    In [ ]:
    Copied!
    from rustybt.assets import Asset, Equity
    from rustybt.exceptions import (
        BrokerError,
        DataNotFoundError,
        InsufficientFundsError,
        OrderNotFoundError,
    )
    from rustybt.live.brokers.base import BrokerAdapter
    
    from rustybt.assets import Asset, Equity from rustybt.exceptions import ( BrokerError, DataNotFoundError, InsufficientFundsError, OrderNotFoundError, ) from rustybt.live.brokers.base import BrokerAdapter
    In [ ]:
    Copied!
    # Set decimal precision
    getcontext().prec = 28
    
    # Set decimal precision getcontext().prec = 28
    In [ ]:
    Copied!
    logger = structlog.get_logger()
    
    logger = structlog.get_logger()
    In [ ]:
    Copied!
    class CustomBrokerAdapter(BrokerAdapter):
        """Custom broker adapter template.
    
        This adapter demonstrates a complete implementation of the BrokerAdapter
        interface for connecting to a custom trading system or broker API.
    
        Adapt this template for your broker by:
        1. Replacing mock API calls with real API client
        2. Implementing proper authentication
        3. Adding broker-specific order types
        4. Handling broker-specific error codes
        5. Implementing WebSocket streaming (optional)
    
        Example:
            >>> broker = CustomBrokerAdapter(
            ...     api_url="https://api.mybroker.com",
            ...     api_key="your_api_key",
            ...     api_secret="your_api_secret"
            ... )
            >>> await broker.connect()
            >>> order_id = await broker.submit_order(
            ...     asset=asset,
            ...     amount=Decimal("100"),
            ...     order_type="market"
            ... )
        """
    
        def __init__(
            self, api_url: str, api_key: str, api_secret: str, testnet: bool = True, timeout: int = 30
        ):
            """Initialize custom broker adapter.
    
            Args:
                api_url: Broker API base URL
                api_key: API authentication key
                api_secret: API secret key
                testnet: Use testnet/sandbox mode
                timeout: Request timeout in seconds
            """
            self.api_url = api_url.rstrip("/")
            self.api_key = api_key
            self.api_secret = api_secret
            self.testnet = testnet
            self.timeout = timeout
    
            # Connection state
            self._connected = False
    
            # Order tracking
            self._orders: dict[str, dict] = {}
            self._order_id_map: dict[str, str] = {}  # internal_id -> broker_id
    
            # Position tracking
            self._positions: dict[str, dict] = {}
    
            # Account state
            self._account = {
                "cash": Decimal("100000"),
                "equity": Decimal("100000"),
                "buying_power": Decimal("100000"),
                "margin_used": Decimal("0"),
            }
    
            # Market data queue (for mock data)
            self._market_data_queue = asyncio.Queue()
    
            logger.info("custom_broker_initialized", api_url=self.api_url, testnet=self.testnet)
    
        async def connect(self) -> None:
            """Establish connection to broker.
    
            This method should:
            1. Authenticate with broker API
            2. Verify API credentials
            3. Initialize WebSocket connections (if applicable)
            4. Load initial account state
    
            Raises:
                BrokerConnectionError: If connection fails
            """
            logger.info("connecting_to_broker")
    
            try:
                # TODO: Replace with real API authentication
                # Example:
                # response = await self._api_client.authenticate(
                #     api_key=self.api_key,
                #     api_secret=self.api_secret
                # )
                # self._session_token = response['token']
    
                # Simulate connection delay
                await asyncio.sleep(0.5)
    
                # Load initial account state
                await self._load_account_state()
    
                # Load existing positions
                await self._load_positions()
    
                self._connected = True
                logger.info("broker_connected", account_cash=float(self._account["cash"]))
    
            except Exception as e:
                logger.error("connection_failed", error=str(e))
                raise BrokerError(f"Failed to connect: {e}", adapter="custom_broker") from e
    
        async def disconnect(self) -> None:
            """Disconnect from broker.
    
            This method should:
            1. Close WebSocket connections
            2. Cancel any pending subscriptions
            3. Clean up resources
            """
            logger.info("disconnecting_from_broker")
    
            # TODO: Close API connections
            # await self._api_client.close()
            # await self._websocket.close()
    
            self._connected = False
            logger.info("broker_disconnected")
    
        async def submit_order(
            self,
            asset: Asset,
            amount: Decimal,
            order_type: str,
            limit_price: Decimal | None = None,
            stop_price: Decimal | None = None,
        ) -> str:
            """Submit order to broker.
    
            Args:
                asset: Asset to trade
                amount: Order quantity (positive=buy, negative=sell)
                order_type: 'market', 'limit', 'stop', 'stop-limit'
                limit_price: Limit price for limit/stop-limit orders
                stop_price: Stop price for stop/stop-limit orders
    
            Returns:
                Broker order ID
    
            Raises:
                InsufficientFundsError: If insufficient funds
                BrokerError: If order submission fails
            """
            if not self._connected:
                raise BrokerError("Not connected to broker", adapter="custom_broker")
    
            logger.info(
                "submitting_order", asset=asset.symbol, amount=float(amount), order_type=order_type
            )
    
            # Validate order
            await self._validate_order(asset, amount, order_type)
    
            # Generate internal order ID
            internal_id = str(uuid.uuid4())
    
            try:
                # TODO: Replace with real API call
                # Example:
                # response = await self._api_client.submit_order(
                #     symbol=asset.symbol,
                #     side='buy' if amount > 0 else 'sell',
                #     quantity=abs(float(amount)),
                #     order_type=order_type,
                #     limit_price=float(limit_price) if limit_price else None,
                #     stop_price=float(stop_price) if stop_price else None
                # )
                # broker_order_id = response['order_id']
    
                # Simulate API call
                await asyncio.sleep(0.1)
                broker_order_id = f"ORD_{random.randint(100000, 999999)}"  # noqa: S311
    
                # Store order
                self._orders[internal_id] = {
                    "broker_order_id": broker_order_id,
                    "asset": asset,
                    "amount": amount,
                    "order_type": order_type,
                    "status": "open",
                    "filled": Decimal("0"),
                    "limit_price": limit_price,
                    "stop_price": stop_price,
                    "created_at": pd.Timestamp.now(),
                }
    
                self._order_id_map[internal_id] = broker_order_id
    
                logger.info("order_submitted", internal_id=internal_id, broker_order_id=broker_order_id)
    
                return broker_order_id
    
            except Exception as e:
                logger.error("order_submission_failed", error=str(e))
                raise BrokerError(f"Order submission failed: {e}", adapter="custom_broker") from e
    
        async def cancel_order(self, broker_order_id: str) -> None:
            """Cancel order.
    
            Args:
                broker_order_id: Broker's order ID
    
            Raises:
                OrderNotFoundError: If order doesn't exist
                BrokerError: If cancellation fails
            """
            logger.info("cancelling_order", broker_order_id=broker_order_id)
    
            # Find internal order ID
            internal_id = None
            for iid, bid in self._order_id_map.items():
                if bid == broker_order_id:
                    internal_id = iid
                    break
    
            if not internal_id or internal_id not in self._orders:
                raise OrderNotFoundError(f"Order {broker_order_id} not found")
    
            try:
                # TODO: Replace with real API call
                # await self._api_client.cancel_order(broker_order_id)
    
                # Simulate cancellation
                await asyncio.sleep(0.05)
    
                # Update order status
                self._orders[internal_id]["status"] = "cancelled"
    
                logger.info("order_cancelled", broker_order_id=broker_order_id)
    
            except Exception as e:
                logger.error("cancellation_failed", error=str(e))
                raise BrokerError(f"Order cancellation failed: {e}", adapter="custom_broker") from e
    
        async def get_account_info(self) -> dict[str, Decimal]:
            """Get account information.
    
            Returns:
                Dict with keys: 'cash', 'equity', 'buying_power', 'margin_used'
    
            Raises:
                BrokerError: If request fails
            """
            if not self._connected:
                raise BrokerError("Not connected to broker", adapter="custom_broker")
    
            try:
                # TODO: Replace with real API call
                # response = await self._api_client.get_account()
                # return {
                #     'cash': Decimal(str(response['cash'])),
                #     'equity': Decimal(str(response['equity'])),
                #     'buying_power': Decimal(str(response['buying_power'])),
                #     'margin_used': Decimal(str(response['margin_used'])),
                # }
    
                # Return cached account state
                return self._account.copy()
    
            except Exception as e:
                logger.error("get_account_failed", error=str(e))
                raise BrokerError(f"Failed to get account info: {e}", adapter="custom_broker") from e
    
        async def get_positions(self) -> list[dict]:
            """Get current positions.
    
            Returns:
                List of position dicts with keys: 'asset', 'amount', 'cost_basis',
                'market_value', 'unrealized_pnl'
    
            Raises:
                BrokerError: If request fails
            """
            if not self._connected:
                raise BrokerError("Not connected to broker", adapter="custom_broker")
    
            try:
                # TODO: Replace with real API call
                # response = await self._api_client.get_positions()
    
                # Convert positions to standard format
                positions = []
                for symbol, pos in self._positions.items():
                    positions.append(
                        {
                            "asset": symbol,
                            "amount": pos["amount"],
                            "cost_basis": pos["cost_basis"],
                            "market_value": pos["market_value"],
                            "unrealized_pnl": pos["unrealized_pnl"],
                        }
                    )
    
                return positions
    
            except Exception as e:
                logger.error("get_positions_failed", error=str(e))
                raise BrokerError(f"Failed to get positions: {e}", adapter="custom_broker") from e
    
        async def get_open_orders(self) -> list[dict]:
            """Get open/pending orders from broker.
    
            Returns:
                List of order dicts with keys: 'order_id', 'asset', 'amount',
                'status', 'order_type', 'limit_price', 'stop_price'
    
            Raises:
                BrokerError: If request fails
            """
            if not self._connected:
                raise BrokerError("Not connected to broker", adapter="custom_broker")
    
            try:
                # TODO: Replace with real API call
                # response = await self._api_client.get_open_orders()
    
                # Filter open orders
                open_orders = []
                for _internal_id, order in self._orders.items():
                    if order["status"] in ["open", "partially_filled"]:
                        open_orders.append(
                            {
                                "order_id": order["broker_order_id"],
                                "asset": order["asset"].symbol,
                                "amount": order["amount"],
                                "status": order["status"],
                                "order_type": order["order_type"],
                                "limit_price": order.get("limit_price"),
                                "stop_price": order.get("stop_price"),
                            }
                        )
    
                return open_orders
    
            except Exception as e:  # noqa: BLE001
                logger.error("get_open_orders_failed", error=str(e))
                raise BrokerError(f"Failed to get open orders: {e}", adapter="custom_broker")
    
        async def subscribe_market_data(self, assets: list[Asset]) -> None:
            """Subscribe to real-time market data.
    
            Args:
                assets: List of assets to subscribe to
    
            Raises:
                BrokerError: If subscription fails
            """
            logger.info("subscribing_to_market_data", symbols=[a.symbol for a in assets])
    
            # TODO: Implement WebSocket subscription
            # await self._websocket.subscribe(symbols=[a.symbol for a in assets])
    
            logger.info("market_data_subscribed")
    
        async def unsubscribe_market_data(self, assets: list[Asset]) -> None:
            """Unsubscribe from market data.
    
            Args:
                assets: List of assets to unsubscribe from
    
            Raises:
                BrokerError: If unsubscribe fails
            """
            logger.info("unsubscribing_from_market_data", symbols=[a.symbol for a in assets])
    
            # TODO: Implement WebSocket unsubscribe
            # await self._websocket.unsubscribe(symbols=[a.symbol for a in assets])
    
            logger.info("market_data_unsubscribed")
    
        async def get_next_market_data(self) -> dict | None:
            """Get next market data update (blocking).
    
            Returns:
                Dict with keys: 'asset', 'open', 'high', 'low', 'close', 'volume',
                'timestamp'. Returns None if no data available.
    
            Raises:
                BrokerError: If data fetch fails
            """
            try:
                # TODO: Get data from WebSocket queue
                # data = await self._websocket_queue.get()
    
                # For demo, generate mock data
                await asyncio.sleep(1)  # Simulate 1-second bar
                return None  # No data in this example
    
            except Exception as e:  # noqa: BLE001
                logger.error("get_market_data_failed", error=str(e))
                return None
    
        async def get_current_price(self, asset: Asset) -> Decimal:
            """Get current price for asset.
    
            Args:
                asset: Asset to get price for
    
            Returns:
                Current price
    
            Raises:
                DataNotFoundError: If price not available
                BrokerError: If request fails
            """
            try:
                # TODO: Replace with real API call
                # response = await self._api_client.get_quote(asset.symbol)
                # return Decimal(str(response['last_price']))
    
                # Generate mock price
                base_price = Decimal("150.00")
                noise = Decimal(str(random.uniform(-2, 2)))
                return base_price + noise
    
            except Exception as e:  # noqa: BLE001
                logger.error("get_price_failed", asset=asset.symbol, error=str(e))
                raise DataNotFoundError(f"Price not available for {asset.symbol}")
    
        def is_connected(self) -> bool:
            """Check if broker connection is active.
    
            Returns:
                True if connected, False otherwise
            """
            return self._connected
    
        # ========================================================================
        # Helper Methods
        # ========================================================================
    
        async def _load_account_state(self) -> None:
            """Load initial account state from broker."""
            # TODO: Fetch real account state
            # response = await self._api_client.get_account()
            # self._account = {
            #     'cash': Decimal(str(response['cash'])),
            #     ...
            # }
            pass
    
        async def _load_positions(self) -> None:
            """Load existing positions from broker."""
            # TODO: Fetch real positions
            # response = await self._api_client.get_positions()
            # for pos in response:
            #     self._positions[pos['symbol']] = {...}
            pass
    
        async def _validate_order(self, asset: Asset, amount: Decimal, order_type: str) -> None:
            """Validate order before submission.
    
            Raises:
                InsufficientFundsError: If insufficient funds
                BrokerError: If order is invalid
            """
            # Check buying power
            if amount > 0:  # Buy order
                estimated_cost = abs(amount) * Decimal("150")  # Rough estimate
                if estimated_cost > self._account["buying_power"]:
                    raise InsufficientFundsError(
                        f"Insufficient funds: need ${estimated_cost}, have ${self._account['buying_power']}"
                    )
    
            # Validate order type
            valid_order_types = ["market", "limit", "stop", "stop-limit"]
            if order_type not in valid_order_types:
                raise BrokerError(
                    f"Invalid order type: {order_type}. Valid types: {valid_order_types}",
                    adapter="custom_broker",
                )
    
    class CustomBrokerAdapter(BrokerAdapter): """Custom broker adapter template. This adapter demonstrates a complete implementation of the BrokerAdapter interface for connecting to a custom trading system or broker API. Adapt this template for your broker by: 1. Replacing mock API calls with real API client 2. Implementing proper authentication 3. Adding broker-specific order types 4. Handling broker-specific error codes 5. Implementing WebSocket streaming (optional) Example: >>> broker = CustomBrokerAdapter( ... api_url="https://api.mybroker.com", ... api_key="your_api_key", ... api_secret="your_api_secret" ... ) >>> await broker.connect() >>> order_id = await broker.submit_order( ... asset=asset, ... amount=Decimal("100"), ... order_type="market" ... ) """ def __init__( self, api_url: str, api_key: str, api_secret: str, testnet: bool = True, timeout: int = 30 ): """Initialize custom broker adapter. Args: api_url: Broker API base URL api_key: API authentication key api_secret: API secret key testnet: Use testnet/sandbox mode timeout: Request timeout in seconds """ self.api_url = api_url.rstrip("/") self.api_key = api_key self.api_secret = api_secret self.testnet = testnet self.timeout = timeout # Connection state self._connected = False # Order tracking self._orders: dict[str, dict] = {} self._order_id_map: dict[str, str] = {} # internal_id -> broker_id # Position tracking self._positions: dict[str, dict] = {} # Account state self._account = { "cash": Decimal("100000"), "equity": Decimal("100000"), "buying_power": Decimal("100000"), "margin_used": Decimal("0"), } # Market data queue (for mock data) self._market_data_queue = asyncio.Queue() logger.info("custom_broker_initialized", api_url=self.api_url, testnet=self.testnet) async def connect(self) -> None: """Establish connection to broker. This method should: 1. Authenticate with broker API 2. Verify API credentials 3. Initialize WebSocket connections (if applicable) 4. Load initial account state Raises: BrokerConnectionError: If connection fails """ logger.info("connecting_to_broker") try: # TODO: Replace with real API authentication # Example: # response = await self._api_client.authenticate( # api_key=self.api_key, # api_secret=self.api_secret # ) # self._session_token = response['token'] # Simulate connection delay await asyncio.sleep(0.5) # Load initial account state await self._load_account_state() # Load existing positions await self._load_positions() self._connected = True logger.info("broker_connected", account_cash=float(self._account["cash"])) except Exception as e: logger.error("connection_failed", error=str(e)) raise BrokerError(f"Failed to connect: {e}", adapter="custom_broker") from e async def disconnect(self) -> None: """Disconnect from broker. This method should: 1. Close WebSocket connections 2. Cancel any pending subscriptions 3. Clean up resources """ logger.info("disconnecting_from_broker") # TODO: Close API connections # await self._api_client.close() # await self._websocket.close() self._connected = False logger.info("broker_disconnected") async def submit_order( self, asset: Asset, amount: Decimal, order_type: str, limit_price: Decimal | None = None, stop_price: Decimal | None = None, ) -> str: """Submit order to broker. Args: asset: Asset to trade amount: Order quantity (positive=buy, negative=sell) order_type: 'market', 'limit', 'stop', 'stop-limit' limit_price: Limit price for limit/stop-limit orders stop_price: Stop price for stop/stop-limit orders Returns: Broker order ID Raises: InsufficientFundsError: If insufficient funds BrokerError: If order submission fails """ if not self._connected: raise BrokerError("Not connected to broker", adapter="custom_broker") logger.info( "submitting_order", asset=asset.symbol, amount=float(amount), order_type=order_type ) # Validate order await self._validate_order(asset, amount, order_type) # Generate internal order ID internal_id = str(uuid.uuid4()) try: # TODO: Replace with real API call # Example: # response = await self._api_client.submit_order( # symbol=asset.symbol, # side='buy' if amount > 0 else 'sell', # quantity=abs(float(amount)), # order_type=order_type, # limit_price=float(limit_price) if limit_price else None, # stop_price=float(stop_price) if stop_price else None # ) # broker_order_id = response['order_id'] # Simulate API call await asyncio.sleep(0.1) broker_order_id = f"ORD_{random.randint(100000, 999999)}" # noqa: S311 # Store order self._orders[internal_id] = { "broker_order_id": broker_order_id, "asset": asset, "amount": amount, "order_type": order_type, "status": "open", "filled": Decimal("0"), "limit_price": limit_price, "stop_price": stop_price, "created_at": pd.Timestamp.now(), } self._order_id_map[internal_id] = broker_order_id logger.info("order_submitted", internal_id=internal_id, broker_order_id=broker_order_id) return broker_order_id except Exception as e: logger.error("order_submission_failed", error=str(e)) raise BrokerError(f"Order submission failed: {e}", adapter="custom_broker") from e async def cancel_order(self, broker_order_id: str) -> None: """Cancel order. Args: broker_order_id: Broker's order ID Raises: OrderNotFoundError: If order doesn't exist BrokerError: If cancellation fails """ logger.info("cancelling_order", broker_order_id=broker_order_id) # Find internal order ID internal_id = None for iid, bid in self._order_id_map.items(): if bid == broker_order_id: internal_id = iid break if not internal_id or internal_id not in self._orders: raise OrderNotFoundError(f"Order {broker_order_id} not found") try: # TODO: Replace with real API call # await self._api_client.cancel_order(broker_order_id) # Simulate cancellation await asyncio.sleep(0.05) # Update order status self._orders[internal_id]["status"] = "cancelled" logger.info("order_cancelled", broker_order_id=broker_order_id) except Exception as e: logger.error("cancellation_failed", error=str(e)) raise BrokerError(f"Order cancellation failed: {e}", adapter="custom_broker") from e async def get_account_info(self) -> dict[str, Decimal]: """Get account information. Returns: Dict with keys: 'cash', 'equity', 'buying_power', 'margin_used' Raises: BrokerError: If request fails """ if not self._connected: raise BrokerError("Not connected to broker", adapter="custom_broker") try: # TODO: Replace with real API call # response = await self._api_client.get_account() # return { # 'cash': Decimal(str(response['cash'])), # 'equity': Decimal(str(response['equity'])), # 'buying_power': Decimal(str(response['buying_power'])), # 'margin_used': Decimal(str(response['margin_used'])), # } # Return cached account state return self._account.copy() except Exception as e: logger.error("get_account_failed", error=str(e)) raise BrokerError(f"Failed to get account info: {e}", adapter="custom_broker") from e async def get_positions(self) -> list[dict]: """Get current positions. Returns: List of position dicts with keys: 'asset', 'amount', 'cost_basis', 'market_value', 'unrealized_pnl' Raises: BrokerError: If request fails """ if not self._connected: raise BrokerError("Not connected to broker", adapter="custom_broker") try: # TODO: Replace with real API call # response = await self._api_client.get_positions() # Convert positions to standard format positions = [] for symbol, pos in self._positions.items(): positions.append( { "asset": symbol, "amount": pos["amount"], "cost_basis": pos["cost_basis"], "market_value": pos["market_value"], "unrealized_pnl": pos["unrealized_pnl"], } ) return positions except Exception as e: logger.error("get_positions_failed", error=str(e)) raise BrokerError(f"Failed to get positions: {e}", adapter="custom_broker") from e async def get_open_orders(self) -> list[dict]: """Get open/pending orders from broker. Returns: List of order dicts with keys: 'order_id', 'asset', 'amount', 'status', 'order_type', 'limit_price', 'stop_price' Raises: BrokerError: If request fails """ if not self._connected: raise BrokerError("Not connected to broker", adapter="custom_broker") try: # TODO: Replace with real API call # response = await self._api_client.get_open_orders() # Filter open orders open_orders = [] for _internal_id, order in self._orders.items(): if order["status"] in ["open", "partially_filled"]: open_orders.append( { "order_id": order["broker_order_id"], "asset": order["asset"].symbol, "amount": order["amount"], "status": order["status"], "order_type": order["order_type"], "limit_price": order.get("limit_price"), "stop_price": order.get("stop_price"), } ) return open_orders except Exception as e: # noqa: BLE001 logger.error("get_open_orders_failed", error=str(e)) raise BrokerError(f"Failed to get open orders: {e}", adapter="custom_broker") async def subscribe_market_data(self, assets: list[Asset]) -> None: """Subscribe to real-time market data. Args: assets: List of assets to subscribe to Raises: BrokerError: If subscription fails """ logger.info("subscribing_to_market_data", symbols=[a.symbol for a in assets]) # TODO: Implement WebSocket subscription # await self._websocket.subscribe(symbols=[a.symbol for a in assets]) logger.info("market_data_subscribed") async def unsubscribe_market_data(self, assets: list[Asset]) -> None: """Unsubscribe from market data. Args: assets: List of assets to unsubscribe from Raises: BrokerError: If unsubscribe fails """ logger.info("unsubscribing_from_market_data", symbols=[a.symbol for a in assets]) # TODO: Implement WebSocket unsubscribe # await self._websocket.unsubscribe(symbols=[a.symbol for a in assets]) logger.info("market_data_unsubscribed") async def get_next_market_data(self) -> dict | None: """Get next market data update (blocking). Returns: Dict with keys: 'asset', 'open', 'high', 'low', 'close', 'volume', 'timestamp'. Returns None if no data available. Raises: BrokerError: If data fetch fails """ try: # TODO: Get data from WebSocket queue # data = await self._websocket_queue.get() # For demo, generate mock data await asyncio.sleep(1) # Simulate 1-second bar return None # No data in this example except Exception as e: # noqa: BLE001 logger.error("get_market_data_failed", error=str(e)) return None async def get_current_price(self, asset: Asset) -> Decimal: """Get current price for asset. Args: asset: Asset to get price for Returns: Current price Raises: DataNotFoundError: If price not available BrokerError: If request fails """ try: # TODO: Replace with real API call # response = await self._api_client.get_quote(asset.symbol) # return Decimal(str(response['last_price'])) # Generate mock price base_price = Decimal("150.00") noise = Decimal(str(random.uniform(-2, 2))) return base_price + noise except Exception as e: # noqa: BLE001 logger.error("get_price_failed", asset=asset.symbol, error=str(e)) raise DataNotFoundError(f"Price not available for {asset.symbol}") def is_connected(self) -> bool: """Check if broker connection is active. Returns: True if connected, False otherwise """ return self._connected # ======================================================================== # Helper Methods # ======================================================================== async def _load_account_state(self) -> None: """Load initial account state from broker.""" # TODO: Fetch real account state # response = await self._api_client.get_account() # self._account = { # 'cash': Decimal(str(response['cash'])), # ... # } pass async def _load_positions(self) -> None: """Load existing positions from broker.""" # TODO: Fetch real positions # response = await self._api_client.get_positions() # for pos in response: # self._positions[pos['symbol']] = {...} pass async def _validate_order(self, asset: Asset, amount: Decimal, order_type: str) -> None: """Validate order before submission. Raises: InsufficientFundsError: If insufficient funds BrokerError: If order is invalid """ # Check buying power if amount > 0: # Buy order estimated_cost = abs(amount) * Decimal("150") # Rough estimate if estimated_cost > self._account["buying_power"]: raise InsufficientFundsError( f"Insufficient funds: need ${estimated_cost}, have ${self._account['buying_power']}" ) # Validate order type valid_order_types = ["market", "limit", "stop", "stop-limit"] if order_type not in valid_order_types: raise BrokerError( f"Invalid order type: {order_type}. Valid types: {valid_order_types}", adapter="custom_broker", )

    ============================================================================ Example Usage¶

    In [ ]:
    Copied!
    async def main():
        """Demonstrate custom broker adapter usage."""
        print("=" * 70)
        print("Custom Broker Adapter Example")
        print("=" * 70)
    
        # Create custom broker adapter
        print("\n[1/5] Initializing custom broker...")
        broker = CustomBrokerAdapter(
            api_url="https://api.example.com",
            api_key="demo_api_key",
            api_secret="demo_api_secret",
            testnet=True,
        )
        print("✓ Broker initialized")
    
        # Connect to broker
        print("\n[2/5] Connecting to broker...")
        await broker.connect()
        print("✓ Connected to broker")
    
        # Get account info
        print("\n[3/5] Getting account information...")
        account = await broker.get_account_info()
        print(f"✓ Account cash: ${account['cash']}")
        print(f"  Equity: ${account['equity']}")
        print(f"  Buying power: ${account['buying_power']}")
    
        # Create mock asset
        asset = Equity(
            1,  # Asset ID
            exchange="NYSE",
            symbol="AAPL",
        )
    
        # Submit order
        print("\n[4/5] Submitting market order...")
        order_id = await broker.submit_order(
            asset=asset,
            amount=Decimal("100"),  # Buy 100 shares
            order_type="market",
        )
        print(f"✓ Order submitted: {order_id}")
    
        # Get open orders
        print("\n[5/5] Checking open orders...")
        open_orders = await broker.get_open_orders()
        print(f"✓ Open orders: {len(open_orders)}")
        for order in open_orders:
            print(f"  - {order['asset']}: {order['amount']} shares @ {order['order_type']}")
    
        # Disconnect
        await broker.disconnect()
    
        print("\n" + "=" * 70)
        print("✓ Example complete!")
        print("=" * 70)
        print("\nNext steps:")
        print("  1. Replace mock API calls with real broker API")
        print("  2. Implement proper authentication")
        print("  3. Add WebSocket market data streaming")
        print("  4. Implement position reconciliation")
        print("  5. Add comprehensive error handling")
    
    async def main(): """Demonstrate custom broker adapter usage.""" print("=" * 70) print("Custom Broker Adapter Example") print("=" * 70) # Create custom broker adapter print("\n[1/5] Initializing custom broker...") broker = CustomBrokerAdapter( api_url="https://api.example.com", api_key="demo_api_key", api_secret="demo_api_secret", testnet=True, ) print("✓ Broker initialized") # Connect to broker print("\n[2/5] Connecting to broker...") await broker.connect() print("✓ Connected to broker") # Get account info print("\n[3/5] Getting account information...") account = await broker.get_account_info() print(f"✓ Account cash: ${account['cash']}") print(f" Equity: ${account['equity']}") print(f" Buying power: ${account['buying_power']}") # Create mock asset asset = Equity( 1, # Asset ID exchange="NYSE", symbol="AAPL", ) # Submit order print("\n[4/5] Submitting market order...") order_id = await broker.submit_order( asset=asset, amount=Decimal("100"), # Buy 100 shares order_type="market", ) print(f"✓ Order submitted: {order_id}") # Get open orders print("\n[5/5] Checking open orders...") open_orders = await broker.get_open_orders() print(f"✓ Open orders: {len(open_orders)}") for order in open_orders: print(f" - {order['asset']}: {order['amount']} shares @ {order['order_type']}") # Disconnect await broker.disconnect() print("\n" + "=" * 70) print("✓ Example complete!") print("=" * 70) print("\nNext steps:") print(" 1. Replace mock API calls with real broker API") print(" 2. Implement proper authentication") print(" 3. Add WebSocket market data streaming") print(" 4. Implement position reconciliation") print(" 5. Add comprehensive error handling")
    In [ ]:
    Copied!
    if __name__ == "__main__":
        asyncio.run(main())
    
    if __name__ == "__main__": asyncio.run(main())
    Previous
    WebSocket Streaming
    Next
    Grid Search MA Crossover
    Made with Material for MkDocs