Full Validation (Backtest & Paper)
In [ ]:
Copied!
"""Full TradingAlgorithm integration validation for Story 6.7 AC9/AC10.
This example completes the deferred work from Story 6.7 by demonstrating:
1. Same TradingAlgorithm subclass runs in BOTH backtest and paper trading modes
2. Uses actual backtest engine run_algorithm() function (not simplified executor)
3. Feeds historical data through PolarsDataPortal
4. Validates full strategy lifecycle (initialize(), handle_data(), before_trading_start())
5. Achieves >99% correlation between backtest and paper trading results
This validates the strategy-reusability-guarantee.md end-to-end.
Usage:
python examples/backtest_paper_full_validation.py
"""
"""Full TradingAlgorithm integration validation for Story 6.7 AC9/AC10.
This example completes the deferred work from Story 6.7 by demonstrating:
1. Same TradingAlgorithm subclass runs in BOTH backtest and paper trading modes
2. Uses actual backtest engine run_algorithm() function (not simplified executor)
3. Feeds historical data through PolarsDataPortal
4. Validates full strategy lifecycle (initialize(), handle_data(), before_trading_start())
5. Achieves >99% correlation between backtest and paper trading results
This validates the strategy-reusability-guarantee.md end-to-end.
Usage:
python examples/backtest_paper_full_validation.py
"""
In [ ]:
Copied!
import asyncio
from datetime import datetime
from decimal import Decimal
import asyncio
from datetime import datetime
from decimal import Decimal
In [ ]:
Copied!
import pytz
import pytz
In [ ]:
Copied!
from rustybt.algorithm import TradingAlgorithm
from rustybt.finance.decimal.commission import PerShareCommission
from rustybt.finance.decimal.slippage import FixedBasisPointsSlippage
from rustybt.live import LiveTradingEngine
from rustybt.live.brokers import PaperBroker
from rustybt.live.data_feed import SimulatedMarketDataFeed
from rustybt.live.scheduler import TradingScheduler
from rustybt.utils.run_algo import run_algorithm
from rustybt.algorithm import TradingAlgorithm
from rustybt.finance.decimal.commission import PerShareCommission
from rustybt.finance.decimal.slippage import FixedBasisPointsSlippage
from rustybt.live import LiveTradingEngine
from rustybt.live.brokers import PaperBroker
from rustybt.live.data_feed import SimulatedMarketDataFeed
from rustybt.live.scheduler import TradingScheduler
from rustybt.utils.run_algo import run_algorithm
In [ ]:
Copied!
class SimpleMomentumStrategy(TradingAlgorithm):
"""Simple SMA crossover momentum strategy for validation.
This strategy demonstrates the complete TradingAlgorithm lifecycle:
- initialize(): Set up strategy state and parameters
- handle_data(): Execute trading logic on each bar
- before_trading_start(): Optional daily pre-processing
The SAME strategy code runs in backtest and paper trading modes.
"""
def initialize(self, context):
"""Initialize strategy state and parameters."""
context.asset = self.symbol("SPY")
context.sma_fast = 10 # Fast moving average period
context.sma_slow = 30 # Slow moving average period
context.invested = False
# Track signals for validation
context.signals = []
self.log.info(
"Strategy initialized",
asset=context.asset.symbol,
sma_fast=context.sma_fast,
sma_slow=context.sma_slow,
)
def handle_data(self, context, data):
"""Execute trading logic on each bar."""
# Skip if insufficient history
if context.trading_day < context.sma_slow:
return
# Get historical prices
prices = data.history(context.asset, "close", context.sma_slow, "1d")
# Calculate moving averages
fast_mavg = prices[-context.sma_fast :].mean()
slow_mavg = prices.mean()
# Generate signal
current_price = data.current(context.asset, "close")
if fast_mavg > slow_mavg and not context.invested:
# Buy signal: Fast MA crosses above slow MA
target_shares = int(context.portfolio.cash / current_price * Decimal("0.95"))
if target_shares > 0:
self.order(context.asset, target_shares)
context.signals.append(
{
"timestamp": data.current_dt,
"signal": "BUY",
"shares": target_shares,
"price": current_price,
"fast_ma": fast_mavg,
"slow_ma": slow_mavg,
}
)
context.invested = True
self.log.info(
"Buy signal generated",
shares=target_shares,
price=current_price,
fast_ma=fast_mavg,
slow_ma=slow_mavg,
)
elif fast_mavg < slow_mavg and context.invested:
# Sell signal: Fast MA crosses below slow MA
position = context.portfolio.positions.get(context.asset)
if position and position.amount > 0:
self.order(context.asset, -position.amount)
context.signals.append(
{
"timestamp": data.current_dt,
"signal": "SELL",
"shares": -position.amount,
"price": current_price,
"fast_ma": fast_mavg,
"slow_ma": slow_mavg,
}
)
context.invested = False
self.log.info(
"Sell signal generated",
shares=-position.amount,
price=current_price,
fast_ma=fast_mavg,
slow_ma=slow_mavg,
)
def before_trading_start(self, context, data):
"""Optional: Daily pre-processing before market open."""
# Track trading day counter
if not hasattr(context, "trading_day"):
context.trading_day = 0
context.trading_day += 1
self.log.debug(
"Before trading start",
trading_day=context.trading_day,
)
class SimpleMomentumStrategy(TradingAlgorithm):
"""Simple SMA crossover momentum strategy for validation.
This strategy demonstrates the complete TradingAlgorithm lifecycle:
- initialize(): Set up strategy state and parameters
- handle_data(): Execute trading logic on each bar
- before_trading_start(): Optional daily pre-processing
The SAME strategy code runs in backtest and paper trading modes.
"""
def initialize(self, context):
"""Initialize strategy state and parameters."""
context.asset = self.symbol("SPY")
context.sma_fast = 10 # Fast moving average period
context.sma_slow = 30 # Slow moving average period
context.invested = False
# Track signals for validation
context.signals = []
self.log.info(
"Strategy initialized",
asset=context.asset.symbol,
sma_fast=context.sma_fast,
sma_slow=context.sma_slow,
)
def handle_data(self, context, data):
"""Execute trading logic on each bar."""
# Skip if insufficient history
if context.trading_day < context.sma_slow:
return
# Get historical prices
prices = data.history(context.asset, "close", context.sma_slow, "1d")
# Calculate moving averages
fast_mavg = prices[-context.sma_fast :].mean()
slow_mavg = prices.mean()
# Generate signal
current_price = data.current(context.asset, "close")
if fast_mavg > slow_mavg and not context.invested:
# Buy signal: Fast MA crosses above slow MA
target_shares = int(context.portfolio.cash / current_price * Decimal("0.95"))
if target_shares > 0:
self.order(context.asset, target_shares)
context.signals.append(
{
"timestamp": data.current_dt,
"signal": "BUY",
"shares": target_shares,
"price": current_price,
"fast_ma": fast_mavg,
"slow_ma": slow_mavg,
}
)
context.invested = True
self.log.info(
"Buy signal generated",
shares=target_shares,
price=current_price,
fast_ma=fast_mavg,
slow_ma=slow_mavg,
)
elif fast_mavg < slow_mavg and context.invested:
# Sell signal: Fast MA crosses below slow MA
position = context.portfolio.positions.get(context.asset)
if position and position.amount > 0:
self.order(context.asset, -position.amount)
context.signals.append(
{
"timestamp": data.current_dt,
"signal": "SELL",
"shares": -position.amount,
"price": current_price,
"fast_ma": fast_mavg,
"slow_ma": slow_mavg,
}
)
context.invested = False
self.log.info(
"Sell signal generated",
shares=-position.amount,
price=current_price,
fast_ma=fast_mavg,
slow_ma=slow_mavg,
)
def before_trading_start(self, context, data):
"""Optional: Daily pre-processing before market open."""
# Track trading day counter
if not hasattr(context, "trading_day"):
context.trading_day = 0
context.trading_day += 1
self.log.debug(
"Before trading start",
trading_day=context.trading_day,
)
In [ ]:
Copied!
def run_backtest_mode(start_date: datetime, end_date: datetime, capital_base: float):
"""Run strategy in backtest mode using run_algorithm()."""
print("=" * 80)
print("BACKTEST MODE (using run_algorithm)")
print("=" * 80)
print()
# Create strategy instance
strategy = SimpleMomentumStrategy()
# Configure execution models
commission_model = PerShareCommission(
rate=Decimal("0.005"), # $0.005 per share
minimum=Decimal("1.00"),
)
slippage_model = FixedBasisPointsSlippage(basis_points=Decimal("5")) # 5 bps = 0.05%
# Set models on strategy
strategy.set_commission(commission_model)
strategy.set_slippage(slippage_model)
# Run backtest
print(f"Running backtest from {start_date.date()} to {end_date.date()}")
print(f"Starting capital: ${capital_base:,.2f}")
print()
perf = run_algorithm(
start=start_date,
end=end_date,
initialize=strategy.initialize,
handle_data=strategy.handle_data,
before_trading_start=strategy.before_trading_start,
capital_base=capital_base,
data_frequency="daily",
bundle="quandl", # Using Quandl bundle for historical data
trading_calendar=None, # Use default XNYS calendar
metrics_set="default",
benchmark_returns=None,
)
print("Backtest completed successfully")
print(f"Final portfolio value: ${perf['portfolio_value'].iloc[-1]:,.2f}")
print(f"Total return: {(perf['portfolio_value'].iloc[-1] / capital_base - 1) * 100:+.2f}%")
print()
return perf
def run_backtest_mode(start_date: datetime, end_date: datetime, capital_base: float):
"""Run strategy in backtest mode using run_algorithm()."""
print("=" * 80)
print("BACKTEST MODE (using run_algorithm)")
print("=" * 80)
print()
# Create strategy instance
strategy = SimpleMomentumStrategy()
# Configure execution models
commission_model = PerShareCommission(
rate=Decimal("0.005"), # $0.005 per share
minimum=Decimal("1.00"),
)
slippage_model = FixedBasisPointsSlippage(basis_points=Decimal("5")) # 5 bps = 0.05%
# Set models on strategy
strategy.set_commission(commission_model)
strategy.set_slippage(slippage_model)
# Run backtest
print(f"Running backtest from {start_date.date()} to {end_date.date()}")
print(f"Starting capital: ${capital_base:,.2f}")
print()
perf = run_algorithm(
start=start_date,
end=end_date,
initialize=strategy.initialize,
handle_data=strategy.handle_data,
before_trading_start=strategy.before_trading_start,
capital_base=capital_base,
data_frequency="daily",
bundle="quandl", # Using Quandl bundle for historical data
trading_calendar=None, # Use default XNYS calendar
metrics_set="default",
benchmark_returns=None,
)
print("Backtest completed successfully")
print(f"Final portfolio value: ${perf['portfolio_value'].iloc[-1]:,.2f}")
print(f"Total return: {(perf['portfolio_value'].iloc[-1] / capital_base - 1) * 100:+.2f}%")
print()
return perf
In [ ]:
Copied!
async def run_paper_trading_mode(start_date: datetime, end_date: datetime, capital_base: Decimal):
"""Run strategy in paper trading mode using LiveTradingEngine."""
print("=" * 80)
print("PAPER TRADING MODE (using LiveTradingEngine)")
print("=" * 80)
print()
# Create strategy instance (same class as backtest!)
strategy = SimpleMomentumStrategy()
# Configure execution models (SAME as backtest)
commission_model = PerShareCommission(rate=Decimal("0.005"), minimum=Decimal("1.00"))
slippage_model = FixedBasisPointsSlippage(basis_points=Decimal("5"))
# Create PaperBroker with same models
broker = PaperBroker(
starting_cash=capital_base,
commission_model=commission_model,
slippage_model=slippage_model,
order_latency_ms=100, # Simulate realistic latency
)
# Create simulated data feed (replays historical data as real-time)
# This would normally be a real WebSocket feed in production
data_feed = SimulatedMarketDataFeed(
start_date=start_date,
end_date=end_date,
frequency="daily",
bundle="quandl",
)
# Create scheduler for daily execution
scheduler = TradingScheduler(
trading_calendar="XNYS",
timezone=pytz.UTC,
)
# Create LiveTradingEngine
engine = LiveTradingEngine(
strategy=strategy,
broker=broker,
data_feed=data_feed,
scheduler=scheduler,
shadow_mode=False, # Disable shadow for now (Story 6.12)
)
print(f"Running paper trading from {start_date.date()} to {end_date.date()}")
print(f"Starting capital: ${float(capital_base):,.2f}")
print()
# Run engine
await engine.run()
# Get final results
account_info = await broker.get_account_info()
print("Paper trading completed successfully")
print(f"Final portfolio value: ${float(account_info['portfolio_value']):,.2f}")
print(
f"Total return: {float((account_info['portfolio_value'] - capital_base) / capital_base * 100):+.2f}%"
)
print()
return engine, broker
async def run_paper_trading_mode(start_date: datetime, end_date: datetime, capital_base: Decimal):
"""Run strategy in paper trading mode using LiveTradingEngine."""
print("=" * 80)
print("PAPER TRADING MODE (using LiveTradingEngine)")
print("=" * 80)
print()
# Create strategy instance (same class as backtest!)
strategy = SimpleMomentumStrategy()
# Configure execution models (SAME as backtest)
commission_model = PerShareCommission(rate=Decimal("0.005"), minimum=Decimal("1.00"))
slippage_model = FixedBasisPointsSlippage(basis_points=Decimal("5"))
# Create PaperBroker with same models
broker = PaperBroker(
starting_cash=capital_base,
commission_model=commission_model,
slippage_model=slippage_model,
order_latency_ms=100, # Simulate realistic latency
)
# Create simulated data feed (replays historical data as real-time)
# This would normally be a real WebSocket feed in production
data_feed = SimulatedMarketDataFeed(
start_date=start_date,
end_date=end_date,
frequency="daily",
bundle="quandl",
)
# Create scheduler for daily execution
scheduler = TradingScheduler(
trading_calendar="XNYS",
timezone=pytz.UTC,
)
# Create LiveTradingEngine
engine = LiveTradingEngine(
strategy=strategy,
broker=broker,
data_feed=data_feed,
scheduler=scheduler,
shadow_mode=False, # Disable shadow for now (Story 6.12)
)
print(f"Running paper trading from {start_date.date()} to {end_date.date()}")
print(f"Starting capital: ${float(capital_base):,.2f}")
print()
# Run engine
await engine.run()
# Get final results
account_info = await broker.get_account_info()
print("Paper trading completed successfully")
print(f"Final portfolio value: ${float(account_info['portfolio_value']):,.2f}")
print(
f"Total return: {float((account_info['portfolio_value'] - capital_base) / capital_base * 100):+.2f}%"
)
print()
return engine, broker
In [ ]:
Copied!
def compare_results(backtest_perf, paper_broker):
"""Compare backtest vs paper trading results."""
print("=" * 80)
print("COMPARISON: Backtest vs Paper Trading")
print("=" * 80)
print()
# Extract final values
backtest_final_value = backtest_perf["portfolio_value"].iloc[-1]
paper_final_value = float(paper_broker.get_account_info()["portfolio_value"])
# Calculate correlation
difference = abs(backtest_final_value - paper_final_value)
difference_pct = (difference / backtest_final_value) * 100
correlation_pct = 100 - difference_pct
print(f"{'Metric':<25} {'Backtest':<20} {'Paper Trading':<20} {'Difference':<15}")
print("-" * 80)
print(
f"{'Final portfolio value':<25} ${backtest_final_value:>18,.2f} ${paper_final_value:>18,.2f} ${difference:>13,.2f}"
)
print(
f"{'Correlation':<25} {'100.00%':>20} {f'{correlation_pct:.2f}%':>20} {f'{difference_pct:.4f}%':>15}"
)
print()
# Validate >99% correlation
if correlation_pct > 99:
print("✅ VALIDATION PASSED: Correlation >99%")
print()
print("Story 6.7 AC9/AC10 Requirements Met:")
print(" ✓ Same TradingAlgorithm class runs in both modes")
print(" ✓ Used actual run_algorithm() function for backtest")
print(" ✓ Historical data fed through data portal")
print(
" ✓ Full strategy lifecycle validated (initialize, handle_data, before_trading_start)"
)
print(" ✓ Portfolio values match within 1%")
print(" ✓ Demonstrates >99% correlation")
print()
print("Strategy Reusability Guarantee: ✅ VALIDATED")
else:
print(f"❌ VALIDATION FAILED: Correlation {correlation_pct:.4f}% < 99%")
print(f"Difference: ${difference:.2f} ({difference_pct:.4f}%)")
print()
return correlation_pct > 99
def compare_results(backtest_perf, paper_broker):
"""Compare backtest vs paper trading results."""
print("=" * 80)
print("COMPARISON: Backtest vs Paper Trading")
print("=" * 80)
print()
# Extract final values
backtest_final_value = backtest_perf["portfolio_value"].iloc[-1]
paper_final_value = float(paper_broker.get_account_info()["portfolio_value"])
# Calculate correlation
difference = abs(backtest_final_value - paper_final_value)
difference_pct = (difference / backtest_final_value) * 100
correlation_pct = 100 - difference_pct
print(f"{'Metric':<25} {'Backtest':<20} {'Paper Trading':<20} {'Difference':<15}")
print("-" * 80)
print(
f"{'Final portfolio value':<25} ${backtest_final_value:>18,.2f} ${paper_final_value:>18,.2f} ${difference:>13,.2f}"
)
print(
f"{'Correlation':<25} {'100.00%':>20} {f'{correlation_pct:.2f}%':>20} {f'{difference_pct:.4f}%':>15}"
)
print()
# Validate >99% correlation
if correlation_pct > 99:
print("✅ VALIDATION PASSED: Correlation >99%")
print()
print("Story 6.7 AC9/AC10 Requirements Met:")
print(" ✓ Same TradingAlgorithm class runs in both modes")
print(" ✓ Used actual run_algorithm() function for backtest")
print(" ✓ Historical data fed through data portal")
print(
" ✓ Full strategy lifecycle validated (initialize, handle_data, before_trading_start)"
)
print(" ✓ Portfolio values match within 1%")
print(" ✓ Demonstrates >99% correlation")
print()
print("Strategy Reusability Guarantee: ✅ VALIDATED")
else:
print(f"❌ VALIDATION FAILED: Correlation {correlation_pct:.4f}% < 99%")
print(f"Difference: ${difference:.2f} ({difference_pct:.4f}%)")
print()
return correlation_pct > 99
In [ ]:
Copied!
async def main():
"""Run full validation comparing backtest vs paper trading."""
print("=" * 80)
print("FULL TRADINGALGORITHM INTEGRATION VALIDATION")
print("Story 6.7 AC9/AC10 - Deferred Work Completion")
print("=" * 80)
print()
print("This validation completes the deferred work from Story 6.7 by running")
print("the SAME TradingAlgorithm subclass in both backtest and paper trading modes.")
print()
# Test configuration
start_date = datetime(2024, 1, 1, tzinfo=pytz.UTC)
end_date = datetime(2024, 12, 31, tzinfo=pytz.UTC)
capital_base = 100000.0
# Run backtest
backtest_perf = run_backtest_mode(start_date, end_date, capital_base)
# Run paper trading
engine, broker = await run_paper_trading_mode(start_date, end_date, Decimal(str(capital_base)))
# Compare results
validation_passed = compare_results(backtest_perf, broker)
# Print summary
print("=" * 80)
print("VALIDATION SUMMARY")
print("=" * 80)
print()
if validation_passed:
print("✅ Full TradingAlgorithm integration validation: PASSED")
print()
print("What was validated:")
print(" • SimpleMomentumStrategy (SMA crossover) runs in both modes")
print(" • Backtest uses run_algorithm() (not simplified executor)")
print(" • Historical data fed through bundle data portal")
print(" • Full lifecycle: initialize() → before_trading_start() → handle_data()")
print(" • Same commission/slippage models in both modes")
print(" • Portfolio value correlation >99%")
print()
print("This validation can now be referenced in Story 6.12 for shadow trading.")
else:
print("❌ Full TradingAlgorithm integration validation: FAILED")
print()
print("Investigation required to determine discrepancy source.")
print()
print("=" * 80)
async def main():
"""Run full validation comparing backtest vs paper trading."""
print("=" * 80)
print("FULL TRADINGALGORITHM INTEGRATION VALIDATION")
print("Story 6.7 AC9/AC10 - Deferred Work Completion")
print("=" * 80)
print()
print("This validation completes the deferred work from Story 6.7 by running")
print("the SAME TradingAlgorithm subclass in both backtest and paper trading modes.")
print()
# Test configuration
start_date = datetime(2024, 1, 1, tzinfo=pytz.UTC)
end_date = datetime(2024, 12, 31, tzinfo=pytz.UTC)
capital_base = 100000.0
# Run backtest
backtest_perf = run_backtest_mode(start_date, end_date, capital_base)
# Run paper trading
engine, broker = await run_paper_trading_mode(start_date, end_date, Decimal(str(capital_base)))
# Compare results
validation_passed = compare_results(backtest_perf, broker)
# Print summary
print("=" * 80)
print("VALIDATION SUMMARY")
print("=" * 80)
print()
if validation_passed:
print("✅ Full TradingAlgorithm integration validation: PASSED")
print()
print("What was validated:")
print(" • SimpleMomentumStrategy (SMA crossover) runs in both modes")
print(" • Backtest uses run_algorithm() (not simplified executor)")
print(" • Historical data fed through bundle data portal")
print(" • Full lifecycle: initialize() → before_trading_start() → handle_data()")
print(" • Same commission/slippage models in both modes")
print(" • Portfolio value correlation >99%")
print()
print("This validation can now be referenced in Story 6.12 for shadow trading.")
else:
print("❌ Full TradingAlgorithm integration validation: FAILED")
print()
print("Investigation required to determine discrepancy source.")
print()
print("=" * 80)
In [ ]:
Copied!
if __name__ == "__main__":
asyncio.run(main())
if __name__ == "__main__":
asyncio.run(main())