Skip to content

BaseDataAdapter - Core Adapter Interface

Module: rustybt.data.adapters.base

Overview

BaseDataAdapter is the abstract base class for all RustyBT data adapters. It provides a standardized interface for fetching, validating, and standardizing market data from any source with built-in rate limiting, retry logic, and error handling.

Class Definition

from abc import ABC
from rustybt.data.adapters.base import BaseDataAdapter

class BaseDataAdapter(ABC):
    """Base class for data source adapters.

    Provides standardized interface for fetching, validating, and standardizing
    market data from various sources. Includes built-in rate limiting, retry logic,
    and error handling.
    """

Standard Schema

All adapters must return data in this standardized Polars DataFrame schema:

import polars as pl

# Standard OHLCV schema for all adapters
STANDARD_SCHEMA = {
    "timestamp": pl.Datetime("us"),                    # UTC timestamps (microsecond precision)
    "symbol": pl.Utf8,                                 # Asset symbol/ticker
    "open": pl.Decimal(precision=18, scale=8),         # Opening price
    "high": pl.Decimal(precision=18, scale=8),         # Highest price
    "low": pl.Decimal(precision=18, scale=8),          # Lowest price
    "close": pl.Decimal(precision=18, scale=8),        # Closing price
    "volume": pl.Decimal(precision=18, scale=8),       # Trading volume
}

Key Points: - Decimal Precision: All prices use Decimal type (18 digits precision, 8 decimal places) - UTC Timestamps: All timestamps are in UTC timezone - Sorted: Data must be sorted by timestamp within each symbol - No Duplicates: No duplicate symbol-timestamp pairs allowed

Constructor

from typing import Optional

# Constructor signature (called from subclass)
def __init__(
    self,
    name: str,
    rate_limit_per_second: int = 10,
    max_retries: int = 3,
    initial_retry_delay: float = 1.0,
    backoff_factor: float = 2.0,
    validator: Optional['DataValidator'] = None,
) -> None:
    """Initialize base data adapter."""
    pass

Parameters

  • name (str): Adapter name for logging and identification
  • rate_limit_per_second (int, default=10): Maximum requests per second
  • max_retries (int, default=3): Maximum retry attempts for transient errors
  • initial_retry_delay (float, default=1.0): Initial delay before first retry (seconds)
  • backoff_factor (float, default=2.0): Multiplier for exponential backoff
  • validator (Optional[DataValidator], default=None): Optional multi-layer data validator

Example

from rustybt.data.adapters.base import BaseDataAdapter

class MyAdapter(BaseDataAdapter):
    def __init__(self):
        super().__init__(
            name="MyAdapter",
            rate_limit_per_second=5,     # Conservative rate limit
            max_retries=3,                # Retry failed requests
            backoff_factor=2.0            # Exponential backoff
        )

Abstract Methods

Subclasses must implement these methods:

fetch()

from abc import abstractmethod
import pandas as pd
import polars as pl

# Abstract method signature (implement in subclass)
@abstractmethod
async def fetch(
    self,
    symbols: list[str],
    start_date: pd.Timestamp,
    end_date: pd.Timestamp,
    resolution: str,
) -> pl.DataFrame:
    """Fetch OHLCV data and return Polars DataFrame with Decimal columns."""
    pass

Fetch OHLCV data and return Polars DataFrame with Decimal columns.

Parameters: - symbols (list[str]): List of symbols to fetch (e.g., ["AAPL", "MSFT"]) - start_date (pd.Timestamp): Start date for data range - end_date (pd.Timestamp): End date for data range - resolution (str): Time resolution (e.g., "1d", "1h", "1m")

Returns: - pl.DataFrame: Polars DataFrame with standardized OHLCV schema

Raises: - NetworkError: If API request fails - RateLimitError: If rate limit exceeded - InvalidDataError: If received data is invalid - ValidationError: If data validation fails

standardize()

from abc import abstractmethod
import polars as pl

# Abstract method signature (implement in subclass)
@abstractmethod
def standardize(self, df: pl.DataFrame) -> pl.DataFrame:
    """Convert provider-specific format to RustyBT standard schema."""
    pass

Convert provider-specific format to RustyBT standard schema.

Parameters: - df (pl.DataFrame): DataFrame in provider-specific format

Returns: - pl.DataFrame: DataFrame with standardized schema and Decimal columns

Optional Methods

Subclasses can override these methods for custom behavior:

validate()

import polars as pl
from rustybt.data.adapters.base import validate_ohlcv_relationships

