Skip to content

DataSource API Reference

Overview

The DataSource interface provides a unified abstraction for fetching market data from various sources (brokers, data vendors, files). All adapters implement this interface for consistent usage.

Base Interface

DataSource (Abstract Base Class)

from rustybt.data.sources.base import DataSource, DataSourceMetadata
import pandas as pd
import polars as pl

class DataSource(ABC):
    """Abstract base class for data sources."""

    @abstractmethod
    async def fetch(
        self,
        symbols: list[str],
        start: pd.Timestamp,
        end: pd.Timestamp,
        frequency: str
    ) -> pl.DataFrame:
        """Fetch OHLCV data for symbols.

        Args:
            symbols: List of ticker symbols
            start: Start timestamp (inclusive)
            end: End timestamp (inclusive)
            frequency: Data frequency ("daily", "hourly", "minute")

        Returns:
            Polars DataFrame with columns:
            - symbol: str
            - date: date (for daily) or timestamp: datetime (for intraday)
            - open: Decimal
            - high: Decimal
            - low: Decimal
            - close: Decimal
            - volume: Decimal
        """
        pass

    @abstractmethod
    def ingest_to_bundle(
        self,
        bundle_name: str,
        symbols: list[str],
        start: pd.Timestamp,
        end: pd.Timestamp,
        frequency: str,
        asset_type: str | None = None,
        **kwargs
    ) -> Path:
        """Ingest data and create bundle.

        Args:
            bundle_name: Name for the bundle
            symbols: Symbols to ingest
            start: Start date
            end: End date
            frequency: Data frequency
            asset_type: Optional asset type ('forex', 'crypto', 'equity', 'future').
                       If None, will be inferred from symbol patterns.
                       Used to determine appropriate trading calendar:
                       - 'forex': 24/5 calendar (Sunday evening - Friday evening)
                       - 'crypto': 24/7 calendar (continuous, no holidays)
                       - 'equity': XNYS calendar (NYSE business hours)
                       - 'future': XNYS calendar (NYSE business hours)
            **kwargs: Adapter-specific options

        Returns:
            Path to created bundle directory
        """
        pass

    @abstractmethod
    def get_metadata(self) -> DataSourceMetadata:
        """Get data source metadata."""
        pass

    @abstractmethod
    def supports_live(self) -> bool:
        """Whether this source supports live streaming."""
        pass

DataSourceMetadata

@dataclass
class DataSourceMetadata:
    """Metadata about a data source."""
    source_type: str  # "yfinance", "alpaca", "ccxt", etc.
    source_url: str  # API endpoint
    api_version: str  # API version
    supports_live: bool  # Real-time streaming support
    supported_frequencies: list[str]  # ["daily", "hourly", "minute"]
    rate_limit: Optional[int] = None  # Requests per minute
    requires_auth: bool = False

Built-in Adapters

YFinance (via DataSourceRegistry)

Free historical data from Yahoo Finance (15-minute delayed).

from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio

async def main():
    source = DataSourceRegistry.get_source("yfinance")

    # Fetch data
    df = await source.fetch(
        symbols=["AAPL", "MSFT"],
        start=pd.Timestamp("2023-01-01"),
        end=pd.Timestamp("2023-12-31"),
        frequency="1d"
    )

    # Ingest to bundle
    source.ingest_to_bundle(
        bundle_name="stocks-2023",
        symbols=["AAPL", "MSFT", "GOOGL"],
        start=pd.Timestamp("2023-01-01"),
        end=pd.Timestamp("2023-12-31"),
        frequency="1d",
        asset_type="equity"  # Assigns XNYS calendar for US equities
    )

asyncio.run(main())

Limitations: - 15-minute delayed quotes - Rate limits: ~2000 requests/hour - No real-time streaming - Historical data only

Alpaca (via DataSourceRegistry)

Real-time and historical stock data via Alpaca Markets API.

from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio

async def main():
    source = DataSourceRegistry.get_source(
        "alpaca",
        api_key="YOUR_API_KEY",
        api_secret="YOUR_API_SECRET",
        paper_trading=True,
    )
    df = await source.fetch(
        symbols=["AAPL"],
        start=pd.Timestamp.now() - pd.Timedelta(hours=1),
        end=pd.Timestamp.now(),
        frequency="1m"
    )

    # Ingest to bundle
    source.ingest_to_bundle(
        bundle_name="alpaca-stocks",
        symbols=["AAPL", "MSFT"],
        start=pd.Timestamp("2024-01-01"),
        end=pd.Timestamp("2024-12-31"),
        frequency="1d",
        asset_type="equity"  # Assigns XNYS calendar for US equities
    )

asyncio.run(main())

Features: - Real-time quotes (IEX feed) - WebSocket streaming - Paper trading mode - Free tier available

CCXT (via DataSourceRegistry)

Cryptocurrency data via CCXT library (100+ exchanges).

from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio

async def main():
    source = DataSourceRegistry.get_source(
        "ccxt",
        exchange="binance",
        # api_key / api_secret optional depending on endpoint
    )
    df = await source.fetch(
        symbols=["BTC/USDT", "ETH/USDT"],
        start=pd.Timestamp("2024-01-01"),
        end=pd.Timestamp("2024-12-31"),
        frequency="1h"
    )

    # Ingest to bundle
    source.ingest_to_bundle(
        bundle_name="crypto-hourly",
        symbols=["BTC/USDT", "ETH/USDT"],
        start=pd.Timestamp("2024-01-01"),
        end=pd.Timestamp("2024-12-31"),
        frequency="1h",
        asset_type="crypto"  # Assigns 24/7 calendar for cryptocurrencies
    )

asyncio.run(main())

Supported Exchanges: binance, coinbase, kraken, bybit, okx, and 100+ more.

Polygon (via DataSourceRegistry)

High-quality financial data from Polygon.io.

from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio

async def main():
    source = DataSourceRegistry.get_source(
        "polygon",
        api_key="YOUR_API_KEY"
    )
    df = await source.fetch(
        symbols=["AAPL"],
        start=pd.Timestamp("2024-01-01"),
        end=pd.Timestamp("2024-01-31"),
        frequency="1m"
    )

    # Ingest to bundle
    source.ingest_to_bundle(
        bundle_name="polygon-stocks",
        symbols=["AAPL", "TSLA"],
        start=pd.Timestamp("2024-01-01"),
        end=pd.Timestamp("2024-01-31"),
        frequency="1m",
        asset_type="equity"  # Assigns XNYS calendar for US equities
    )

asyncio.run(main())

Features: - Real-time and historical - Stocks, options, forex, crypto - Tick-level data available - Premium tiers for more data

CSV (via DataSourceRegistry)

Load data from CSV files.

from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio

async def main():
    source = DataSourceRegistry.get_source(
        "csv",
        data_dir="/path/to/csv/files",
    )
    df = await source.fetch(
        symbols=["AAPL", "MSFT"],
        start=pd.Timestamp("2023-01-01"),
        end=pd.Timestamp("2023-12-31"),
        frequency="1d"
    )

    # Ingest to bundle
    source.ingest_to_bundle(
        bundle_name="custom-data",
        symbols=["AAPL", "MSFT"],
        start=pd.Timestamp("2023-01-01"),
        end=pd.Timestamp("2023-12-31"),
        frequency="1d",
        asset_type="equity"  # Specify asset type for appropriate calendar
    )

asyncio.run(main())

CSV Format:

date,open,high,low,close,volume
2023-01-01,100.0,105.0,99.0,103.0,1000000
2023-01-02,103.0,106.0,102.0,105.0,1200000

Registry Pattern

DataSourceRegistry

Centralized registry for managing data sources.

from rustybt.data.sources import DataSourceRegistry

# Get source by name
source = DataSourceRegistry.get_source("yfinance")

# Get source with config
source = DataSourceRegistry.get_source(
    "alpaca",
    api_key="...",
    api_secret="..."
)

# List available sources
sources = DataSourceRegistry.list_sources()
print(sources)  # ["alpaca", "alphavantage", "ccxt", "csv", "polygon", "yfinance"]

Creating Custom Adapters

Example: Custom REST API Adapter (advanced)


See Also: - Data Management Performance - Data Ingestion Guide - Live vs Backtest Data