Skip to content

Backtesting API Reference

The backtesting package provides the backtesting engine, transaction modeling, and performance analysis.

Overview

The backtesting package contains:

  • Engine - Core backtesting simulation engine
  • Transactions - Transaction cost modeling
  • Performance - Performance analytics and metrics
  • Models - Data models for backtesting

Backtesting Package

portfolio_management.backtesting

Backtesting framework for portfolio strategies.

This package provides historical simulation capabilities, including: - Transaction cost modeling (commissions, slippage, bid-ask spread) - Rebalancing logic (scheduled, opportunistic, forced) - Performance metrics calculation (Sharpe, Sortino, drawdown, etc.) - Portfolio evolution tracking with cash management - Point-in-time eligibility filtering to avoid look-ahead bias

BacktestEngine

Historical portfolio backtesting engine.

Simulates the performance of a portfolio strategy over a historical period, incorporating realistic constraints like transaction costs, rebalancing schedules, and point-in-time data eligibility.

The engine iterates day by day through the historical price data, tracks the portfolio's value, and triggers rebalancing events based on the configured frequency. At each rebalance, it uses the provided strategy to determine a new target portfolio and executes the necessary trades.

Workflow
  1. Initialize with configuration, strategy, and historical data.
  2. Iterate through each day in the backtest period.
  3. On each day, update the total portfolio equity value.
  4. Check if a scheduled rebalancing is due.
  5. On a rebalancing day: a. Determine the universe of eligible assets (PIT eligibility). b. Apply preselection and membership policies to get candidate assets. c. Call the portfolio strategy to get target weights. d. Calculate required trades (buys/sells). e. Compute and deduct transaction costs. f. Update cash and holdings.
  6. After the simulation, calculate final performance metrics.

Attributes:

Name Type Description
config BacktestConfig

The configuration settings for the backtest.

strategy PortfolioStrategy

The portfolio construction strategy to be tested.

prices DataFrame

DataFrame of historical prices.

returns DataFrame

DataFrame of historical returns.

classifications dict[str, str] | None

Asset class mappings for constraints.

preselection

Optional preselection filter for asset screening.

membership_policy

Optional policy to control portfolio turnover.

cache

Optional cache for factors and eligibility data to improve performance.

cost_model TransactionCostModel

The model for calculating trade costs.

holdings dict[str, int]

The current number of shares held for each asset.

cash Decimal

The current cash balance in the portfolio.

rebalance_events list[RebalanceEvent]

A log of all rebalancing events.

equity_curve list[tuple[date, float]]

A daily log of portfolio equity.

Example

from portfolio_management.backtesting.models import BacktestConfig from portfolio_management.portfolio.strategy import EqualWeightStrategy from portfolio_management.utils.testing import create_dummy_data

start_date = datetime.date(2022, 1, 1) end_date = datetime.date(2023, 12, 31) prices, returns = create_dummy_data(['AAPL', 'MSFT'], start_date, end_date)

config = BacktestConfig( ... start_date=start_date, ... end_date=end_date, ... initial_capital=Decimal("100000.00"), ... rebalance_frequency=RebalanceFrequency.QUARTERLY, ... commission_pct=Decimal("0.001") ... ) strategy = EqualWeightStrategy(min_history_periods=60)

engine = BacktestEngine(config, strategy, prices, returns) equity_curve, metrics, events = engine.run()

print(f"Backtest finished with {len(events)} rebalances.") print(f"Final portfolio value: ${metrics.final_value:,.2f}") print(f"Annualized Return: {metrics.annualized_return:.2%}") print(f"Sharpe Ratio: {metrics.sharpe_ratio:.2f}")

