Skip to content

Advanced Pipeline Techniques

Last Updated: 2025-10-31

This guide covers advanced Pipeline techniques for building sophisticated quantitative trading strategies.

Table of Contents

  1. Combining Factors
  2. Combining Filters
  3. Creating Custom Factors
  4. Creating Custom Filters
  5. Advanced Patterns
  6. Best Practices

Combining Factors

One of Pipeline's most powerful features is the ability to combine factors to create sophisticated signals.

Arithmetic Operations

Factors support standard arithmetic operations:

from rustybt.pipeline.factors import Returns, AnnualizedVolatility

# Simple arithmetic
returns_1m = Returns(window_length=21)
returns_3m = Returns(window_length=63)

# Addition
combined_momentum = returns_1m + returns_3m

# Weighted average
momentum_score = (0.3 * returns_1m) + (0.7 * returns_3m)

# Division (e.g., risk-adjusted returns)
volatility = AnnualizedVolatility(window_length=252)
risk_adjusted = returns_1m / volatility

# Multiple operations
composite = (returns_1m + returns_3m) / 2

Ranking and Normalization

Rank or normalize factors before combining (common for multi-factor models):

# Rank factors before combining
returns_1m_rank = returns_1m.rank()
returns_3m_rank = returns_3m.rank()
vol_rank = volatility.rank()

# Equal-weight composite (lower volatility rank is better)
quality_momentum = (returns_1m_rank + returns_3m_rank - vol_rank) / 3

# Z-score normalization
returns_1m_z = returns_1m.zscore()
returns_3m_z = returns_3m.zscore()
normalized_composite = (returns_1m_z + returns_3m_z) / 2

Working with Multi-Output Factors

Some factors return multiple outputs (e.g., Bollinger Bands, Ichimoku):

from rustybt.pipeline.factors import BollingerBands, IchimokuKinkoHyo

# Access individual outputs
bb = BollingerBands(window_length=20, k=2.0)
upper = bb.upper
middle = bb.middle
lower = bb.lower

# Combine outputs
bb_width = (upper - lower) / middle  # Bollinger Band width as percentage
bb_position = (USEquityPricing.close.latest - lower) / (upper - lower)

# Ichimoku components
ichimoku = IchimokuKinkoHyo(window_length=52)
cloud_thickness = ichimoku.senkou_span_a - ichimoku.senkou_span_b

# Use in filters
price = USEquityPricing.close.latest
above_cloud = price > ichimoku.senkou_span_a
below_cloud = price < ichimoku.senkou_span_b

Combining Filters

Filters can be combined using logical operations to create complex selection criteria.

Logical Operations

from rustybt.pipeline.factors import Returns, AverageDollarVolume
from rustybt.pipeline.data import USEquityPricing

# Individual filters
liquid = AverageDollarVolume(window_length=20).top(500)
price_filter = USEquityPricing.close.latest > 5
momentum_filter = Returns(window_length=21) > 0.05

# AND (&) - Must meet ALL conditions
conservative_universe = liquid & price_filter & momentum_filter

# OR (|) - Must meet ANY condition
returns_1m = Returns(window_length=21)
returns_3m = Returns(window_length=63)
high_momentum = (returns_1m > 0.10) | (returns_3m > 0.20)

# NOT (~) - Exclude matching assets
not_penny_stocks = ~(USEquityPricing.close.latest < 5)

# Complex combinations with parentheses
advanced_filter = (
    liquid &
    not_penny_stocks &
    (high_momentum | (volatility < 0.20))
)

Top/Bottom with Masking

Use the mask parameter to restrict top() or bottom() to a subset:

# Top N after applying filter
liquid_filter = AverageDollarVolume(window_length=20).top(1000)
returns = Returns(window_length=21)

# Top 50 momentum stocks from liquid universe only
top_momentum = returns.top(50, mask=liquid_filter)

# Bottom 50 volatility from high momentum stocks
high_momentum_filter = returns.top(200)
low_vol = volatility.bottom(50, mask=high_momentum_filter)

Creating Custom Factors

When built-in factors don't meet your needs, create custom factors.

Basic Custom Factor

from rustybt.pipeline import CustomFactor
from rustybt.pipeline.data import USEquityPricing
import numpy as np

