logo
RustyBT Documentation
Pipeline API
Initializing search
    jerryinyang/rustybt
    • Home
    • Getting Started
    • User Guides
    • Migration Guides
    • Examples & Tutorials
    • API Reference
    • About
    jerryinyang/rustybt
    • Home
      • Installation
      • Quick Start
      • Configuration
      • Decimal Precision
        • Backtest Output Organization
        • Strategy Code Capture
        • Cash Validation
        • CSV Data Import
        • Data Ingestion
        • Databento Data Import
        • Data Validation
        • Creating Data Adapters
        • Migrating to Unified Data
        • Caching System
        • Caching Guide
        • Broker Setup
        • Testnet Setup
        • Live vs Backtest Data
        • WebSocket Streaming
        • Type Hinting
        • Exception Handling
        • Execution Methods
        • Pipeline API
        • Advanced Pipeline Techniques
        • Multi-Strategy Portfolio
        • Deployment Guide
        • Production Checklist
        • Audit Logging
        • Troubleshooting
      • Cash Validation Migration
      • Overview
        • Overview
          • Crypto Backtesting with CCXT Data Adapter
          • Equity Backtesting with YFinance Data Adapter
          • Getting Started with RustyBT
          • Data Ingestion with RustyBT
          • Strategy Development with RustyBT
          • Performance Analysis
          • Strategy Optimization
          • Walk-Forward Optimization
          • Risk Analytics
          • Portfolio Construction (Single-Strategy Multi-Asset)
          • 09. Multi-Strategy Portfolio
          • Live Paper Trading
          • Complete Workflow: Data → Backtest → Analysis → Optimization
          • CCXT Data Ingestion
          • YFinance Data Ingestion
          • Custom Data Adapter
          • Backtest with Cache
          • Full Validation (Backtest & Paper)
          • Cache Warming
          • Generate Backtest Report
          • Live Trading (Simple)
          • Live Trading (Advanced)
          • Paper Trading (Simple)
          • Paper Trading Validation
          • Shadow Trading (Simple)
          • Shadow Trading Dashboard
          • Portfolio Allocator Tutorial
          • Allocation Algorithms
          • Attribution Analysis
          • Slippage Models
          • Borrow Costs
          • Overnight Financing
          • High-Frequency Custom Triggers
          • Latency Simulation
          • Pipeline API
          • WebSocket Streaming
          • Custom Broker Adapter
          • Grid Search MA Crossover
          • Random Search vs Grid
          • Bayesian Optimization (5 Params)
          • Parallel Optimization
          • Walk-Forward Analysis
      • Overview & Interactive Docs
        • Overview
        • Asset Finder
          • Overview
          • Selection Guide
          • Base Adapter
          • CCXT
          • YFinance
          • CSV
          • Polygon
          • Alpaca
          • AlphaVantage
          • Overview
          • Architecture
          • Catalog API
          • Bundle System
          • Metadata Tracking
          • Migration Guide
          • Overview
          • Data Portal
          • Polars Data Portal
          • Bar Reader
          • Daily Bars
          • Overview
          • Providers
          • Storage
          • Converters
          • FX & Caching
          • Caching
          • Optimization
          • Troubleshooting
          • Overview
          • Computation API
        • Overview
          • Overview
          • Types Reference
          • Blotter
          • Blotter System
          • Decimal Blotter
          • Execution Pipeline
          • Latency Models
          • Partial Fills
          • Order Status Tracking
          • Slippage
          • Slippage Models
          • Commissions
          • Commission Models
          • Borrow Costs & Financing
          • Order Lifecycle
          • Examples
        • Overview
          • Allocation Algorithms
          • Multi-Strategy Allocation
          • Portfolio Allocator
          • Allocators
          • Risk Management
          • Risk Metrics
          • Position Limits
          • Performance Tracking
          • Metrics
          • Order Aggregation
          • Analytics Suite
        • Overview
          • Parameter Spaces
          • Objective Functions
          • Grid Search
          • Random Search
          • Bayesian
          • Genetic
          • Overview
          • Monte Carlo
          • Noise Infusion
          • Sensitivity Analysis
        • Overview
        • Artifact Manager
        • Code Capture
        • Overview
        • Reports
        • Visualization
          • Overview
          • Overview
          • Metrics
          • VaR & CVaR
          • Drawdown
          • Overview
        • Overview
        • Production Deployment
          • Circuit Breakers
        • Overview
        • Datasource API
        • Optimization API
        • Analytics API
      • License
      • Contributing
      • Changelog
    In [ ]:
    Copied!
    #
    # Copyright 2025 RustyBT Contributors
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    """
    Example: Pipeline API Tutorial
    
    This example demonstrates how to use the Pipeline API for factor-based
    trading strategies and quantitative research.
    
    Key Concepts Demonstrated:
    - Creating custom factors
    - Building pipelines with filters
    - Screening assets based on factors
    - Rebalancing based on rankings
    - Statistical arbitrage strategies
    
    Usage:
        python examples/pipeline_tutorial.py
    
    Note: Pipeline API is an advanced feature primarily for factor-based strategies.
          For simple strategies, use the standard TradingAlgorithm API.
    """
    
    # # Copyright 2025 RustyBT Contributors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Example: Pipeline API Tutorial This example demonstrates how to use the Pipeline API for factor-based trading strategies and quantitative research. Key Concepts Demonstrated: - Creating custom factors - Building pipelines with filters - Screening assets based on factors - Rebalancing based on rankings - Statistical arbitrage strategies Usage: python examples/pipeline_tutorial.py Note: Pipeline API is an advanced feature primarily for factor-based strategies. For simple strategies, use the standard TradingAlgorithm API. """
    In [ ]:
    Copied!
    from rustybt import TradingAlgorithm
    from rustybt.pipeline import CustomFactor, Pipeline
    from rustybt.pipeline.data import USEquityPricing
    from rustybt.pipeline.factors import RSI, Returns, SimpleMovingAverage
    
    from rustybt import TradingAlgorithm from rustybt.pipeline import CustomFactor, Pipeline from rustybt.pipeline.data import USEquityPricing from rustybt.pipeline.factors import RSI, Returns, SimpleMovingAverage
    In [ ]:
    Copied!
    print("=" * 70)
    print("Pipeline API Tutorial")
    print("=" * 70)
    print("\nThe Pipeline API allows you to define factor-based strategies")
    print("that screen and rank assets based on computed factors.")
    
    print("=" * 70) print("Pipeline API Tutorial") print("=" * 70) print("\nThe Pipeline API allows you to define factor-based strategies") print("that screen and rank assets based on computed factors.")

    ============================================================================ Example 1: Basic Pipeline with Built-in Factors¶

    In [ ]:
    Copied!
    def example_1_basic_pipeline():
        """Example 1: Basic pipeline with moving averages and RSI."""
        print("\n" + "=" * 70)
        print("Example 1: Basic Pipeline - Moving Averages & RSI")
        print("=" * 70)
    
        # Define a pipeline
        def make_pipeline():
            # Get closing prices
            close = USEquityPricing.close.latest
    
            # Compute factors
            sma_50 = SimpleMovingAverage(inputs=[close], window_length=50)
            sma_200 = SimpleMovingAverage(inputs=[close], window_length=200)
            rsi_14 = RSI(inputs=[close], window_length=14)
    
            # Create filters
            price_above_sma50 = close > sma_50
            uptrend = sma_50 > sma_200
            oversold = rsi_14 < 30
    
            # Combine filters (bullish stocks that are oversold)
            screen = price_above_sma50 & uptrend & oversold
    
            # Build pipeline
            return Pipeline(
                columns={
                    "close": close,
                    "sma_50": sma_50,
                    "sma_200": sma_200,
                    "rsi_14": rsi_14,
                },
                screen=screen,
            )
    
        print("\n✓ Pipeline defined")
        print("\nFactors:")
        print("  - SMA(50): 50-day simple moving average")
        print("  - SMA(200): 200-day simple moving average")
        print("  - RSI(14): 14-day Relative Strength Index")
        print("\nScreen:")
        print("  - Price > SMA(50)")
        print("  - SMA(50) > SMA(200) (golden cross)")
        print("  - RSI < 30 (oversold)")
    
    def example_1_basic_pipeline(): """Example 1: Basic pipeline with moving averages and RSI.""" print("\n" + "=" * 70) print("Example 1: Basic Pipeline - Moving Averages & RSI") print("=" * 70) # Define a pipeline def make_pipeline(): # Get closing prices close = USEquityPricing.close.latest # Compute factors sma_50 = SimpleMovingAverage(inputs=[close], window_length=50) sma_200 = SimpleMovingAverage(inputs=[close], window_length=200) rsi_14 = RSI(inputs=[close], window_length=14) # Create filters price_above_sma50 = close > sma_50 uptrend = sma_50 > sma_200 oversold = rsi_14 < 30 # Combine filters (bullish stocks that are oversold) screen = price_above_sma50 & uptrend & oversold # Build pipeline return Pipeline( columns={ "close": close, "sma_50": sma_50, "sma_200": sma_200, "rsi_14": rsi_14, }, screen=screen, ) print("\n✓ Pipeline defined") print("\nFactors:") print(" - SMA(50): 50-day simple moving average") print(" - SMA(200): 200-day simple moving average") print(" - RSI(14): 14-day Relative Strength Index") print("\nScreen:") print(" - Price > SMA(50)") print(" - SMA(50) > SMA(200) (golden cross)") print(" - RSI < 30 (oversold)")

    ============================================================================ Example 2: Custom Factor¶

    In [ ]:
    Copied!
    def example_2_custom_factor():
        """Example 2: Creating a custom factor."""
        print("\n" + "=" * 70)
        print("Example 2: Custom Factor - Mean Reversion Score")
        print("=" * 70)
    
        class MeanReversionScore(CustomFactor):
            """Custom factor: Mean reversion score.
    
            Measures how far price is from its N-day average,
            normalized by volatility (z-score).
            """
    
            inputs = [USEquityPricing.close]
            window_length = 20
    
            def compute(self, today, assets, out, close):
                """Compute mean reversion score.
    
                Args:
                    today: Current date
                    assets: Assets being computed
                    out: Output array to fill
                    close: Close price array (window_length x num_assets)
                """
                # Calculate rolling mean and std
                mean = close.mean(axis=0)
                std = close.std(axis=0)
    
                # Calculate z-score (current price vs mean)
                current_price = close[-1]
                z_score = (current_price - mean) / std
    
                # Return negative z-score (more negative = more oversold = higher score)
                out[:] = -z_score
    
        # Use custom factor in pipeline
        def make_pipeline():
            close = USEquityPricing.close.latest
            mean_reversion = MeanReversionScore()
    
            # Buy top decile (most oversold)
            top_oversold = mean_reversion.top(10, mask=close > 10)
    
            return Pipeline(
                columns={
                    "close": close,
                    "mean_reversion_score": mean_reversion,
                },
                screen=top_oversold,
            )
    
        print("\n✓ Custom factor created")
        print("\nMean Reversion Score:")
        print("  - Measures price deviation from 20-day average")
        print("  - Normalized by volatility (z-score)")
        print("  - Higher score = more oversold = better buy opportunity")
    
    def example_2_custom_factor(): """Example 2: Creating a custom factor.""" print("\n" + "=" * 70) print("Example 2: Custom Factor - Mean Reversion Score") print("=" * 70) class MeanReversionScore(CustomFactor): """Custom factor: Mean reversion score. Measures how far price is from its N-day average, normalized by volatility (z-score). """ inputs = [USEquityPricing.close] window_length = 20 def compute(self, today, assets, out, close): """Compute mean reversion score. Args: today: Current date assets: Assets being computed out: Output array to fill close: Close price array (window_length x num_assets) """ # Calculate rolling mean and std mean = close.mean(axis=0) std = close.std(axis=0) # Calculate z-score (current price vs mean) current_price = close[-1] z_score = (current_price - mean) / std # Return negative z-score (more negative = more oversold = higher score) out[:] = -z_score # Use custom factor in pipeline def make_pipeline(): close = USEquityPricing.close.latest mean_reversion = MeanReversionScore() # Buy top decile (most oversold) top_oversold = mean_reversion.top(10, mask=close > 10) return Pipeline( columns={ "close": close, "mean_reversion_score": mean_reversion, }, screen=top_oversold, ) print("\n✓ Custom factor created") print("\nMean Reversion Score:") print(" - Measures price deviation from 20-day average") print(" - Normalized by volatility (z-score)") print(" - Higher score = more oversold = better buy opportunity")

    ============================================================================ Example 3: Pipeline in Trading Algorithm¶

    In [ ]:
    Copied!
    def example_3_pipeline_in_algorithm():
        """Example 3: Using pipeline in a trading algorithm."""
        print("\n" + "=" * 70)
        print("Example 3: Pipeline-Based Trading Strategy")
        print("=" * 70)
    
        class MomentumPipelineStrategy(TradingAlgorithm):
            """Strategy that uses pipeline for stock selection."""
    
            def initialize(self):
                """Initialize strategy and attach pipeline."""
                # Create pipeline
                pipe = self.make_pipeline()
    
                # Attach pipeline to algorithm
                self.attach_pipeline(pipe, "momentum_screen")
    
                # Schedule rebalance
                self.schedule_function(
                    self.rebalance,
                    date_rule=self.date_rules.month_start(),
                    time_rule=self.time_rules.market_open(),
                )
    
            def make_pipeline(self):
                """Define screening pipeline."""
                close = USEquityPricing.close.latest
    
                # Momentum factors
                returns_1m = Returns(window_length=21)
                returns_3m = Returns(window_length=63)
                returns_6m = Returns(window_length=126)
    
                # Combined momentum score (equal weight)
                momentum_score = (returns_1m + returns_3m + returns_6m) / 3
    
                # Universe filter (liquid stocks)
                volume = USEquityPricing.volume.latest
                dollar_volume = close * volume
                liquid = dollar_volume.top(500)
    
                # Select top 50 momentum stocks
                top_momentum = momentum_score.top(50, mask=liquid)
    
                return Pipeline(
                    columns={
                        "close": close,
                        "momentum_score": momentum_score,
                        "returns_1m": returns_1m,
                        "returns_3m": returns_3m,
                        "returns_6m": returns_6m,
                    },
                    screen=top_momentum,
                )
    
            def rebalance(self, context, data):
                """Rebalance portfolio based on pipeline output."""
                # Get pipeline output
                pipeline_output = self.pipeline_output("momentum_screen")
    
                # Get current positions
                current_positions = set(context.portfolio.positions.keys())
    
                # Get target positions from pipeline
                target_positions = set(pipeline_output.index)
    
                # Close positions no longer in screen
                for asset in current_positions - target_positions:
                    self.order_target_percent(asset, 0)
    
                # Equal weight new positions
                if len(target_positions) > 0:
                    target_weight = 1.0 / len(target_positions)
    
                    for asset in target_positions:
                        self.order_target_percent(asset, target_weight)
    
        print("\n✓ Pipeline-based strategy defined")
        print("\nStrategy Logic:")
        print("  1. Screen universe to top 500 liquid stocks")
        print("  2. Calculate momentum score (1m, 3m, 6m returns)")
        print("  3. Select top 50 momentum stocks")
        print("  4. Rebalance monthly to equal-weight portfolio")
        print("\nThis is a typical quantitative momentum strategy!")
    
    def example_3_pipeline_in_algorithm(): """Example 3: Using pipeline in a trading algorithm.""" print("\n" + "=" * 70) print("Example 3: Pipeline-Based Trading Strategy") print("=" * 70) class MomentumPipelineStrategy(TradingAlgorithm): """Strategy that uses pipeline for stock selection.""" def initialize(self): """Initialize strategy and attach pipeline.""" # Create pipeline pipe = self.make_pipeline() # Attach pipeline to algorithm self.attach_pipeline(pipe, "momentum_screen") # Schedule rebalance self.schedule_function( self.rebalance, date_rule=self.date_rules.month_start(), time_rule=self.time_rules.market_open(), ) def make_pipeline(self): """Define screening pipeline.""" close = USEquityPricing.close.latest # Momentum factors returns_1m = Returns(window_length=21) returns_3m = Returns(window_length=63) returns_6m = Returns(window_length=126) # Combined momentum score (equal weight) momentum_score = (returns_1m + returns_3m + returns_6m) / 3 # Universe filter (liquid stocks) volume = USEquityPricing.volume.latest dollar_volume = close * volume liquid = dollar_volume.top(500) # Select top 50 momentum stocks top_momentum = momentum_score.top(50, mask=liquid) return Pipeline( columns={ "close": close, "momentum_score": momentum_score, "returns_1m": returns_1m, "returns_3m": returns_3m, "returns_6m": returns_6m, }, screen=top_momentum, ) def rebalance(self, context, data): """Rebalance portfolio based on pipeline output.""" # Get pipeline output pipeline_output = self.pipeline_output("momentum_screen") # Get current positions current_positions = set(context.portfolio.positions.keys()) # Get target positions from pipeline target_positions = set(pipeline_output.index) # Close positions no longer in screen for asset in current_positions - target_positions: self.order_target_percent(asset, 0) # Equal weight new positions if len(target_positions) > 0: target_weight = 1.0 / len(target_positions) for asset in target_positions: self.order_target_percent(asset, target_weight) print("\n✓ Pipeline-based strategy defined") print("\nStrategy Logic:") print(" 1. Screen universe to top 500 liquid stocks") print(" 2. Calculate momentum score (1m, 3m, 6m returns)") print(" 3. Select top 50 momentum stocks") print(" 4. Rebalance monthly to equal-weight portfolio") print("\nThis is a typical quantitative momentum strategy!")

    ============================================================================ Example 4: Advanced Filters and Classifiers¶

    In [ ]:
    Copied!
    def example_4_advanced_filters():
        """Example 4: Advanced filters and classifiers."""
        print("\n" + "=" * 70)
        print("Example 4: Advanced Filters - Sector Neutral Strategy")
        print("=" * 70)
    
        def make_pipeline():
            close = USEquityPricing.close.latest
            volume = USEquityPricing.volume.latest
    
            # Momentum factor
            returns_3m = Returns(window_length=63)
    
            # Universe: liquid stocks
            dollar_volume = close * volume
            liquid = dollar_volume.percentile_between(80, 100)
    
            # Sector classifier (would need sector data)
            # sector = Sector()
    
            # Select top 3 momentum stocks per sector
            # This creates a sector-neutral portfolio
            # top_per_sector = returns_3m.top(3, groupby=sector, mask=liquid)
    
            # For this example, just use top momentum overall
            top_momentum = returns_3m.top(20, mask=liquid)
    
            return Pipeline(
                columns={
                    "close": close,
                    "returns_3m": returns_3m,
                    # 'sector': sector,
                },
                screen=top_momentum,
            )
    
        print("\n✓ Advanced pipeline defined")
        print("\nAdvanced Features:")
        print("  - Percentile filters (top 20% by dollar volume)")
        print("  - Sector classification (requires sector data)")
        print("  - Groupby operations (sector-neutral selection)")
        print("  - Multiple filter combinations")
    
    def example_4_advanced_filters(): """Example 4: Advanced filters and classifiers.""" print("\n" + "=" * 70) print("Example 4: Advanced Filters - Sector Neutral Strategy") print("=" * 70) def make_pipeline(): close = USEquityPricing.close.latest volume = USEquityPricing.volume.latest # Momentum factor returns_3m = Returns(window_length=63) # Universe: liquid stocks dollar_volume = close * volume liquid = dollar_volume.percentile_between(80, 100) # Sector classifier (would need sector data) # sector = Sector() # Select top 3 momentum stocks per sector # This creates a sector-neutral portfolio # top_per_sector = returns_3m.top(3, groupby=sector, mask=liquid) # For this example, just use top momentum overall top_momentum = returns_3m.top(20, mask=liquid) return Pipeline( columns={ "close": close, "returns_3m": returns_3m, # 'sector': sector, }, screen=top_momentum, ) print("\n✓ Advanced pipeline defined") print("\nAdvanced Features:") print(" - Percentile filters (top 20% by dollar volume)") print(" - Sector classification (requires sector data)") print(" - Groupby operations (sector-neutral selection)") print(" - Multiple filter combinations")

    ============================================================================ Example 5: Pipeline Performance Tips¶

    In [ ]:
    Copied!
    def example_5_performance_tips():
        """Example 5: Pipeline performance optimization."""
        print("\n" + "=" * 70)
        print("Example 5: Pipeline Performance Tips")
        print("=" * 70)
    
        print("\n💡 Performance Optimization Tips:")
        print("\n1. Universe Reduction:")
        print("   - Filter to tradable universe early (e.g., top 1000 by volume)")
        print("   - Reduces computation for subsequent factors")
    
        print("\n2. Factor Caching:")
        print("   - Factors are computed once per day and cached")
        print("   - Reuse factors across multiple pipelines")
    
        print("\n3. Window Length:")
        print("   - Longer windows = more memory usage")
        print("   - Use minimum required window length")
    
        print("\n4. Custom Factors:")
        print("   - Implement compute() efficiently")
        print("   - Use NumPy vectorized operations")
        print("   - Avoid Python loops over assets")
    
        print("\n5. Polars Integration:")
        print("   - RustyBT uses Polars for data engine")
        print("   - Factors automatically benefit from Polars performance")
        print("   - 5-10x faster than pandas-based implementations")
    
    def example_5_performance_tips(): """Example 5: Pipeline performance optimization.""" print("\n" + "=" * 70) print("Example 5: Pipeline Performance Tips") print("=" * 70) print("\n💡 Performance Optimization Tips:") print("\n1. Universe Reduction:") print(" - Filter to tradable universe early (e.g., top 1000 by volume)") print(" - Reduces computation for subsequent factors") print("\n2. Factor Caching:") print(" - Factors are computed once per day and cached") print(" - Reuse factors across multiple pipelines") print("\n3. Window Length:") print(" - Longer windows = more memory usage") print(" - Use minimum required window length") print("\n4. Custom Factors:") print(" - Implement compute() efficiently") print(" - Use NumPy vectorized operations") print(" - Avoid Python loops over assets") print("\n5. Polars Integration:") print(" - RustyBT uses Polars for data engine") print(" - Factors automatically benefit from Polars performance") print(" - 5-10x faster than pandas-based implementations")

    ============================================================================ Example 6: Pipeline Debugging¶

    In [ ]:
    Copied!
    def example_6_debugging():
        """Example 6: Pipeline debugging techniques."""
        print("\n" + "=" * 70)
        print("Example 6: Pipeline Debugging")
        print("=" * 70)
    
        print("\n🔧 Debugging Techniques:")
    
        print("\n1. Inspect Pipeline Output:")
        print("   pipeline_output = algo.pipeline_output('my_pipeline')")
        print("   print(pipeline_output.head())")
    
        print("\n2. Check Factor Values:")
        print("   print(pipeline_output['momentum_score'].describe())")
        print("   print(pipeline_output['momentum_score'].hist())")
    
        print("\n3. Validate Screen:")
        print("   screen_count = len(pipeline_output)")
        print("   print(f'Screen passed: {screen_count} assets')")
    
        print("\n4. Test Factors Independently:")
        print("   # Run pipeline without screen to see all factor values")
        print("   pipe = Pipeline(columns={'factor': my_factor})")
    
        print("\n5. Compare to Benchmark:")
        print("   # Compare your factor to known factors (e.g., momentum)")
        print("   correlation = pipe_output['my_factor'].corr(pipe_output['momentum'])")
    
    def example_6_debugging(): """Example 6: Pipeline debugging techniques.""" print("\n" + "=" * 70) print("Example 6: Pipeline Debugging") print("=" * 70) print("\n🔧 Debugging Techniques:") print("\n1. Inspect Pipeline Output:") print(" pipeline_output = algo.pipeline_output('my_pipeline')") print(" print(pipeline_output.head())") print("\n2. Check Factor Values:") print(" print(pipeline_output['momentum_score'].describe())") print(" print(pipeline_output['momentum_score'].hist())") print("\n3. Validate Screen:") print(" screen_count = len(pipeline_output)") print(" print(f'Screen passed: {screen_count} assets')") print("\n4. Test Factors Independently:") print(" # Run pipeline without screen to see all factor values") print(" pipe = Pipeline(columns={'factor': my_factor})") print("\n5. Compare to Benchmark:") print(" # Compare your factor to known factors (e.g., momentum)") print(" correlation = pipe_output['my_factor'].corr(pipe_output['momentum'])")

    ============================================================================ Run All Examples¶

    In [ ]:
    Copied!
    def main():
        """Run all pipeline tutorial examples."""
        try:
            example_1_basic_pipeline()
            example_2_custom_factor()
            example_3_pipeline_in_algorithm()
            example_4_advanced_filters()
            example_5_performance_tips()
            example_6_debugging()
    
            print("\n" + "=" * 70)
            print("✨ Pipeline Tutorial Complete!")
            print("=" * 70)
    
            print("\n📚 Key Takeaways:")
            print("  1. Pipelines enable factor-based strategies")
            print("  2. Built-in factors: SMA, RSI, Returns, etc.")
            print("  3. Custom factors: Inherit from CustomFactor")
            print("  4. Filters and screens: Select tradable universe")
            print("  5. Integration: Use with TradingAlgorithm")
    
            print("\n🎯 When to Use Pipeline:")
            print("  ✓ Factor-based strategies (momentum, value, quality)")
            print("  ✓ Statistical arbitrage")
            print("  ✓ Quantitative research and backtesting")
            print("  ✓ Multi-asset screening and ranking")
            print("  ✗ Simple technical indicator strategies (use regular API)")
    
            print("\n📖 Next Steps:")
            print("  1. Read: docs/guides/pipeline-api-guide.md")
            print("  2. Try: Implement your own custom factor")
            print("  3. Backtest: Run a momentum strategy with pipeline")
            print("  4. Research: Use pipeline for factor research")
    
        except Exception as e:
            print(f"\n❌ Error: {e}")
            import traceback
    
            traceback.print_exc()
    
    def main(): """Run all pipeline tutorial examples.""" try: example_1_basic_pipeline() example_2_custom_factor() example_3_pipeline_in_algorithm() example_4_advanced_filters() example_5_performance_tips() example_6_debugging() print("\n" + "=" * 70) print("✨ Pipeline Tutorial Complete!") print("=" * 70) print("\n📚 Key Takeaways:") print(" 1. Pipelines enable factor-based strategies") print(" 2. Built-in factors: SMA, RSI, Returns, etc.") print(" 3. Custom factors: Inherit from CustomFactor") print(" 4. Filters and screens: Select tradable universe") print(" 5. Integration: Use with TradingAlgorithm") print("\n🎯 When to Use Pipeline:") print(" ✓ Factor-based strategies (momentum, value, quality)") print(" ✓ Statistical arbitrage") print(" ✓ Quantitative research and backtesting") print(" ✓ Multi-asset screening and ranking") print(" ✗ Simple technical indicator strategies (use regular API)") print("\n📖 Next Steps:") print(" 1. Read: docs/guides/pipeline-api-guide.md") print(" 2. Try: Implement your own custom factor") print(" 3. Backtest: Run a momentum strategy with pipeline") print(" 4. Research: Use pipeline for factor research") except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc()
    In [ ]:
    Copied!
    if __name__ == "__main__":
        main()
    
    if __name__ == "__main__": main()
    Previous
    Latency Simulation
    Next
    WebSocket Streaming
    Made with Material for MkDocs