Source code in src/portfolio_management/backtesting/engine/backtest.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
class BacktestEngine:
    """Historical portfolio backtesting engine.

    Simulates the performance of a portfolio strategy over a historical period,
    incorporating realistic constraints like transaction costs, rebalancing
    schedules, and point-in-time data eligibility.

    The engine iterates day by day through the historical price data, tracks the
    portfolio's value, and triggers rebalancing events based on the configured
    frequency. At each rebalance, it uses the provided strategy to determine a
    new target portfolio and executes the necessary trades.

    Workflow:
        1. Initialize with configuration, strategy, and historical data.
        2. Iterate through each day in the backtest period.
        3. On each day, update the total portfolio equity value.
        4. Check if a scheduled rebalancing is due.
        5. On a rebalancing day:
           a. Determine the universe of eligible assets (PIT eligibility).
           b. Apply preselection and membership policies to get candidate assets.
           c. Call the portfolio strategy to get target weights.
           d. Calculate required trades (buys/sells).
           e. Compute and deduct transaction costs.
           f. Update cash and holdings.
        6. After the simulation, calculate final performance metrics.

    Attributes:
        config (BacktestConfig): The configuration settings for the backtest.
        strategy (PortfolioStrategy): The portfolio construction strategy to be tested.
        prices (pd.DataFrame): DataFrame of historical prices.
        returns (pd.DataFrame): DataFrame of historical returns.
        classifications (dict[str, str] | None): Asset class mappings for constraints.
        preselection: Optional preselection filter for asset screening.
        membership_policy: Optional policy to control portfolio turnover.
        cache: Optional cache for factors and eligibility data to improve performance.
        cost_model (TransactionCostModel): The model for calculating trade costs.
        holdings (dict[str, int]): The current number of shares held for each asset.
        cash (Decimal): The current cash balance in the portfolio.
        rebalance_events (list[RebalanceEvent]): A log of all rebalancing events.
        equity_curve (list[tuple[datetime.date, float]]): A daily log of portfolio equity.

    Example:
        >>> from portfolio_management.backtesting.models import BacktestConfig
        >>> from portfolio_management.portfolio.strategy import EqualWeightStrategy
        >>> from portfolio_management.utils.testing import create_dummy_data
        >>>
        >>> start_date = datetime.date(2022, 1, 1)
        >>> end_date = datetime.date(2023, 12, 31)
        >>> prices, returns = create_dummy_data(['AAPL', 'MSFT'], start_date, end_date)
        >>>
        >>> config = BacktestConfig(
        ...     start_date=start_date,
        ...     end_date=end_date,
        ...     initial_capital=Decimal("100000.00"),
        ...     rebalance_frequency=RebalanceFrequency.QUARTERLY,
        ...     commission_pct=Decimal("0.001")
        ... )
        >>> strategy = EqualWeightStrategy(min_history_periods=60)
        >>>
        >>> engine = BacktestEngine(config, strategy, prices, returns)
        >>> equity_curve, metrics, events = engine.run()
        >>>
        >>> print(f"Backtest finished with {len(events)} rebalances.")
        >>> print(f"Final portfolio value: ${metrics.final_value:,.2f}")
        >>> print(f"Annualized Return: {metrics.annualized_return:.2%}")
        >>> print(f"Sharpe Ratio: {metrics.sharpe_ratio:.2f}")

    """

    def __init__(
        self,
        config: BacktestConfig,
        strategy: PortfolioStrategy,
        prices: pd.DataFrame,
        returns: pd.DataFrame,
        classifications: dict[str, str] | None = None,
        preselection: Preselection | None = None,
        membership_policy: MembershipPolicy | None = None,
        cache: FactorCache | None = None,
    ) -> None:
        """Initialize the backtesting engine.

        Args:
            config: Backtest configuration.
            strategy: Portfolio construction strategy to use.
            prices: Historical prices (index=dates, columns=tickers).
            returns: Historical returns (index=dates, columns=tickers).
            classifications: Optional asset class mappings for constraints.
            preselection: Optional Preselection instance for asset filtering.
            membership_policy: Optional MembershipPolicy for controlling portfolio churn.
            cache: Optional FactorCache instance for caching factor scores and PIT eligibility.

        Raises:
            InsufficientDataError: If data doesn't cover the backtest period.

        """
        self.config = config
        self.strategy = strategy
        self.prices = prices.copy()
        self.returns = returns.copy()
        self.classifications = classifications or {}
        self.preselection = preselection
        self.membership_policy = membership_policy
        self.cache = cache
        self.holding_periods: dict[
            str,
            int,
        ] = {}  # Track holding periods for membership policy

        # Validate date coverage
        data_start = self.prices.index.min()
        data_end = self.prices.index.max()

        if pd.isna(data_start) or pd.isna(data_end):
            raise InsufficientDataError(
                required_periods=0,
                available_periods=0,
            )

        data_start_date = data_start.date()
        data_end_date = data_end.date()

        if data_start_date > config.start_date or data_end_date < config.end_date:
            raise InsufficientDataError(
                required_periods=0,
                available_periods=0,
            )

        # Initialize transaction cost model
        self.cost_model = TransactionCostModel(
            commission_pct=config.commission_pct,
            commission_min=config.commission_min,
            slippage_bps=config.slippage_bps,
        )

        # Tracking state
        self.holdings: dict[str, int] = {}  # Current share counts
        self.cash: Decimal = config.initial_capital
        self.rebalance_events: list[RebalanceEvent] = []
        self.equity_curve: list[tuple[datetime.date, float]] = []
        # Track delisted assets
        self.delisted_assets: dict[str, datetime.date] = {}

    def run(self) -> tuple[pd.DataFrame, PerformanceMetrics, list[RebalanceEvent]]:
        """Execute the backtest simulation.

        This is the main entry point to start the backtest. It iterates through
        the specified time period, manages the portfolio, and calculates results.

        Returns:
            A tuple containing:
            - pd.DataFrame: The daily equity curve of the portfolio.
            - PerformanceMetrics: A summary of key performance indicators.
            - list[RebalanceEvent]: A detailed log of all rebalancing events.

        Raises:
            InsufficientHistoryError: If the provided data does not cover the
                configured backtest period.
            RebalanceError: If a fatal error occurs during a rebalancing attempt.

        """
        # Filter data to backtest period
        # Convert index to dates for comparison
        price_dates = pd.to_datetime(self.prices.index).date
        mask = (price_dates >= self.config.start_date) & (
            price_dates <= self.config.end_date
        )
        period_prices = self.prices.loc[mask].copy()
        period_returns = self.returns.loc[mask].copy()

        if len(period_prices) == 0:
            raise InsufficientDataError(
                required_periods=0,
                available_periods=0,
            )

        # Simulate each trading day
        for i in range(len(period_prices)):
            date_idx = period_prices.index[i]
            date = date_idx.date()
            prices_row = period_prices.iloc[i]

            # Calculate current portfolio value
            portfolio_value = self._calculate_portfolio_value(prices_row)
            self.equity_curve.append((date, float(portfolio_value)))

            has_min_history = (i + 1) >= self.strategy.min_history_periods

            # Only create lookback slices when actually rebalancing
            should_rebalance_forced = not self.rebalance_events and has_min_history
            should_rebalance_scheduled = (
                has_min_history and self._should_rebalance_scheduled(date)
            )

            if should_rebalance_forced or should_rebalance_scheduled:
                # Create lookback window only when needed
                # Use rolling window for parameter estimation (standard practice in quant finance)
                lookback_window = min(self.config.lookback_periods, i + 1)
                start_idx = max(0, i + 1 - lookback_window)
                lookback_returns = period_returns.iloc[start_idx : i + 1]
                lookback_prices = period_prices.iloc[start_idx : i + 1]

                trigger = (
                    RebalanceTrigger.FORCED
                    if should_rebalance_forced
                    else RebalanceTrigger.SCHEDULED
                )
                self._rebalance(
                    date,
                    lookback_returns,
                    lookback_prices,
                    trigger,
                )
                if should_rebalance_forced:
                    continue

        # Calculate performance metrics
        equity_df = pd.DataFrame(
            self.equity_curve,
            columns=["date", "equity"],
        ).set_index("date")
        metrics = calculate_metrics(equity_df, self.rebalance_events)

        return equity_df, metrics, self.rebalance_events

    def _calculate_portfolio_value(self, prices: pd.Series) -> Decimal:
        """Calculate total portfolio value at current prices."""
        holdings_value = Decimal(0)
        for ticker, shares in self.holdings.items():
            if ticker in prices.index and not pd.isna(prices[ticker]):
                price = Decimal(str(float(prices[ticker])))
                holdings_value += Decimal(str(shares)) * price
        return holdings_value + self.cash

    def _should_rebalance_scheduled(self, date: datetime.date) -> bool:
        """Check if scheduled rebalancing is due."""
        if not self.rebalance_events:
            return False

        last_rebalance = self.rebalance_events[-1].date
        freq = self.config.rebalance_frequency

        if freq == RebalanceFrequency.DAILY:
            return (date - last_rebalance).days >= 1
        if freq == RebalanceFrequency.WEEKLY:
            return (date - last_rebalance).days >= 7
        if freq == RebalanceFrequency.MONTHLY:
            return (
                date.month != last_rebalance.month or date.year != last_rebalance.year
            )
        if freq == RebalanceFrequency.QUARTERLY:
            months_diff = (date.year - last_rebalance.year) * 12 + (
                date.month - last_rebalance.month
            )
            return months_diff >= 3
        if freq == RebalanceFrequency.ANNUAL:
            return date.year != last_rebalance.year
        raise ValueError(f"Unsupported rebalance frequency: {freq!r}")

    def _rebalance(
        self,
        date: datetime.date,
        historical_returns: pd.DataFrame,
        historical_prices: pd.DataFrame,
        trigger: RebalanceTrigger,
    ) -> None:
        """Execute a portfolio rebalancing.

        Args:
            date: Rebalancing date.
            historical_returns: Historical returns up to this point.
            historical_prices: Historical prices up to this point.
            trigger: What triggered this rebalance.

        Raises:
            RebalanceError: If rebalancing fails.

        """
        try:
            # Get current prices for this date (last row of prices)
            date_prices = (
                historical_prices.iloc[-1]
                if len(historical_prices) > 0
                else pd.Series()
            )

            # Calculate current portfolio value
            pre_value = self._calculate_portfolio_value(date_prices)

            # Import here to avoid circular dependency
            from portfolio_management.portfolio import PortfolioConstraints

            # Get target weights from strategy using historical returns
            if len(historical_returns) < self.strategy.min_history_periods:
                # Not enough history yet, skip rebalance
                return

            # Apply point-in-time eligibility mask if enabled
            eligible_returns = historical_returns
            if self.config.use_pit_eligibility:
                # Compute eligibility based only on data up to this date
                # Use cached version if cache is available
                if self.cache is not None:
                    eligibility_mask = compute_pit_eligibility_cached(
                        returns=historical_returns,
                        date=date,
                        min_history_days=self.config.min_history_days,
                        min_price_rows=self.config.min_price_rows,
                        cache=self.cache,
                    )
                else:
                    eligibility_mask = compute_pit_eligibility(
                        returns=historical_returns,
                        date=date,
                        min_history_days=self.config.min_history_days,
                        min_price_rows=self.config.min_price_rows,
                    )

                # Filter to only eligible assets
                eligible_tickers = historical_returns.columns[eligibility_mask]
                if len(eligible_tickers) == 0:
                    # No eligible assets yet, skip rebalance
                    return

                eligible_returns = historical_returns[eligible_tickers]

                # Detect delistings - assets that have stopped trading
                delistings = detect_delistings(
                    returns=self.returns,
                    current_date=date,
                    lookforward_days=30,
                )

                # Liquidate holdings in delisted assets
                for ticker, last_date in delistings.items():
                    if ticker in self.holdings and self.holdings[ticker] > 0:
                        # Mark as delisted
                        self.delisted_assets[ticker] = last_date

                        # Liquidate at last available price if we have it
                        if ticker in date_prices.index and not pd.isna(
                            date_prices[ticker],
                        ):
                            shares = self.holdings[ticker]
                            price = float(date_prices[ticker])

                            if price > 0:
                                # Calculate cost for selling
                                cost = self.cost_model.calculate_cost(
                                    ticker,
                                    shares,
                                    price,
                                    is_buy=False,
                                )

                                # Sell and add proceeds to cash
                                sale_value = Decimal(str(shares * price))
                                self.cash += sale_value
                                self.cash -= cost

                                # Remove from holdings
                                del self.holdings[ticker]

            constraints = PortfolioConstraints(
                max_weight=1.0,  # Allow any weight for single asset
                min_weight=0.0,
                max_equity_exposure=1.0,  # Allow full equity exposure
                min_bond_exposure=0.0,  # No minimum bond requirement
            )

            # Build asset class series if we have classifications
            asset_classes = None
            if self.classifications:
                # Filter to only eligible assets
                asset_classes = pd.Series(self.classifications)
                if self.config.use_pit_eligibility:
                    asset_classes = asset_classes[
                        asset_classes.index.isin(eligible_returns.columns)
                    ]

            # Apply preselection if configured
            candidate_assets = list(eligible_returns.columns)
            preselected_ranks: pd.Series | None = None
            membership_top_k = len(candidate_assets)
            if self.preselection is not None:
                # Preselect assets based on factors (momentum, low_vol, etc.)
                # Pass full self.returns dataset; preselection will filter by rebalance_date
                # Then intersect selected assets with eligible assets
                selected_assets = self.preselection.select_assets(
                    returns=self.returns,
                    rebalance_date=date,
                )
                # Only keep selected assets that are also eligible
                selected_assets = [
                    a for a in selected_assets if a in eligible_returns.columns
                ]
                candidate_assets = selected_assets
                membership_top_k = len(selected_assets)

                # Get ranks for membership policy (if needed)
                if (
                    self.membership_policy is not None
                    and self.membership_policy.enabled
                ):
                    if isinstance(eligible_returns.index, pd.DatetimeIndex):
                        date_mask = eligible_returns.index.date < date
                    else:
                        date_mask = eligible_returns.index < date
                    available_returns = eligible_returns.loc[date_mask]

                    if self.preselection.config.method.value == "momentum":
                        scores = self.preselection._compute_momentum(available_returns)
                    elif self.preselection.config.method.value == "low_vol":
                        scores = self.preselection._compute_low_volatility(
                            available_returns,
                        )
                    elif self.preselection.config.method.value == "combined":
                        scores = self.preselection._compute_combined(available_returns)
                    else:
                        scores = pd.Series(
                            range(len(available_returns.columns)),
                            index=available_returns.columns,
                        )

                    valid_scores = scores.dropna()
                    sorted_scores = valid_scores.sort_values(ascending=False)
                    preselected_ranks = pd.Series(
                        range(1, len(sorted_scores) + 1),
                        index=sorted_scores.index,
                    )

            # Apply membership policy if configured
            if self.membership_policy is not None and self.membership_policy.enabled:
                from portfolio_management.portfolio import apply_membership_policy

                current_holdings = list(self.holdings.keys())

                # If preselection not used, create simple rank by name for determinism
                if preselected_ranks is None:
                    # Simple ranking: alphabetical order
                    preselected_ranks = pd.Series(
                        range(1, len(candidate_assets) + 1),
                        index=sorted(candidate_assets),
                    )
                else:
                    # Ensure current holdings have ranks even if not in preselection results
                    missing_holdings = sorted(
                        set(current_holdings) - set(preselected_ranks.index),
                    )
                    if missing_holdings:
                        worst_rank = (
                            int(preselected_ranks.max())
                            if not preselected_ranks.empty
                            else 0
                        )
                        additional_ranks = pd.Series(
                            range(
                                worst_rank + 1,
                                worst_rank + len(missing_holdings) + 1,
                            ),
                            index=missing_holdings,
                        )
                        preselected_ranks = pd.concat(
                            [preselected_ranks, additional_ranks],
                        )

                current_weight_map = dict.fromkeys(current_holdings, 0.0)
                candidate_weight_map = dict.fromkeys(
                    set(candidate_assets) | set(current_holdings),
                    0.0,
                )

                # Apply membership policy
                assert preselected_ranks is not None
                final_candidates = apply_membership_policy(
                    current_holdings=current_holdings,
                    preselected_ranks=cast("pd.Series", preselected_ranks),
                    policy=self.membership_policy,
                    holding_periods=self.holding_periods,
                    top_k=membership_top_k,
                    current_weights=current_weight_map,
                    candidate_weights=candidate_weight_map,
                )

                candidate_assets = [
                    c for c in final_candidates if c in eligible_returns.columns
                ]

            # Filter to final candidates determined by preselection/membership
            eligible_returns = eligible_returns[candidate_assets]

            if asset_classes is not None:
                asset_classes = asset_classes[
                    asset_classes.index.isin(candidate_assets)
                ]

            # Construct target portfolio (on selected subset)
            portfolio = self.strategy.construct(
                returns=eligible_returns,
                constraints=constraints,
                asset_classes=asset_classes,
            )
            target_weights = portfolio.weights

            # Calculate investable cash (keeping reserve)
            total_target_value = float(pre_value) * (1 - self.config.cash_reserve_pct)

            # Calculate target shares for each asset
            target_shares: dict[str, int] = {}
            for ticker in target_weights.index:
                if ticker not in date_prices.index or pd.isna(date_prices[ticker]):
                    continue
                # Access series element properly
                ticker_price = date_prices.loc[ticker]
                price = float(ticker_price)
                if price <= 0:
                    continue
                ticker_weight = target_weights.loc[ticker]
                target_value = total_target_value * float(ticker_weight)
                target_shares[ticker] = int(target_value / price)

            # Calculate trades needed
            trades: dict[str, int] = {}
            all_tickers = set(target_shares.keys()) | set(self.holdings.keys())

            for ticker in all_tickers:
                current = self.holdings.get(ticker, 0)
                target = target_shares.get(ticker, 0)
                if current != target:
                    trades[ticker] = target - current

            # Calculate transaction costs
            total_cost = Decimal(0)
            for ticker, share_change in trades.items():
                if share_change == 0:
                    continue
                if ticker not in date_prices.index or pd.isna(date_prices[ticker]):
                    continue
                price = float(date_prices[ticker])
                if price <= 0:
                    continue

                cost = self.cost_model.calculate_cost(
                    ticker,
                    abs(share_change),
                    price,
                    share_change > 0,
                )
                total_cost += cost

            # Check if we have enough cash for buys + costs
            total_buys = Decimal(0)
            for ticker, share_change in trades.items():
                if share_change > 0:  # Buy
                    if ticker not in date_prices.index or pd.isna(date_prices[ticker]):
                        continue
                    price_decimal = Decimal(str(float(date_prices[ticker])))
                    total_buys += Decimal(share_change) * price_decimal

            if total_buys + total_cost > self.cash:
                # Scale back trades to fit cash constraints
                scale_factor = float(self.cash * Decimal("0.95")) / float(
                    total_buys + total_cost,
                )
                trades = {
                    ticker: int(shares * scale_factor)
                    for ticker, shares in trades.items()
                }

            # Execute trades
            for ticker, share_change in trades.items():
                if share_change == 0:
                    continue
                if ticker not in date_prices.index or pd.isna(date_prices[ticker]):
                    continue
                price = float(date_prices[ticker])
                if price <= 0:
                    continue

                # Calculate cost for this trade
                cost = self.cost_model.calculate_cost(
                    ticker,
                    abs(share_change),
                    price,
                    share_change > 0,
                )

                trade_value = Decimal(str(abs(share_change) * price))

                if share_change > 0:  # Buy
                    self.cash -= trade_value
                    self.cash -= cost
                else:  # Sell
                    self.cash += trade_value
                    self.cash -= cost

                # Update holdings
                self.holdings[ticker] = self.holdings.get(ticker, 0) + share_change

            # Remove zero positions
            removed_tickers = [t for t, s in self.holdings.items() if s == 0]
            self.holdings = {t: s for t, s in self.holdings.items() if s != 0}

            # Update holding periods for membership policy
            if self.membership_policy is not None and self.membership_policy.enabled:
                # Increment holding periods for all current holdings
                for ticker in self.holdings:
                    self.holding_periods[ticker] = (
                        self.holding_periods.get(ticker, 0) + 1
                    )

                # Reset holding periods for removed positions
                for ticker in removed_tickers:
                    if ticker in self.holding_periods:
                        del self.holding_periods[ticker]

            # Calculate post-rebalance value
            post_value = self._calculate_portfolio_value(date_prices)

            # Record event
            event = RebalanceEvent(
                date=date,
                trigger=trigger,
                trades=trades,
                costs=total_cost,
                pre_rebalance_value=pre_value,
                post_rebalance_value=post_value,
                cash_before=pre_value - self._calculate_holdings_value(date_prices),
                cash_after=self.cash,
            )
            self.rebalance_events.append(event)

        except Exception as e:
            raise RebalanceError(date, f"Rebalancing failed: {e}") from e

    def _calculate_holdings_value(self, prices: pd.Series) -> Decimal:
        """Calculate value of current holdings only (excluding cash)."""
        holdings_value = Decimal(0)
        for ticker, shares in self.holdings.items():
            if ticker in prices.index and not pd.isna(prices[ticker]):
                price = Decimal(str(float(prices[ticker])))
                holdings_value += Decimal(str(shares)) * price
        return holdings_value

