Pipeline API Deep Dive¶
Runtime: ~5 minutes
Level: Advanced
This notebook provides a comprehensive deep dive into RustyBT's Pipeline API, covering:
- Factors - Numerical cross-sectional computations
- Filters - Boolean asset selection
- Classifiers - Categorical groupings
- Custom Terms - Building your own pipeline components
- Advanced Techniques - Combining multiple concepts
What is the Pipeline API?¶
The Pipeline API is a declarative system for:
- Computing cross-sectional factors across many assets
- Selecting universes dynamically
- Building factor-based strategies
- Implementing quantitative research workflows
Key Advantage: Express complex computations concisely and run them efficiently.
📋 Notebook Information
- RustyBT Version: 0.1.2+
- Last Validated: 2025-11-07
- API Compatibility: Verified ✅
- Documentation: API Reference
In [ ]:
Copied!
# Setup
from rustybt.analytics import setup_notebook
setup_notebook()
import pandas as pd
import numpy as np
from datetime import datetime
# Pipeline imports
from rustybt.pipeline import Pipeline
from rustybt.pipeline.data import USEquityPricing
from rustybt.pipeline.factors import (
# Note: AverageTrueRange not available, use TrueRange
SimpleMovingAverage,
ExponentialWeightedMovingAverage,
Returns,
RSI,
MACDSignal,
BollingerBands,
# AverageTrueRange, # Not available - use TrueRange + SMA
VWAP,
)
from rustybt.pipeline.filters import StaticAssets
from rustybt.pipeline import CustomFactor, CustomFilter, CustomClassifier
from rustybt import TradingAlgorithm
print("✓ Imports successful")
# Setup
from rustybt.analytics import setup_notebook
setup_notebook()
import pandas as pd
import numpy as np
from datetime import datetime
# Pipeline imports
from rustybt.pipeline import Pipeline
from rustybt.pipeline.data import USEquityPricing
from rustybt.pipeline.factors import (
# Note: AverageTrueRange not available, use TrueRange
SimpleMovingAverage,
ExponentialWeightedMovingAverage,
Returns,
RSI,
MACDSignal,
BollingerBands,
# AverageTrueRange, # Not available - use TrueRange + SMA
VWAP,
)
from rustybt.pipeline.filters import StaticAssets
from rustybt.pipeline import CustomFactor, CustomFilter, CustomClassifier
from rustybt import TradingAlgorithm
print("✓ Imports successful")
In [ ]:
Copied!
# Example: Common technical indicators as factors
# Moving averages
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
ema_12 = ExponentialWeightedMovingAverage(inputs=[USEquityPricing.close], span=12)
# Returns
returns_1d = Returns(window_length=2)
returns_5d = Returns(window_length=6) # 1 for close, 5 for lookback
returns_20d = Returns(window_length=21)
# Volatility (standard deviation of returns)
volatility = returns_1d.stddev(window_length=20)
# RSI (Relative Strength Index)
rsi_14 = RSI(window_length=14)
# MACD
macd = MACD()
# Bollinger Bands
bb_upper, bb_middle, bb_lower = BollingerBands(window_length=20, k=2)
# Average True Range
# atr = AverageTrueRange(window_length=14) # Not available
tr = TrueRange()
atr = SimpleMovingAverage(inputs=[tr], window_length=14)
# Volume-Weighted Average Price
vwap_20 = VWAP(window_length=20)
print("✓ Defined technical factors")
# Example: Common technical indicators as factors
# Moving averages
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
ema_12 = ExponentialWeightedMovingAverage(inputs=[USEquityPricing.close], span=12)
# Returns
returns_1d = Returns(window_length=2)
returns_5d = Returns(window_length=6) # 1 for close, 5 for lookback
returns_20d = Returns(window_length=21)
# Volatility (standard deviation of returns)
volatility = returns_1d.stddev(window_length=20)
# RSI (Relative Strength Index)
rsi_14 = RSI(window_length=14)
# MACD
macd = MACD()
# Bollinger Bands
bb_upper, bb_middle, bb_lower = BollingerBands(window_length=20, k=2)
# Average True Range
# atr = AverageTrueRange(window_length=14) # Not available
tr = TrueRange()
atr = SimpleMovingAverage(inputs=[tr], window_length=14)
# Volume-Weighted Average Price
vwap_20 = VWAP(window_length=20)
print("✓ Defined technical factors")
Factor Operations¶
Factors support mathematical operations and window methods:
In [ ]:
Copied!
# Mathematical operations
price_to_sma = USEquityPricing.close / sma_20
sma_spread = (sma_20 - sma_50) / sma_50 # Percentage spread
# Window methods
high_20d = USEquityPricing.high.max(window_length=20)
low_20d = USEquityPricing.low.min(window_length=20)
mean_volume = USEquityPricing.volume.mean(window_length=20)
# Composite factors
momentum_score = (
returns_20d.zscore() * 0.4 +
rsi_14.zscore() * 0.3 +
(price_to_sma - 1).zscore() * 0.3
)
print("✓ Defined composite factors")
# Mathematical operations
price_to_sma = USEquityPricing.close / sma_20
sma_spread = (sma_20 - sma_50) / sma_50 # Percentage spread
# Window methods
high_20d = USEquityPricing.high.max(window_length=20)
low_20d = USEquityPricing.low.min(window_length=20)
mean_volume = USEquityPricing.volume.mean(window_length=20)
# Composite factors
momentum_score = (
returns_20d.zscore() * 0.4 +
rsi_14.zscore() * 0.3 +
(price_to_sma - 1).zscore() * 0.3
)
print("✓ Defined composite factors")
In [ ]:
Copied!
# Comparison filters
above_sma = USEquityPricing.close > sma_20
golden_cross = sma_20 > sma_50
overbought = rsi_14 > 70
oversold = rsi_14 < 30
# Range filters
high_volume = USEquityPricing.volume > mean_volume * 1.5
normal_volatility = volatility.percentile_between(25, 75)
# Top/bottom filters
top_momentum = momentum_score.top(50) # Top 50 by momentum
bottom_rsi = rsi_14.bottom(20) # Bottom 20 by RSI (most oversold)
# Combining filters
long_candidates = (
golden_cross &
above_sma &
(rsi_14 < 70) &
high_volume
)
short_candidates = (
~golden_cross & # ~ is NOT
(USEquityPricing.close < sma_20) &
overbought
)
print("✓ Defined filters")
# Comparison filters
above_sma = USEquityPricing.close > sma_20
golden_cross = sma_20 > sma_50
overbought = rsi_14 > 70
oversold = rsi_14 < 30
# Range filters
high_volume = USEquityPricing.volume > mean_volume * 1.5
normal_volatility = volatility.percentile_between(25, 75)
# Top/bottom filters
top_momentum = momentum_score.top(50) # Top 50 by momentum
bottom_rsi = rsi_14.bottom(20) # Bottom 20 by RSI (most oversold)
# Combining filters
long_candidates = (
golden_cross &
above_sma &
(rsi_14 < 70) &
high_volume
)
short_candidates = (
~golden_cross & # ~ is NOT
(USEquityPricing.close < sma_20) &
overbought
)
print("✓ Defined filters")
In [ ]:
Copied!
# Example: Classify assets by trend strength
class TrendStrength(CustomClassifier):
"""
Classify assets into trend strength categories:
- Strong Uptrend (2)
- Weak Uptrend (1)
- Neutral (0)
- Weak Downtrend (-1)
- Strong Downtrend (-2)
"""
inputs = [USEquityPricing.close]
window_length = 50
dtype = np.int8
missing_value = 0
def compute(self, today, assets, out, close):
# Calculate short and long SMAs
sma_short = np.mean(close[-20:], axis=0)
sma_long = np.mean(close, axis=0)
current = close[-1]
# Calculate trend strength
short_spread = (current - sma_short) / sma_short
long_spread = (current - sma_long) / sma_long
# Classify
out[:] = 0 # Default to neutral
out[short_spread > 0.05] = 2 # Strong uptrend
out[(short_spread > 0.02) & (short_spread <= 0.05)] = 1 # Weak uptrend
out[short_spread < -0.05] = -2 # Strong downtrend
out[(short_spread < -0.02) & (short_spread >= -0.05)] = -1 # Weak downtrend
trend_classifier = TrendStrength()
# Use classifier for filtering
strong_uptrends = (trend_classifier.eq(2))
any_uptrend = (trend_classifier > 0)
print("✓ Defined classifiers")
# Example: Classify assets by trend strength
class TrendStrength(CustomClassifier):
"""
Classify assets into trend strength categories:
- Strong Uptrend (2)
- Weak Uptrend (1)
- Neutral (0)
- Weak Downtrend (-1)
- Strong Downtrend (-2)
"""
inputs = [USEquityPricing.close]
window_length = 50
dtype = np.int8
missing_value = 0
def compute(self, today, assets, out, close):
# Calculate short and long SMAs
sma_short = np.mean(close[-20:], axis=0)
sma_long = np.mean(close, axis=0)
current = close[-1]
# Calculate trend strength
short_spread = (current - sma_short) / sma_short
long_spread = (current - sma_long) / sma_long
# Classify
out[:] = 0 # Default to neutral
out[short_spread > 0.05] = 2 # Strong uptrend
out[(short_spread > 0.02) & (short_spread <= 0.05)] = 1 # Weak uptrend
out[short_spread < -0.05] = -2 # Strong downtrend
out[(short_spread < -0.02) & (short_spread >= -0.05)] = -1 # Weak downtrend
trend_classifier = TrendStrength()
# Use classifier for filtering
strong_uptrends = (trend_classifier.eq(2))
any_uptrend = (trend_classifier > 0)
print("✓ Defined classifiers")
In [ ]:
Copied!
class MeanReversionScore(CustomFactor):
"""
Calculate mean reversion score based on:
- Distance from moving average
- Volatility
- Volume surge
Higher score = more mean reversion opportunity
"""
inputs = [
USEquityPricing.close,
USEquityPricing.volume,
]
window_length = 30
def compute(self, today, assets, out, close, volume):
# Calculate components
mean_price = np.mean(close, axis=0)
std_price = np.std(close, axis=0)
current_price = close[-1]
# Z-score (how far from mean in std devs)
z_score = np.abs((current_price - mean_price) / std_price)
# Volume surge (current vs average)
mean_volume = np.mean(volume[:-1], axis=0)
current_volume = volume[-1]
volume_surge = current_volume / mean_volume
# Combined score
# Higher z-score + volume surge = better mean reversion opportunity
out[:] = z_score * np.log1p(volume_surge)
class BollingerPercentB(CustomFactor):
"""
Bollinger %B: Position of price within Bollinger Bands
- 0.0 = at lower band
- 0.5 = at middle (SMA)
- 1.0 = at upper band
"""
inputs = [USEquityPricing.close]
params = {'window_length': 20, 'k': 2}
def compute(self, today, assets, out, close, window_length, k):
sma = np.mean(close, axis=0)
std = np.std(close, axis=0)
current = close[-1]
upper = sma + k * std
lower = sma - k * std
# %B calculation
out[:] = (current - lower) / (upper - lower)
out[:] = np.clip(out, 0, 1) # Clip to 0-1 range
class RelativeStrength(CustomFactor):
"""
Relative strength vs SPY (or other benchmark)
"""
inputs = [USEquityPricing.close]
window_length = 20
def compute(self, today, assets, out, close):
# Calculate returns for each asset
asset_returns = (close[-1] - close[0]) / close[0]
# For simplicity, compare to cross-sectional median
# In practice, you'd want to pass in benchmark data
benchmark_return = np.median(asset_returns)
# Relative strength
out[:] = asset_returns - benchmark_return
# Instantiate custom factors
mean_reversion_score = MeanReversionScore()
pct_b = BollingerPercentB()
relative_strength = RelativeStrength(window_length=60)
print("✓ Defined custom factors")
class MeanReversionScore(CustomFactor):
"""
Calculate mean reversion score based on:
- Distance from moving average
- Volatility
- Volume surge
Higher score = more mean reversion opportunity
"""
inputs = [
USEquityPricing.close,
USEquityPricing.volume,
]
window_length = 30
def compute(self, today, assets, out, close, volume):
# Calculate components
mean_price = np.mean(close, axis=0)
std_price = np.std(close, axis=0)
current_price = close[-1]
# Z-score (how far from mean in std devs)
z_score = np.abs((current_price - mean_price) / std_price)
# Volume surge (current vs average)
mean_volume = np.mean(volume[:-1], axis=0)
current_volume = volume[-1]
volume_surge = current_volume / mean_volume
# Combined score
# Higher z-score + volume surge = better mean reversion opportunity
out[:] = z_score * np.log1p(volume_surge)
class BollingerPercentB(CustomFactor):
"""
Bollinger %B: Position of price within Bollinger Bands
- 0.0 = at lower band
- 0.5 = at middle (SMA)
- 1.0 = at upper band
"""
inputs = [USEquityPricing.close]
params = {'window_length': 20, 'k': 2}
def compute(self, today, assets, out, close, window_length, k):
sma = np.mean(close, axis=0)
std = np.std(close, axis=0)
current = close[-1]
upper = sma + k * std
lower = sma - k * std
# %B calculation
out[:] = (current - lower) / (upper - lower)
out[:] = np.clip(out, 0, 1) # Clip to 0-1 range
class RelativeStrength(CustomFactor):
"""
Relative strength vs SPY (or other benchmark)
"""
inputs = [USEquityPricing.close]
window_length = 20
def compute(self, today, assets, out, close):
# Calculate returns for each asset
asset_returns = (close[-1] - close[0]) / close[0]
# For simplicity, compare to cross-sectional median
# In practice, you'd want to pass in benchmark data
benchmark_return = np.median(asset_returns)
# Relative strength
out[:] = asset_returns - benchmark_return
# Instantiate custom factors
mean_reversion_score = MeanReversionScore()
pct_b = BollingerPercentB()
relative_strength = RelativeStrength(window_length=60)
print("✓ Defined custom factors")
5. Building a Complete Pipeline¶
Let's combine everything into a production-ready pipeline.
In [ ]:
Copied!
def make_pipeline():
"""
Create a comprehensive pipeline combining multiple factors.
"""
# Base factors
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
returns_20d = Returns(window_length=21)
rsi_14 = RSI(window_length=14)
volatility = Returns(window_length=2).stddev(window_length=20)
# Custom factors
mean_rev = MeanReversionScore()
rel_strength = RelativeStrength(window_length=60)
pct_b = BollingerPercentB()
# Composite scores
momentum_score = (
returns_20d.zscore() * 0.5 +
rel_strength.zscore() * 0.5
)
# Filters
liquid = USEquityPricing.volume.latest > 1_000_000
not_too_volatile = volatility.percentile_between(5, 95)
# Universe
base_universe = liquid & not_too_volatile
# Long/short selection
longs = momentum_score.top(20, mask=base_universe)
shorts = momentum_score.bottom(20, mask=base_universe)
# Mean reversion opportunities
mean_rev_longs = (
(pct_b < 0.2) & # Near lower band
(rsi_14 < 30) & # Oversold
base_universe
)
mean_rev_shorts = (
(pct_b > 0.8) & # Near upper band
(rsi_14 > 70) & # Overbought
base_universe
)
return Pipeline(
columns={
# Factors for analysis
'close': USEquityPricing.close.latest,
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi_14,
'pct_b': pct_b,
'volatility': volatility,
'momentum_score': momentum_score,
'mean_reversion_score': mean_rev,
'relative_strength': rel_strength,
# Selection filters
'momentum_long': longs,
'momentum_short': shorts,
'mean_rev_long': mean_rev_longs,
'mean_rev_short': mean_rev_shorts,
},
screen=base_universe
)
pipeline = make_pipeline()
print("✓ Pipeline created with 13 columns")
def make_pipeline():
"""
Create a comprehensive pipeline combining multiple factors.
"""
# Base factors
sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)
sma_50 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50)
returns_20d = Returns(window_length=21)
rsi_14 = RSI(window_length=14)
volatility = Returns(window_length=2).stddev(window_length=20)
# Custom factors
mean_rev = MeanReversionScore()
rel_strength = RelativeStrength(window_length=60)
pct_b = BollingerPercentB()
# Composite scores
momentum_score = (
returns_20d.zscore() * 0.5 +
rel_strength.zscore() * 0.5
)
# Filters
liquid = USEquityPricing.volume.latest > 1_000_000
not_too_volatile = volatility.percentile_between(5, 95)
# Universe
base_universe = liquid & not_too_volatile
# Long/short selection
longs = momentum_score.top(20, mask=base_universe)
shorts = momentum_score.bottom(20, mask=base_universe)
# Mean reversion opportunities
mean_rev_longs = (
(pct_b < 0.2) & # Near lower band
(rsi_14 < 30) & # Oversold
base_universe
)
mean_rev_shorts = (
(pct_b > 0.8) & # Near upper band
(rsi_14 > 70) & # Overbought
base_universe
)
return Pipeline(
columns={
# Factors for analysis
'close': USEquityPricing.close.latest,
'sma_20': sma_20,
'sma_50': sma_50,
'rsi': rsi_14,
'pct_b': pct_b,
'volatility': volatility,
'momentum_score': momentum_score,
'mean_reversion_score': mean_rev,
'relative_strength': rel_strength,
# Selection filters
'momentum_long': longs,
'momentum_short': shorts,
'mean_rev_long': mean_rev_longs,
'mean_rev_short': mean_rev_shorts,
},
screen=base_universe
)
pipeline = make_pipeline()
print("✓ Pipeline created with 13 columns")
6. Using Pipeline in a Strategy¶
Integrate the pipeline into a trading algorithm.
In [ ]:
Copied!
class PipelineStrategy(TradingAlgorithm):
"""
Multi-strategy algorithm using pipeline outputs:
- Momentum long/short
- Mean reversion long/short
"""
def initialize(self, context):
# Attach pipeline
self.attach_pipeline(make_pipeline(), 'my_pipeline')
# Schedule rebalance
self.schedule_function(
self.rebalance,
date_rules=self.date_rules.week_start(),
time_rules=self.time_rules.market_open()
)
# Track strategy allocation
context.momentum_weight = 0.6
context.mean_rev_weight = 0.4
def before_trading_start(self, context, data):
# Get pipeline output
context.output = self.pipeline_output('my_pipeline')
def rebalance(self, context, data):
output = context.output
# Get selections
momentum_longs = output[output['momentum_long']].index
momentum_shorts = output[output['momentum_short']].index
mean_rev_longs = output[output['mean_rev_long']].index
mean_rev_shorts = output[output['mean_rev_short']].index
# Calculate target positions
targets = {}
# Momentum positions
momentum_long_weight = context.momentum_weight / (2 * max(len(momentum_longs), 1))
momentum_short_weight = -context.momentum_weight / (2 * max(len(momentum_shorts), 1))
for asset in momentum_longs:
targets[asset] = targets.get(asset, 0) + momentum_long_weight
for asset in momentum_shorts:
targets[asset] = targets.get(asset, 0) + momentum_short_weight
# Mean reversion positions
mean_rev_long_weight = context.mean_rev_weight / (2 * max(len(mean_rev_longs), 1))
mean_rev_short_weight = -context.mean_rev_weight / (2 * max(len(mean_rev_shorts), 1))
for asset in mean_rev_longs:
targets[asset] = targets.get(asset, 0) + mean_rev_long_weight
for asset in mean_rev_shorts:
targets[asset] = targets.get(asset, 0) + mean_rev_short_weight
# Execute trades
for asset, target_weight in targets.items():
if data.can_trade(asset):
self.order_target_percent(asset, target_weight)
# Close positions not in targets
for asset in context.portfolio.positions:
if asset not in targets and data.can_trade(asset):
self.order_target(asset, 0)
# Record metrics
self.record(
num_positions=len(context.portfolio.positions),
momentum_longs=len(momentum_longs),
momentum_shorts=len(momentum_shorts),
mean_rev_longs=len(mean_rev_longs),
mean_rev_shorts=len(mean_rev_shorts),
)
print("✓ Strategy class defined")
class PipelineStrategy(TradingAlgorithm):
"""
Multi-strategy algorithm using pipeline outputs:
- Momentum long/short
- Mean reversion long/short
"""
def initialize(self, context):
# Attach pipeline
self.attach_pipeline(make_pipeline(), 'my_pipeline')
# Schedule rebalance
self.schedule_function(
self.rebalance,
date_rules=self.date_rules.week_start(),
time_rules=self.time_rules.market_open()
)
# Track strategy allocation
context.momentum_weight = 0.6
context.mean_rev_weight = 0.4
def before_trading_start(self, context, data):
# Get pipeline output
context.output = self.pipeline_output('my_pipeline')
def rebalance(self, context, data):
output = context.output
# Get selections
momentum_longs = output[output['momentum_long']].index
momentum_shorts = output[output['momentum_short']].index
mean_rev_longs = output[output['mean_rev_long']].index
mean_rev_shorts = output[output['mean_rev_short']].index
# Calculate target positions
targets = {}
# Momentum positions
momentum_long_weight = context.momentum_weight / (2 * max(len(momentum_longs), 1))
momentum_short_weight = -context.momentum_weight / (2 * max(len(momentum_shorts), 1))
for asset in momentum_longs:
targets[asset] = targets.get(asset, 0) + momentum_long_weight
for asset in momentum_shorts:
targets[asset] = targets.get(asset, 0) + momentum_short_weight
# Mean reversion positions
mean_rev_long_weight = context.mean_rev_weight / (2 * max(len(mean_rev_longs), 1))
mean_rev_short_weight = -context.mean_rev_weight / (2 * max(len(mean_rev_shorts), 1))
for asset in mean_rev_longs:
targets[asset] = targets.get(asset, 0) + mean_rev_long_weight
for asset in mean_rev_shorts:
targets[asset] = targets.get(asset, 0) + mean_rev_short_weight
# Execute trades
for asset, target_weight in targets.items():
if data.can_trade(asset):
self.order_target_percent(asset, target_weight)
# Close positions not in targets
for asset in context.portfolio.positions:
if asset not in targets and data.can_trade(asset):
self.order_target(asset, 0)
# Record metrics
self.record(
num_positions=len(context.portfolio.positions),
momentum_longs=len(momentum_longs),
momentum_shorts=len(momentum_shorts),
mean_rev_longs=len(mean_rev_longs),
mean_rev_shorts=len(mean_rev_shorts),
)
print("✓ Strategy class defined")
In [ ]:
Copied!
# Define multiple universes
liquid_universe = USEquityPricing.volume.latest > 5_000_000
large_cap = USEquityPricing.close.latest * USEquityPricing.volume.latest > 1_000_000_000
mid_cap = (
(USEquityPricing.close.latest * USEquityPricing.volume.latest > 100_000_000) &
(USEquityPricing.close.latest * USEquityPricing.volume.latest < 1_000_000_000)
)
# Separate strategies per universe
large_cap_momentum = momentum_score.top(10, mask=large_cap)
mid_cap_value = (USEquityPricing.close / sma_20).bottom(20, mask=mid_cap)
print("✓ Multi-universe pipeline defined")
# Define multiple universes
liquid_universe = USEquityPricing.volume.latest > 5_000_000
large_cap = USEquityPricing.close.latest * USEquityPricing.volume.latest > 1_000_000_000
mid_cap = (
(USEquityPricing.close.latest * USEquityPricing.volume.latest > 100_000_000) &
(USEquityPricing.close.latest * USEquityPricing.volume.latest < 1_000_000_000)
)
# Separate strategies per universe
large_cap_momentum = momentum_score.top(10, mask=large_cap)
mid_cap_value = (USEquityPricing.close / sma_20).bottom(20, mask=mid_cap)
print("✓ Multi-universe pipeline defined")
B. Factor Ranking and Deciles¶
In [ ]:
Copied!
# Rank factors into deciles
momentum_decile = momentum_score.deciles()
volatility_quintile = volatility.quantiles(5)
# Select based on multiple factor ranks
high_momentum_low_vol = (
(momentum_decile >= 8) & # Top 30%
(volatility_quintile <= 2) & # Bottom 40%
base_universe
)
print("✓ Factor ranking defined")
# Rank factors into deciles
momentum_decile = momentum_score.deciles()
volatility_quintile = volatility.quantiles(5)
# Select based on multiple factor ranks
high_momentum_low_vol = (
(momentum_decile >= 8) & # Top 30%
(volatility_quintile <= 2) & # Bottom 40%
base_universe
)
print("✓ Factor ranking defined")
C. Window-Safe Computations¶
In [ ]:
Copied!
class SafeMeanReversion(CustomFactor):
"""
Mean reversion factor with proper NaN and zero handling.
"""
inputs = [USEquityPricing.close, USEquityPricing.volume]
window_length = 30
def compute(self, today, assets, out, close, volume):
# Handle NaNs
close = np.nan_to_num(close, nan=0.0)
volume = np.nan_to_num(volume, nan=0.0)
# Calculate with safe division
mean_price = np.mean(close, axis=0)
std_price = np.std(close, axis=0)
# Avoid division by zero
with np.errstate(divide='ignore', invalid='ignore'):
z_score = np.abs((close[-1] - mean_price) / std_price)
z_score = np.nan_to_num(z_score, nan=0.0, posinf=0.0, neginf=0.0)
out[:] = z_score
print("✓ Window-safe factor defined")
class SafeMeanReversion(CustomFactor):
"""
Mean reversion factor with proper NaN and zero handling.
"""
inputs = [USEquityPricing.close, USEquityPricing.volume]
window_length = 30
def compute(self, today, assets, out, close, volume):
# Handle NaNs
close = np.nan_to_num(close, nan=0.0)
volume = np.nan_to_num(volume, nan=0.0)
# Calculate with safe division
mean_price = np.mean(close, axis=0)
std_price = np.std(close, axis=0)
# Avoid division by zero
with np.errstate(divide='ignore', invalid='ignore'):
z_score = np.abs((close[-1] - mean_price) / std_price)
z_score = np.nan_to_num(z_score, nan=0.0, posinf=0.0, neginf=0.0)
out[:] = z_score
print("✓ Window-safe factor defined")
Summary¶
Key Takeaways¶
- Factors are the core building blocks for quantitative strategies
- Filters provide dynamic universe selection
- Classifiers enable categorical analysis and grouping
- Custom terms allow unlimited extensibility
- Pipelines efficiently compute cross-sectional factors at scale
Best Practices¶
✅ Use built-in factors when possible (they're optimized)
✅ Combine multiple factors for robust signals
✅ Always define a proper universe screen
✅ Handle NaNs and edge cases in custom factors
✅ Use .zscore() to normalize factors before combining
✅ Test pipeline outputs thoroughly before live trading
Next Steps¶
- Notebook 12: Advanced Order Management
- Notebook 13: Portfolio Optimization + Walk Forward
- Notebook 14: Multi-Timeframe Strategies