Zipline Pipeline API¶
The Zipline Pipeline API provides a declarative framework for defining cross-sectional computations across assets. It enables strategy logic to be expressed as composable Factors, Filters, and Classifiers.
Note: This document covers the Computation Pipeline (strategy calculations). For data ingestion pipelines, see Data Pipeline System.
Overview¶
The Pipeline API separates what to compute (factor definitions) from how to compute it (execution engine). This enables:
- Declarative strategy logic - Express computations as factor graphs
- Automatic optimization - Engine optimizes execution plans
- Backtesting integration - Seamlessly integrates with TradingAlgorithm
- Reusable components - Build libraries of factors
Key Concepts¶
| Concept | Purpose | Example |
|---|---|---|
| Factor | Numerical computation | Moving average, RSI, returns |
| Filter | Boolean screening | Top 500 by volume, Price > $10 |
| Classifier | Categorical grouping | Sector, exchange, market cap tier |
| Pipeline | Collection of computations | Daily alpha signal generator |
Architecture¶
Execution Flow¶
Strategy Definition (Python)
↓
Factor Graph Construction
↓
Execution Plan Optimization
↓
Data Loading (via DataPortal)
↓
Computation (vectorized)
↓
Results DataFrame
Component Hierarchy¶
Pipeline
├── Columns (named Factors/Filters)
├── Screen (optional Filter)
└── Domain (universe definition)
Factor (Numerical)
├── Built-in (SimpleMovingAverage, Returns)
├── Technical (RSI, Bollinger Bands)
├── Statistical (Zscore, Correlation)
└── Custom (user-defined)
Filter (Boolean)
├── Comparison (>, <, ==)
├── Logical (AND, OR, NOT)
└── Statistical (Top/Bottom N)
Factors¶
Factors compute numerical values for each asset. They are the building blocks of quantitative strategies.
Built-in Factors¶
Latest Price¶
from rustybt.pipeline.factors import Latest
from rustybt.pipeline.data import USEquityPricing
# Get latest closing price
close_price = USEquityPricing.close.latest
Returns¶
from rustybt.pipeline.factors import Returns
# 1-day returns
returns_1d = Returns(window_length=2)
# 20-day returns
returns_20d = Returns(window_length=21)
Simple Moving Average¶
from rustybt.pipeline.factors import SimpleMovingAverage
# 50-day SMA
sma_50 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=50
)
# 200-day SMA
sma_200 = SimpleMovingAverage(
inputs=[USEquityPricing.close],
window_length=200
)
Technical Indicators¶
RSI (Relative Strength Index)¶
Bollinger Bands¶
from rustybt.pipeline.factors import BollingerBands
# 20-day Bollinger Bands
bb = BollingerBands(window_length=20)
bb_upper = bb.upper
bb_lower = bb.lower
bb_pct = bb.percent # Position within bands (0-1)
VWAP (Volume-Weighted Average Price)¶
Statistical Factors¶
Z-Score¶
# Note: Advanced statistical factors like Zscore are planned for future releases
# For now, use custom factors with zscore calculations
Linear Regression¶
# Note: LinearRegression factor is planned for future releases
# For now, use custom factors for regression analysis
Decimal-Aware Factors¶
RustyBT provides Decimal-precision factors for financial-grade calculations:
from rustybt.pipeline.factors.decimal_factors import (
DecimalLatestPrice,
DecimalSimpleMovingAverage,
)
# Latest price (Decimal)
latest_price = DecimalLatestPrice()
# SMA with Decimal precision
sma_decimal = DecimalSimpleMovingAverage(window_length=20)
Custom Factors¶
Create custom factors by subclassing CustomFactor:
from rustybt.pipeline import CustomFactor
import numpy as np
class MeanReversionScore(CustomFactor):
"""Calculate mean reversion signal."""
inputs = [USEquityPricing.close]
window_length = 20
def compute(self, today, assets, out, close):
# Calculate z-score
mean = np.mean(close, axis=0)
std = np.std(close, axis=0)
current = close[-1]
out[:] = -(current - mean) / std # Negative for mean reversion
Usage:
Filters¶
Filters produce boolean values for screening assets.
Comparison Filters¶
# Price filters
cheap = USEquityPricing.close.latest < 50
expensive = USEquityPricing.close.latest > 100
# Volume filters
liquid = USEquityPricing.volume.latest > 1_000_000
illiquid = USEquityPricing.volume.latest < 100_000
# Factor filters
rsi = RSI(window_length=14)
oversold = rsi < 30
overbought = rsi > 70
Logical Combinators¶
# AND
tradeable = liquid & (USEquityPricing.close.latest > 5)
# OR
extreme = oversold | overbought
# NOT
not_penny = ~(USEquityPricing.close.latest < 5)
Statistical Filters¶
Top/Bottom N¶
# Top 100 by dollar volume
dollar_volume = USEquityPricing.close.latest * USEquityPricing.volume.latest
top_100 = dollar_volume.top(100)
# Bottom 50 by market cap
bottom_50_mcap = market_cap.bottom(50)
Percentile Filters¶
# Top 10% by returns
returns_1d = Returns(window_length=2)
top_decile = returns_1d.percentile_between(90, 100)
# Middle 50% by volatility
volatility = Returns(window_length=2).stddev(window_length=252)
middle_half = volatility.percentile_between(25, 75)
Custom Filters¶
from rustybt.pipeline import CustomFilter
class VolatilityFilter(CustomFilter):
"""Select stocks with volatility in specified range."""
inputs = [Returns(window_length=2)]
window_length = 252
def __init__(self, min_vol, max_vol):
self.min_vol = min_vol
self.max_vol = max_vol
def compute(self, today, assets, out, returns):
volatility = np.std(returns, axis=0) * np.sqrt(252)
out[:] = (volatility >= self.min_vol) & (volatility <= self.max_vol)
Pipeline Construction¶
Basic Pipeline¶
from rustybt.pipeline import Pipeline
# Define computations
close = USEquityPricing.close.latest
volume = USEquityPricing.volume.latest
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
# Create pipeline
pipeline = Pipeline(
columns={
'close': close,
'volume': volume,
'sma_20': sma_20,
'sma_50': sma_50,
}
)
Adding a Screen¶
# Define screen
universe = (
(USEquityPricing.close.latest > 5) &
(USEquityPricing.volume.latest > 1_000_000)
)
# Create pipeline with screen
pipeline = Pipeline(
columns={
'close': close,
'sma_20': sma_20,
},
screen=universe
)
Integration with TradingAlgorithm¶
from rustybt.algorithm import TradingAlgorithm
class MyStrategy(TradingAlgorithm):
def initialize(self, context):
# Define pipeline
pipe = Pipeline(
columns={
'sentiment': sentiment_score,
'returns_20d': Returns(window_length=21),
},
screen=liquid_universe
)
# Attach pipeline
self.attach_pipeline(pipe, 'my_pipeline')
def before_trading_start(self, context, data):
# Get pipeline output
output = self.pipeline_output('my_pipeline')
# Use results
top_sentiment = output.nlargest(10, 'sentiment')
context.longs = top_sentiment.index
Expressions and Operators¶
Factors and Filters support mathematical and logical operations:
Arithmetic Operations¶
close = USEquityPricing.close.latest
volume = USEquityPricing.volume.latest
# Addition
total_price = close + 10
# Subtraction
price_delta = close - sma_20
# Multiplication
dollar_volume = close * volume
# Division
price_ratio = close / sma_50
# Power
price_squared = close ** 2
Comparison Operations¶
# Greater than
above_sma = close > sma_20
# Less than
below_sma = close < sma_20
# Equal / Not equal
at_target = close == 100
not_at_target = close != 100
# Greater/Less than or equal
above_or_at = close >= sma_20
below_or_at = close <= sma_20
Window Methods¶
# Rolling mean
returns = Returns(window_length=2)
avg_returns = returns.mean(window_length=20)
# Rolling standard deviation
volatility = returns.stddev(window_length=252)
# Rolling max/min
high_20d = USEquityPricing.high.max(window_length=20)
low_20d = USEquityPricing.low.min(window_length=20)
# Rolling sum
volume_20d = USEquityPricing.volume.sum(window_length=20)
Rank and Percentile¶
# Rank (1 = lowest)
volume_rank = USEquityPricing.volume.latest.rank()
# Percentile rank (0-100)
volume_pctile = USEquityPricing.volume.latest.percentile_rank()
# Demean (subtract cross-sectional mean)
demeaned_returns = returns.demean()
# Z-score (normalize to mean=0, std=1)
normalized_returns = returns.zscore()
Common Patterns¶
Pattern 1: Mean Reversion Strategy¶
# Calculate z-score of returns
returns = Returns(window_length=2)
returns_zscore = returns.zscore(window_length=252)
# Select extreme values
oversold = returns_zscore < -2
overbought = returns_zscore > 2
# Create pipeline
pipeline = Pipeline(
columns={
'returns_zscore': returns_zscore,
'signal': -returns_zscore, # Negative for mean reversion
},
screen=liquid_universe & (oversold | overbought)
)
Pattern 2: Momentum Strategy¶
# Multi-period momentum
returns_1m = Returns(window_length=21)
returns_3m = Returns(window_length=63)
returns_6m = Returns(window_length=126)
# Combined momentum score
momentum = (
returns_1m.rank() +
returns_3m.rank() +
returns_6m.rank()
)
# Screen for winners
winners = momentum.top(50)
pipeline = Pipeline(
columns={
'momentum': momentum,
'returns_1m': returns_1m,
},
screen=winners
)
Pattern 3: Technical Breakout¶
# Define technical indicators
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
close = USEquityPricing.close.latest
# Breakout conditions
golden_cross = sma_20 > sma_50
above_sma = close > sma_20
high_volume = USEquityPricing.volume.latest > USEquityPricing.volume.mean(20) * 1.5
# Combined signal
breakout = golden_cross & above_sma & high_volume
pipeline = Pipeline(
columns={
'close': close,
'sma_20': sma_20,
'sma_50': sma_50,
},
screen=breakout
)
Pattern 4: Statistical Arbitrage¶
# Calculate beta to market
spy_returns = Returns(window_length=2, inputs=[spy_close])
stock_returns = Returns(window_length=2)
# Regression to calculate beta
# Note: Advanced regression factors are planned for future releases
beta = RollingLinearRegression(
dependent=stock_returns,
independent=spy_returns,
window_length=252
).beta
# Alpha (excess returns)
alpha = stock_returns - (beta * spy_returns)
# Select high alpha stocks
high_alpha = alpha.zscore(window_length=20) > 1.5
pipeline = Pipeline(
columns={
'alpha': alpha,
'beta': beta,
},
screen=high_alpha
)
Performance Optimization¶
1. Minimize Window Lengths¶
# GOOD: Use only what you need
sma_short = SimpleMovingAverage(window_length=20)
# AVOID: Excessive window length
sma_long = SimpleMovingAverage(window_length=5000) # Too long
2. Reuse Computations¶
# GOOD: Compute once, use multiple times
returns = Returns(window_length=2)
returns_mean = returns.mean(window_length=20)
returns_std = returns.stddev(window_length=20)
returns_zscore = (returns - returns_mean) / returns_std
# AVOID: Redundant computations
returns_mean = Returns(window_length=2).mean(window_length=20)
returns_std = Returns(window_length=2).stddev(window_length=20)
3. Screen Early¶
# GOOD: Screen reduces computation universe
expensive_universe = USEquityPricing.close.latest > 100
pipeline = Pipeline(
columns={
'complex_factor': expensive_computation,
},
screen=expensive_universe # Computes only for screened assets
)
# AVOID: No screen = computes for all assets
pipeline = Pipeline(
columns={
'complex_factor': expensive_computation,
}
)
4. Use Built-in Factors¶
# GOOD: Use optimized built-in
sma = SimpleMovingAverage(window_length=20)
# AVOID: Custom implementation (slower)
class SlowSMA(CustomFactor):
window_length = 20
def compute(self, today, assets, out, close):
out[:] = np.mean(close, axis=0) # Slower than built-in
Data Loaders¶
Pipeline loaders are responsible for loading data into the pipeline system. They bridge the gap between raw data storage (bar readers) and pipeline computations by providing AdjustedArrays that automatically handle corporate actions (splits, dividends).
Loader Architecture¶
Pipeline Engine
↓
Loaders
↓
┌─────────────┬──────────────┬────────────┐
│ Equity │ DataFrame │ Custom │
│ Pricing │ Loader │ Loaders │
│ Loader │ │ │
└─────────────┴──────────────┴────────────┘
↓ ↓ ↓
Bar Readers DataFrames Custom Sources
Built-in Loaders¶
EquityPricingLoader¶
Loads OHLCV data with support for price/volume adjustments and currency conversion.
from rustybt.pipeline.loaders import EquityPricingLoader
# With FX support
loader = EquityPricingLoader(
raw_price_reader=bar_reader,
adjustments_reader=adjustments_reader,
fx_reader=fx_reader
)
# Without FX (simpler)
loader = EquityPricingLoader.without_fx(
raw_price_reader=bar_reader,
adjustments_reader=adjustments_reader
)
Features: - Automatic price/volume adjustments (splits, dividends) - Currency conversion support - Handles corporate actions - Works with any BarReader
Use Cases: - Standard equity backtesting - Multi-currency portfolios - Historical price analysis
DataFrameLoader¶
Loads custom data from pandas DataFrames.
# Note: DataFrameLoader is planned for future releases
# For now, use EquityPricingLoader or custom loaders
With Adjustments:
# Define adjustments (e.g., 2:1 split)
adjustments = pd.DataFrame({
'sid': [1],
'value': [2.0],
'kind': [0], # Multiply adjustment
'start_date': [pd.NaT],
'end_date': [pd.Timestamp('2024-01-15')],
'apply_date': [pd.Timestamp('2024-01-15')]
})
loader = DataFrameLoader(
column=MyDataset.custom_field,
baseline=baseline,
adjustments=adjustments
)
Features: - In-memory data loading - Support for adjustments - Fast for small datasets - Great for testing
Use Cases: - Testing custom factors - Alternative data integration - Small datasets that fit in memory - Rapid prototyping
Custom Loaders¶
Create custom loaders for special data sources:
from rustybt.pipeline.loaders.base import PipelineLoader
from rustybt.lib.adjusted_array import AdjustedArray
class APIDataLoader(PipelineLoader):
"""Load data from REST API."""
def __init__(self, api_url, api_key):
self.api_url = api_url
self.api_key = api_key
def load_adjusted_array(self, domain, columns, dates, sids, mask):
"""
Load data as AdjustedArrays.
Parameters
----------
domain : Domain
Pipeline domain
columns : list[BoundColumn]
Columns to load
dates : pd.DatetimeIndex
Dates to load
sids : pd.Int64Index
Asset IDs to load
mask : np.array[bool]
Asset tradeable mask
Returns
-------
dict[BoundColumn -> AdjustedArray]
"""
# Fetch data from API
raw_data = self._fetch_from_api(columns, dates, sids)
# Convert to AdjustedArrays
out = {}
for column, data_array in raw_data.items():
out[column] = AdjustedArray(
data=data_array.astype(column.dtype),
adjustments={}, # No adjustments
missing_value=column.missing_value
)
return out
def _fetch_from_api(self, columns, dates, sids):
# API fetching logic
import requests
response = requests.get(
f"{self.api_url}/data",
params={
'columns': [c.name for c in columns],
'start': dates[0].isoformat(),
'end': dates[-1].isoformat(),
'sids': list(sids)
},
headers={'Authorization': f'Bearer {self.api_key}'}
)
# Convert response to arrays
# ... implementation ...
return data_dict
@property
def currency_aware(self):
"""Whether loader supports currency conversion."""
return False # This loader doesn't support FX
Usage:
# Register custom loader in pipeline
from rustybt.pipeline import Pipeline
from rustybt.pipeline.engine import SimplePipelineEngine
api_loader = APIDataLoader(
api_url="https://api.example.com",
api_key="your-api-key"
)
# Create engine with custom loader
engine = SimplePipelineEngine(
get_loader=lambda column: api_loader,
asset_finder=finder,
default_domain=domain
)
# Run pipeline
output = engine.run_pipeline(pipeline, start_date, end_date)
Loader Best Practices¶
-
Use Built-in Loaders When Possible
-
Implement Currency Awareness Correctly
-
Handle Missing Data
-
Cache Expensive Operations
class CachedLoader(PipelineLoader): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._cache = {} def load_adjusted_array(self, domain, columns, dates, sids, mask): cache_key = (tuple(dates), tuple(sids)) if cache_key in self._cache: return self._cache[cache_key] result = self._load_impl(domain, columns, dates, sids, mask) self._cache[cache_key] = result return result -
Test Loaders Independently
import pytest # Note: Testing utilities are being refactored def test_custom_loader(): """Test custom loader loads data correctly.""" loader = APIDataLoader(api_url="...", api_key="...") dates = pd.date_range('2024-01-01', periods=10) sids = pd.Int64Index([1, 2, 3]) result = loader.load_adjusted_array( domain=test_domain, columns=[MyDataset.field], dates=dates, sids=sids, mask=np.ones((len(dates), len(sids)), dtype=bool) ) assert MyDataset.field in result assert result[MyDataset.field].data.shape == (len(dates), len(sids))
Testing Strategies¶
Unit Testing Factors¶
import pytest
from rustybt.pipeline import Pipeline
# Note: Testing utilities are being refactored
def test_mean_reversion_factor():
"""Test custom mean reversion factor."""
# Create test data
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
sids = [1, 2, 3]
data = create_test_data(
dates=dates,
sids=sids,
close_prices={1: 100, 2: 50, 3: 200}
)
# Create factor
factor = MeanReversionScore()
# Compute
result = factor.compute(data)
# Assert expected behavior
assert len(result) == len(sids)
assert result.notna().all()
Backtesting Pipelines¶
from rustybt.utils.run_algo import run_algorithm
# Test strategy with pipeline
result = run_algorithm(
start=pd.Timestamp('2023-01-01'),
end=pd.Timestamp('2023-12-31'),
initialize=initialize,
capital_base=100_000,
data_frequency='daily',
bundle='quandl'
)
# Analyze results
print(f"Total return: {result.portfolio_value[-1] / 100_000 - 1:.2%}")
print(f"Sharpe ratio: {result.sharpe:.2f}")
Best Practices¶
- Name your columns clearly - Use descriptive names
- Document factor logic - Add docstrings to custom factors
- Test in isolation - Unit test factors before integration
- Monitor performance - Track pipeline execution time
- Version factor definitions - Track changes to factor logic
- Validate assumptions - Check factor distributions and correlations
See Also¶
- Data Pipeline System - Data ingestion pipelines
- PolarsDataPortal - Modern Decimal-precision data access
- DataPortal (Legacy) - Legacy pandas-based data access
- Bar Readers - Bar reader interface
- Data Sources - Available data sources