Data Ingestion with RustyBT¶
This notebook demonstrates how to fetch and prepare data from multiple sources for backtesting.
Data Sources Covered:
- yfinance (Yahoo Finance) - Free stocks, ETFs, forex
- CCXT - Cryptocurrency exchanges (100+ exchanges)
- CSV files - Custom data
- Alpaca - Real-time and historical market data
What you'll learn:
- Fetching data from different providers
- Data validation and quality checks
- Creating custom data bundles
- Caching for performance
Estimated runtime: 5-10 minutes (depending on data downloads)
📋 Notebook Information
- RustyBT Version: 0.1.2+
- Last Validated: 2025-11-07
- API Compatibility: Verified ✅
- Documentation: API Reference
In [ ]:
Copied!
# Setup
from rustybt.analytics import create_progress_iterator, setup_notebook
setup_notebook()
import contextlib
import numpy as np
import pandas as pd
import polars as pl
from rustybt.data.adapters import CCXTAdapter, CSVAdapter, YFinanceAdapter
# Setup
from rustybt.analytics import create_progress_iterator, setup_notebook
setup_notebook()
import contextlib
import numpy as np
import pandas as pd
import polars as pl
from rustybt.data.adapters import CCXTAdapter, CSVAdapter, YFinanceAdapter
1. Yahoo Finance Data (Stocks & ETFs)¶
Yahoo Finance provides free historical data for stocks, ETFs, indices, and forex.
In [ ]:
Copied!
# Initialize yfinance adapter
yf_adapter = YFinanceAdapter()
# Fetch data for multiple stocks
symbols = ["AAPL", "GOOGL", "MSFT", "TSLA"]
start_date = pd.Timestamp("2023-01-01")
end_date = pd.Timestamp("2023-12-31")
# Fetch with progress bar
all_data = []
for symbol in create_progress_iterator(symbols, desc="Downloading"):
data = await yf_adapter.fetch(
symbols=[symbol], start_date=start_date, end_date=end_date, resolution="1d"
)
all_data.append(data)
# Combine all data
combined = pl.concat(all_data)
# Initialize yfinance adapter
yf_adapter = YFinanceAdapter()
# Fetch data for multiple stocks
symbols = ["AAPL", "GOOGL", "MSFT", "TSLA"]
start_date = pd.Timestamp("2023-01-01")
end_date = pd.Timestamp("2023-12-31")
# Fetch with progress bar
all_data = []
for symbol in create_progress_iterator(symbols, desc="Downloading"):
data = await yf_adapter.fetch(
symbols=[symbol], start_date=start_date, end_date=end_date, resolution="1d"
)
all_data.append(data)
# Combine all data
combined = pl.concat(all_data)
In [ ]:
Copied!
# Validate data quality
with contextlib.suppress(Exception):
yf_adapter.validate(combined)
# Validate data quality
with contextlib.suppress(Exception):
yf_adapter.validate(combined)
2. Cryptocurrency Data (CCXT)¶
CCXT provides unified access to 100+ cryptocurrency exchanges.
In [ ]:
Copied!
# Initialize CCXT adapter for Binance
binance = CCXTAdapter(exchange_id="binance")
# Fetch BTC and ETH data
crypto_symbols = ["BTC/USDT", "ETH/USDT"]
crypto_data = []
for symbol in crypto_symbols:
data = await binance.fetch(
symbols=[symbol],
start_date=pd.Timestamp("2024-01-01"),
end_date=pd.Timestamp("2024-01-31"),
resolution="1h", # Hourly data
)
crypto_data.append(data)
crypto_combined = pl.concat(crypto_data)
# Initialize CCXT adapter for Binance
binance = CCXTAdapter(exchange_id="binance")
# Fetch BTC and ETH data
crypto_symbols = ["BTC/USDT", "ETH/USDT"]
crypto_data = []
for symbol in crypto_symbols:
data = await binance.fetch(
symbols=[symbol],
start_date=pd.Timestamp("2024-01-01"),
end_date=pd.Timestamp("2024-01-31"),
resolution="1h", # Hourly data
)
crypto_data.append(data)
crypto_combined = pl.concat(crypto_data)
3. CSV Data Import¶
Import custom data from CSV files.
In [ ]:
Copied!
# Example CSV structure:
csv_example = pd.DataFrame(
{
"timestamp": pd.date_range("2024-01-01", periods=100, freq="D"),
"symbol": "CUSTOM",
"open": 100 + np.random.randn(100).cumsum(),
"high": 105 + np.random.randn(100).cumsum(),
"low": 95 + np.random.randn(100).cumsum(),
"close": 100 + np.random.randn(100).cumsum(),
"volume": np.random.randint(1000000, 10000000, 100),
}
)
# Save example CSV
csv_example.to_csv("example_data.csv", index=False)
# Load using CSV adapter
csv_adapter = CSVAdapter()
csv_data = csv_adapter.load("example_data.csv")
# Example CSV structure:
csv_example = pd.DataFrame(
{
"timestamp": pd.date_range("2024-01-01", periods=100, freq="D"),
"symbol": "CUSTOM",
"open": 100 + np.random.randn(100).cumsum(),
"high": 105 + np.random.randn(100).cumsum(),
"low": 95 + np.random.randn(100).cumsum(),
"close": 100 + np.random.randn(100).cumsum(),
"volume": np.random.randint(1000000, 10000000, 100),
}
)
# Save example CSV
csv_example.to_csv("example_data.csv", index=False)
# Load using CSV adapter
csv_adapter = CSVAdapter()
csv_data = csv_adapter.load("example_data.csv")
4. Data Quality Checks¶
Always validate data before using in backtests.
In [ ]:
Copied!
def check_data_quality(df: pl.DataFrame, name: str = "Data") -> None:
"""Comprehensive data quality check."""
print(f"\n{'='*60}")
print(f"Data Quality Check: {name}")
print(f"{'='*60}")
# Check for nulls
null_counts = df.null_count()
total_nulls = sum(null_counts.to_dicts()[0].values())
if total_nulls > 0:
print(f"⚠️ Found {total_nulls} null values")
print(null_counts)
else:
print("✅ No null values found")
# Check OHLC relationships
invalid = df.filter(
(pl.col("high") < pl.col("low"))
| (pl.col("high") < pl.col("open"))
| (pl.col("high") < pl.col("close"))
| (pl.col("low") > pl.col("open"))
| (pl.col("low") > pl.col("close"))
)
if len(invalid) > 0:
print(f"❌ Found {len(invalid)} rows with invalid OHLC relationships")
print(invalid.head())
else:
print("✅ OHLC relationships are valid")
# Check for duplicates
duplicates = df.filter(pl.col("timestamp").is_duplicated())
if len(duplicates) > 0:
print(f"⚠️ Found {len(duplicates)} duplicate timestamps")
else:
print("✅ No duplicate timestamps")
# Date range
min_date = df.select(pl.col("timestamp").min()).item()
max_date = df.select(pl.col("timestamp").max()).item()
row_count = len(df)
print(f"\n📊 Data Summary:")
print(f" Rows: {row_count:,}")
print(f" Date Range: {min_date} to {max_date}")
print(f" Symbols: {df.select(pl.col('symbol').n_unique()).item()}")
print(f"{'='*60}\n")
# Check quality
check_data_quality(combined, "Stock Data")
check_data_quality(crypto_combined, "Crypto Data")
def check_data_quality(df: pl.DataFrame, name: str = "Data") -> None:
"""Comprehensive data quality check."""
print(f"\n{'='*60}")
print(f"Data Quality Check: {name}")
print(f"{'='*60}")
# Check for nulls
null_counts = df.null_count()
total_nulls = sum(null_counts.to_dicts()[0].values())
if total_nulls > 0:
print(f"⚠️ Found {total_nulls} null values")
print(null_counts)
else:
print("✅ No null values found")
# Check OHLC relationships
invalid = df.filter(
(pl.col("high") < pl.col("low"))
| (pl.col("high") < pl.col("open"))
| (pl.col("high") < pl.col("close"))
| (pl.col("low") > pl.col("open"))
| (pl.col("low") > pl.col("close"))
)
if len(invalid) > 0:
print(f"❌ Found {len(invalid)} rows with invalid OHLC relationships")
print(invalid.head())
else:
print("✅ OHLC relationships are valid")
# Check for duplicates
duplicates = df.filter(pl.col("timestamp").is_duplicated())
if len(duplicates) > 0:
print(f"⚠️ Found {len(duplicates)} duplicate timestamps")
else:
print("✅ No duplicate timestamps")
# Date range
min_date = df.select(pl.col("timestamp").min()).item()
max_date = df.select(pl.col("timestamp").max()).item()
row_count = len(df)
print(f"\n📊 Data Summary:")
print(f" Rows: {row_count:,}")
print(f" Date Range: {min_date} to {max_date}")
print(f" Symbols: {df.select(pl.col('symbol').n_unique()).item()}")
print(f"{'='*60}\n")
# Check quality
check_data_quality(combined, "Stock Data")
check_data_quality(crypto_combined, "Crypto Data")
5. Save Data for Backtesting¶
Save data in efficient formats for fast backtesting.
In [ ]:
Copied!
# Save to Parquet (recommended - fast and efficient)
combined.write_parquet("stocks_2023.parquet")
crypto_combined.write_parquet("crypto_2024_01.parquet")
# Can also save to CSV for compatibility
# combined.write_csv('stocks_2023.csv')
# Save to Parquet (recommended - fast and efficient)
combined.write_parquet("stocks_2023.parquet")
crypto_combined.write_parquet("crypto_2024_01.parquet")
# Can also save to CSV for compatibility
# combined.write_csv('stocks_2023.csv')
6. Data Caching¶
RustyBT supports caching to avoid re-downloading data.
In [ ]:
Copied!
from rustybt.data.catalog import DataCatalog
# Initialize catalog with caching
catalog = DataCatalog(cache_dir="./data_cache")
# Register data source
catalog.register(
name="stocks_2023",
adapter=yf_adapter,
symbols=["AAPL", "GOOGL", "MSFT"],
start_date=pd.Timestamp("2023-01-01"),
end_date=pd.Timestamp("2023-12-31"),
)
# First call downloads data
data1 = catalog.load("stocks_2023")
# Second call uses cache
data2 = catalog.load("stocks_2023")
from rustybt.data.catalog import DataCatalog
# Initialize catalog with caching
catalog = DataCatalog(cache_dir="./data_cache")
# Register data source
catalog.register(
name="stocks_2023",
adapter=yf_adapter,
symbols=["AAPL", "GOOGL", "MSFT"],
start_date=pd.Timestamp("2023-01-01"),
end_date=pd.Timestamp("2023-12-31"),
)
# First call downloads data
data1 = catalog.load("stocks_2023")
# Second call uses cache
data2 = catalog.load("stocks_2023")
Next Steps¶
Now that you have data:
- 03_strategy_development.ipynb - Build trading strategies with this data
- 10_full_workflow.ipynb - See complete workflow from data to results
Key Takeaways¶
- ✅ Multiple data sources supported (stocks, crypto, custom)
- ✅ Built-in data validation catches errors early
- ✅ Efficient Parquet storage for fast backtests
- ✅ Caching prevents redundant downloads
- ✅ Progress bars for long downloads