Bundle Migration Guide¶
Overview¶
This guide covers migrating legacy data bundles from HDF5 or bcolz formats to the modern Parquet-based storage with Decimal precision. The migration process preserves data integrity while providing improved performance, compression, and financial precision.
Why Migrate?¶
Benefits of Parquet Format¶
Financial Precision: - Decimal(18, 8) precision for all price data - No floating-point arithmetic errors - Guaranteed accuracy for financial calculations
Performance: - Columnar storage enables faster queries - Better compression ratios (typically 5-10x smaller) - Lazy loading with predicate pushdown - Efficient memory usage with memory-mapped files
Compatibility: - Standard format supported by Apache Arrow ecosystem - Cross-language support (Python, Rust, R, etc.) - Cloud-native storage integration
Maintenance: - Self-describing schema with embedded metadata - No index rebuilding required - Atomic writes prevent corruption
Legacy Format Limitations¶
HDF5 Issues: - Float64 precision leads to rounding errors - Requires full file locks for writes - Limited compression options - Python-centric tooling
Bcolz Issues: - Deprecated library with limited maintenance - Fixed chunk sizes limit flexibility - No native Decimal support - Requires index rebuilding
Migration Architecture¶
Migration Process Flow¶
┌─────────────────┐
│ Legacy Bundle │
│ (HDF5/bcolz) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Read Legacy │
│ (Pandas) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Convert to │
│ Polars+Decimal │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Write Parquet │
│ (compressed) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Validate Data │
│ Integrity │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Register in │
│ Metadata │
└─────────────────┘
Components¶
BundleMigrator: Main migration class handling the workflow - Reads legacy formats (HDF5, bcolz) - Converts to Polars DataFrames with Decimal columns - Writes Parquet with compression - Validates data integrity - Registers in metadata catalog
ParquetWriter: Handles Parquet writes with partitioning - Year/month partitioning for daily data - Compression (zstd, snappy, lz4) - Schema validation - Atomic writes
BundleMetadata: Tracks provenance and quality - Source information - Quality metrics - Checksums for validation - Symbol tracking
Migration Workflow¶
Step 1: Prepare Environment¶
First, ensure you have the required dependencies:
# Required packages
import rustybt
from rustybt.data.bundles.migration import BundleMigrator, migrate_bundle
from pathlib import Path
# Optional: Install bcolz if migrating from bcolz
# pip install bcolz
Step 2: Locate Legacy Bundle¶
Identify the bundle directory containing your legacy data:
# Example bundle structure (HDF5)
# data/bundles/quandl/
# ├── daily_bars.h5
# ├── metadata.json
# └── adjustments.h5
# Example bundle structure (bcolz)
# data/bundles/quantopian-quandl/
# ├── daily_equities.bcolz/
# ├── minute_equities.bcolz/
# ├── adjustments.sqlite
# └── metadata.json
bundle_path = "data/bundles/quandl"
Step 3: Initialize Migrator¶
Create a BundleMigrator instance:
from rustybt.data.bundles.migration import BundleMigrator
# Initialize migrator
migrator = BundleMigrator(bundle_path)
Step 4: Migrate Daily Bars¶
Migrate daily bar data with validation:
# Migrate daily bars from HDF5
daily_stats = migrator.migrate_daily_bars(
source_format="hdf5",
compression="zstd", # Best compression ratio
validate=True, # Validate data integrity
batch_size=100 # Process 100 assets at a time
)
print(f"Migrated {daily_stats['row_count']} rows")
print(f"Duration: {daily_stats['duration_seconds']:.2f} seconds")
Compression Options:
- "zstd": Best compression ratio, moderate speed (recommended)
- "snappy": Fastest, moderate compression
- "lz4": Fast, good compression
- None: No compression (not recommended)
Step 5: Migrate Minute Bars (Optional)¶
If you have minute bar data:
# Migrate minute bars from bcolz
minute_stats = migrator.migrate_minute_bars(
source_format="bcolz",
compression="zstd",
validate=True,
start_date=None, # Optional: filter by date range
end_date=None
)
print(f"Migrated {minute_stats['row_count']} rows")
Date Range Filtering:
from datetime import date
# Migrate only recent data
minute_stats = migrator.migrate_minute_bars(
source_format="bcolz",
start_date=date(2023, 1, 1),
end_date=date(2023, 12, 31),
validate=True
)
Step 6: Verify Migration¶
After migration, verify the results:
from rustybt.data.bundles.metadata import BundleMetadata
# Query metadata for migrated bundle
metadata = BundleMetadata.get(f"migrated-{bundle_path}")
if metadata:
print(f"Row count: {metadata.row_count}")
print(f"Date range: {metadata.start_date} to {metadata.end_date}")
print(f"Validation passed: {metadata.validation_passed}")
print(f"Checksum: {metadata.checksum}")
Convenience Function¶
For simple migrations, use the migrate_bundle() convenience function:
from rustybt.data.bundles.migration import migrate_bundle
# Migrate entire bundle in one call
stats = migrate_bundle(
bundle_path="data/bundles/quandl",
source_format="hdf5",
compression="zstd",
migrate_daily=True, # Migrate daily bars
migrate_minute=False, # Skip minute bars
validate=True
)
# Access results
print(f"Daily bars: {stats['daily']['row_count']} rows")
if 'minute' in stats:
print(f"Minute bars: {stats['minute']['row_count']} rows")
Best Practices¶
Before Migration¶
1. Backup Legacy Data
Always create a backup before migration:
2. Check Available Disk Space
Parquet files with compression are typically smaller, but you need space for both formats during migration:
import shutil
# Check available space
stats = shutil.disk_usage("/")
available_gb = stats.free / (1024 ** 3)
print(f"Available space: {available_gb:.2f} GB")
3. Verify Source Data Integrity
Ensure your legacy data is not corrupted:
import pandas as pd
# Test read HDF5
try:
df = pd.read_hdf("data/bundles/quandl/daily_bars.h5")
print(f"Source data OK: {len(df)} rows")
except Exception as e:
print(f"Source data error: {e}")
During Migration¶
1. Use Validation
Always enable validation for production migrations:
stats = migrator.migrate_daily_bars(
source_format="hdf5",
validate=True # IMPORTANT: Always validate
)
2. Monitor Progress
For large bundles, monitor progress with logging:
import structlog
# Enable debug logging
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)
)
# Now migration will log progress
migrator.migrate_daily_bars(source_format="hdf5")
3. Handle Errors Gracefully
Wrap migration in try-except for error handling:
from rustybt.data.bundles.migration import MigrationError
try:
stats = migrator.migrate_daily_bars("hdf5")
print("Migration successful")
except MigrationError as e:
print(f"Migration failed: {e}")
# Restore from backup if needed
After Migration¶
1. Verify Data Integrity
Spot-check migrated data:
import polars as pl
# Read migrated Parquet
df = pl.read_parquet("data/bundles/quandl/daily_bars")
# Verify schema
assert df.schema["open"] == pl.Decimal(18, 8), "Incorrect precision"
assert df.schema["date"] == pl.Date, "Incorrect date type"
# Verify data
print(f"Total rows: {len(df)}")
print(f"Date range: {df['date'].min()} to {df['date'].max()}")
print(df.head())
2. Update Bundle Registration
Re-register the bundle if needed:
from rustybt.data.bundles import register, unregister
# Unregister old bundle
try:
unregister("quandl")
except:
pass
# Register new Parquet bundle
from rustybt.data.bundles.csvdir import csvdir_equities
register(
"quandl-parquet",
csvdir_equities(
["daily"],
"data/bundles/quandl"
)
)
3. Clean Up Legacy Data
After verifying the migration, optionally remove legacy files:
import os
from pathlib import Path
# ONLY after thorough verification
legacy_file = Path("data/bundles/quandl/daily_bars.h5")
if legacy_file.exists():
# Move to archive instead of deleting
archive_dir = Path("data/bundles/archive")
archive_dir.mkdir(exist_ok=True)
legacy_file.rename(archive_dir / legacy_file.name)
Common Issues & Troubleshooting¶
Issue: HDF5 File Not Found¶
Error: MigrationError: HDF5 daily bars file not found
Solution: Verify the file path and structure:
from pathlib import Path
bundle_path = Path("data/bundles/quandl")
daily_bars = bundle_path / "daily_bars.h5"
if not daily_bars.exists():
print(f"File not found: {daily_bars}")
print("Available files:")
for f in bundle_path.glob("*.h5"):
print(f" - {f.name}")
Issue: Bcolz Not Installed¶
Error: MigrationError: bcolz package not installed
Solution: Install bcolz (note: requires Python 3.9 or earlier):
If bcolz is not compatible with your Python version, convert the data manually:
# Alternative: Use legacy Python environment to export CSV
# Then use CSV adapter to ingest
import pandas as pd
import bcolz
# In legacy environment
ctable = bcolz.open("data/bundles/quandl/daily_equities.bcolz")
df = ctable.todataframe()
df.to_csv("exported_data.csv", index=False)
# Then use CSV adapter in new environment
from rustybt.data.adapters.csv_adapter import CSVAdapter
adapter = CSVAdapter(file_path="exported_data.csv")
Issue: Schema Validation Failure¶
Error: ValueError: Schema validation failed
Solution: Check column names and types:
import polars as pl
# Inspect DataFrame schema
df = pl.read_parquet("output.parquet")
print(df.schema)
# Expected schema for daily bars
from rustybt.data.polars.parquet_schema import DAILY_BARS_SCHEMA
print(DAILY_BARS_SCHEMA)
Issue: Memory Issues with Large Bundles¶
Error: MemoryError: Unable to allocate array
Solution: Use batch processing:
# Migrate in smaller batches
migrator.migrate_daily_bars(
source_format="hdf5",
batch_size=50, # Reduce batch size
validate=True
)
Or process date ranges separately:
from datetime import date
# Migrate year by year
for year in range(2020, 2024):
stats = migrator.migrate_minute_bars(
source_format="bcolz",
start_date=date(year, 1, 1),
end_date=date(year, 12, 31),
validate=True
)
print(f"Migrated {year}: {stats['row_count']} rows")
Issue: Validation Fails¶
Error: MigrationError: Value mismatch at index X
Solution: Check for known data quality issues:
# Some legacy data may have precision issues
# Review the validation tolerance
# Default tolerance: 1e-6
# If legacy data has known issues, disable validation
stats = migrator.migrate_daily_bars(
source_format="hdf5",
validate=False # Skip validation
)
# Then manually verify critical data points
Complete Migration Example¶
Here's a complete example migrating a Quandl bundle:
from rustybt.data.bundles.migration import BundleMigrator, MigrationError
import structlog
from pathlib import Path
# Configure logging
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO)
)
# Migration configuration
BUNDLE_PATH = "data/bundles/quandl"
SOURCE_FORMAT = "hdf5"
COMPRESSION = "zstd"
def migrate_quandl_bundle():
"""Migrate Quandl bundle from HDF5 to Parquet."""
# Validate bundle exists
bundle_path = Path(BUNDLE_PATH)
if not bundle_path.exists():
raise ValueError(f"Bundle not found: {BUNDLE_PATH}")
# Check disk space
import shutil
stats = shutil.disk_usage(bundle_path)
if stats.free < 10 * (1024 ** 3): # 10 GB minimum
print("WARNING: Low disk space")
# Initialize migrator
migrator = BundleMigrator(BUNDLE_PATH)
# Migrate daily bars
print("Migrating daily bars...")
try:
daily_stats = migrator.migrate_daily_bars(
source_format=SOURCE_FORMAT,
compression=COMPRESSION,
validate=True,
batch_size=100
)
print(f"✓ Daily bars migrated: {daily_stats['row_count']} rows")
print(f" Output: {daily_stats['output_path']}")
print(f" Duration: {daily_stats['duration_seconds']:.2f}s")
print(f" Compression: {COMPRESSION}")
except MigrationError as e:
print(f"✗ Daily bars migration failed: {e}")
return False
# Migrate minute bars (if exists)
minute_path = bundle_path / "minute_bars.h5"
if minute_path.exists():
print("\nMigrating minute bars...")
try:
minute_stats = migrator.migrate_minute_bars(
source_format=SOURCE_FORMAT,
compression=COMPRESSION,
validate=True
)
print(f"✓ Minute bars migrated: {minute_stats['row_count']} rows")
print(f" Duration: {minute_stats['duration_seconds']:.2f}s")
except MigrationError as e:
print(f"✗ Minute bars migration failed: {e}")
# Verify results
print("\nVerifying migration...")
from rustybt.data.bundles.metadata import BundleMetadata
bundle_name = f"migrated-{BUNDLE_PATH}"
metadata = BundleMetadata.get(bundle_name)
if metadata:
print(f"✓ Metadata registered")
print(f" Row count: {metadata.row_count}")
print(f" Validation: {'PASSED' if metadata.validation_passed else 'FAILED'}")
print("\n✓ Migration complete!")
return True
if __name__ == "__main__":
success = migrate_quandl_bundle()
exit(0 if success else 1)
Migration Checklist¶
Use this checklist when performing migrations:
Pre-Migration: - [ ] Backup source bundle directory - [ ] Verify source data integrity - [ ] Check available disk space (at least 2x source size) - [ ] Install required dependencies (bcolz if needed) - [ ] Review bundle structure and file locations
During Migration: - [ ] Initialize BundleMigrator - [ ] Migrate daily bars with validation enabled - [ ] Migrate minute bars if present - [ ] Monitor logs for errors or warnings - [ ] Verify migration statistics (row counts, duration)
Post-Migration: - [ ] Verify Parquet files created successfully - [ ] Check BundleMetadata registration - [ ] Spot-check migrated data for accuracy - [ ] Compare file sizes (Parquet should be smaller) - [ ] Test reading migrated data in backtest - [ ] Update bundle registration if needed - [ ] Archive or remove legacy files (after verification) - [ ] Document migration in project changelog
API Reference Summary¶
BundleMigrator¶
Main migration class for converting bundles.
from rustybt.data.bundles.migration import BundleMigrator
migrator = BundleMigrator(bundle_path: str)
Methods:
- migrate_daily_bars(source_format, compression, validate, batch_size) → dict
- migrate_minute_bars(source_format, compression, validate, start_date, end_date) → dict
migrate_bundle()¶
Convenience function for simple migrations.
from rustybt.data.bundles.migration import migrate_bundle
stats = migrate_bundle(
bundle_path: str,
source_format: Literal["hdf5", "bcolz"],
compression: Literal["snappy", "zstd", "lz4"] | None = "zstd",
migrate_daily: bool = True,
migrate_minute: bool = False,
validate: bool = True
) → dict
MigrationError¶
Exception raised when migration fails.
from rustybt.data.bundles.migration import MigrationError
try:
migrator.migrate_daily_bars("hdf5")
except MigrationError as e:
print(f"Migration failed: {e}")
Related Documentation¶
Next Steps¶
After migration, consider:
- Performance Testing: Benchmark query performance with new Parquet format
- Monitoring: Set up monitoring for bundle freshness and quality
- Automation: Create automated migration scripts for new bundles
- Documentation: Document your migration process for team reference