run()

Execute the backtest simulation.

This is the main entry point to start the backtest. It iterates through the specified time period, manages the portfolio, and calculates results.

Returns:

Type Description
DataFrame

A tuple containing:

PerformanceMetrics
  • pd.DataFrame: The daily equity curve of the portfolio.
list[RebalanceEvent]
  • PerformanceMetrics: A summary of key performance indicators.
tuple[DataFrame, PerformanceMetrics, list[RebalanceEvent]]
  • list[RebalanceEvent]: A detailed log of all rebalancing events.

Raises:

Type Description
InsufficientHistoryError

If the provided data does not cover the configured backtest period.

RebalanceError

If a fatal error occurs during a rebalancing attempt.

Source code in src/portfolio_management/backtesting/engine/backtest.py
def run(self) -> tuple[pd.DataFrame, PerformanceMetrics, list[RebalanceEvent]]:
    """Execute the backtest simulation.

    This is the main entry point to start the backtest. It iterates through
    the specified time period, manages the portfolio, and calculates results.

    Returns:
        A tuple containing:
        - pd.DataFrame: The daily equity curve of the portfolio.
        - PerformanceMetrics: A summary of key performance indicators.
        - list[RebalanceEvent]: A detailed log of all rebalancing events.

    Raises:
        InsufficientHistoryError: If the provided data does not cover the
            configured backtest period.
        RebalanceError: If a fatal error occurs during a rebalancing attempt.

    """
    # Filter data to backtest period
    # Convert index to dates for comparison
    price_dates = pd.to_datetime(self.prices.index).date
    mask = (price_dates >= self.config.start_date) & (
        price_dates <= self.config.end_date
    )
    period_prices = self.prices.loc[mask].copy()
    period_returns = self.returns.loc[mask].copy()

    if len(period_prices) == 0:
        raise InsufficientDataError(
            required_periods=0,
            available_periods=0,
        )

    # Simulate each trading day
    for i in range(len(period_prices)):
        date_idx = period_prices.index[i]
        date = date_idx.date()
        prices_row = period_prices.iloc[i]

        # Calculate current portfolio value
        portfolio_value = self._calculate_portfolio_value(prices_row)
        self.equity_curve.append((date, float(portfolio_value)))

        has_min_history = (i + 1) >= self.strategy.min_history_periods

        # Only create lookback slices when actually rebalancing
        should_rebalance_forced = not self.rebalance_events and has_min_history
        should_rebalance_scheduled = (
            has_min_history and self._should_rebalance_scheduled(date)
        )

        if should_rebalance_forced or should_rebalance_scheduled:
            # Create lookback window only when needed
            # Use rolling window for parameter estimation (standard practice in quant finance)
            lookback_window = min(self.config.lookback_periods, i + 1)
            start_idx = max(0, i + 1 - lookback_window)
            lookback_returns = period_returns.iloc[start_idx : i + 1]
            lookback_prices = period_prices.iloc[start_idx : i + 1]

            trigger = (
                RebalanceTrigger.FORCED
                if should_rebalance_forced
                else RebalanceTrigger.SCHEDULED
            )
            self._rebalance(
                date,
                lookback_returns,
                lookback_prices,
                trigger,
            )
            if should_rebalance_forced:
                continue

    # Calculate performance metrics
    equity_df = pd.DataFrame(
        self.equity_curve,
        columns=["date", "equity"],
    ).set_index("date")
    metrics = calculate_metrics(equity_df, self.rebalance_events)

    return equity_df, metrics, self.rebalance_events