# Optional method signature (override in subclass if needed)
def validate(self, df: pl.DataFrame) -> bool:
    """Validate OHLCV data quality and relationships."""
    return validate_ohlcv_relationships(df)

Validate OHLCV data quality and relationships. Default implementation uses validate_ohlcv_relationships().

Parameters: - df (pl.DataFrame): DataFrame to validate

Returns: - bool: True if validation passes

Raises: - ValidationError: If data validation fails

Built-in Features

Rate Limiting

Automatic rate limiting using token bucket algorithm:

import asyncio
import pandas as pd
from rustybt.data.adapters.yfinance_adapter import YFinanceAdapter

async def fetch_multiple_symbols():
    # Rate limiter automatically throttles requests
    adapter = YFinanceAdapter(request_delay=0.2)  # 5 requests per second

    start_date = pd.Timestamp("2024-01-01")
    end_date = pd.Timestamp("2024-01-31")

    # This will automatically space out requests
    for symbol in ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META"]:
        data = await adapter.fetch([symbol], start_date, end_date, "1d")
        print(f"Fetched {len(data)} rows for {symbol}")
        # Requests are automatically spaced to respect rate limit

# Run the async function
asyncio.run(fetch_multiple_symbols())

Retry Logic

Automatic retry with exponential backoff for transient errors:

import asyncio
import pandas as pd
from rustybt.data.adapters.yfinance_adapter import YFinanceAdapter

async def fetch_with_retry():
    adapter = YFinanceAdapter(
        request_delay=1.0,          # Base request delay
        # Note: max_retries is set in BaseDataAdapter (default 3)
    )

    start_date = pd.Timestamp("2024-01-01")
    end_date = pd.Timestamp("2024-01-31")

    # Transient errors (NetworkError, TimeoutError) are automatically retried
    # with exponential backoff: 1s, 2s, 4s
    data = await adapter.fetch(["AAPL"], start_date, end_date, "1d")

    return data

# Run the async function
data = asyncio.run(fetch_with_retry())

Data Validation

Automatic validation of OHLCV relationships:

import asyncio
import pandas as pd
from rustybt.data.adapters.yfinance_adapter import YFinanceAdapter

async def fetch_with_validation():
    adapter = YFinanceAdapter()

    start_date = pd.Timestamp("2024-01-01")
    end_date = pd.Timestamp("2024-01-31")

    # Validation happens automatically in fetch()
    data = await adapter.fetch(["AAPL"], start_date, end_date, "1d")

    # Validation checks:
    # ✅ Required columns exist (timestamp, symbol, open, high, low, close, volume)
    # ✅ OHLCV relationships valid (high >= low, high >= open/close, etc.)
    # ✅ No NULL values in required columns
    # ✅ Timestamps are sorted
    # ✅ No duplicate timestamps per symbol

    return data

# Run the async function
data = asyncio.run(fetch_with_validation())

Utility Functions

validate_ohlcv_relationships()

import pandas as pd
import polars as pl
from rustybt.data.adapters.base import validate_ohlcv_relationships