class MeanReversion(CustomFactor):
    """Z-score of price vs N-day mean."""

    inputs = [USEquityPricing.close]
    window_length = 20

    def compute(self, today, assets, out, close):
        """Compute z-score.

        Parameters
        ----------
        today : pd.Timestamp
            Current date
        assets : np.ndarray
            Array of SIDs (asset identifiers)
        out : np.ndarray
            Output array to fill
        close : np.ndarray
            Close price array (window_length x num_assets)
        """
        # Calculate mean and std for each asset (axis=0 is time)
        mean = close.mean(axis=0)
        std = close.std(axis=0)

        # Calculate z-score
        current_price = close[-1]
        z_score = (current_price - mean) / (std + 1e-9)  # Avoid division by zero

        out[:] = -z_score  # Negative: oversold is positive signal

# Use in pipeline
mean_reversion = MeanReversion()
oversold = mean_reversion > 1.5  # Z-score > 1.5 standard deviations

Custom Factor with Parameters

class CustomRSI(CustomFactor):
    """RSI with configurable overbought/oversold levels."""

    inputs = [USEquityPricing.close]
    window_length = 14
    params = {
        'overbought': 70,
        'oversold': 30,
    }

    def compute(self, today, assets, out, close, overbought, oversold):
        """Compute RSI and classify."""
        # Calculate price changes
        delta = np.diff(close, axis=0)

        # Separate gains and losses
        gains = np.where(delta > 0, delta, 0)
        losses = np.where(delta < 0, -delta, 0)

        # Average gains and losses
        avg_gain = gains.mean(axis=0)
        avg_loss = losses.mean(axis=0)

        # Calculate RS and RSI
        rs = avg_gain / (avg_loss + 1e-9)
        rsi = 100 - (100 / (1 + rs))

        out[:] = rsi

# Usage with custom parameters
rsi = CustomRSI(overbought=80, oversold=20)

Advanced Custom Factor

class TrendStrength(CustomFactor):
    """Measure trend strength using linear regression R-squared."""

    inputs = [USEquityPricing.close]
    window_length = 50

    def compute(self, today, assets, out, close):
        from scipy import stats

        # For each asset
        for i in range(len(assets)):
            prices = close[:, i]

            # Skip if too many NaNs
            if np.isnan(prices).sum() > self.window_length * 0.2:
                out[i] = np.nan
                continue

            # Linear regression
            x = np.arange(len(prices))
            # Remove NaNs
            mask = ~np.isnan(prices)
            if mask.sum() < 2:
                out[i] = np.nan
                continue

            slope, intercept, r_value, p_value, std_err = stats.linregress(
                x[mask], prices[mask]
            )

            # R-squared indicates trend strength
            out[i] = r_value ** 2

# Use it
trend = TrendStrength()
strong_trend = trend > 0.7  # R² > 0.7

Creating Custom Filters

Create custom filters for complex selection logic.

Basic Custom Filter

from rustybt.pipeline import CustomFilter

class VolatilityFilter(CustomFilter):
    """Select stocks with volatility in specified range."""

    inputs = [Returns(window_length=2)]
    window_length = 252
    params = ('min_vol', 'max_vol')

    def compute(self, today, assets, out, returns, min_vol, max_vol):
        """Filter by volatility range.

        Parameters
        ----------
        today : pd.Timestamp
        assets : np.ndarray of int (SIDs)
        out : np.ndarray[bool]
            Output array (True = include, False = exclude)
        returns : np.ndarray
            Daily returns array
        min_vol, max_vol : float
            Volatility range
        """
        volatility = np.std(returns, axis=0) * np.sqrt(252)
        out[:] = (volatility >= min_vol) & (volatility <= max_vol)

# Usage
moderate_vol = VolatilityFilter(min_vol=0.15, max_vol=0.35)

Custom Filter with Asset Finder (Advanced)

For filtering based on asset metadata (names, types, etc.), use a factory pattern with closures:

from rustybt.pipeline import CustomFilter
from rustybt.data.bundles.core import load

FIAT_CURRENCIES = {
    'USD', 'USDT', 'USDC', 'BUSD', 'DAI',
    'EUR', 'GBP', 'AUD', 'CAD', 'CHF',
    'HKD', 'JPY', 'NZD', 'SGD',
    'UAH', 'TRY', 'RUB', 'ZAR', 'NGN',
    'BRL', 'ARS', 'MXN', 'CLP',
    'CZK', 'DKK', 'NOK', 'SEK', 'PLN',
    'HUF', 'RON', 'PHP', 'IDR', 'THB',
    'VND', 'SAR', 'AED', 'INR', 'CNY',
    'KRW', 'KZT',
}