BacktestConfig dataclass

Configuration for a backtest run.

This dataclass holds all the parameters needed to define a backtest simulation. It is immutable to ensure that the configuration cannot be changed during a run.

Attributes:

Name Type Description
start_date date

The first date of the backtest period.

end_date date

The last date of the backtest period.

initial_capital Decimal

The starting portfolio value.

rebalance_frequency RebalanceFrequency

How often to rebalance.

rebalance_threshold float

The weight drift threshold for opportunistic rebalancing.

commission_pct float

Commission as a percentage of trade value.

commission_min float

Minimum commission fee per trade.

slippage_bps float

Slippage cost in basis points.

cash_reserve_pct float

The minimum percentage of the portfolio to hold as cash.

lookback_periods int

The rolling window size for parameter estimation (e.g., returns).

use_pit_eligibility bool

If True, enables point-in-time eligibility filtering.

min_history_days int

The minimum calendar days of history for PIT eligibility.

min_price_rows int

The minimum number of price observations for PIT eligibility.

Source code in src/portfolio_management/backtesting/models.py
@dataclass(frozen=True)
class BacktestConfig:
    """Configuration for a backtest run.

    This dataclass holds all the parameters needed to define a backtest simulation.
    It is immutable to ensure that the configuration cannot be changed during a run.

    Attributes:
        start_date (datetime.date): The first date of the backtest period.
        end_date (datetime.date): The last date of the backtest period.
        initial_capital (Decimal): The starting portfolio value.
        rebalance_frequency (RebalanceFrequency): How often to rebalance.
        rebalance_threshold (float): The weight drift threshold for opportunistic rebalancing.
        commission_pct (float): Commission as a percentage of trade value.
        commission_min (float): Minimum commission fee per trade.
        slippage_bps (float): Slippage cost in basis points.
        cash_reserve_pct (float): The minimum percentage of the portfolio to hold as cash.
        lookback_periods (int): The rolling window size for parameter estimation (e.g., returns).
        use_pit_eligibility (bool): If True, enables point-in-time eligibility filtering.
        min_history_days (int): The minimum calendar days of history for PIT eligibility.
        min_price_rows (int): The minimum number of price observations for PIT eligibility.

    """

    start_date: datetime.date
    end_date: datetime.date
    initial_capital: Decimal = Decimal("100000.00")
    rebalance_frequency: RebalanceFrequency = RebalanceFrequency.MONTHLY
    rebalance_threshold: float = 0.20  # ±20% drift
    commission_pct: float = 0.001  # 0.1%
    commission_min: float = 0.0
    slippage_bps: float = 5.0  # 5 bps
    cash_reserve_pct: float = 0.01  # 1%
    lookback_periods: int = (
        252  # Rolling window for parameter estimation (252 = 1 year)
    )
    use_pit_eligibility: bool = False  # Enable point-in-time eligibility filtering
    min_history_days: int = 252  # Minimum days for eligibility (1 year)
    min_price_rows: int = 252  # Minimum price rows for eligibility

    def __post_init__(self) -> None:
        """Validate configuration values after initialization."""
        if self.start_date >= self.end_date:
            raise ConfigurationError(
                None,
                f"start_date must be before end_date ({self.end_date})",
            )
        if self.initial_capital <= 0:
            raise ConfigurationError(None, "initial_capital must be positive")

        if not 0 <= self.rebalance_threshold <= 1:
            raise ConfigurationError(
                None,
                "rebalance_threshold must be between 0 and 1",
            )
        if self.commission_pct < 0:
            raise ConfigurationError(None, "commission_pct cannot be negative")

        if self.slippage_bps < 0:
            raise ConfigurationError(None, "slippage_bps cannot be negative")

        if not 0 <= self.cash_reserve_pct < 1:
            raise ConfigurationError(None, "cash_reserve_pct must be between 0 and 1")

        if self.min_history_days <= 0:
            raise ConfigurationError(None, "min_history_days must be positive")

        if self.min_price_rows <= 0:
            raise ConfigurationError(None, "min_price_rows must be positive")