# Create sample data
df = pl.DataFrame({
    "timestamp": [pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03")],
    "symbol": ["AAPL", "AAPL"],
    "open": ["185.64", "186.84"],
    "high": ["186.95", "187.73"],
    "low": ["185.17", "186.06"],
    "close": ["186.89", "186.33"],
    "volume": ["45274200", "37628400"],
}).with_columns([
    pl.col("open").cast(str).str.to_decimal(scale=8),
    pl.col("high").cast(str).str.to_decimal(scale=8),
    pl.col("low").cast(str).str.to_decimal(scale=8),
    pl.col("close").cast(str).str.to_decimal(scale=8),
    pl.col("volume").cast(str).str.to_decimal(scale=8),
])

# Validate OHLCV data manually
is_valid = validate_ohlcv_relationships(df)
print(f"Data is valid: {is_valid}")

Validates: - Required columns exist - OHLCV relationships (high ≥ low, high ≥ open/close, etc.) - No NULL values - Timestamps sorted - No duplicate timestamps per symbol

detect_outliers()

import pandas as pd
import polars as pl
from rustybt.data.adapters.base import detect_outliers

# Create sample data with potential outlier
data = pl.DataFrame({
    "timestamp": [pd.Timestamp(f"2024-01-{i:02d}") for i in range(2, 7)],
    "symbol": ["AAPL"] * 5,
    "open": ["185", "186", "187", "188", "189"],
    "high": ["186", "187", "188", "189", "190"],
    "low": ["184", "185", "186", "187", "188"],
    "close": ["185.5", "186.5", "187.5", "188.5", "189.5"],
    "volume": ["1000000"] * 5,
}).with_columns([
    pl.col("open").cast(str).str.to_decimal(scale=8),
    pl.col("high").cast(str).str.to_decimal(scale=8),
    pl.col("low").cast(str).str.to_decimal(scale=8),
    pl.col("close").cast(str).str.to_decimal(scale=8),
    pl.col("volume").cast(str).str.to_decimal(scale=8),
])

# Detect price outliers using MAD
outliers = detect_outliers(data, threshold=3.0)

if len(outliers) > 0:
    print(f"Found {len(outliers)} potential data quality issues")
    print(outliers[["timestamp", "symbol", "close", "pct_change"]])
else:
    print("No outliers detected")

Uses Median Absolute Deviation (MAD) to detect anomalous price movements.

Parameters: - df (pl.DataFrame): DataFrame with OHLCV data - threshold (float, default=3.0): Number of MADs for outlier detection

Returns: - pl.DataFrame: DataFrame containing only outlier rows

Exception Hierarchy

from rustybt.data.adapters.base import (
    NetworkError,        # Network connectivity error
    RateLimitError,     # API rate limit exceeded
    InvalidDataError,   # Invalid data received
    ValidationError,    # Data validation failed
)

# All inherit from DataAdapterError
from rustybt.exceptions import DataAdapterError

Creating Custom Adapters

Minimal Example

import pandas as pd
import polars as pl
from rustybt.data.adapters.base import BaseDataAdapter

class MyCustomAdapter(BaseDataAdapter):
    """Custom adapter for my data source."""

    def __init__(self):
        super().__init__(name="MyCustomAdapter", rate_limit_per_second=10)

    async def fetch(
        self,
        symbols: list[str],
        start_date: pd.Timestamp,
        end_date: pd.Timestamp,
        resolution: str,
    ) -> pl.DataFrame:
        """Fetch data from my source."""
        # 1. Fetch raw data from source
        raw_data = await self._fetch_from_source(symbols, start_date, end_date, resolution)

        # 2. Convert to Polars DataFrame
        df = pl.DataFrame(raw_data)

        # 3. Standardize to RustyBT schema
        df = self.standardize(df)

        # 4. Validate (automatic)
        self.validate(df)

        # 5. Log success
        self._log_fetch_success(symbols, start_date, end_date, resolution, len(df))

        return df

    def standardize(self, df: pl.DataFrame) -> pl.DataFrame:
        """Convert my format to standard schema."""
        return df.select([
            pl.col("dt").alias("timestamp").cast(pl.Datetime("us")),
            pl.col("ticker").alias("symbol"),
            pl.col("o").cast(str).str.to_decimal(scale=8).alias("open"),
            pl.col("h").cast(str).str.to_decimal(scale=8).alias("high"),
            pl.col("l").cast(str).str.to_decimal(scale=8).alias("low"),
            pl.col("c").cast(str).str.to_decimal(scale=8).alias("close"),
            pl.col("v").cast(str).str.to_decimal(scale=8).alias("volume"),
        ])

    async def _fetch_from_source(self, symbols, start_date, end_date, resolution):
        """Fetch from actual data source (implement based on your source)."""
        # Example stub - replace with actual API/database calls
        return {
            "dt": [start_date],
            "ticker": [symbols[0]],
            "o": ["100.00"],
            "h": ["101.00"],
            "l": ["99.00"],
            "c": ["100.50"],
            "v": ["1000000"],
        }

# Example usage
async def main():
    adapter = MyCustomAdapter()
    data = await adapter.fetch(
        ["AAPL"],
        pd.Timestamp("2024-01-02"),
        pd.Timestamp("2024-01-03"),
        "1d"
    )
    print(f"Fetched {len(data)} rows")

# Uncomment to run: asyncio.run(main())

Full Example with Error Handling

import pandas as pd
import polars as pl
from rustybt.data.adapters.base import (
    BaseDataAdapter,
    NetworkError,
    InvalidDataError,
    with_retry,
)
import structlog

logger = structlog.get_logger()

class RobustAdapter(BaseDataAdapter):
    """Production-grade adapter with error handling."""

    def __init__(self):
        super().__init__(
            name="RobustAdapter",
            rate_limit_per_second=5,
            max_retries=3,
            backoff_factor=2.0,
        )

    @with_retry(max_retries=3, initial_delay=1.0, backoff_factor=2.0)
    async def fetch(
        self,
        symbols: list[str],
        start_date: pd.Timestamp,
        end_date: pd.Timestamp,
        resolution: str,
    ) -> pl.DataFrame:
        """Fetch with automatic retry on transient errors."""

        # Rate limiting (automatic via self.rate_limiter)
        await self.rate_limiter.acquire()

        try:
            # Fetch raw data
            raw_data = await self._api_call(symbols, start_date, end_date, resolution)

            # Convert and standardize
            df = pl.DataFrame(raw_data)
            df = self.standardize(df)

            # Validate
            self.validate(df)

            # Log success
            self._log_fetch_success(symbols, start_date, end_date, resolution, len(df))

            return df

        except ValueError as e:
            # Non-retryable error
            logger.error("invalid_data", error=str(e), symbols=symbols)
            raise InvalidDataError(f"Invalid data received: {e}")

        except Exception as e:
            # Unexpected error
            logger.error("fetch_failed", error=str(e), symbols=symbols)
            raise NetworkError(f"Fetch failed: {e}")

    def standardize(self, df: pl.DataFrame) -> pl.DataFrame:
        """Standardize schema with proper type conversions."""
        return df.select([
            pl.col("timestamp").cast(pl.Datetime("us")),
            pl.col("symbol"),
            pl.col("open").cast(str).str.to_decimal(scale=8),
            pl.col("high").cast(str).str.to_decimal(scale=8),
            pl.col("low").cast(str).str.to_decimal(scale=8),
            pl.col("close").cast(str).str.to_decimal(scale=8),
            pl.col("volume").cast(str).str.to_decimal(scale=8),
        ]).sort("timestamp", "symbol")

    async def _api_call(self, symbols, start_date, end_date, resolution):
        """Example API call - replace with actual implementation."""
        # Stub implementation
        return {
            "timestamp": [start_date],
            "symbol": [symbols[0]],
            "open": ["100.00"],
            "high": ["101.00"],
            "low": ["99.00"],
            "close": ["100.50"],
            "volume": ["1000000"],
        }

# Example usage
async def main():
    adapter = RobustAdapter()
    data = await adapter.fetch(
        ["AAPL"],
        pd.Timestamp("2024-01-02"),
        pd.Timestamp("2024-01-03"),
        "1d"
    )
    print(f"Fetched {len(data)} rows with error handling")

# Uncomment to run: asyncio.run(main())

Best Practices

1. Conservative Rate Limits

from rustybt.data.adapters.yfinance_adapter import YFinanceAdapter

# ❌ Don't be aggressive with rate limits
# adapter = YFinanceAdapter(request_delay=0.01)  # Too fast! (100 req/s)

# ✅ Start conservative, increase if needed
adapter = YFinanceAdapter(request_delay=0.2)   # Safe default (5 req/s)
print(f"✅ Created adapter with safe rate limit: {adapter.request_delay}s delay")

2. Proper Decimal Conversion

import polars as pl

# Create sample data
df = pl.DataFrame({"close": [100.123456789]})

# ❌ Don't convert floats directly to Decimal
# df_bad = df.with_columns(pl.col("close").cast(pl.Decimal))  # Loses precision!

# ✅ Convert through string to preserve precision
df_good = df.with_columns(pl.col("close").cast(str).str.to_decimal(scale=8))
print(f"✅ Proper decimal conversion: {df_good['close'][0]}")

3. Always Validate

# Educational pattern - showing best practices for validation

# ❌ DON'T: Skip validation
# async def fetch(self, symbols, start_date, end_date, resolution):
#     df = await self._get_data()
#     return df  # No validation - data quality issues may slip through!

# ✅ DO: Always validate before returning
# async def fetch(self, symbols, start_date, end_date, resolution):
#     df = await self._get_data()
#     df = self.standardize(df)
#     self.validate(df)  # Catches data quality issues early
#     return df

print("✅ Pattern demonstrated: Always validate data before returning")

4. Log Important Events

# Educational pattern - showing best practices for logging

# ❌ DON'T: Silent failures
# async def fetch(self, symbols, start_date, end_date, resolution):
#     try:
#         return await self._get_data()
#     except Exception:
#         return pl.DataFrame()  # Silent failure - no logs, no debugging info!

# ✅ DO: Log errors and successes
# async def fetch(self, symbols, start_date, end_date, resolution):
#     try:
#         df = await self._get_data()
#         self._log_fetch_success(symbols, start_date, end_date, resolution, len(df))
#         return df
#     except Exception as e:
#         logger.error("fetch_failed", error=str(e), symbols=symbols)
#         raise

print("✅ Pattern demonstrated: Always log important events for debugging")

See Also