def make_fiat_filter(asset_finder):
    """Factory function to create a CustomFilter with access to asset_finder.

    This uses a closure to give the filter access to the asset finder
    without passing it through the constructor (CustomFilter doesn't support
    custom constructor parameters).

    Parameters
    ----------
    asset_finder : AssetFinder
        Asset finder instance from bundle

    Returns
    -------
    ExcludeFiatPairs
        Filter instance that excludes fiat pairs
    """
    class ExcludeFiatPairs(CustomFilter):
        """Exclude cryptocurrency pairs involving fiat currencies.

        This demonstrates accessing asset metadata in CustomFilter by using
        the asset finder (from closure) to map SIDs to Asset objects.
        """

        inputs = ()
        window_length = 1

        def compute(self, today, assets, out):
            """Filter based on asset names.

            Parameters
            ----------
            today : pd.Timestamp
            assets : np.ndarray of int
                Array of SIDs (security identifiers)
            out : np.ndarray[bool]
                Output array to fill
            """
            for i, sid in enumerate(assets):
                # Look up asset by SID using asset_finder from closure
                asset = asset_finder.retrieve_asset(sid)
                asset_name = asset.asset_name

                # Parse and check for fiat
                parts = asset_name.replace(':', '/').split('/')
                is_fiat = any(
                    part.upper().strip() in FIAT_CURRENCIES
                    for part in parts
                )

                out[i] = not is_fiat  # True = include, False = exclude

    return ExcludeFiatPairs()


def make_pipeline():
    """Example usage with asset finder."""
    from rustybt.pipeline import Pipeline
    from rustybt.pipeline.factors import AverageDollarVolume
    from rustybt.pipeline.domain import EquityCalendarDomain

    # Get asset finder from bundle
    bundle_data = load('binance-spot-1d')
    asset_finder = bundle_data.asset_finder

    # Create domain and filters
    CRYPTO = EquityCalendarDomain(country_code="US", calendar_name="24/7")
    avg_volume = AverageDollarVolume(window_length=5)
    exclude_fiat = make_fiat_filter(asset_finder)  # Use factory function

    # Combine: filter non-fiat FIRST, then take top 10
    # This guarantees exactly 10 non-fiat assets
    non_fiat_top_10 = avg_volume.top(10, mask=exclude_fiat)

    return Pipeline(
        columns={'avg_volume': avg_volume},
        screen=non_fiat_top_10,
        domain=CRYPTO,
    )

Alternative: Post-Pipeline Filtering

For simple cases or when asset finder isn't needed, filter after pipeline output:

def make_pipeline():
    """Get top 30, will filter to 10 non-fiat."""
    avg_volume = AverageDollarVolume(window_length=5)
    return Pipeline(
        columns={'avg_volume': avg_volume},
        screen=avg_volume.top(30),  # Oversample
        domain=CRYPTO,
    )

def before_trading_start(context, data):
    """Filter to top 10 non-fiat pairs."""
    pipeline_output = context.pipeline_output('universe')

    # Filter using custom logic
    def is_fiat_pair(symbol):
        parts = symbol.replace(':', '/').split('/')
        return any(p.upper() in FIAT_CURRENCIES for p in parts)

    # Filter and take top 10
    non_fiat = pipeline_output[
        ~pipeline_output.index.map(lambda a: is_fiat_pair(a.asset_name))
    ]
    top_10 = non_fiat.nlargest(10, 'avg_volume')
    context.universe = list(top_10.index)

When to use each approach: - CustomFilter with asset finder: Guarantees exact count, more efficient, recommended for production - Post-pipeline filtering: Simpler, good for prototyping, may not guarantee exact count


Advanced Patterns

Multi-Stage Filtering

Build complex universes through progressive filtering:

# Stage 1: Broad universe
liquid = AverageDollarVolume(window_length=20).top(1000)
price_filter = USEquityPricing.close.latest > 5
stage1 = liquid & price_filter

# Stage 2: Momentum screen
returns = Returns(window_length=21)
momentum_filter = returns.top(200, mask=stage1)

# Stage 3: Quality screen
volatility = AnnualizedVolatility(window_length=252)
quality_filter = volatility.bottom(100, mask=momentum_filter)