PerformanceMetrics dataclass

A container for the performance metrics of a backtest run.

This dataclass holds all the key statistics calculated from a backtest's equity curve, providing a comprehensive summary of the strategy's performance and risk characteristics.

Attributes:

Name Type Description
total_return float

The cumulative return over the entire backtest period.

annualized_return float

The annualized geometric mean return (CAGR).

annualized_volatility float

The annualized standard deviation of daily returns.

sharpe_ratio float

The risk-adjusted return (assumes a 0% risk-free rate).

sortino_ratio float

The downside risk-adjusted return.

max_drawdown float

The largest peak-to-trough decline in portfolio value.

calmar_ratio float

The annualized return divided by the max drawdown.

expected_shortfall_95 float

The average loss on the worst 5% of days (CVaR).

win_rate float

The percentage of days with positive returns.

avg_win float

The average return on days with positive returns.

avg_loss float

The average return on days with negative returns.

turnover float

The average portfolio turnover per rebalancing period.

total_costs Decimal

The sum of all transaction costs incurred.

num_rebalances int

The total number of rebalancing events.

Source code in src/portfolio_management/backtesting/models.py
@dataclass
class PerformanceMetrics:
    """A container for the performance metrics of a backtest run.

    This dataclass holds all the key statistics calculated from a backtest's
    equity curve, providing a comprehensive summary of the strategy's performance
    and risk characteristics.

    Attributes:
        total_return (float): The cumulative return over the entire backtest period.
        annualized_return (float): The annualized geometric mean return (CAGR).
        annualized_volatility (float): The annualized standard deviation of daily returns.
        sharpe_ratio (float): The risk-adjusted return (assumes a 0% risk-free rate).
        sortino_ratio (float): The downside risk-adjusted return.
        max_drawdown (float): The largest peak-to-trough decline in portfolio value.
        calmar_ratio (float): The annualized return divided by the max drawdown.
        expected_shortfall_95 (float): The average loss on the worst 5% of days (CVaR).
        win_rate (float): The percentage of days with positive returns.
        avg_win (float): The average return on days with positive returns.
        avg_loss (float): The average return on days with negative returns.
        turnover (float): The average portfolio turnover per rebalancing period.
        total_costs (Decimal): The sum of all transaction costs incurred.
        num_rebalances (int): The total number of rebalancing events.

    """

    total_return: float
    annualized_return: float
    annualized_volatility: float
    sharpe_ratio: float
    sortino_ratio: float
    max_drawdown: float
    calmar_ratio: float
    expected_shortfall_95: float
    win_rate: float
    avg_win: float
    avg_loss: float
    turnover: float
    total_costs: Decimal
    num_rebalances: int
    final_value: Decimal = Decimal("0.0")

RebalanceEvent dataclass

A detailed record of a single portfolio rebalancing event.

This dataclass captures the state of the portfolio immediately before and after a rebalance, along with details of the trades executed and costs incurred.

Attributes:

Name Type Description
date date

The date on which the rebalance occurred.

trigger RebalanceTrigger

The reason for the rebalance (e.g., scheduled, forced).

trades dict[str, int]

A mapping of asset tickers to the number of shares traded. Positive values are buys, negative values are sells.

costs Decimal

The total transaction costs (commission + slippage) for the event.

pre_rebalance_value Decimal

The total portfolio value before rebalancing.

post_rebalance_value Decimal

The total portfolio value after rebalancing.

cash_before Decimal

The cash balance before the rebalance.

cash_after Decimal

The cash balance after executing trades and paying costs.

Source code in src/portfolio_management/backtesting/models.py
@dataclass
class RebalanceEvent:
    """A detailed record of a single portfolio rebalancing event.

    This dataclass captures the state of the portfolio immediately before and
    after a rebalance, along with details of the trades executed and costs incurred.

    Attributes:
        date (datetime.date): The date on which the rebalance occurred.
        trigger (RebalanceTrigger): The reason for the rebalance (e.g., scheduled, forced).
        trades (dict[str, int]): A mapping of asset tickers to the number of shares
            traded. Positive values are buys, negative values are sells.
        costs (Decimal): The total transaction costs (commission + slippage) for the event.
        pre_rebalance_value (Decimal): The total portfolio value before rebalancing.
        post_rebalance_value (Decimal): The total portfolio value after rebalancing.
        cash_before (Decimal): The cash balance before the rebalance.
        cash_after (Decimal): The cash balance after executing trades and paying costs.

    """

    date: datetime.date
    trigger: RebalanceTrigger
    trades: dict[str, int]
    costs: Decimal
    pre_rebalance_value: Decimal
    post_rebalance_value: Decimal
    cash_before: Decimal
    cash_after: Decimal

RebalanceFrequency

Bases: Enum

Enumeration for supported rebalancing frequencies.

Source code in src/portfolio_management/backtesting/models.py
class RebalanceFrequency(Enum):
    """Enumeration for supported rebalancing frequencies."""

    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"
    QUARTERLY = "quarterly"
    ANNUAL = "annual"

RebalanceTrigger

Bases: Enum

Enumeration for the cause of a rebalance event.

Source code in src/portfolio_management/backtesting/models.py
class RebalanceTrigger(Enum):
    """Enumeration for the cause of a rebalance event."""

    SCHEDULED = "scheduled"  # Calendar-based (e.g., monthly)
    OPPORTUNISTIC = "opportunistic"  # Threshold-based (e.g., weight drift)
    FORCED = "forced"  # Manual override or initial portfolio setup

TransactionCostModel dataclass

Model for calculating realistic transaction costs.

This class combines multiple cost components (commission, slippage) to provide a total cost for a given trade.

Attributes:

Name Type Description
commission_pct float

The commission charged as a percentage of the total trade value. E.g., 0.001 for 0.1%.

commission_min float

The minimum flat fee for a commission. The actual commission will be max(trade_value * commission_pct, commission_min).

slippage_bps float

The estimated slippage cost in basis points (1/100th of a percent). E.g., 5.0 bps means a cost of 0.05% of the trade value.

Example

