Data Ingestion Guide¶
Last Updated: 2024-10-11
Quick Start¶
Ingest stock data from Yahoo Finance in one line:
rustybt ingest-unified yfinance --symbols AAPL,MSFT,GOOGL --bundle my-stocks \
--start 2023-01-01 --end 2023-12-31 --frequency 1d
Or using Python API:
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source("yfinance")
await source.ingest_to_bundle(
bundle_name="my-stocks",
symbols=["AAPL", "MSFT", "GOOGL"],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
Overview¶
The unified data ingestion system supports multiple data sources through a consistent API. All adapters implement the same DataSource interface, making it easy to switch between providers.
Supported Data Sources¶
| Source | Type | Live Support | Rate Limit | API Key Required |
|---|---|---|---|---|
| yfinance | Equities/ETFs | ❌ | 2000 req/hr | ❌ |
| ccxt | Crypto | ✅ | Varies by exchange | ⚠️ Depends on exchange |
| polygon | Equities/Options | ✅ | Plan-dependent | ✅ |
| alpaca | Equities | ✅ | 200 req/min | ✅ |
| alphavantage | Equities/Forex | ❌ | 5 req/min (free) | ✅ |
| csv | Any | ❌ | N/A | ❌ |
Per-Adapter Examples¶
YFinance (Free Equities/ETFs)¶
Best for: Historical backtesting, US equities, no API key needed
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source("yfinance")
await source.ingest_to_bundle(
bundle_name="tech-stocks",
symbols=["AAPL", "MSFT", "GOOGL", "AMZN"],
start=pd.Timestamp("2020-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
CLI equivalent:
rustybt ingest-unified yfinance \
--symbols AAPL,MSFT,GOOGL,AMZN \
--start 2020-01-01 \
--end 2023-12-31 \
--frequency 1d \
--bundle tech-stocks
CCXT (Crypto Exchanges)¶
Best for: Cryptocurrency data from 100+ exchanges
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source("ccxt", exchange="binance")
await source.ingest_to_bundle(
bundle_name="crypto-hourly",
symbols=["BTC/USDT", "ETH/USDT", "SOL/USDT"],
start=pd.Timestamp("2024-01-01"),
end=pd.Timestamp("2024-12-31"),
frequency="1h"
)
asyncio.run(main())
CLI equivalent:
rustybt ingest-unified ccxt \
--exchange binance \
--symbols BTC/USDT,ETH/USDT,SOL/USDT \
--start 2024-01-01 \
--end 2024-12-31 \
--frequency 1h \
--bundle crypto-hourly
Supported exchanges: Run rustybt ingest-unified ccxt --list-exchanges
Polygon (Premium Equities/Options)¶
Best for: High-quality data, minute bars, options chains
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source(
"polygon",
api_key="YOUR_API_KEY"
)
await source.ingest_to_bundle(
bundle_name="intraday-stocks",
symbols=["AAPL", "TSLA"],
start=pd.Timestamp("2024-01-01"),
end=pd.Timestamp("2024-01-31"),
frequency="1m"
)
asyncio.run(main())
Note: Polygon API key required. Get one at polygon.io
Alpaca (US Equities with Live Support)¶
Best for: Live trading + backtesting with same API
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source(
"alpaca",
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET"
)
await source.ingest_to_bundle(
bundle_name="alpaca-stocks",
symbols=["SPY", "QQQ", "IWM"],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
Note: Supports both paper trading and live accounts
AlphaVantage (Equities + Forex)¶
Best for: Forex pairs, fundamental data
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source(
"alphavantage",
api_key="YOUR_API_KEY"
)
await source.ingest_to_bundle(
bundle_name="forex-pairs",
symbols=["EUR/USD", "GBP/USD", "USD/JPY"],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
Note: Free tier limited to 5 requests/minute
CSV (Local Files)¶
Best for: Custom data, proprietary sources
from rustybt.data.sources import DataSourceRegistry
import pandas as pd
import asyncio
async def main():
source = DataSourceRegistry.get_source(
"csv",
csv_dir="/path/to/csv/files"
)
# Ingest from CSV files
# Expected format: {symbol}.csv with columns: date,open,high,low,close,volume
await source.ingest_to_bundle(
bundle_name="custom-data",
symbols=["SYM1", "SYM2"],
start=pd.Timestamp("2020-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
CSV format requirements:
- Filename: {symbol}.csv (e.g., AAPL.csv)
- Required columns: date, open, high, low, close, volume
- Date format: ISO 8601 (2023-01-15)
- Decimal precision: Use string or Decimal for prices
CLI Reference¶
General Syntax¶
Common Options¶
| Option | Description | Example |
|---|---|---|
--symbols |
Comma-separated symbols | --symbols AAPL,MSFT |
--start |
Start date (ISO 8601) | --start 2023-01-01 |
--end |
End date (ISO 8601) | --end 2023-12-31 |
--frequency |
Data frequency | --frequency 1d |
--bundle |
Bundle name | --bundle my-data |
--api-key |
API key (if required) | --api-key YOUR_KEY |
--no-cache |
Disable caching | --no-cache |
--validate |
Validate after ingestion | --validate |
Frequency Options¶
| Value | Description | Example Use Case |
|---|---|---|
1d |
Daily bars | Long-term backtests |
1h |
Hourly bars | Intraday strategies |
5m |
5-minute bars | High-frequency strategies |
1m |
1-minute bars | Ultra high-frequency |
Advanced Usage¶
Batch Ingestion¶
Ingest multiple bundles in one script:
import asyncio
import pandas as pd
from rustybt.data.sources import DataSourceRegistry
async def main():
configs = [
{
"source": "yfinance",
"bundle": "us-equities",
"symbols": ["AAPL", "MSFT", "GOOGL"],
},
{
"source": "ccxt",
"bundle": "crypto",
"symbols": ["BTC/USDT", "ETH/USDT"],
"exchange": "binance",
},
]
for config in configs:
source = DataSourceRegistry.get_source(config["source"], **config.get("params", {}))
await source.ingest_to_bundle(
bundle_name=config["bundle"],
symbols=config["symbols"],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
asyncio.run(main())
Incremental Updates¶
Update existing bundle with new data:
import asyncio
import pandas as pd
from rustybt.data.sources import DataSourceRegistry
from rustybt.data.bundles.metadata import BundleMetadata
async def main():
# Load existing bundle metadata
metadata = BundleMetadata.load("my-stocks")
last_date = metadata.end_date
# Get data source
source = DataSourceRegistry.get_source("yfinance")
# Ingest only new data
await source.ingest_to_bundle(
bundle_name="my-stocks",
symbols=metadata.symbols,
start=last_date + pd.Timedelta(days=1),
end=pd.Timestamp.now(),
frequency="1d",
mode="append" # Append to existing bundle
)
asyncio.run(main())
Validation After Ingestion¶
import asyncio
import pandas as pd
from rustybt.data.sources import DataSourceRegistry
from rustybt.data.bundles.metadata import BundleMetadata
async def main():
source = DataSourceRegistry.get_source("yfinance")
await source.ingest_to_bundle(
bundle_name="my-stocks",
symbols=["AAPL"],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
# Validate bundle
metadata = BundleMetadata.load("my-stocks")
assert metadata.quality_score > 0.95, "Low quality data detected"
assert metadata.missing_data_pct < 0.05, "Too much missing data"
asyncio.run(main())
Troubleshooting¶
Rate Limit Errors¶
Error: RateLimitExceeded: Too many requests to API
Solution: Use caching or slow down ingestion:
import asyncio
import pandas as pd
from rustybt.data.sources import DataSourceRegistry
async def main():
source = DataSourceRegistry.get_source("yfinance")
symbols = ["AAPL", "MSFT", "GOOGL"]
for symbol in symbols:
await source.ingest_to_bundle(
bundle_name=f"bundle-{symbol}",
symbols=[symbol],
start=pd.Timestamp("2023-01-01"),
end=pd.Timestamp("2023-12-31"),
frequency="1d"
)
await asyncio.sleep(1) # 1 second delay between symbols
asyncio.run(main())
Missing Data¶
Error: NoDataAvailableError: Symbol AAPL has no data for 2023-01-15
Possible causes: - Market holiday (no trading) - Symbol delisted or renamed - API downtime
Solution: Check metadata quality score:
metadata = BundleMetadata.load("my-bundle")
print(f"Missing data: {metadata.missing_data_pct*100:.2f}%")
API Authentication Errors¶
Error: AuthenticationError: Invalid API key
Solution: Set API key via environment variable:
export POLYGON_API_KEY="your_key_here"
export ALPACA_API_KEY="your_key_here"
export ALPACA_API_SECRET="your_secret_here"
Next Steps¶
- Caching Guide - Optimize performance with caching
- Migration Guide - Upgrade from old APIs
- API Reference - Full DataSource API documentation