# Final universe
final_universe = quality_filter

Factor-Driven Filters

Create filters from factor values:

# Define factors
returns_1m = Returns(window_length=21)
returns_3m = Returns(window_length=63)
volatility = AnnualizedVolatility(window_length=252)

# Multiple conditions
positive_momentum = returns_1m > 0
sustained_momentum = returns_3m > 0.10
low_risk = volatility < 0.30

# Combine into final filter
quality_growth = positive_momentum & sustained_momentum & low_risk

# Top N from filtered universe
top_quality = returns_1m.top(50, mask=quality_growth)

Conditional Strategy Logic

Different universe logic based on market conditions:

from rustybt.pipeline.factors import RSI, Returns

# RSI-based conditions
rsi = RSI(window_length=14)
oversold = rsi < 30
overbought = rsi > 70

# Different logic for different conditions
returns = Returns(window_length=21)

# Mean reversion for extreme RSI
mean_reversion_universe = oversold | overbought

# Momentum for neutral RSI
neutral_rsi = ~(oversold | overbought)
momentum_universe = neutral_rsi & (returns > 0.05)

# Combined strategy
either_strategy = mean_reversion_universe | momentum_universe

Sector-Neutral Selection

Select top N assets per group (e.g., sector):

from rustybt.pipeline.classifiers import Sector

# Define momentum and sector
momentum = Returns(window_length=63)
sector = Sector()

# Top 3 momentum stocks per sector
top_per_sector = momentum.top(3, groupby=sector)

pipeline = Pipeline(
    columns={
        'momentum': momentum,
        'sector': sector,
    },
    screen=top_per_sector,
)

Best Practices

1. Use Parentheses for Clarity

Make complex combinations clear with parentheses:

# Clear
universe = (liquid & price_filter) & (momentum | quality)

# Unclear - operator precedence may surprise you
universe = liquid & price_filter & momentum | quality

2. Name Intermediate Steps

Improve readability by naming intermediate computations:

# Good - clear intent
liquid = AverageDollarVolume(window_length=20).top(1000)
price_ok = USEquityPricing.close.latest > 5
base_universe = liquid & price_ok

# Less clear - hard to understand
universe = AverageDollarVolume(window_length=20).top(1000) & (USEquityPricing.close.latest > 5)

3. Apply Broad Filters First

More efficient - reduces computation for subsequent factors:

# Efficient: broad filter first
liquid = dollar_volume.top(1000)  # Quick
expensive_calc = custom_factor.top(50, mask=liquid)  # Only on 1000 assets

# Inefficient: expensive calc on all assets
expensive = custom_factor.top(50)  # Slow on all assets
liquid_and_expensive = expensive & liquid

4. Test Filters Separately

Debug incrementally by testing each filter:

# Test each filter individually
pipe1 = Pipeline(columns={}, screen=filter1)
output1 = engine.run_pipeline(pipe1, start, end)
print(f"Filter1: {len(output1)} assets")

pipe2 = Pipeline(columns={}, screen=filter2)
output2 = engine.run_pipeline(pipe2, start, end)
print(f"Filter2: {len(output2)} assets")

# Then combine
pipe_combined = Pipeline(columns={}, screen=filter1 & filter2)
output_combined = engine.run_pipeline(pipe_combined, start, end)
print(f"Combined: {len(output_combined)} assets")

5. Document Factor Logic

Add clear docstrings to custom factors:

class MyCustomFactor(CustomFactor):
    """Short description of what this factor measures.

    Detailed explanation:
    - What the factor measures
    - Why it's predictive
    - Expected range of values
    - Any academic paper references

    Parameters
    ----------
    inputs : list
        Description of required inputs
    window_length : int
        Description of window parameter

    Notes
    -----
    Any important implementation details or caveats.
    """
    pass

6. Handle Edge Cases

Protect against NaN, inf, and division by zero:

def compute(self, today, assets, out, prices):
    # Check for sufficient data
    if np.isnan(prices).sum() > self.window_length * 0.5:
        out[:] = np.nan
        return

    # Protect against division by zero
    denominator = prices.std(axis=0)
    out[:] = prices.mean(axis=0) / (denominator + 1e-9)

    # Handle infinities
    out[~np.isfinite(out)] = np.nan


Last Updated: 2025-10-31