model = TransactionCostModel(commission_pct=0.001, slippage_bps=10) cost = model.calculate_cost("MSFT", shares=50, price=300.0, is_buy=True)

Commission = 50 * 300 * 0.001 = 15.0

Slippage = 50 * 300 * (10 / 10000) = 15.0

Total = 15.0 + 15.0 = 30.0

print(cost) 30.00

Source code in src/portfolio_management/backtesting/transactions/costs.py
@dataclass
class TransactionCostModel:
    """Model for calculating realistic transaction costs.

    This class combines multiple cost components (commission, slippage) to
    provide a total cost for a given trade.

    Attributes:
        commission_pct (float): The commission charged as a percentage of the
            total trade value. E.g., 0.001 for 0.1%.
        commission_min (float): The minimum flat fee for a commission. The actual
            commission will be `max(trade_value * commission_pct, commission_min)`.
        slippage_bps (float): The estimated slippage cost in basis points (1/100th
            of a percent). E.g., 5.0 bps means a cost of 0.05% of the trade value.

    Example:
        >>> model = TransactionCostModel(commission_pct=0.001, slippage_bps=10)
        >>> cost = model.calculate_cost("MSFT", shares=50, price=300.0, is_buy=True)
        >>> # Commission = 50 * 300 * 0.001 = 15.0
        >>> # Slippage = 50 * 300 * (10 / 10000) = 15.0
        >>> # Total = 15.0 + 15.0 = 30.0
        >>> print(cost)
        30.00

    """

    commission_pct: float = 0.001  # 0.1%
    commission_min: float = 0.0
    slippage_bps: float = 5.0  # 5 bps

    def calculate_cost(
        self,
        ticker: str,
        shares: int,
        price: float,
        is_buy: bool,
    ) -> Decimal:
        """Calculate the total transaction cost for a single trade.

        The total cost is the sum of the commission and slippage.

        Args:
            ticker (str): The symbol of the asset being traded.
            shares (int): The absolute number of shares being traded.
            price (float): The execution price per share.
            is_buy (bool): True if the trade is a buy, False for a sell.

        Returns:
            Decimal: The total calculated cost for the trade, always positive.

        Raises:
            DataValidationError: If input `shares` is negative or `price` is
                non-positive.

        """
        if shares < 0:
            raise DataValidationError(f"Shares must be non-negative, got {shares}")
        if price <= 0:
            raise DataValidationError(f"Price must be positive, got {price}")

        # Calculate base trade value
        trade_value = abs(shares) * price

        # Commission (max of percentage or minimum)
        commission = max(
            trade_value * self.commission_pct,
            self.commission_min if shares > 0 else 0.0,
        )

        # Slippage (always a cost, regardless of direction)
        slippage = trade_value * (self.slippage_bps / 10000.0)

        total_cost = commission + slippage
        return Decimal(str(round(total_cost, 2)))

    def calculate_batch_cost(
        self,
        trades: dict[str, tuple[int, float]],
    ) -> dict[str, Decimal]:
        """Calculate costs for a batch of multiple trades.

        Args:
            trades (dict[str, tuple[int, float]]): A dictionary mapping a ticker
                to a tuple of (shares, price). A positive number of shares
                indicates a buy, and a negative number indicates a sell.

        Returns:
            dict[str, Decimal]: A dictionary mapping each ticker to its
            calculated transaction cost.

        """
        costs = {}
        for ticker, (shares, price) in trades.items():
            if shares == 0:
                costs[ticker] = Decimal("0.00")
                continue
            is_buy = shares > 0
            costs[ticker] = self.calculate_cost(ticker, abs(shares), price, is_buy)
        return costs

calculate_cost(ticker, shares, price, is_buy)

Calculate the total transaction cost for a single trade.

The total cost is the sum of the commission and slippage.

Parameters:

Name Type Description Default
ticker str

The symbol of the asset being traded.

required
shares int

The absolute number of shares being traded.

required
price float

The execution price per share.

required
is_buy bool

True if the trade is a buy, False for a sell.

required

Returns:

Name Type Description
Decimal Decimal

The total calculated cost for the trade, always positive.

Raises:

Type Description
DataValidationError

If input shares is negative or price is non-positive.

Source code in src/portfolio_management/backtesting/transactions/costs.py
def calculate_cost(
    self,
    ticker: str,
    shares: int,
    price: float,
    is_buy: bool,
) -> Decimal:
    """Calculate the total transaction cost for a single trade.

    The total cost is the sum of the commission and slippage.

    Args:
        ticker (str): The symbol of the asset being traded.
        shares (int): The absolute number of shares being traded.
        price (float): The execution price per share.
        is_buy (bool): True if the trade is a buy, False for a sell.

    Returns:
        Decimal: The total calculated cost for the trade, always positive.

    Raises:
        DataValidationError: If input `shares` is negative or `price` is
            non-positive.

    """
    if shares < 0:
        raise DataValidationError(f"Shares must be non-negative, got {shares}")
    if price <= 0:
        raise DataValidationError(f"Price must be positive, got {price}")

    # Calculate base trade value
    trade_value = abs(shares) * price

    # Commission (max of percentage or minimum)
    commission = max(
        trade_value * self.commission_pct,
        self.commission_min if shares > 0 else 0.0,
    )

    # Slippage (always a cost, regardless of direction)
    slippage = trade_value * (self.slippage_bps / 10000.0)

    total_cost = commission + slippage
    return Decimal(str(round(total_cost, 2)))

calculate_batch_cost(trades)

Calculate costs for a batch of multiple trades.

Parameters:

Name Type Description Default
trades dict[str, tuple[int, float]]

A dictionary mapping a ticker to a tuple of (shares, price). A positive number of shares indicates a buy, and a negative number indicates a sell.

required

Returns:

Type Description
dict[str, Decimal]

dict[str, Decimal]: A dictionary mapping each ticker to its

dict[str, Decimal]

calculated transaction cost.

Source code in src/portfolio_management/backtesting/transactions/costs.py
def calculate_batch_cost(
    self,
    trades: dict[str, tuple[int, float]],
) -> dict[str, Decimal]:
    """Calculate costs for a batch of multiple trades.

    Args:
        trades (dict[str, tuple[int, float]]): A dictionary mapping a ticker
            to a tuple of (shares, price). A positive number of shares
            indicates a buy, and a negative number indicates a sell.

    Returns:
        dict[str, Decimal]: A dictionary mapping each ticker to its
        calculated transaction cost.

    """
    costs = {}
    for ticker, (shares, price) in trades.items():
        if shares == 0:
            costs[ticker] = Decimal("0.00")
            continue
        is_buy = shares > 0
        costs[ticker] = self.calculate_cost(ticker, abs(shares), price, is_buy)
    return costs

compute_pit_eligibility(returns, date, min_history_days=252, min_price_rows=252)

Compute a point-in-time eligibility mask for assets at a given date.

This function prevents lookahead bias by ensuring that only assets with a sufficiently long and dense history of data are considered for inclusion in the portfolio on a given rebalancing date.

An asset is considered eligible if it meets two criteria: 1. The time since its first valid data point is at least min_history_days. 2. The number of non-missing data points up to the given date is at least min_price_rows.

Parameters:

Name Type Description Default
returns DataFrame

A DataFrame of historical returns, with dates as the index and asset tickers as columns.

required
date date

The rebalancing date for which to compute eligibility.

required
min_history_days int

The minimum number of calendar days of history required for an asset to be eligible. Defaults to 252.

252
min_price_rows int

The minimum number of non-missing return data points required. Defaults to 252.

252

Returns:

Type Description
Series

pd.Series: A boolean Series where the index is the asset tickers and the

Series

values indicate eligibility (True if eligible, False otherwise).

Raises:

Type Description
ValueError

If the input returns DataFrame is invalid or the date is outside the data range.

Source code in src/portfolio_management/backtesting/eligibility.py
def compute_pit_eligibility(
    returns: pd.DataFrame,
    date: datetime.date,
    min_history_days: int = 252,
    min_price_rows: int = 252,
) -> pd.Series:
    """Compute a point-in-time eligibility mask for assets at a given date.

    This function prevents lookahead bias by ensuring that only assets with a
    sufficiently long and dense history of data are considered for inclusion in
    the portfolio on a given rebalancing date.

    An asset is considered eligible if it meets two criteria:
    1.  The time since its first valid data point is at least `min_history_days`.
    2.  The number of non-missing data points up to the given `date` is at
        least `min_price_rows`.

    Args:
        returns (pd.DataFrame): A DataFrame of historical returns, with dates as
            the index and asset tickers as columns.
        date (datetime.date): The rebalancing date for which to compute eligibility.
        min_history_days (int): The minimum number of calendar days of history
            required for an asset to be eligible. Defaults to 252.
        min_price_rows (int): The minimum number of non-missing return data points
            required. Defaults to 252.

    Returns:
        pd.Series: A boolean Series where the index is the asset tickers and the
        values indicate eligibility (True if eligible, False otherwise).

    Raises:
        ValueError: If the input `returns` DataFrame is invalid or the `date` is
            outside the data range.

    """
    # Validate inputs
    if returns is None or not isinstance(returns, pd.DataFrame) or returns.empty:
        raise DataValidationError("returns must be a non-empty pandas DataFrame")

    if not isinstance(date, datetime.date):
        raise DataValidationError(
            f"date must be a datetime.date, got {type(date).__name__}",
        )

    if min_history_days <= 0:
        raise ConfigurationError(
            None,
            f"min_history_days must be > 0, got {min_history_days}",
        )

    if min_price_rows <= 0:
        raise ConfigurationError(
            None,
            f"min_price_rows must be > 0, got {min_price_rows}",
        )

    # Check if date is within data range
    max_date = returns.index.max()
    if isinstance(max_date, pd.Timestamp):
        max_date = max_date.date()

    if date > max_date:
        # If date is beyond available data, use the last available date
        # This prevents future data leakage - we can only use data up to max_date
        logger.debug(
            "Date %s is after last available date %s. Using last date for eligibility.",
            date,
            max_date,
        )
        date = max_date

    # Filter returns to only include data up to the given date
    # Convert date to datetime for comparison
    cutoff_datetime = pd.Timestamp(date)
    historical_data = returns[returns.index <= cutoff_datetime]

    if len(historical_data) == 0:
        # No data available yet - nothing is eligible
        logger.debug(
            "No historical data available up to %s. All assets ineligible.",
            date,
        )
        return pd.Series(False, index=returns.columns, name=0)

    # For each asset, find first non-NaN observation
    first_valid_idx = historical_data.apply(lambda x: x.first_valid_index())

    # Calculate days since first valid observation
    days_since_first = pd.Series(index=returns.columns, dtype=float)
    for ticker in returns.columns:
        first_date = first_valid_idx[ticker]
        if pd.isna(first_date):
            # No valid observations at all
            days_since_first[ticker] = 0
            continue
        # Calculate days between first valid and current date
        days_diff = (cutoff_datetime - pd.Timestamp(first_date)).days
        days_since_first[ticker] = days_diff

    # Count non-NaN observations up to the date
    rows_count = historical_data.notna().sum()

    # Apply eligibility criteria
    eligible = (days_since_first >= min_history_days) & (rows_count >= min_price_rows)

    return eligible

detect_delistings(returns, current_date, lookforward_days=30)

Detect assets that have been or will soon be delisted.

This utility identifies assets whose last available data point occurs at or before the current_date, and for which no new data appears within the lookforward_days window. It is used to gracefully liquidate positions in assets that are no longer trading.

Note

This function involves a small degree of lookahead, which is a pragmatic choice for handling delistings in a backtest. In a live trading environment, delisting information would be received from a data provider.

Parameters:

Name Type Description Default
returns DataFrame

The entire historical returns DataFrame.

required
current_date date

The current date in the backtest simulation.

required
lookforward_days int

The number of days to look ahead to confirm that an asset has truly been delisted. Defaults to 30.

30

Returns:

Type Description
dict[str, date]

dict[str, datetime.date]: A dictionary mapping the ticker of each

dict[str, date]

delisted asset to its last known date with valid data.

Source code in src/portfolio_management/backtesting/eligibility.py
def detect_delistings(
    returns: pd.DataFrame,
    current_date: datetime.date,
    lookforward_days: int = 30,
) -> dict[str, datetime.date]:
    """Detect assets that have been or will soon be delisted.

    This utility identifies assets whose last available data point occurs at or
    before the `current_date`, and for which no new data appears within the
    `lookforward_days` window. It is used to gracefully liquidate positions
    in assets that are no longer trading.

    Note:
        This function involves a small degree of lookahead, which is a
        pragmatic choice for handling delistings in a backtest. In a live
        trading environment, delisting information would be received from a
        data provider.

    Args:
        returns (pd.DataFrame): The entire historical returns DataFrame.
        current_date (datetime.date): The current date in the backtest simulation.
        lookforward_days (int): The number of days to look ahead to confirm that
            an asset has truly been delisted. Defaults to 30.

    Returns:
        dict[str, datetime.date]: A dictionary mapping the ticker of each
        delisted asset to its last known date with valid data.

    """
    cutoff_datetime = pd.Timestamp(current_date)
    lookforward_datetime = cutoff_datetime + pd.Timedelta(days=lookforward_days)

    delistings: dict[str, datetime.date] = {}

    for ticker in returns.columns:
        # Get all data for this ticker
        ticker_data = returns[ticker]

        # Find last valid observation
        last_valid_idx = ticker_data.last_valid_index()

        if last_valid_idx is None:
            # No valid data at all
            continue

        last_valid_value: Any = last_valid_idx
        last_valid_date = pd.Timestamp(last_valid_value)

        # Check if last valid date is at or before current date
        if last_valid_date <= cutoff_datetime:
            # Check if there's any valid data in the lookforward period
            future_data = ticker_data[
                (ticker_data.index > cutoff_datetime)
                & (ticker_data.index <= lookforward_datetime)
            ]

            if future_data.notna().sum() == 0:
                # No future data - asset is delisted
                delistings[ticker] = last_valid_date.date()

    return delistings

get_asset_history_stats(returns, date)

Get detailed history statistics for each asset up to a given date.

This function computes comprehensive statistics about data availability for each asset, which is useful for debugging eligibility filters and understanding the data quality of the universe.

Parameters:

Name Type Description Default
returns DataFrame

The historical returns DataFrame.

required
date date

The date up to which statistics should be computed.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame where each row corresponds to an asset and

DataFrame

columns include 'ticker', 'first_valid_date', 'last_valid_date',

DataFrame

'days_since_first', 'total_rows', and 'coverage_pct'.

Source code in src/portfolio_management/backtesting/eligibility.py
def get_asset_history_stats(
    returns: pd.DataFrame,
    date: datetime.date,
) -> pd.DataFrame:
    """Get detailed history statistics for each asset up to a given date.

    This function computes comprehensive statistics about data availability for
    each asset, which is useful for debugging eligibility filters and
    understanding the data quality of the universe.

    Args:
        returns (pd.DataFrame): The historical returns DataFrame.
        date (datetime.date): The date up to which statistics should be computed.

    Returns:
        pd.DataFrame: A DataFrame where each row corresponds to an asset and
        columns include 'ticker', 'first_valid_date', 'last_valid_date',
        'days_since_first', 'total_rows', and 'coverage_pct'.

    """
    cutoff_datetime = pd.Timestamp(date)
    historical_data = returns[returns.index <= cutoff_datetime]

    if len(historical_data) == 0:
        return pd.DataFrame(
            columns=[
                "ticker",
                "first_valid_date",
                "last_valid_date",
                "days_since_first",
                "total_rows",
                "coverage_pct",
            ],
        )

    stats = []

    for ticker in returns.columns:
        ticker_data = historical_data[ticker]

        first_valid = ticker_data.first_valid_index()
        last_valid = ticker_data.last_valid_index()

        if first_valid is None:
            stats.append(
                {
                    "ticker": ticker,
                    "first_valid_date": None,
                    "last_valid_date": None,
                    "days_since_first": 0,
                    "total_rows": 0,
                    "coverage_pct": 0.0,
                },
            )
        else:
            first_valid_value: Any = first_valid
            first_valid_ts = pd.Timestamp(first_valid_value)
            if last_valid is not None:
                last_valid_value: Any = last_valid
                last_valid_ts = pd.Timestamp(last_valid_value)
            else:
                last_valid_ts = first_valid_ts

            days_since = (cutoff_datetime - first_valid_ts).days
            total_rows = ticker_data.notna().sum()

            # Calculate coverage percentage
            total_days = (cutoff_datetime - first_valid_ts).days + 1
            coverage_pct = (total_rows / total_days * 100) if total_days > 0 else 0.0

            stats.append(
                {
                    "ticker": ticker,
                    "first_valid_date": str(first_valid_ts.date()),
                    "last_valid_date": str(last_valid_ts.date()),
                    "days_since_first": days_since,
                    "total_rows": total_rows,
                    "coverage_pct": coverage_pct,
                },
            )

    return pd.DataFrame(stats)

calculate_metrics(equity_df, rebalance_events)

Calculate performance metrics from an equity curve and rebalance events.

This function takes the results of a backtest and computes a wide range of standard performance and risk metrics.

Parameters:

Name Type Description Default
equity_df DataFrame

A DataFrame with a 'equity' column containing the portfolio's total value, indexed by date.

required
rebalance_events list[RebalanceEvent]

A list of all rebalancing events that occurred during the backtest, used for cost and turnover calculations.

required

Returns:

Name Type Description
PerformanceMetrics PerformanceMetrics

A dataclass containing all calculated statistics. Returns a zeroed-out metrics object if the equity curve has insufficient data (< 2 periods).

Source code in src/portfolio_management/backtesting/performance/metrics.py
def calculate_metrics(
    equity_df: pd.DataFrame,
    rebalance_events: list[RebalanceEvent],
) -> PerformanceMetrics:
    """Calculate performance metrics from an equity curve and rebalance events.

    This function takes the results of a backtest and computes a wide range of
    standard performance and risk metrics.

    Args:
        equity_df (pd.DataFrame): A DataFrame with a 'equity' column containing
            the portfolio's total value, indexed by date.
        rebalance_events (list[RebalanceEvent]): A list of all rebalancing
            events that occurred during the backtest, used for cost and
            turnover calculations.

    Returns:
        PerformanceMetrics: A dataclass containing all calculated statistics.
            Returns a zeroed-out metrics object if the equity curve has
            insufficient data (< 2 periods).

    """
    # Import here to avoid circular dependency
    from portfolio_management.backtesting.models import PerformanceMetrics

    if len(equity_df) < 2:
        # Not enough data for meaningful metrics
        return PerformanceMetrics(
            total_return=0.0,
            annualized_return=0.0,
            annualized_volatility=0.0,
            sharpe_ratio=0.0,
            sortino_ratio=0.0,
            max_drawdown=0.0,
            calmar_ratio=0.0,
            expected_shortfall_95=0.0,
            win_rate=0.0,
            avg_win=0.0,
            avg_loss=0.0,
            turnover=0.0,
            total_costs=sum((e.costs for e in rebalance_events), Decimal(0)),
            num_rebalances=len(rebalance_events),
            final_value=Decimal(0),
        )

    # Calculate returns
    returns = equity_df["equity"].pct_change().dropna()

    if len(returns) == 0:
        return PerformanceMetrics(
            total_return=0.0,
            annualized_return=0.0,
            annualized_volatility=0.0,
            sharpe_ratio=0.0,
            sortino_ratio=0.0,
            max_drawdown=0.0,
            calmar_ratio=0.0,
            expected_shortfall_95=0.0,
            win_rate=0.0,
            avg_win=0.0,
            avg_loss=0.0,
            turnover=0.0,
            total_costs=sum((e.costs for e in rebalance_events), Decimal(0)),
            num_rebalances=len(rebalance_events),
            final_value=(
                Decimal(equity_df["equity"].iloc[-1])
                if not equity_df.empty
                else Decimal(0)
            ),
        )

    # Total and annualized returns
    total_return = float(
        (equity_df["equity"].iloc[-1] / equity_df["equity"].iloc[0]) - 1,
    )
    days = len(equity_df)
    years = days / 252  # Approximate trading days per year
    annualized_return = (
        float((1 + total_return) ** (1 / years) - 1) if years > 0 else 0.0
    )

    from portfolio_management.analytics.risk_metrics import calculate_volatility

    # Volatility
    annualized_vol = calculate_volatility(returns)

    # Sharpe ratio (assuming 0% risk-free rate)
    sharpe = annualized_return / annualized_vol if annualized_vol > 0 else 0.0

    from portfolio_management.analytics.risk_metrics import calculate_downside_deviation

    # Sortino ratio (downside deviation)
    downside_dev = calculate_downside_deviation(returns)
    sortino = annualized_return / downside_dev if downside_dev > 0 else 0.0

    # Maximum drawdown
    cumulative = (1 + returns).cumprod()
    running_max = cumulative.expanding().max()
    drawdown = (cumulative - running_max) / running_max
    max_drawdown = float(drawdown.min())

    # Calmar ratio
    calmar = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0.0

    # Expected Shortfall (95%)
    es_95 = float(returns.quantile(0.05)) if len(returns) > 0 else 0.0

    # Win rate and avg win/loss
    positive_returns = returns[returns > 0]
    negative_returns = returns[returns < 0]
    win_rate = float(len(positive_returns) / len(returns)) if len(returns) > 0 else 0.0
    avg_win = float(positive_returns.mean()) if len(positive_returns) > 0 else 0.0
    avg_loss = float(negative_returns.mean()) if len(negative_returns) > 0 else 0.0

    # Turnover and costs
    total_costs = sum((event.costs for event in rebalance_events), Decimal(0))
    num_rebalances = len(rebalance_events)

    # Simple turnover calculation: sum of absolute trades / avg portfolio value
    if rebalance_events and not equity_df["equity"].empty:
        total_trade_volume = sum(
            sum(abs(qty) for qty in event.trades.values()) for event in rebalance_events
        )
        avg_portfolio_value = float(equity_df["equity"].mean())
        avg_turnover = (
            total_trade_volume / (num_rebalances * avg_portfolio_value)
            if num_rebalances > 0 and avg_portfolio_value > 0
            else 0.0
        )
    else:
        avg_turnover = 0.0

    return PerformanceMetrics(
        total_return=total_return,
        annualized_return=annualized_return,
        annualized_volatility=annualized_vol,
        sharpe_ratio=sharpe,
        sortino_ratio=sortino,
        max_drawdown=max_drawdown,
        calmar_ratio=calmar,
        expected_shortfall_95=es_95,
        win_rate=win_rate,
        avg_win=avg_win,
        avg_loss=avg_loss,
        turnover=avg_turnover,
        total_costs=total_costs,
        num_rebalances=num_rebalances,
        final_value=Decimal(equity_df["equity"].iloc[-1]),
    )

options: show_root_heading: true show_source: false members_order: source group_by_category: true show_category_heading: true