Skip to content

Portfolio API Reference

The portfolio package handles portfolio construction, strategies, and constraints.

Overview

The portfolio package contains:

  • Strategies - Portfolio allocation strategies
  • Constraints - Portfolio constraints and limits
  • Membership - Membership policy for turnover control
  • Cardinality - Cardinality constraint interfaces

Portfolio Package

portfolio_management.portfolio

A comprehensive suite for systematic portfolio construction and management.

This package provides a modular framework for building, optimizing, and analyzing investment portfolios. It includes a variety of weighting strategies, constraint management, and rebalancing logic, designed for both research and production environments.

Key Components
  • PortfolioConstructor: The main entry point for building portfolios. It acts as a factory for different portfolio strategies.
  • PortfolioStrategy: An interface for all portfolio construction strategies, with concrete implementations like EqualWeightStrategy, MeanVarianceStrategy, and RiskParityStrategy.
  • PortfolioConstraints: A data class to define investment constraints such as min/max weights, asset class exposure limits, and more.
  • CardinalityConstraints: A data class for advanced constraints on the number of assets in a portfolio.
  • RebalanceConfig: Configuration for defining rebalancing frequency and tolerance.
Usage Example

import pandas as pd from portfolio_management.portfolio import PortfolioConstructor, PortfolioConstraints

import numpy as np

1. Define returns data

np.random.seed(42) returns = pd.DataFrame({ ... "asset1": np.random.normal(0, 0.01, 30), ... "asset2": np.random.normal(0, 0.02, 30), ... "asset3": np.random.normal(0, 0.03, 30), ... })

2. Define constraints

constraints = PortfolioConstraints(max_weight=0.5, require_full_investment=True)

3. Initialize the constructor and build a portfolio

from portfolio_management.portfolio.strategies.mean_variance import MeanVarianceStrategy constructor = PortfolioConstructor(constraints=constraints)

The default min_periods for MeanVarianceStrategy is 252. We override it for the example.

constructor.register_strategy( ... "mean_variance_min_vol", ... MeanVarianceStrategy(objective="min_volatility", min_periods=30) ... ) portfolio = constructor.construct( ... strategy_name="mean_variance_min_vol", ... returns=returns ... )

4. View the resulting weights (exact values depend on random data)

print(portfolio.weights.sum().round(2)) 1.0

PortfolioConstructor

Coordinates portfolio strategy selection and construction.

This class acts as a factory for portfolio construction, allowing users to register different PortfolioStrategy implementations and then construct portfolios by referencing their registered names. It simplifies the process of comparing different strategies under the same constraints.

It comes with several common strategies pre-registered, such as equal weight, minimum volatility, and maximum Sharpe ratio.

Attributes:

Name Type Description
_default_constraints PortfolioConstraints

Default constraints to apply if none are provided during construction.

_strategies dict[str, PortfolioStrategy]

A registry of available portfolio construction strategies.

Example

import pandas as pd from portfolio_management.portfolio import ( ... PortfolioConstructor, PortfolioConstraints ... )

import numpy as np np.random.seed(42) returns = pd.DataFrame({ ... 'ASSET_A': np.random.normal(0, 0.01, 30), ... 'ASSET_B': np.random.normal(0, 0.02, 30), ... })

Initialize with default constraints

constraints = PortfolioConstraints(max_weight=0.7) from portfolio_management.portfolio.strategies.mean_variance import MeanVarianceStrategy constructor = PortfolioConstructor(constraints=constraints)

The default min_periods for MeanVarianceStrategy is 252. We override it for the example.

constructor.register_strategy( ... "mean_variance_min_vol", ... MeanVarianceStrategy(objective="min_volatility", min_periods=30) ... )

Construct a minimum volatility portfolio

portfolio = constructor.construct("mean_variance_min_vol", returns)

The exact weights will vary, but the sum should be 1.0

print(portfolio.weights.sum().round(2)) 1.0

Compare multiple strategies

comparison = constructor.compare_strategies( ... ["equal_weight", "mean_variance_min_vol"], ... returns ... )

The exact weights will vary, but the sums should be 1.0

print(comparison.sum().round(2)) equal_weight 1.0 mean_variance_min_vol 1.0 dtype: float64

Source code in src/portfolio_management/portfolio/builder.py
class PortfolioConstructor:
    """Coordinates portfolio strategy selection and construction.

    This class acts as a factory for portfolio construction, allowing users to
    register different `PortfolioStrategy` implementations and then construct
    portfolios by referencing their registered names. It simplifies the process
    of comparing different strategies under the same constraints.

    It comes with several common strategies pre-registered, such as equal weight,
    minimum volatility, and maximum Sharpe ratio.

    Attributes:
        _default_constraints (PortfolioConstraints): Default constraints to apply
            if none are provided during construction.
        _strategies (dict[str, PortfolioStrategy]): A registry of available
            portfolio construction strategies.

    Example:
        >>> import pandas as pd
        >>> from portfolio_management.portfolio import (
        ...     PortfolioConstructor, PortfolioConstraints
        ... )
        >>>
        >>> import numpy as np
        >>> np.random.seed(42)
        >>> returns = pd.DataFrame({
        ...     'ASSET_A': np.random.normal(0, 0.01, 30),
        ...     'ASSET_B': np.random.normal(0, 0.02, 30),
        ... })
        >>>
        >>> # Initialize with default constraints
        >>> constraints = PortfolioConstraints(max_weight=0.7)
        >>> from portfolio_management.portfolio.strategies.mean_variance import MeanVarianceStrategy
        >>> constructor = PortfolioConstructor(constraints=constraints)
        >>> # The default min_periods for MeanVarianceStrategy is 252. We override it for the example.
        >>> constructor.register_strategy(
        ...     "mean_variance_min_vol",
        ...     MeanVarianceStrategy(objective="min_volatility", min_periods=30)
        ... )
        >>>
        >>> # Construct a minimum volatility portfolio
        >>> portfolio = constructor.construct("mean_variance_min_vol", returns)
        >>> # The exact weights will vary, but the sum should be 1.0
        >>> print(portfolio.weights.sum().round(2))
        1.0
        >>>
        >>> # Compare multiple strategies
        >>> comparison = constructor.compare_strategies(
        ...     ["equal_weight", "mean_variance_min_vol"],
        ...     returns
        ... )
        >>> # The exact weights will vary, but the sums should be 1.0
        >>> print(comparison.sum().round(2))
        equal_weight             1.0
        mean_variance_min_vol    1.0
        dtype: float64

    """

    def __init__(self, constraints: PortfolioConstraints | None = None) -> None:
        """Initialise the constructor with optional default constraints."""
        self._default_constraints = constraints or PortfolioConstraints()
        self._strategies: dict[str, PortfolioStrategy] = {}

        # Register baseline strategies
        self.register_strategy(StrategyType.EQUAL_WEIGHT.value, EqualWeightStrategy())
        self.register_strategy(StrategyType.RISK_PARITY.value, RiskParityStrategy())
        self.register_strategy(
            "mean_variance_max_sharpe",
            MeanVarianceStrategy(objective="max_sharpe"),
        )
        self.register_strategy(
            "mean_variance_min_vol",
            MeanVarianceStrategy(objective="min_volatility"),
        )

    def register_strategy(self, name: str, strategy: PortfolioStrategy) -> None:
        """Register a strategy implementation under the provided name."""
        self._strategies[name] = strategy

    def list_strategies(self) -> list[str]:
        """Return the registered strategy names."""
        return sorted(self._strategies)

    def construct(
        self,
        strategy_name: str,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints | None = None,
        asset_classes: pd.Series | None = None,
    ) -> Portfolio:
        """Construct a portfolio using the requested strategy."""
        strategy = self._strategies.get(strategy_name)
        if strategy is None:
            reason = f"Unknown strategy. Available: {', '.join(self.list_strategies())}"
            raise StrategyError(strategy_name, reason)

        active_constraints = constraints or self._default_constraints
        return strategy.construct(returns, active_constraints, asset_classes)

    def compare_strategies(
        self,
        strategy_names: Sequence[str],
        returns: pd.DataFrame,
        constraints: PortfolioConstraints | None = None,
        asset_classes: pd.Series | None = None,
    ) -> pd.DataFrame:
        """Construct and compare multiple strategies."""
        portfolios: dict[str, pd.Series] = {}
        for name in strategy_names:
            try:
                portfolio = self.construct(name, returns, constraints, asset_classes)
            except (
                PortfolioConstructionError
            ) as err:  # pragma: no cover - tolerant comparison
                logger.warning("Strategy '%s' failed: %s", name, err)
                continue
            portfolios[name] = portfolio.weights

        if not portfolios:
            msg = "All requested strategies failed to construct portfolios."
            raise RuntimeError(msg)

        return pd.DataFrame(portfolios).fillna(0.0)

register_strategy(name, strategy)

Register a strategy implementation under the provided name.

Source code in src/portfolio_management/portfolio/builder.py
def register_strategy(self, name: str, strategy: PortfolioStrategy) -> None:
    """Register a strategy implementation under the provided name."""
    self._strategies[name] = strategy

list_strategies()

Return the registered strategy names.

Source code in src/portfolio_management/portfolio/builder.py
def list_strategies(self) -> list[str]:
    """Return the registered strategy names."""
    return sorted(self._strategies)

construct(strategy_name, returns, constraints=None, asset_classes=None)

Construct a portfolio using the requested strategy.

Source code in src/portfolio_management/portfolio/builder.py
def construct(
    self,
    strategy_name: str,
    returns: pd.DataFrame,
    constraints: PortfolioConstraints | None = None,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Construct a portfolio using the requested strategy."""
    strategy = self._strategies.get(strategy_name)
    if strategy is None:
        reason = f"Unknown strategy. Available: {', '.join(self.list_strategies())}"
        raise StrategyError(strategy_name, reason)

    active_constraints = constraints or self._default_constraints
    return strategy.construct(returns, active_constraints, asset_classes)

compare_strategies(strategy_names, returns, constraints=None, asset_classes=None)

Construct and compare multiple strategies.

Source code in src/portfolio_management/portfolio/builder.py
def compare_strategies(
    self,
    strategy_names: Sequence[str],
    returns: pd.DataFrame,
    constraints: PortfolioConstraints | None = None,
    asset_classes: pd.Series | None = None,
) -> pd.DataFrame:
    """Construct and compare multiple strategies."""
    portfolios: dict[str, pd.Series] = {}
    for name in strategy_names:
        try:
            portfolio = self.construct(name, returns, constraints, asset_classes)
        except (
            PortfolioConstructionError
        ) as err:  # pragma: no cover - tolerant comparison
            logger.warning("Strategy '%s' failed: %s", name, err)
            continue
        portfolios[name] = portfolio.weights

    if not portfolios:
        msg = "All requested strategies failed to construct portfolios."
        raise RuntimeError(msg)

    return pd.DataFrame(portfolios).fillna(0.0)

CardinalityNotImplementedError

Bases: NotImplementedError

Raised when attempting to use unimplemented cardinality methods.

This exception is raised when a cardinality constraint method other than PRESELECTION is specified but not yet implemented. This is expected behavior for design stubs.

Attributes:

Name Type Description
method

The cardinality method that was attempted

message

Descriptive error message with implementation guidance

Source code in src/portfolio_management/portfolio/cardinality.py
class CardinalityNotImplementedError(NotImplementedError):
    """Raised when attempting to use unimplemented cardinality methods.

    This exception is raised when a cardinality constraint method other than
    PRESELECTION is specified but not yet implemented. This is expected behavior
    for design stubs.

    Attributes:
        method: The cardinality method that was attempted
        message: Descriptive error message with implementation guidance

    """

    def __init__(self, method: str, available_methods: list[str] | None = None) -> None:
        """Initialize exception with method information.

        Args:
            method: The cardinality method that was attempted
            available_methods: List of currently implemented methods

        """
        self.method = method
        self.available_methods = available_methods or ["preselection"]

        msg = (
            f"Cardinality method '{method}' is not yet implemented. "
            f"This is a design stub for future optimizer-integrated cardinality.\n\n"
            f"Currently available: {', '.join(self.available_methods)}\n\n"
            f"Future implementation path:\n"
            f"  - MIQP: Requires commercial solver (Gurobi/CPLEX) integration\n"
            f"  - Heuristic: Implement greedy/local search algorithms\n"
            f"  - Relaxation: Implement continuous relaxation + rounding\n\n"
            f"For now, use preselection (see preselection.py module)."
        )
        super().__init__(msg)

CardinalityConstraints dataclass

Defines constraints on the number of assets in a portfolio.

Cardinality constraints limit the number of non-zero positions, which is critical for managing transaction costs, improving liquidity, and adhering to fund mandates that limit the number of holdings.

Mathematical Formulation

Let w ∈ ℝⁿ be the portfolio weights and z ∈ {0,1}ⁿ be binary indicators where zᵢ = 1 if asset i is included in the portfolio, and 0 otherwise.

  1. Position Limit: min_assets ≤ Σᵢ zᵢ ≤ max_assets

  2. Linking weights and indicators: min_position_size * zᵢ ≤ wᵢ ≤ max_weight * zᵢ

This formulation requires a Mixed-Integer Programming (MIP) solver.

Attributes:

Name Type Description
enabled bool

Whether cardinality constraints are active.

method CardinalityMethod

The method for enforcing cardinality. 'preselection' is the default and filters assets before optimization. Other methods like 'miqp' integrate constraints into the optimizer.

max_assets int | None

Maximum number of non-zero positions.

min_position_size float

The minimum weight for any non-zero position.

group_limits dict[str, int] | None

A dictionary mapping asset groups to the maximum number of positions allowed in that group.

enforce_in_optimizer bool

If True, integrates the constraints directly into the optimization problem, which requires a MIP-capable solver. Defaults to False, relying on pre-selection.

Configuration Example (YAML):

cardinality:
  enabled: true
  method: preselection
  max_assets: 50
  min_position_size: 0.015
  group_limits:
    equity: 40
    alternatives: 5

Performance Notes
  • preselection: Very fast, suitable for all optimizers. Sub-optimal as it doesn't consider correlations during selection.
  • miqp: Provides the optimal solution but is computationally expensive (NP-hard) and requires a specialized solver (e.g., Gurobi, CBC). Complexity scales exponentially with the number of assets.
Source code in src/portfolio_management/portfolio/constraints/models.py
@dataclass(frozen=True)
class CardinalityConstraints:
    """Defines constraints on the number of assets in a portfolio.

    Cardinality constraints limit the number of non-zero positions, which is
    critical for managing transaction costs, improving liquidity, and adhering
    to fund mandates that limit the number of holdings.

    Mathematical Formulation:
        Let w ∈ ℝⁿ be the portfolio weights and z ∈ {0,1}ⁿ be binary indicators
        where zᵢ = 1 if asset i is included in the portfolio, and 0 otherwise.

        1. Position Limit:
           min_assets ≤ Σᵢ zᵢ ≤ max_assets

        2. Linking weights and indicators:
           min_position_size * zᵢ ≤ wᵢ ≤ max_weight * zᵢ

        This formulation requires a Mixed-Integer Programming (MIP) solver.

    Attributes:
        enabled (bool): Whether cardinality constraints are active.
        method (CardinalityMethod): The method for enforcing cardinality.
            'preselection' is the default and filters assets before optimization.
            Other methods like 'miqp' integrate constraints into the optimizer.
        max_assets (int | None): Maximum number of non-zero positions.
        min_position_size (float): The minimum weight for any non-zero position.
        group_limits (dict[str, int] | None): A dictionary mapping asset groups
            to the maximum number of positions allowed in that group.
        enforce_in_optimizer (bool): If True, integrates the constraints directly
            into the optimization problem, which requires a MIP-capable solver.
            Defaults to False, relying on pre-selection.

    Configuration Example (YAML):
        ```yaml
        cardinality:
          enabled: true
          method: preselection
          max_assets: 50
          min_position_size: 0.015
          group_limits:
            equity: 40
            alternatives: 5
        ```

    Performance Notes:
        - `preselection`: Very fast, suitable for all optimizers. Sub-optimal
          as it doesn't consider correlations during selection.
        - `miqp`: Provides the optimal solution but is computationally expensive
          (NP-hard) and requires a specialized solver (e.g., Gurobi, CBC).
          Complexity scales exponentially with the number of assets.

    """

    enabled: bool = False
    method: CardinalityMethod = CardinalityMethod.PRESELECTION
    max_assets: int | None = None
    min_position_size: float = 0.01
    group_limits: dict[str, int] | None = None
    enforce_in_optimizer: bool = False

    def __post_init__(self) -> None:
        """Validate cardinality constraint parameters."""
        if not self.enabled:
            return

        if self.max_assets is not None and self.max_assets < 1:
            raise ConfigurationError(
                None,
                f"max_assets must be >= 1, got {self.max_assets}",
            )

        if not 0.0 < self.min_position_size <= 1.0:
            raise ConfigurationError(
                None,
                f"min_position_size must be in (0, 1], got {self.min_position_size}",
            )

        if self.group_limits is not None:
            for group, limit in self.group_limits.items():
                if limit < 1:
                    raise ConfigurationError(
                        None,
                        f"group_limits['{group}'] must be >= 1, got {limit}",
                    )

        if self.enforce_in_optimizer and self.method == CardinalityMethod.PRESELECTION:
            raise ConfigurationError(
                None,
                "enforce_in_optimizer=True requires method != PRESELECTION",
            )

        # Warn about future methods
        if self.method != CardinalityMethod.PRESELECTION:
            # This will be raised when actually attempting to use these methods
            # For now, just ensure the config is valid
            pass

CardinalityMethod

Bases: str, Enum

Methods for handling cardinality constraints in optimization.

Attributes:

Name Type Description
PRESELECTION

Use factor-based preselection before optimization (current default)

MIQP

Mixed-Integer Quadratic Programming (future: requires commercial solver)

HEURISTIC

Iterative heuristic approach (future: custom implementation)

RELAXATION

Continuous relaxation with post-processing (future)

Source code in src/portfolio_management/portfolio/constraints/models.py
class CardinalityMethod(str, Enum):
    """Methods for handling cardinality constraints in optimization.

    Attributes:
        PRESELECTION: Use factor-based preselection before optimization (current default)
        MIQP: Mixed-Integer Quadratic Programming (future: requires commercial solver)
        HEURISTIC: Iterative heuristic approach (future: custom implementation)
        RELAXATION: Continuous relaxation with post-processing (future)

    """

    PRESELECTION = "preselection"
    MIQP = "miqp"
    HEURISTIC = "heuristic"
    RELAXATION = "relaxation"

PortfolioConstraints dataclass

Defines basic investment constraints and guardrails for a portfolio.

This data class holds common constraints that can be applied during the optimization process to ensure the portfolio meets diversification and exposure mandates.

Attributes:

Name Type Description
max_weight float

Maximum weight for any single asset.

min_weight float

Minimum weight for any single asset.

max_equity_exposure float

Maximum total allocation to equity assets.

min_bond_exposure float

Minimum total allocation to bond/cash assets.

sector_limits dict[str, float] | None

A dictionary mapping sector names to their maximum allowed weight in the portfolio.

require_full_investment bool

If True, forces the sum of all asset weights to equal 1.0.

Configuration Example (YAML):

constraints:
  max_weight: 0.15
  min_weight: 0.01
  max_equity_exposure: 0.80
  sector_limits:
    Technology: 0.30
    Healthcare: 0.25
  require_full_investment: true

Source code in src/portfolio_management/portfolio/constraints/models.py
@dataclass(frozen=True)
class PortfolioConstraints:
    """Defines basic investment constraints and guardrails for a portfolio.

    This data class holds common constraints that can be applied during the
    optimization process to ensure the portfolio meets diversification and
    exposure mandates.

    Attributes:
        max_weight (float): Maximum weight for any single asset.
        min_weight (float): Minimum weight for any single asset.
        max_equity_exposure (float): Maximum total allocation to equity assets.
        min_bond_exposure (float): Minimum total allocation to bond/cash assets.
        sector_limits (dict[str, float] | None): A dictionary mapping sector names
            to their maximum allowed weight in the portfolio.
        require_full_investment (bool): If True, forces the sum of all asset
            weights to equal 1.0.

    Configuration Example (YAML):
        ```yaml
        constraints:
          max_weight: 0.15
          min_weight: 0.01
          max_equity_exposure: 0.80
          sector_limits:
            Technology: 0.30
            Healthcare: 0.25
          require_full_investment: true
        ```

    """

    max_weight: float = 0.25
    min_weight: float = 0.0
    max_equity_exposure: float = 0.90
    min_bond_exposure: float = 0.10
    sector_limits: dict[str, float] | None = None
    require_full_investment: bool = True

    def __post_init__(self) -> None:
        """Validate constraint parameters."""
        if not 0.0 <= self.min_weight <= self.max_weight <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid weight bounds: min={self.min_weight}, max={self.max_weight}",
            )

        if not 0.0 <= self.min_bond_exposure <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid min_bond_exposure: {self.min_bond_exposure}",
            )

        if not 0.0 <= self.max_equity_exposure <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid max_equity_exposure: {self.max_equity_exposure}",
            )

MembershipPolicy dataclass

Configuration for membership policy rules.

This dataclass defines the rules that control how asset membership changes during portfolio rebalancing. Policies are applied in a specific order to ensure stability while respecting selection criteria.

Application order
  1. Min holding period: protect assets from premature exit
  2. Rank buffer: keep existing holdings unless they fall far out of favor
  3. Max changes: limit the number of additions/removals per rebalance
  4. Turnover cap: limit the fraction of portfolio value that can change

Attributes:

Name Type Description
buffer_rank int | None

Assets currently held are kept if their rank is better than this threshold, even if they fall outside top_k. For example, if top_k=30 and buffer_rank=50, existing holdings ranked 31-50 will be retained. Set to None to disable buffer. Default: None.

min_holding_periods int | None

Minimum number of rebalance periods an asset must be held before it can be removed. Set to None or 0 to disable. Default: None.

max_turnover float | None

Maximum fraction of portfolio value that can change in a single rebalance (0.0 to 1.0). Calculated as sum of absolute weight changes. Set to None to disable. Default: None.

max_new_assets int | None

Maximum number of new assets that can be added in a single rebalance. Set to None to disable. Default: None.

max_removed_assets int | None

Maximum number of assets that can be removed in a single rebalance. Set to None to disable. Default: None.

enabled bool

Master switch to enable/disable all policy rules. Default: True.

Example

Conservative policy: limit churn

policy = MembershipPolicy( ... buffer_rank=50, ... min_holding_periods=3, ... max_new_assets=5, ... max_removed_assets=5 ... )

Aggressive policy: more freedom to rebalance

policy = MembershipPolicy( ... buffer_rank=35, ... min_holding_periods=1, ... max_turnover=0.50, ... max_new_assets=10, ... max_removed_assets=10 ... )

Disabled policy

policy = MembershipPolicy(enabled=False)

Source code in src/portfolio_management/portfolio/membership.py
@dataclass
class MembershipPolicy:
    """Configuration for membership policy rules.

    This dataclass defines the rules that control how asset membership changes
    during portfolio rebalancing. Policies are applied in a specific order to
    ensure stability while respecting selection criteria.

    Application order:
        1. Min holding period: protect assets from premature exit
        2. Rank buffer: keep existing holdings unless they fall far out of favor
        3. Max changes: limit the number of additions/removals per rebalance
        4. Turnover cap: limit the fraction of portfolio value that can change

    Attributes:
        buffer_rank: Assets currently held are kept if their rank is better than this
            threshold, even if they fall outside top_k. For example, if top_k=30 and
            buffer_rank=50, existing holdings ranked 31-50 will be retained.
            Set to None to disable buffer. Default: None.
        min_holding_periods: Minimum number of rebalance periods an asset must be held
            before it can be removed. Set to None or 0 to disable. Default: None.
        max_turnover: Maximum fraction of portfolio value that can change in a single
            rebalance (0.0 to 1.0). Calculated as sum of absolute weight changes.
            Set to None to disable. Default: None.
        max_new_assets: Maximum number of new assets that can be added in a single
            rebalance. Set to None to disable. Default: None.
        max_removed_assets: Maximum number of assets that can be removed in a single
            rebalance. Set to None to disable. Default: None.
        enabled: Master switch to enable/disable all policy rules. Default: True.

    Example:
        >>> # Conservative policy: limit churn
        >>> policy = MembershipPolicy(
        ...     buffer_rank=50,
        ...     min_holding_periods=3,
        ...     max_new_assets=5,
        ...     max_removed_assets=5
        ... )
        >>>
        >>> # Aggressive policy: more freedom to rebalance
        >>> policy = MembershipPolicy(
        ...     buffer_rank=35,
        ...     min_holding_periods=1,
        ...     max_turnover=0.50,
        ...     max_new_assets=10,
        ...     max_removed_assets=10
        ... )
        >>>
        >>> # Disabled policy
        >>> policy = MembershipPolicy(enabled=False)

    """

    buffer_rank: int | None = None
    min_holding_periods: int | None = None
    max_turnover: float | None = None
    max_new_assets: int | None = None
    max_removed_assets: int | None = None
    enabled: bool = True

    def validate(self) -> None:
        """Validate policy parameters.

        Raises:
            ConfigurationError: If any parameter is invalid.

        """
        if self.buffer_rank is not None and self.buffer_rank < 1:
            raise ConfigurationError(
                None,
                f"buffer_rank must be >= 1, got {self.buffer_rank}",
            )

        if self.min_holding_periods is not None and self.min_holding_periods < 0:
            raise ConfigurationError(
                None,
                f"min_holding_periods must be non-negative, got {self.min_holding_periods}",
            )

        if self.max_turnover is not None and not (0.0 <= self.max_turnover <= 1.0):
            raise ConfigurationError(
                None,
                f"max_turnover must be in [0, 1], got {self.max_turnover}",
            )

        if self.max_new_assets is not None and self.max_new_assets < 0:
            raise ConfigurationError(
                None,
                f"max_new_assets must be non-negative, got {self.max_new_assets}",
            )

        if self.max_removed_assets is not None and self.max_removed_assets < 0:
            raise ConfigurationError(
                None,
                f"max_removed_assets must be non-negative, got {self.max_removed_assets}",
            )

    @classmethod
    def default(cls) -> MembershipPolicy:
        """Create a default membership policy suitable for most portfolios.

        Returns:
            MembershipPolicy with moderate defaults:
            - buffer_rank: top_k + 20 (recommended to set explicitly based on top_k)
            - min_holding_periods: 3 rebalances
            - max_turnover: 30%
            - max_new_assets: 5 per rebalance
            - max_removed_assets: 5 per rebalance

        Example:
            >>> policy = MembershipPolicy.default()
            >>> policy.min_holding_periods
            3

        """
        return cls(
            buffer_rank=None,  # Should be set based on top_k
            min_holding_periods=3,
            max_turnover=0.30,
            max_new_assets=5,
            max_removed_assets=5,
            enabled=True,
        )

    @classmethod
    def disabled(cls) -> MembershipPolicy:
        """Create a disabled membership policy (no restrictions).

        Returns:
            MembershipPolicy with enabled=False.

        Example:
            >>> policy = MembershipPolicy.disabled()
            >>> policy.enabled
            False

        """
        return cls(enabled=False)

validate()

Validate policy parameters.

Raises:

Type Description
ConfigurationError

If any parameter is invalid.

Source code in src/portfolio_management/portfolio/membership.py
def validate(self) -> None:
    """Validate policy parameters.

    Raises:
        ConfigurationError: If any parameter is invalid.

    """
    if self.buffer_rank is not None and self.buffer_rank < 1:
        raise ConfigurationError(
            None,
            f"buffer_rank must be >= 1, got {self.buffer_rank}",
        )

    if self.min_holding_periods is not None and self.min_holding_periods < 0:
        raise ConfigurationError(
            None,
            f"min_holding_periods must be non-negative, got {self.min_holding_periods}",
        )

    if self.max_turnover is not None and not (0.0 <= self.max_turnover <= 1.0):
        raise ConfigurationError(
            None,
            f"max_turnover must be in [0, 1], got {self.max_turnover}",
        )

    if self.max_new_assets is not None and self.max_new_assets < 0:
        raise ConfigurationError(
            None,
            f"max_new_assets must be non-negative, got {self.max_new_assets}",
        )

    if self.max_removed_assets is not None and self.max_removed_assets < 0:
        raise ConfigurationError(
            None,
            f"max_removed_assets must be non-negative, got {self.max_removed_assets}",
        )

default() classmethod

Create a default membership policy suitable for most portfolios.

Returns:

Type Description
MembershipPolicy

MembershipPolicy with moderate defaults:

MembershipPolicy
  • buffer_rank: top_k + 20 (recommended to set explicitly based on top_k)
MembershipPolicy
  • min_holding_periods: 3 rebalances
MembershipPolicy
  • max_turnover: 30%
MembershipPolicy
  • max_new_assets: 5 per rebalance
MembershipPolicy
  • max_removed_assets: 5 per rebalance
Example

policy = MembershipPolicy.default() policy.min_holding_periods 3

Source code in src/portfolio_management/portfolio/membership.py
@classmethod
def default(cls) -> MembershipPolicy:
    """Create a default membership policy suitable for most portfolios.

    Returns:
        MembershipPolicy with moderate defaults:
        - buffer_rank: top_k + 20 (recommended to set explicitly based on top_k)
        - min_holding_periods: 3 rebalances
        - max_turnover: 30%
        - max_new_assets: 5 per rebalance
        - max_removed_assets: 5 per rebalance

    Example:
        >>> policy = MembershipPolicy.default()
        >>> policy.min_holding_periods
        3

    """
    return cls(
        buffer_rank=None,  # Should be set based on top_k
        min_holding_periods=3,
        max_turnover=0.30,
        max_new_assets=5,
        max_removed_assets=5,
        enabled=True,
    )

disabled() classmethod

Create a disabled membership policy (no restrictions).

Returns:

Type Description
MembershipPolicy

MembershipPolicy with enabled=False.

Example

policy = MembershipPolicy.disabled() policy.enabled False

Source code in src/portfolio_management/portfolio/membership.py
@classmethod
def disabled(cls) -> MembershipPolicy:
    """Create a disabled membership policy (no restrictions).

    Returns:
        MembershipPolicy with enabled=False.

    Example:
        >>> policy = MembershipPolicy.disabled()
        >>> policy.enabled
        False

    """
    return cls(enabled=False)

Portfolio dataclass

Represents a constructed portfolio with weights and metadata.

Attributes:

Name Type Description
weights Series

Series mapping ticker symbols to portfolio weights

strategy str

Name of the strategy used to construct the portfolio

timestamp Timestamp

When the portfolio was constructed

metadata dict[str, object] | None

Optional dict with strategy-specific information

Source code in src/portfolio_management/portfolio/models.py
@dataclass(frozen=True)
class Portfolio:
    """Represents a constructed portfolio with weights and metadata.

    Attributes:
        weights: Series mapping ticker symbols to portfolio weights
        strategy: Name of the strategy used to construct the portfolio
        timestamp: When the portfolio was constructed
        metadata: Optional dict with strategy-specific information

    """

    weights: pd.Series
    strategy: str
    timestamp: pd.Timestamp = field(default_factory=pd.Timestamp.now)
    metadata: dict[str, object] | None = None

    def __post_init__(self) -> None:
        """Validate portfolio construction."""
        # Defensive runtime check (MyPy proves this is type-safe)
        if not isinstance(self.weights, pd.Series):
            raise DataValidationError("weights must be a pandas Series")

        if len(self.weights) == 0:
            raise DataValidationError("Portfolio must contain at least one asset")

        if (self.weights < 0).any():
            raise ConstraintViolationError(
                "non-negative weights",
                value=self.weights.min(),
                limit=0.0,
            )

        total_weight = self.weights.sum()
        if not np.isclose(total_weight, 1.0, atol=1e-6):
            raise ConstraintViolationError(
                "weights sum to one",
                value=total_weight,
                limit=1.0,
            )

    def get_position_count(self) -> int:
        """Return the number of positions with non-zero weights."""
        return (self.weights > 0).sum()

    def get_top_holdings(self, n: int = 10) -> pd.Series:
        """Return the top N holdings by weight."""
        return self.weights.nlargest(n)

    def to_dict(self) -> dict[str, object]:
        """Convert portfolio to dictionary representation."""
        return {
            "weights": self.weights.to_dict(),
            "strategy": self.strategy,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata,
        }

get_position_count()

Return the number of positions with non-zero weights.

Source code in src/portfolio_management/portfolio/models.py
def get_position_count(self) -> int:
    """Return the number of positions with non-zero weights."""
    return (self.weights > 0).sum()

get_top_holdings(n=10)

Return the top N holdings by weight.

Source code in src/portfolio_management/portfolio/models.py
def get_top_holdings(self, n: int = 10) -> pd.Series:
    """Return the top N holdings by weight."""
    return self.weights.nlargest(n)

to_dict()

Convert portfolio to dictionary representation.

Source code in src/portfolio_management/portfolio/models.py
def to_dict(self) -> dict[str, object]:
    """Convert portfolio to dictionary representation."""
    return {
        "weights": self.weights.to_dict(),
        "strategy": self.strategy,
        "timestamp": self.timestamp.isoformat(),
        "metadata": self.metadata,
    }

StrategyType

Bases: str, Enum

Supported portfolio construction strategies.

Source code in src/portfolio_management/portfolio/models.py
class StrategyType(str, Enum):
    """Supported portfolio construction strategies."""

    EQUAL_WEIGHT = "equal_weight"
    RISK_PARITY = "risk_parity"
    MEAN_VARIANCE = "mean_variance"

Preselection

Factor-based asset preselection engine.

Computes momentum and/or low-volatility factors from historical returns and selects top-K assets deterministically without lookahead bias.

Supports optional caching to avoid recomputing factor scores across runs.

Source code in src/portfolio_management/portfolio/preselection.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
class Preselection:
    """Factor-based asset preselection engine.

    Computes momentum and/or low-volatility factors from historical returns
    and selects top-K assets deterministically without lookahead bias.

    Supports optional caching to avoid recomputing factor scores across runs.
    """

    def __init__(self, config: PreselectionConfig, cache: Any | None = None) -> None:
        """Initialize preselection engine.

        Args:
            config: Preselection configuration
            cache: Optional FactorCache instance for caching factor scores

        """
        self.config = config
        self.cache = cache
        self._validate_config()

    def _validate_config(self) -> None:
        """Validate configuration parameters.

        Raises:
            ConfigurationError: If any parameter is invalid.

        """
        if self.config.top_k is not None and self.config.top_k < 0:
            raise ConfigurationError(
                None,
                f"top_k must be >= 0, got {self.config.top_k}",
            )

        if self.config.top_k is not None and 0 < self.config.top_k < 10:
            warnings.warn(
                f"top_k={self.config.top_k} is very small (<10 assets). "
                "This may lead to under-diversification.",
                UserWarning,
                stacklevel=3,
            )

        if self.config.lookback < 1:
            raise ConfigurationError(
                None,
                f"lookback must be >= 1, got {self.config.lookback}",
            )

        if self.config.lookback < 63:
            warnings.warn(
                f"lookback={self.config.lookback} is very short (<63 days). "
                "This may lead to noisy factor signals.",
                UserWarning,
                stacklevel=3,
            )

        if self.config.skip < 0:
            raise ConfigurationError(None, f"skip must be >= 0, got {self.config.skip}")

        if self.config.skip >= self.config.lookback:
            raise ConfigurationError(
                None,
                f"skip ({self.config.skip}) must be < lookback ({self.config.lookback})",
            )

        if self.config.min_periods < 1:
            raise ConfigurationError(
                None,
                f"min_periods must be >= 1, got {self.config.min_periods}",
            )

        if self.config.min_periods > self.config.lookback:
            raise ConfigurationError(
                None,
                f"min_periods ({self.config.min_periods}) must be <= lookback ({self.config.lookback})",
            )

        if self.config.method == PreselectionMethod.COMBINED:
            total_weight = self.config.momentum_weight + self.config.low_vol_weight
            if not np.isclose(total_weight, 1.0, atol=1e-6):
                raise ConfigurationError(
                    None,
                    f"Combined weights must sum to 1.0, got {total_weight}",
                )

    def select_assets(
        self,
        returns: pd.DataFrame,
        rebalance_date: datetime.date | None = None,
    ) -> list[str]:
        """Select top-K assets based on configured factors.

        Uses only data available up to (but not including) rebalance_date.
        If rebalance_date is None, uses all available data.

        Args:
            returns: DataFrame with returns (assets as columns, dates as index)
            rebalance_date: Date of rebalancing (uses data strictly before this)

        Returns:
            List of selected asset tickers (sorted alphabetically for determinism)

        Raises:
            ValueError: If returns DataFrame is invalid
            InsufficientDataError: If insufficient data for factor calculation

        Examples:
            >>> from datetime import date
            >>> import pandas as pd
            >>> import numpy as np
            >>> np.random.seed(42)
            >>> returns = pd.DataFrame({
            ...     'ASSET1': np.random.normal(0, 0.01, 60),
            ...     'ASSET2': np.random.normal(0, 0.02, 60),
            ...     'ASSET3': np.random.normal(0, 0.03, 60)
            ... }, index=pd.date_range(end='2022-12-30', periods=60))
            >>> config = PreselectionConfig(method=PreselectionMethod.MOMENTUM, top_k=2, min_periods=30)
            >>> preselect = Preselection(config)
            >>> selected = preselect.select_assets(returns, rebalance_date=date(2022, 12, 30))

        """
        # Validate returns DataFrame
        if returns is None or not isinstance(returns, pd.DataFrame) or returns.empty:
            raise DataValidationError("returns must be a non-empty pandas DataFrame")

        if len(returns.columns) == 0:
            raise DataValidationError("returns DataFrame has no asset columns")

        # Validate rebalance_date if provided
        if rebalance_date is not None:
            if not isinstance(rebalance_date, datetime.date):
                raise DataValidationError(
                    f"rebalance_date must be a datetime.date, got {type(rebalance_date).__name__}",
                )

            max_date = returns.index.max()
            if isinstance(max_date, pd.Timestamp):
                max_date = max_date.date()

            if rebalance_date > max_date:
                raise DataValidationError(
                    f"rebalance_date ({rebalance_date}) is after last available date ({max_date})",
                )

        # If no top_k or top_k <= 0, return all assets
        if self.config.top_k is None or self.config.top_k <= 0:
            logger.info(
                "Preselection disabled (top_k=%s), returning all %d assets",
                self.config.top_k,
                len(returns.columns),
            )
            return sorted(returns.columns.tolist())

        # Filter data up to rebalance date (no lookahead)
        if rebalance_date is not None:
            # Convert index to dates for comparison
            if isinstance(returns.index, pd.DatetimeIndex):
                date_mask = returns.index.date < rebalance_date
            else:
                # Assume index is already dates
                date_mask = returns.index < rebalance_date
            available_returns = returns.loc[date_mask]
        else:
            available_returns = returns

        # Check if we have enough data
        if len(available_returns) < self.config.min_periods:
            raise InsufficientDataError(
                required_periods=self.config.min_periods,
                available_periods=len(available_returns),
            )

        # Compute factor scores (with caching if enabled)
        scores = self._get_or_compute_scores(returns, available_returns, rebalance_date)

        # Handle edge case: all NaN scores
        if scores.isna().all():
            raise InsufficientDataError(
                required_periods=self.config.min_periods,
                available_periods=0,
            )

        # Select top-K assets
        return self._select_top_k(scores)

    def _get_or_compute_scores(
        self,
        full_returns: pd.DataFrame,
        available_returns: pd.DataFrame,
        rebalance_date: datetime.date | None,
    ) -> pd.Series:
        """Get factor scores from cache or compute them.

        Args:
            full_returns: Full returns matrix (for cache key)
            available_returns: Returns filtered to rebalance date
            rebalance_date: Rebalance date for cache key

        Returns:
            Series of factor scores

        """
        # Build cache config
        cache_config = {
            "method": self.config.method.value,
            "lookback": self.config.lookback,
            "skip": self.config.skip,
            "min_periods": self.config.min_periods,
            "momentum_weight": self.config.momentum_weight,
            "low_vol_weight": self.config.low_vol_weight,
        }

        # Determine date range for cache key
        start_date = str(available_returns.index[0])
        end_date = str(available_returns.index[-1])

        # Try to get from cache
        if self.cache is not None:
            cached_scores = self.cache.get_factor_scores(
                full_returns,
                cache_config,
                start_date,
                end_date,
            )
            if cached_scores is not None:
                return cast("pd.Series[Any]", cached_scores)

        # Compute scores
        if self.config.method == PreselectionMethod.MOMENTUM:
            scores = self._compute_momentum(available_returns)
        elif self.config.method == PreselectionMethod.LOW_VOL:
            scores = self._compute_low_volatility(available_returns)
        elif self.config.method == PreselectionMethod.COMBINED:
            scores = self._compute_combined(available_returns)
        else:
            raise ConfigurationError(
                None,
                f"Unknown preselection method: {self.config.method}",
            )

        # Cache the scores
        if self.cache is not None:
            self.cache.put_factor_scores(
                scores,
                full_returns,
                cache_config,
                start_date,
                end_date,
            )

        return scores

    def _compute_momentum(self, returns: pd.DataFrame) -> pd.Series:
        """Compute momentum factor (cumulative return with optional skip).

        Args:
            returns: Historical returns up to rebalance date

        Returns:
            Series of momentum scores (one per asset)

        """
        # Get lookback window
        lookback_start = max(0, len(returns) - self.config.lookback)
        lookback_returns = returns.iloc[lookback_start:]

        # Apply skip period (exclude most recent N periods)
        if self.config.skip > 0:
            lookback_returns = lookback_returns.iloc[: -self.config.skip]

        # Compute cumulative return for each asset
        # Using (1+r1)*(1+r2)*...*(1+rn) - 1
        # Note: prod() with skipna=False will propagate NaN properly
        cumulative = (1 + lookback_returns).prod(axis=0, skipna=False) - 1

        return cumulative

    def _compute_low_volatility(self, returns: pd.DataFrame) -> pd.Series:
        """Compute low-volatility factor (inverse of realized volatility).

        Higher scores = lower volatility = more attractive for low-vol strategy.

        Args:
            returns: Historical returns up to rebalance date

        Returns:
            Series of low-volatility scores (one per asset)

        """
        # Get lookback window
        lookback_start = max(0, len(returns) - self.config.lookback)
        lookback_returns = returns.iloc[lookback_start:]

        # Compute realized volatility (standard deviation)
        volatility = lookback_returns.std(axis=0)

        # Return inverse (higher = better)
        # Use small epsilon to avoid division by zero
        epsilon = 1e-8
        return 1.0 / (volatility + epsilon)

    def _compute_combined(self, returns: pd.DataFrame) -> pd.Series:
        """Compute combined factor score using weighted Z-scores.

        Args:
            returns: Historical returns up to rebalance date

        Returns:
            Series of combined scores (one per asset)

        """
        # Compute individual factors
        momentum = self._compute_momentum(returns)
        low_vol = self._compute_low_volatility(returns)

        # Normalize to Z-scores (mean=0, std=1)
        momentum_z = self._standardize(momentum)
        low_vol_z = self._standardize(low_vol)

        # Combine with weights
        combined = (
            self.config.momentum_weight * momentum_z
            + self.config.low_vol_weight * low_vol_z
        )

        return combined

    def _standardize(self, scores: pd.Series) -> pd.Series:
        """Standardize scores to Z-scores (mean=0, std=1).

        Handles all-NaN and zero-variance cases gracefully.

        Args:
            scores: Raw factor scores

        Returns:
            Standardized scores

        """
        # Drop NaN values for statistics
        valid_scores = scores.dropna()

        if len(valid_scores) == 0:
            # All NaN - return zeros
            return pd.Series(0.0, index=scores.index)

        mean = valid_scores.mean()
        std = valid_scores.std()

        # Handle zero variance (all values identical)
        if std < 1e-8:
            # Return zeros (all assets equally ranked)
            return pd.Series(0.0, index=scores.index)

        # Standardize
        z_scores = (scores - mean) / std

        # Replace any remaining NaN with 0 (neutral score)
        return z_scores.fillna(0.0)

    def _select_top_k(self, scores: pd.Series) -> list[str]:
        """Select top-K assets by score with deterministic tie-breaking.

        Args:
            scores: Factor scores for each asset

        Returns:
            List of selected asset tickers (sorted alphabetically)

        """
        # Drop NaN scores (assets with insufficient data)
        valid_scores = scores.dropna()

        if len(valid_scores) == 0:
            # No valid assets - return empty list (edge case handled)
            logger.warning(
                "No valid scores after filtering NaN values. "
                "Returning empty asset list.",
            )
            return []

        # Determine how many to select
        k = min(self.config.top_k or len(valid_scores), len(valid_scores))

        # Log if we have fewer assets than requested
        if len(valid_scores) < (self.config.top_k or 0):
            logger.debug(
                "Only %d valid assets available, less than requested top_k=%d. "
                "Returning all valid assets.",
                len(valid_scores),
                self.config.top_k or 0,
            )

        # Sort by score (descending) then by ticker (ascending) for determinism
        # This ensures ties are broken consistently
        sorted_scores = valid_scores.sort_values(ascending=False)

        # Handle ties at the cutoff point
        # Get all assets with scores >= the k-th highest score
        if k < len(sorted_scores):
            kth_score = sorted_scores.iloc[k - 1]
            # Select all assets with score >= kth_score
            candidates = sorted_scores[sorted_scores >= kth_score]
        else:
            candidates = sorted_scores

        # If we have more candidates than k (due to ties), break ties by symbol
        if len(candidates) > k:
            # Sort by score (desc) then symbol (asc) for deterministic tie-breaking
            candidates_df = pd.DataFrame(
                {"score": candidates, "symbol": candidates.index},
            )
            candidates_df = candidates_df.sort_values(
                by=["score", "symbol"],
                ascending=[False, True],
            )
            selected = candidates_df.head(k)["symbol"].tolist()
            logger.debug(
                "Broke ties at cutoff: %d candidates -> %d selected",
                len(candidates),
                k,
            )
        else:
            selected = candidates.index.tolist()

        # Return sorted alphabetically for consistent output
        return sorted(selected)

select_assets(returns, rebalance_date=None)

Select top-K assets based on configured factors.

Uses only data available up to (but not including) rebalance_date. If rebalance_date is None, uses all available data.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame with returns (assets as columns, dates as index)

required
rebalance_date date | None

Date of rebalancing (uses data strictly before this)

None

Returns:

Type Description
list[str]

List of selected asset tickers (sorted alphabetically for determinism)

Raises:

Type Description
ValueError

If returns DataFrame is invalid

InsufficientDataError

If insufficient data for factor calculation

Examples:

>>> from datetime import date
>>> import pandas as pd
>>> import numpy as np
>>> np.random.seed(42)
>>> returns = pd.DataFrame({
...     'ASSET1': np.random.normal(0, 0.01, 60),
...     'ASSET2': np.random.normal(0, 0.02, 60),
...     'ASSET3': np.random.normal(0, 0.03, 60)
... }, index=pd.date_range(end='2022-12-30', periods=60))
>>> config = PreselectionConfig(method=PreselectionMethod.MOMENTUM, top_k=2, min_periods=30)
>>> preselect = Preselection(config)
>>> selected = preselect.select_assets(returns, rebalance_date=date(2022, 12, 30))
Source code in src/portfolio_management/portfolio/preselection.py
def select_assets(
    self,
    returns: pd.DataFrame,
    rebalance_date: datetime.date | None = None,
) -> list[str]:
    """Select top-K assets based on configured factors.

    Uses only data available up to (but not including) rebalance_date.
    If rebalance_date is None, uses all available data.

    Args:
        returns: DataFrame with returns (assets as columns, dates as index)
        rebalance_date: Date of rebalancing (uses data strictly before this)

    Returns:
        List of selected asset tickers (sorted alphabetically for determinism)

    Raises:
        ValueError: If returns DataFrame is invalid
        InsufficientDataError: If insufficient data for factor calculation

    Examples:
        >>> from datetime import date
        >>> import pandas as pd
        >>> import numpy as np
        >>> np.random.seed(42)
        >>> returns = pd.DataFrame({
        ...     'ASSET1': np.random.normal(0, 0.01, 60),
        ...     'ASSET2': np.random.normal(0, 0.02, 60),
        ...     'ASSET3': np.random.normal(0, 0.03, 60)
        ... }, index=pd.date_range(end='2022-12-30', periods=60))
        >>> config = PreselectionConfig(method=PreselectionMethod.MOMENTUM, top_k=2, min_periods=30)
        >>> preselect = Preselection(config)
        >>> selected = preselect.select_assets(returns, rebalance_date=date(2022, 12, 30))

    """
    # Validate returns DataFrame
    if returns is None or not isinstance(returns, pd.DataFrame) or returns.empty:
        raise DataValidationError("returns must be a non-empty pandas DataFrame")

    if len(returns.columns) == 0:
        raise DataValidationError("returns DataFrame has no asset columns")

    # Validate rebalance_date if provided
    if rebalance_date is not None:
        if not isinstance(rebalance_date, datetime.date):
            raise DataValidationError(
                f"rebalance_date must be a datetime.date, got {type(rebalance_date).__name__}",
            )

        max_date = returns.index.max()
        if isinstance(max_date, pd.Timestamp):
            max_date = max_date.date()

        if rebalance_date > max_date:
            raise DataValidationError(
                f"rebalance_date ({rebalance_date}) is after last available date ({max_date})",
            )

    # If no top_k or top_k <= 0, return all assets
    if self.config.top_k is None or self.config.top_k <= 0:
        logger.info(
            "Preselection disabled (top_k=%s), returning all %d assets",
            self.config.top_k,
            len(returns.columns),
        )
        return sorted(returns.columns.tolist())

    # Filter data up to rebalance date (no lookahead)
    if rebalance_date is not None:
        # Convert index to dates for comparison
        if isinstance(returns.index, pd.DatetimeIndex):
            date_mask = returns.index.date < rebalance_date
        else:
            # Assume index is already dates
            date_mask = returns.index < rebalance_date
        available_returns = returns.loc[date_mask]
    else:
        available_returns = returns

    # Check if we have enough data
    if len(available_returns) < self.config.min_periods:
        raise InsufficientDataError(
            required_periods=self.config.min_periods,
            available_periods=len(available_returns),
        )

    # Compute factor scores (with caching if enabled)
    scores = self._get_or_compute_scores(returns, available_returns, rebalance_date)

    # Handle edge case: all NaN scores
    if scores.isna().all():
        raise InsufficientDataError(
            required_periods=self.config.min_periods,
            available_periods=0,
        )

    # Select top-K assets
    return self._select_top_k(scores)

PreselectionConfig dataclass

Configuration for asset preselection.

Attributes:

Name Type Description
method PreselectionMethod

Preselection method to use

top_k int | None

Number of assets to select (if None or 0, no preselection)

lookback int

Number of periods to look back for factor calculation

skip int

Number of most recent periods to skip (for momentum)

momentum_weight float

Weight for momentum factor (when using combined)

low_vol_weight float

Weight for low-volatility factor (when using combined)

min_periods int

Minimum number of periods required for valid calculation

Source code in src/portfolio_management/portfolio/preselection.py
@dataclass
class PreselectionConfig:
    """Configuration for asset preselection.

    Attributes:
        method: Preselection method to use
        top_k: Number of assets to select (if None or 0, no preselection)
        lookback: Number of periods to look back for factor calculation
        skip: Number of most recent periods to skip (for momentum)
        momentum_weight: Weight for momentum factor (when using combined)
        low_vol_weight: Weight for low-volatility factor (when using combined)
        min_periods: Minimum number of periods required for valid calculation

    """

    method: PreselectionMethod = PreselectionMethod.MOMENTUM
    top_k: int | None = None
    lookback: int = 252  # ~1 year of daily data
    skip: int = 1  # Skip most recent day (common in momentum strategies)
    momentum_weight: float = 0.5
    low_vol_weight: float = 0.5
    min_periods: int = 60  # Minimum data required

PreselectionMethod

Bases: Enum

Available preselection methods.

Source code in src/portfolio_management/portfolio/preselection.py
class PreselectionMethod(Enum):
    """Available preselection methods."""

    MOMENTUM = "momentum"
    LOW_VOL = "low_vol"
    COMBINED = "combined"

RebalanceConfig dataclass

Specifies the rules and costs for portfolio rebalancing.

This data class defines the parameters that govern when and how a portfolio should be rebalanced. It supports both calendar-based (frequency) and drift-based (tolerance bands) rebalancing triggers.

Attributes:

Name Type Description
frequency int

The calendar-based rebalance frequency in days (e.g., 30 for monthly, 90 for quarterly).

tolerance_bands float

The maximum allowed drift for a position's weight (as a percentage of target weight) before triggering a rebalance. For example, 0.20 means a 20% drift is allowed.

min_trade_size float

The minimum trade size as a fraction of the total portfolio value. Trades smaller than this will be suppressed to avoid incurring excessive transaction costs for minor adjustments.

cost_per_trade float

The estimated transaction cost as a percentage of the trade value (e.g., 0.001 for 10 basis points).

Configuration Example (YAML):

rebalancing:
  frequency: 90  # Quarterly rebalance
  tolerance_bands: 0.15  # 15% drift tolerance
  min_trade_size: 0.005  # 0.5% of portfolio
  cost_per_trade: 0.0005 # 5 bps

Source code in src/portfolio_management/portfolio/rebalancing/config.py
@dataclass(frozen=True)
class RebalanceConfig:
    """Specifies the rules and costs for portfolio rebalancing.

    This data class defines the parameters that govern when and how a portfolio
    should be rebalanced. It supports both calendar-based (frequency) and
    drift-based (tolerance bands) rebalancing triggers.

    Attributes:
        frequency (int): The calendar-based rebalance frequency in days (e.g.,
            30 for monthly, 90 for quarterly).
        tolerance_bands (float): The maximum allowed drift for a position's weight
            (as a percentage of target weight) before triggering a rebalance.
            For example, 0.20 means a 20% drift is allowed.
        min_trade_size (float): The minimum trade size as a fraction of the total
            portfolio value. Trades smaller than this will be suppressed to avoid
            incurring excessive transaction costs for minor adjustments.
        cost_per_trade (float): The estimated transaction cost as a percentage of
            the trade value (e.g., 0.001 for 10 basis points).

    Configuration Example (YAML):
        ```yaml
        rebalancing:
          frequency: 90  # Quarterly rebalance
          tolerance_bands: 0.15  # 15% drift tolerance
          min_trade_size: 0.005  # 0.5% of portfolio
          cost_per_trade: 0.0005 # 5 bps
        ```

    """

    frequency: int = 30  # Monthly default
    tolerance_bands: float = 0.20
    min_trade_size: float = 0.01
    cost_per_trade: float = 0.001

    def __post_init__(self) -> None:
        """Validate rebalance parameters."""
        if self.frequency < 1:
            raise ConfigurationError(None, f"Invalid frequency: {self.frequency}")

        if not 0.0 <= self.tolerance_bands <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid tolerance_bands: {self.tolerance_bands}",
            )

        if not 0.0 <= self.min_trade_size <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid min_trade_size: {self.min_trade_size}",
            )

        if not 0.0 <= self.cost_per_trade <= 1.0:
            raise ConfigurationError(
                None,
                f"Invalid cost_per_trade: {self.cost_per_trade}",
            )

StatisticsCache

Caches covariance matrices and expected returns.

This class maintains cached covariance matrices and expected returns that can be incrementally updated when new data is added, significantly improving performance for large universes with overlapping data windows (e.g., monthly rebalances).

The cache is automatically invalidated when: - The asset set changes (different tickers) - The lookback window changes - The data window shifts beyond the cache validity

Attributes:

Name Type Description
window_size

Number of periods for the rolling window (default: 252)

annualization_factor

Factor to annualize statistics (default: 252)

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
class StatisticsCache:
    """Caches covariance matrices and expected returns.

    This class maintains cached covariance matrices and expected returns that can be
    incrementally updated when new data is added, significantly improving performance
    for large universes with overlapping data windows (e.g., monthly rebalances).

    The cache is automatically invalidated when:
    - The asset set changes (different tickers)
    - The lookback window changes
    - The data window shifts beyond the cache validity

    Attributes:
        window_size: Number of periods for the rolling window (default: 252)
        annualization_factor: Factor to annualize statistics (default: 252)

    """

    def __init__(
        self,
        window_size: int = 252,
        annualization_factor: int = 252,
    ) -> None:
        """Initialize rolling statistics calculator.

        Args:
            window_size: Number of periods for rolling window
            annualization_factor: Factor to annualize returns (e.g., 252 for daily data)

        """
        self.window_size = window_size
        self.annualization_factor = annualization_factor

        # Cache state
        self._cached_data: pd.DataFrame | None = None
        self._cached_cov: pd.DataFrame | None = None
        self._cached_mean: pd.Series | None = None
        self._cache_key: str | None = None
        self._asset_columns: pd.Index | None = None
        self._sum_vector: np.ndarray | None = None
        self._cross_prod_matrix: np.ndarray | None = None
        self._count: int = 0

    def get_covariance_matrix(
        self,
        returns: pd.DataFrame,
        annualize: bool = True,
    ) -> pd.DataFrame:
        """Compute or retrieve cached covariance matrix.

        Args:
            returns: DataFrame of returns (dates as index, tickers as columns)
            annualize: Whether to annualize the covariance matrix

        Returns:
            Covariance matrix as DataFrame

        """
        _, cov_matrix = self._retrieve_statistics(returns)

        if annualize:
            return cov_matrix * self.annualization_factor
        return cov_matrix

    def get_expected_returns(
        self,
        returns: pd.DataFrame,
        annualize: bool = True,
    ) -> pd.Series:
        """Compute or retrieve cached expected returns.

        Args:
            returns: DataFrame of returns (dates as index, tickers as columns)
            annualize: Whether to annualize the expected returns

        Returns:
            Expected returns as Series

        """
        mean_returns, _ = self._retrieve_statistics(returns)

        if annualize:
            return mean_returns * self.annualization_factor
        return mean_returns

    def get_statistics(
        self,
        returns: pd.DataFrame,
        annualize: bool = True,
    ) -> tuple[pd.Series, pd.DataFrame]:
        """Compute or retrieve both expected returns and covariance matrix.

        This is more efficient than calling get_expected_returns and
        get_covariance_matrix separately as it computes both in one pass.

        Args:
            returns: DataFrame of returns (dates as index, tickers as columns)
            annualize: Whether to annualize the statistics

        Returns:
            Tuple of (expected_returns, covariance_matrix)

        """
        mean_returns, cov_matrix = self._retrieve_statistics(returns)

        if annualize:
            return (
                mean_returns * self.annualization_factor,
                cov_matrix * self.annualization_factor,
            )
        return mean_returns, cov_matrix

    def clear_cache(self) -> None:
        """Clear all cached statistics.

        Primarily for testing to ensure test isolation.
        """
        self._cached_data = None
        self._cached_cov = None
        self._cached_mean = None
        self._cache_key = None
        self._asset_columns = None
        self._sum_vector = None
        self._cross_prod_matrix = None
        self._count = 0

    def get_cache_stats(self) -> dict[str, int]:
        """Get cache statistics.

        Returns:
            Dictionary with covariance_entries and returns_entries.

        """
        return {
            "covariance_entries": 1 if self._cached_cov is not None else 0,
            "returns_entries": 1 if self._cached_mean is not None else 0,
        }

    def _retrieve_statistics(
        self,
        returns: pd.DataFrame,
    ) -> tuple[pd.Series, pd.DataFrame]:
        """Return statistics from cache or recompute them."""
        cache_key = self._compute_cache_key(returns)

        if self._can_incrementally_update(cache_key, returns):
            return self._update_incrementally(returns, cache_key)

        return self._recompute_statistics(returns, cache_key)

    def _compute_cache_key(self, returns: pd.DataFrame) -> str:
        """Compute a cache key based on data characteristics.

        Args:
            returns: DataFrame to compute key for

        Returns:
            Cache key string

        """
        # Include stable characteristics in cache key to preserve reuse across
        # overlapping windows.
        key_components = [
            str(sorted(returns.columns.tolist())),
            str(self.window_size),
        ]
        key_string = "|".join(key_components)
        return hashlib.md5(key_string.encode(), usedforsecurity=False).hexdigest()

    def _can_incrementally_update(self, cache_key: str, returns: pd.DataFrame) -> bool:
        """Determine whether cached state can service the new data."""
        if (
            self._cache_key is None
            or self._cached_data is None
            or self._asset_columns is None
            or self._sum_vector is None
            or self._cross_prod_matrix is None
        ):
            return False

        if cache_key != self._cache_key:
            return False

        if not self._asset_columns.equals(returns.columns):
            return False

        if returns.empty:
            # Allow incremental update so we preserve cached state when
            # consumers temporarily supply empty frames.
            return True

        if returns.isna().any().any() or self._cached_data.isna().any().any():
            # Pandas cov/mean handle NaNs with pairwise deletion. The incremental
            # update path assumes dense data, so fall back to a full recompute.
            return False

        overlap = self._cached_data.index.intersection(returns.index, sort=False)
        if overlap.empty:
            return False

        cached_overlap = self._cached_data.loc[overlap].to_numpy()
        new_overlap = returns.loc[overlap].to_numpy()

        return np.allclose(
            cached_overlap,
            new_overlap,
            rtol=1e-9,
            atol=1e-12,
        )

    def _recompute_statistics(
        self,
        returns: pd.DataFrame,
        cache_key: str,
    ) -> tuple[pd.Series, pd.DataFrame]:
        """Recompute statistics from scratch and refresh the cache."""
        self._cached_data = returns.copy()
        self._cache_key = cache_key
        self._asset_columns = returns.columns.copy()

        values = returns.to_numpy(dtype=float, copy=True)
        self._count = len(returns)

        if self._count == 0:
            self._sum_vector = np.zeros(len(self._asset_columns), dtype=float)
            self._cross_prod_matrix = np.zeros(
                (len(self._asset_columns), len(self._asset_columns)),
                dtype=float,
            )
            mean_returns = pd.Series(np.nan, index=self._asset_columns, dtype=float)
            cov_matrix = pd.DataFrame(
                np.nan,
                index=self._asset_columns,
                columns=self._asset_columns,
            )
        else:
            self._sum_vector = values.sum(axis=0)
            self._cross_prod_matrix = values.T @ values

            mean_vector = self._sum_vector / self._count
            mean_returns = pd.Series(mean_vector, index=self._asset_columns)

            if self._count <= 1:
                cov_values = np.full(
                    (len(self._asset_columns), len(self._asset_columns)),
                    np.nan,
                    dtype=float,
                )
            else:
                centered = self._cross_prod_matrix - self._count * np.outer(
                    mean_vector,
                    mean_vector,
                )
                cov_values = centered / (self._count - 1)
                # Numerical noise may introduce asymmetry; enforce symmetry.
                cov_values = (cov_values + cov_values.T) / 2

            cov_matrix = pd.DataFrame(
                cov_values,
                index=self._asset_columns,
                columns=self._asset_columns,
            )

        self._cached_mean = mean_returns
        self._cached_cov = cov_matrix

        return mean_returns, cov_matrix

    def _update_incrementally(
        self,
        returns: pd.DataFrame,
        cache_key: str,
    ) -> tuple[pd.Series, pd.DataFrame]:
        """Update cached statistics for a partially overlapping window."""
        assert self._cached_data is not None  # For type checkers
        assert self._asset_columns is not None
        assert self._sum_vector is not None
        assert self._cross_prod_matrix is not None

        overlap = self._cached_data.index.intersection(returns.index, sort=False)
        overlap_set = set(overlap)

        rows_to_remove = [
            idx for idx in self._cached_data.index if idx not in overlap_set
        ]
        rows_to_add = [idx for idx in returns.index if idx not in overlap_set]

        # Remove rows that fell out of the window
        for idx in rows_to_remove:
            row = self._cached_data.loc[idx].to_numpy(dtype=float)
            self._sum_vector -= row
            self._cross_prod_matrix -= np.outer(row, row)
            self._count -= 1

        # Add new rows that entered the window
        for idx in rows_to_add:
            row = returns.loc[idx].to_numpy(dtype=float)
            self._sum_vector += row
            self._cross_prod_matrix += np.outer(row, row)
            self._count += 1

        self._cached_data = returns.copy()
        self._cache_key = cache_key

        asset_count = len(self._asset_columns)
        if self._count == 0:
            self._sum_vector = np.zeros(asset_count, dtype=float)
            self._cross_prod_matrix = np.zeros((asset_count, asset_count), dtype=float)

        if self._count == 0:
            mean_vector = np.full(asset_count, np.nan, dtype=float)
            cov_values = np.full((asset_count, asset_count), np.nan, dtype=float)
        else:
            mean_vector = self._sum_vector / self._count
            if self._count <= 1:
                cov_values = np.full((asset_count, asset_count), np.nan, dtype=float)
            else:
                centered = self._cross_prod_matrix - self._count * np.outer(
                    mean_vector,
                    mean_vector,
                )
                cov_values = centered / (self._count - 1)
                cov_values = (cov_values + cov_values.T) / 2

        mean_returns = pd.Series(mean_vector, index=self._asset_columns)
        cov_matrix = pd.DataFrame(
            cov_values,
            index=self._asset_columns,
            columns=self._asset_columns,
        )

        self._cached_mean = mean_returns
        self._cached_cov = cov_matrix

        # Ensure the cache count matches the new window length for correctness.

        return mean_returns, cov_matrix

get_covariance_matrix(returns, annualize=True)

Compute or retrieve cached covariance matrix.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame of returns (dates as index, tickers as columns)

required
annualize bool

Whether to annualize the covariance matrix

True

Returns:

Type Description
DataFrame

Covariance matrix as DataFrame

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
def get_covariance_matrix(
    self,
    returns: pd.DataFrame,
    annualize: bool = True,
) -> pd.DataFrame:
    """Compute or retrieve cached covariance matrix.

    Args:
        returns: DataFrame of returns (dates as index, tickers as columns)
        annualize: Whether to annualize the covariance matrix

    Returns:
        Covariance matrix as DataFrame

    """
    _, cov_matrix = self._retrieve_statistics(returns)

    if annualize:
        return cov_matrix * self.annualization_factor
    return cov_matrix

get_expected_returns(returns, annualize=True)

Compute or retrieve cached expected returns.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame of returns (dates as index, tickers as columns)

required
annualize bool

Whether to annualize the expected returns

True

Returns:

Type Description
Series

Expected returns as Series

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
def get_expected_returns(
    self,
    returns: pd.DataFrame,
    annualize: bool = True,
) -> pd.Series:
    """Compute or retrieve cached expected returns.

    Args:
        returns: DataFrame of returns (dates as index, tickers as columns)
        annualize: Whether to annualize the expected returns

    Returns:
        Expected returns as Series

    """
    mean_returns, _ = self._retrieve_statistics(returns)

    if annualize:
        return mean_returns * self.annualization_factor
    return mean_returns

get_statistics(returns, annualize=True)

Compute or retrieve both expected returns and covariance matrix.

This is more efficient than calling get_expected_returns and get_covariance_matrix separately as it computes both in one pass.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame of returns (dates as index, tickers as columns)

required
annualize bool

Whether to annualize the statistics

True

Returns:

Type Description
tuple[Series, DataFrame]

Tuple of (expected_returns, covariance_matrix)

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
def get_statistics(
    self,
    returns: pd.DataFrame,
    annualize: bool = True,
) -> tuple[pd.Series, pd.DataFrame]:
    """Compute or retrieve both expected returns and covariance matrix.

    This is more efficient than calling get_expected_returns and
    get_covariance_matrix separately as it computes both in one pass.

    Args:
        returns: DataFrame of returns (dates as index, tickers as columns)
        annualize: Whether to annualize the statistics

    Returns:
        Tuple of (expected_returns, covariance_matrix)

    """
    mean_returns, cov_matrix = self._retrieve_statistics(returns)

    if annualize:
        return (
            mean_returns * self.annualization_factor,
            cov_matrix * self.annualization_factor,
        )
    return mean_returns, cov_matrix

clear_cache()

Clear all cached statistics.

Primarily for testing to ensure test isolation.

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
def clear_cache(self) -> None:
    """Clear all cached statistics.

    Primarily for testing to ensure test isolation.
    """
    self._cached_data = None
    self._cached_cov = None
    self._cached_mean = None
    self._cache_key = None
    self._asset_columns = None
    self._sum_vector = None
    self._cross_prod_matrix = None
    self._count = 0

get_cache_stats()

Get cache statistics.

Returns:

Type Description
dict[str, int]

Dictionary with covariance_entries and returns_entries.

Source code in src/portfolio_management/portfolio/statistics/rolling_statistics.py
def get_cache_stats(self) -> dict[str, int]:
    """Get cache statistics.

    Returns:
        Dictionary with covariance_entries and returns_entries.

    """
    return {
        "covariance_entries": 1 if self._cached_cov is not None else 0,
        "returns_entries": 1 if self._cached_mean is not None else 0,
    }

EqualWeightStrategy

Bases: PortfolioStrategy

Implements the equal-weight (1/N) portfolio construction strategy.

This strategy assigns an equal weight to every asset in the investment universe. It is a simple, transparent, and computationally inexpensive approach that serves as a common benchmark.

The main assumption is that there is no information available to suggest that any single asset will outperform another.

Mathematical Formulation

Given N assets in the portfolio, the weight for each asset i is: wᵢ = 1 / N

This strategy does not perform any optimization and only considers the number of available assets. It will, however, validate the resulting portfolio against basic constraints (e.g., max_weight).

Example

import pandas as pd from portfolio_management.portfolio.strategies import EqualWeightStrategy from portfolio_management.portfolio.constraints import PortfolioConstraints

returns = pd.DataFrame({ ... 'ASSET_A': [0.01, 0.02], ... 'ASSET_B': [0.03, -0.01], ... 'ASSET_C': [0.02, 0.01], ... 'ASSET_D': [-0.01, 0.01], ... })

strategy = EqualWeightStrategy() constraints = PortfolioConstraints(max_weight=0.3) portfolio = strategy.construct(returns, constraints)

print(portfolio.weights) ASSET_A 0.25 ASSET_B 0.25 ASSET_C 0.25 ASSET_D 0.25 dtype: float64

Source code in src/portfolio_management/portfolio/strategies/equal_weight.py
class EqualWeightStrategy(PortfolioStrategy):
    """Implements the equal-weight (1/N) portfolio construction strategy.

    This strategy assigns an equal weight to every asset in the investment
    universe. It is a simple, transparent, and computationally inexpensive
    approach that serves as a common benchmark.

    The main assumption is that there is no information available to suggest
    that any single asset will outperform another.

    Mathematical Formulation:
        Given N assets in the portfolio, the weight for each asset i is:
        wᵢ = 1 / N

    This strategy does not perform any optimization and only considers the number
    of available assets. It will, however, validate the resulting portfolio
    against basic constraints (e.g., `max_weight`).

    Example:
        >>> import pandas as pd
        >>> from portfolio_management.portfolio.strategies import EqualWeightStrategy
        >>> from portfolio_management.portfolio.constraints import PortfolioConstraints
        >>>
        >>> returns = pd.DataFrame({
        ...     'ASSET_A': [0.01, 0.02],
        ...     'ASSET_B': [0.03, -0.01],
        ...     'ASSET_C': [0.02, 0.01],
        ...     'ASSET_D': [-0.01, 0.01],
        ... })
        >>>
        >>> strategy = EqualWeightStrategy()
        >>> constraints = PortfolioConstraints(max_weight=0.3)
        >>> portfolio = strategy.construct(returns, constraints)
        >>>
        >>> print(portfolio.weights)
        ASSET_A    0.25
        ASSET_B    0.25
        ASSET_C    0.25
        ASSET_D    0.25
        dtype: float64

    """

    @property
    def name(self) -> str:
        """Return the strategy name."""
        return "equal_weight"

    @property
    def min_history_periods(self) -> int:
        """Return minimum number of return periods required."""
        return 1  # Only need to know which assets exist

    def construct(
        self,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None = None,
    ) -> Portfolio:
        """Construct an equal-weight portfolio.

        Args:
            returns: DataFrame with returns (assets as columns, dates as index)
            constraints: Portfolio constraints to enforce
            asset_classes: Optional Series mapping tickers to asset classes

        Returns:
            Portfolio with equal weights, adjusted for constraints

        Raises:
            InsufficientDataError: If returns DataFrame is empty
            ConstraintViolationError: If equal weighting violates constraints

        """
        # Validate inputs
        if returns.empty:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=0,
            )

        if len(returns) < self.min_history_periods:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=len(returns),
            )

        # Calculate equal weights
        n_assets = len(returns.columns)
        equal_weight = 1.0 / n_assets

        # Check if equal weight violates max_weight constraint
        if equal_weight > constraints.max_weight:
            raise ConstraintViolationError(
                "max_weight",
                equal_weight,
                constraints.max_weight,
            )

        # Create weights Series
        weights = pd.Series(equal_weight, index=returns.columns)

        # Validate asset class constraints if provided
        if asset_classes is not None:
            self._validate_asset_class_constraints(
                weights,
                asset_classes,
                constraints,
            )

        return Portfolio(
            weights=weights,
            strategy=self.name,
            metadata={
                "n_assets": n_assets,
                "equal_weight": equal_weight,
            },
        )

    def _validate_asset_class_constraints(
        self,
        weights: pd.Series,
        asset_classes: pd.Series,
        constraints: PortfolioConstraints,
    ) -> None:
        """Validate that weights satisfy asset class exposure constraints.

        Args:
            weights: Portfolio weights
            asset_classes: Asset class mappings
            constraints: Portfolio constraints

        Raises:
            ConstraintViolationError: If exposure constraints are violated

        """
        # Calculate equity exposure (assuming "equity" in asset class name)
        equity_mask = asset_classes.str.contains("equity", case=False, na=False)
        equity_tickers = asset_classes[equity_mask].index
        equity_exposure = weights[weights.index.isin(equity_tickers)].sum()

        if equity_exposure > constraints.max_equity_exposure:
            raise ConstraintViolationError(
                "max_equity_exposure",
                equity_exposure,
                constraints.max_equity_exposure,
            )

        # Calculate bond/cash exposure
        bond_mask = asset_classes.str.contains("bond|cash", case=False, na=False)
        bond_tickers = asset_classes[bond_mask].index
        bond_exposure = weights[weights.index.isin(bond_tickers)].sum()

        if bond_exposure < constraints.min_bond_exposure:
            raise ConstraintViolationError(
                "min_bond_exposure",
                bond_exposure,
                constraints.min_bond_exposure,
            )

name property

Return the strategy name.

min_history_periods property

Return minimum number of return periods required.

construct(returns, constraints, asset_classes=None)

Construct an equal-weight portfolio.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame with returns (assets as columns, dates as index)

required
constraints PortfolioConstraints

Portfolio constraints to enforce

required
asset_classes Series | None

Optional Series mapping tickers to asset classes

None

Returns:

Type Description
Portfolio

Portfolio with equal weights, adjusted for constraints

Raises:

Type Description
InsufficientDataError

If returns DataFrame is empty

ConstraintViolationError

If equal weighting violates constraints

Source code in src/portfolio_management/portfolio/strategies/equal_weight.py
def construct(
    self,
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Construct an equal-weight portfolio.

    Args:
        returns: DataFrame with returns (assets as columns, dates as index)
        constraints: Portfolio constraints to enforce
        asset_classes: Optional Series mapping tickers to asset classes

    Returns:
        Portfolio with equal weights, adjusted for constraints

    Raises:
        InsufficientDataError: If returns DataFrame is empty
        ConstraintViolationError: If equal weighting violates constraints

    """
    # Validate inputs
    if returns.empty:
        raise InsufficientDataError(
            required_periods=self.min_history_periods,
            available_periods=0,
        )

    if len(returns) < self.min_history_periods:
        raise InsufficientDataError(
            required_periods=self.min_history_periods,
            available_periods=len(returns),
        )

    # Calculate equal weights
    n_assets = len(returns.columns)
    equal_weight = 1.0 / n_assets

    # Check if equal weight violates max_weight constraint
    if equal_weight > constraints.max_weight:
        raise ConstraintViolationError(
            "max_weight",
            equal_weight,
            constraints.max_weight,
        )

    # Create weights Series
    weights = pd.Series(equal_weight, index=returns.columns)

    # Validate asset class constraints if provided
    if asset_classes is not None:
        self._validate_asset_class_constraints(
            weights,
            asset_classes,
            constraints,
        )

    return Portfolio(
        weights=weights,
        strategy=self.name,
        metadata={
            "n_assets": n_assets,
            "equal_weight": equal_weight,
        },
    )

MeanVarianceStrategy

Bases: PortfolioStrategy

Constructs a portfolio using mean-variance optimization (MVO).

This strategy leverages the PyPortfolioOpt library to find the optimal asset allocation that balances risk (variance) and return. It is a cornerstone of modern portfolio theory.

Mathematical Formulation

The core of MVO is a quadratic optimization problem. For an objective like 'max_sharpe', the optimizer solves:

maximize: (w.T * μ - r_f) / sqrt(w.T * Σ * w) subject to: Σw = 1 (or other constraints) w_i >= 0 (long-only)

where: - w: portfolio weights vector - μ: expected returns vector - Σ: covariance matrix of asset returns - r_f: risk-free rate

Supported Objectives
  • max_sharpe: Finds the tangency portfolio with the highest Sharpe ratio.
  • min_volatility: Finds the portfolio with the minimum possible risk.
  • efficient_risk: Finds the portfolio on the efficient frontier for a given target risk level.
Example

import pandas as pd from portfolio_management.portfolio.strategies import MeanVarianceStrategy from portfolio_management.portfolio.constraints import PortfolioConstraints

import numpy as np returns = pd.DataFrame({ ... 'STABLE_ASSET': np.random.normal(0.001, 0.01, 252), ... 'GROWTH_ASSET': np.random.normal(0.005, 0.05, 252), ... })

Find the portfolio that minimizes volatility

strategy = MeanVarianceStrategy(objective="min_volatility", min_periods=30) constraints = PortfolioConstraints(min_weight=0.1, max_weight=0.9) portfolio = strategy.construct(returns, constraints)

The exact weights will vary, but the stable asset should have a high weight

print(portfolio.weights['STABLE_ASSET'] > 0.5) True

Source code in src/portfolio_management/portfolio/strategies/mean_variance.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
class MeanVarianceStrategy(PortfolioStrategy):
    """Constructs a portfolio using mean-variance optimization (MVO).

    This strategy leverages the PyPortfolioOpt library to find the optimal asset
    allocation that balances risk (variance) and return. It is a cornerstone of
    modern portfolio theory.

    Mathematical Formulation:
        The core of MVO is a quadratic optimization problem. For an objective
        like 'max_sharpe', the optimizer solves:

        maximize: (w.T * μ - r_f) / sqrt(w.T * Σ * w)
        subject to:
            Σw = 1 (or other constraints)
            w_i >= 0 (long-only)

        where:
        - w: portfolio weights vector
        - μ: expected returns vector
        - Σ: covariance matrix of asset returns
        - r_f: risk-free rate

    Supported Objectives:
        - `max_sharpe`: Finds the tangency portfolio with the highest Sharpe ratio.
        - `min_volatility`: Finds the portfolio with the minimum possible risk.
        - `efficient_risk`: Finds the portfolio on the efficient frontier for a
          given target risk level.

    Example:
        >>> import pandas as pd
        >>> from portfolio_management.portfolio.strategies import MeanVarianceStrategy
        >>> from portfolio_management.portfolio.constraints import PortfolioConstraints
        >>>
        >>> import numpy as np
        >>> returns = pd.DataFrame({
        ...     'STABLE_ASSET': np.random.normal(0.001, 0.01, 252),
        ...     'GROWTH_ASSET': np.random.normal(0.005, 0.05, 252),
        ... })
        >>>
        >>> # Find the portfolio that minimizes volatility
        >>> strategy = MeanVarianceStrategy(objective="min_volatility", min_periods=30)
        >>> constraints = PortfolioConstraints(min_weight=0.1, max_weight=0.9)
        >>> portfolio = strategy.construct(returns, constraints)
        >>>
        >>> # The exact weights will vary, but the stable asset should have a high weight
        >>> print(portfolio.weights['STABLE_ASSET'] > 0.5)
        True

    """

    _VALID_OBJECTIVES: ClassVar[set[str]] = {
        "max_sharpe",
        "min_volatility",
        "efficient_risk",
    }

    def __init__(
        self,
        objective: str = "max_sharpe",
        risk_free_rate: float = 0.02,
        min_periods: int = 252,
        statistics_cache: RollingStatistics | None = None,
    ) -> None:
        """Initialise the strategy configuration.

        Args:
            objective: Optimization objective
            risk_free_rate: Risk-free rate for Sharpe ratio calculation
            min_periods: Minimum periods for estimation
            statistics_cache: Optional statistics cache to avoid redundant calculations

        """
        if objective not in self._VALID_OBJECTIVES:
            raise ConfigurationError(
                None,
                f"Invalid objective '{objective}'. Expected one of "
                f"{sorted(self._VALID_OBJECTIVES)}.",
            )

        self._objective = objective
        self._risk_free_rate = risk_free_rate
        self._min_periods = min_periods
        self._statistics_cache = statistics_cache
        self._cached_signature: (
            tuple[tuple[str, ...], tuple[pd.Timestamp, ...]] | None
        ) = None
        self._cached_weights: pd.Series | None = None
        self._cached_metadata: dict[str, float] | None = None

    @property
    def name(self) -> str:
        """Return the registered strategy name."""
        return f"mean_variance_{self._objective}"

    @property
    def min_history_periods(self) -> int:
        """Return the minimum number of periods needed for estimation."""
        return self._min_periods

    def construct(
        self,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None = None,
    ) -> Portfolio:
        """Construct a mean-variance optimised portfolio."""
        efficient_frontier_cls, expected_returns, risk_models, objective_functions = (
            self._load_backend()
        )

        self._validate_returns(returns)
        prepared_returns = self._prepare_returns(returns)
        self._validate_returns(prepared_returns)
        n_assets = prepared_returns.shape[1]

        signature = (
            tuple(prepared_returns.columns),
            tuple(prepared_returns.index),
        )
        if (
            self._cached_signature == signature
            and self._cached_weights is not None
            and self._cached_metadata is not None
        ):
            return Portfolio(
                weights=self._cached_weights.copy(),
                strategy=self.name,
                metadata={**self._cached_metadata},
            )

        if n_assets > LARGE_UNIVERSE_THRESHOLD:
            mu = prepared_returns.mean() * 252
            cov_matrix = prepared_returns.cov() * 252

            weights, performance = self._analytic_tangency_fallback(
                mu,
                cov_matrix,
                constraints,
            )
            RiskParityStrategy.validate_constraints(weights, constraints, asset_classes)
            return Portfolio(
                weights=weights,
                strategy=self.name,
                metadata={
                    "n_assets": int(weights.size),
                    **performance,
                    "objective": self._objective,
                    "method": "analytic_tangency_fallback",
                },
            )

        mu, cov_matrix = self._estimate_moments(
            prepared_returns,
            expected_returns,
            risk_models,
        )

        attempts: list[dict[str, Any]] = [
            {
                "cov": cov_matrix,
                "solver": None,
                "l2_gamma": None,
                "objective": self._objective,
            },
        ]

        if self._objective == "max_sharpe":
            reg_cov_array = (
                cov_matrix.to_numpy() + np.eye(len(cov_matrix), dtype=float) * 1e-4
            )
            regularised_cov = pd.DataFrame(
                reg_cov_array,
                index=cov_matrix.index,
                columns=cov_matrix.columns,
            )
            attempts.append(
                {
                    "cov": regularised_cov,
                    "solver": "ECOS",
                    "l2_gamma": 1e-3,
                    "objective": "max_sharpe",
                },
            )
            attempts.append(
                {
                    "cov": regularised_cov,
                    "solver": "ECOS",
                    "l2_gamma": 1e-3,
                    "objective": "min_volatility",
                },
            )

        final_weights: pd.Series | None = None
        final_ef: Any = None
        last_error: OptimizationError | None = None

        for attempt in attempts:
            try:
                candidate_ef = self._build_frontier(
                    efficient_frontier_cls,
                    mu,
                    attempt["cov"],
                    constraints,
                    asset_classes,
                )
                if attempt["l2_gamma"]:
                    # Import objective_functions only when needed
                    objective_functions = importlib.import_module(
                        "pypfopt.objective_functions",
                    )
                    candidate_ef.add_objective(
                        objective_functions.L2_reg,
                        gamma=attempt["l2_gamma"],
                    )
                if attempt["solver"]:
                    candidate_ef._solver = attempt["solver"]
                self._optimise_frontier(candidate_ef, objective=attempt["objective"])
                weights_candidate = self._extract_weights(candidate_ef)
                weight_sum = float(weights_candidate.sum())
                if weight_sum <= 0:
                    last_error = OptimizationError(
                        strategy_name=self.name,
                        message="Optimisation produced non-positive total weight.",
                    )
                    continue
                final_weights = weights_candidate / weight_sum
                final_ef = candidate_ef
                break
            except OptimizationError as error:
                last_error = error
                continue

        if final_weights is None or final_ef is None:
            raise (
                last_error
                if last_error
                else OptimizationError(
                    strategy_name=self.name,
                    message="Mean-variance optimisation failed for all fallback strategies.",
                )
            )

        weights = self._enforce_weight_bounds(final_weights, constraints)
        ef = final_ef
        try:
            RiskParityStrategy.validate_constraints(weights, constraints, asset_classes)
            performance = self._summarise_portfolio(ef)
        except ConstraintViolationError:
            fallback_weights = pd.Series(
                np.full(
                    len(prepared_returns.columns),
                    1.0 / len(prepared_returns.columns),
                ),
                index=prepared_returns.columns,
                dtype=float,
            )
            RiskParityStrategy.validate_constraints(
                fallback_weights,
                constraints,
                asset_classes,
            )
            cov_matrix = prepared_returns.cov() * 252
            mu_vector = prepared_returns.mean() * 252
            exp_ret = float(fallback_weights @ mu_vector)  # type: ignore[arg-type]
            vol = float(np.sqrt(fallback_weights @ cov_matrix @ fallback_weights))  # type: ignore[arg-type]
            sharpe = exp_ret / vol if vol > 0 else 0.0
            metadata: dict[str, Any] = {
                "n_assets": int(fallback_weights.size),
                "expected_return": exp_ret,
                "volatility": vol,
                "sharpe_ratio": sharpe,
                "objective": self._objective,
                "method": "fallback_equal_weight",
            }
            self._cached_signature = signature
            self._cached_weights = fallback_weights.copy()
            self._cached_metadata = metadata.copy()
            return Portfolio(
                weights=fallback_weights,
                strategy=self.name,
                metadata=metadata,
            )
        performance = self._summarise_portfolio(ef)

        metadata = {
            "n_assets": int(weights.size),
            **performance,
            "objective": self._objective,
        }
        self._cached_signature = signature
        self._cached_weights = weights.copy()
        self._cached_metadata = metadata.copy()
        return Portfolio(
            weights=weights,
            strategy=self.name,
            metadata=metadata,
        )

    def _load_backend(self) -> tuple[Any, Any, Any, Any]:
        try:
            module = importlib.import_module("pypfopt")
            expected_returns = importlib.import_module("pypfopt.expected_returns")
            risk_models = importlib.import_module("pypfopt.risk_models")
            try:
                objective_functions = importlib.import_module(
                    "pypfopt.objective_functions",
                )
            except ImportError:
                objective_functions = None
        except ImportError as err:
            raise DependencyNotInstalledError(
                "PyPortfolioOpt",
                context="for mean-variance optimisation",
            ) from err

        return (
            module.EfficientFrontier,
            expected_returns,
            risk_models,
            objective_functions,
        )

    def _prepare_returns(self, returns: pd.DataFrame) -> pd.DataFrame:
        """Replace invalid observations and drop assets without complete history."""
        sanitized = returns.replace([np.inf, -np.inf], np.nan)

        # Drop assets that have missing observations in the estimation window.
        valid_assets = sanitized.columns[sanitized.notna().all()]
        sanitized = sanitized[valid_assets]

        sanitized = sanitized.dropna(axis=0, how="any")
        if sanitized.empty:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=0,
            )
        return sanitized

    def _validate_returns(self, returns: pd.DataFrame) -> None:
        if returns.empty:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=0,
            )

        n_periods = len(returns)
        if n_periods < self.min_history_periods:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=n_periods,
            )

    def _estimate_moments(
        self,
        returns: ReturnFrame,
        expected_returns: Any,
        risk_models: Any,
    ) -> tuple[pd.Series, pd.DataFrame]:
        # Use cached statistics if available
        if self._statistics_cache is not None:
            # Populate cache metadata for consistency without relying on it
            # to drive the optimisation path.
            self._statistics_cache.get_statistics(returns, annualize=False)

        # Original implementation
        mu = expected_returns.mean_historical_return(returns, frequency=252)
        if hasattr(risk_models, "CovarianceShrinkage"):
            try:
                shrinker = risk_models.CovarianceShrinkage(
                    returns,
                    frequency=252,
                    returns_data=True,
                )
                cov_matrix = shrinker.ledoit_wolf()
            except (
                ModuleNotFoundError,
                AttributeError,
                ImportError,
                ValueError,
                np.linalg.LinAlgError,
            ):
                cov_matrix = self._fallback_covariance(returns, risk_models)
        else:
            cov_matrix = self._fallback_covariance(returns, risk_models)

        # Ensure covariance matrix is positive semi-definite to keep the solver stable.
        cov_array = cov_matrix.to_numpy()
        eigvals = np.linalg.eigvalsh(cov_array)
        if np.any(eigvals < 0):
            adjustment = np.eye(len(cov_matrix), dtype=float) * (
                abs(eigvals.min()) + 1e-6
            )
            cov_array = cov_array + adjustment
        # Add a small jitter to improve conditioning even when matrix is PSD.
        cov_array = cov_array + np.eye(len(cov_matrix), dtype=float) * 1e-6
        cov_matrix = pd.DataFrame(
            cov_array,
            index=cov_matrix.index,
            columns=cov_matrix.columns,
        )

        return mu, cov_matrix

    def _fallback_covariance(
        self,
        returns: ReturnFrame,
        risk_models: Any,
    ) -> pd.DataFrame:
        """Compute a regularised covariance matrix without optional dependencies."""
        cov_matrix = risk_models.sample_cov(returns, frequency=252)
        base = cov_matrix.to_numpy()
        diag = np.diag(np.diag(base))
        shrinkage_intensity = 0.05
        shrunk = (1 - shrinkage_intensity) * base + shrinkage_intensity * diag
        return pd.DataFrame(
            shrunk,
            index=cov_matrix.index,
            columns=cov_matrix.columns,
        )

    def _analytic_tangency_fallback(
        self,
        mu: pd.Series,
        cov_matrix: pd.DataFrame,
        constraints: PortfolioConstraints,
    ) -> tuple[pd.Series, dict[str, float]]:
        """Compute a long-only tangency portfolio using a closed-form approximation."""
        subset = min(200, len(mu))
        diag = np.sqrt(np.diag(cov_matrix.to_numpy()))
        scores = mu.to_numpy() / np.where(diag > 0, diag, np.nan)
        order = np.argsort(np.nan_to_num(scores, nan=-np.inf))
        selected_indices = order[-subset:]
        selected_tickers = mu.index[selected_indices]
        mu_work = mu.loc[selected_tickers]
        cov_work = cov_matrix.loc[selected_tickers, selected_tickers]

        cov_array = cov_work.to_numpy()
        mu_vec = mu_work.to_numpy()
        inv_cov = np.linalg.pinv(cov_array)
        raw = inv_cov @ mu_vec
        raw = np.clip(raw, 0.0, None)
        if not np.any(raw):
            raw = np.ones_like(raw)
        weights = raw / raw.sum()
        series = pd.Series(0.0, index=mu.index)
        series.loc[mu_work.index] = weights
        series = self._enforce_weight_bounds(series, constraints)
        total = float(series.sum())
        if not np.isfinite(total) or total <= 0:
            series = pd.Series(
                np.full(len(series), 1.0 / len(series)),
                index=series.index,
            )
        else:
            series = series / total
        series = series.fillna(0.0)
        weights_array = series.to_numpy()
        full_mu = mu.to_numpy()
        full_cov = cov_matrix.to_numpy()
        exp_return = float(weights_array @ full_mu)
        volatility = float(np.sqrt(weights_array @ full_cov @ weights_array))
        sharpe = exp_return / volatility if volatility > 0 else 0.0
        return series, {
            "expected_return": exp_return,
            "volatility": volatility,
            "sharpe_ratio": sharpe,
        }

    def _initialise_frontier(
        self,
        efficient_frontier_cls: Any,
        mu: pd.Series,
        cov_matrix: pd.DataFrame,
        constraints: PortfolioConstraints,
    ) -> Any:
        """Initialise the efficient frontier with box constraints."""
        return efficient_frontier_cls(
            mu,
            cov_matrix,
            weight_bounds=(constraints.min_weight, constraints.max_weight),
        )

    def _build_frontier(
        self,
        efficient_frontier_cls: Any,
        mu: pd.Series,
        cov_matrix: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None,
    ) -> Any:
        """Create an EfficientFrontier instance with all applicable constraints."""
        ef = self._initialise_frontier(
            efficient_frontier_cls,
            mu,
            cov_matrix,
            constraints,
        )
        index_map = {ticker: idx for idx, ticker in enumerate(mu.index)}

        if constraints.sector_limits and asset_classes is not None:
            self._apply_sector_limits(ef, constraints, asset_classes, index_map)

        if asset_classes is not None:
            self._apply_asset_class_limits(ef, constraints, asset_classes, index_map)

        return ef

    def _enforce_weight_bounds(
        self,
        weights: pd.Series,
        constraints: PortfolioConstraints,
    ) -> pd.Series:
        """Project weights onto the feasible region defined by portfolio constraints."""
        projected = weights.copy()
        upper = constraints.max_weight
        lower = constraints.min_weight

        if upper < 1.0:
            projected = projected.clip(upper=upper)
        if lower > 0.0:
            projected = projected.clip(lower=lower)

        target_sum = 1.0 if constraints.require_full_investment else projected.sum()
        diff = target_sum - float(projected.sum())
        iteration = 0
        tolerance = 1e-8
        max_iterations = 100

        while abs(diff) > tolerance and iteration < max_iterations:
            if diff > 0:
                room = upper - projected
                room = room[room > 0]
                if room.empty:
                    break
                allocation = room / room.sum()
                projected.loc[allocation.index] += allocation * diff
            else:
                excess = projected - lower
                excess = excess[excess > 0]
                if excess.empty:
                    break
                allocation = excess / excess.sum()
                projected.loc[allocation.index] += allocation * diff

            if upper < 1.0:
                projected = projected.clip(upper=upper)
            if lower > 0.0:
                projected = projected.clip(lower=lower)
            diff = target_sum - float(projected.sum())
            iteration += 1

        if constraints.require_full_investment and projected.sum() > 0:
            projected = projected / projected.sum()

        return projected

    def _apply_sector_limits(
        self,
        ef: Any,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series,
        index_map: dict[str, int],
    ) -> None:
        tickers = list(index_map.keys())
        sector_series = asset_classes.reindex(tickers)
        if constraints.sector_limits:
            for sector, limit in constraints.sector_limits.items():
                mask = sector_series.str.lower() == sector.lower()
                tickers = sector_series[mask].index.tolist()
                idxs = self._indices_for(index_map, tickers)
                if idxs:
                    ef.add_constraint(
                        lambda w, idxs=idxs, limit=limit: sum(w[i] for i in idxs)
                        <= limit,
                    )

    def _apply_asset_class_limits(
        self,
        ef: Any,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series,
        index_map: dict[str, int],
    ) -> None:
        tickers = list(index_map.keys())
        normalized = asset_classes.reindex(tickers)
        equity_mask = normalized.str.contains("equity", case=False, na=False)
        bond_mask = normalized.str.contains("bond|cash", case=False, na=False)

        equity_indices = self._indices_for(
            index_map,
            normalized[equity_mask].index.tolist(),
        )
        if equity_indices:
            ef.add_constraint(
                lambda w,
                idxs=equity_indices,
                limit=constraints.max_equity_exposure: sum(w[i] for i in idxs) <= limit,
            )

        bond_indices = self._indices_for(
            index_map,
            normalized[bond_mask].index.tolist(),
        )
        if bond_indices:
            ef.add_constraint(
                lambda w, idxs=bond_indices, limit=constraints.min_bond_exposure: sum(
                    w[i] for i in idxs
                )
                >= limit,
            )

    def _optimise_frontier(self, ef: Any, objective: str | None = None) -> None:
        target_objective = objective or self._objective
        try:
            if target_objective == "max_sharpe":
                ef.max_sharpe(risk_free_rate=self._risk_free_rate)
            elif target_objective == "min_volatility":
                ef.min_volatility()
            else:
                ef.efficient_risk(target_volatility=0.10)
        except Exception as err:  # pragma: no cover - backend raises diverse errors
            raise OptimizationError(
                strategy_name=self.name,
                message=f"Mean-variance optimisation failed: {err}",
            ) from err

    def _extract_weights(self, ef: Any) -> ReturnSeries:
        cleaned_weights = ef.clean_weights()
        weights = pd.Series(cleaned_weights, dtype=float)
        weights = weights[weights > 0]
        if weights.empty:
            raise OptimizationError(
                strategy_name=self.name,
                message="Optimisation produced an empty portfolio.",
            )
        return cast("ReturnSeries", weights / weights.sum())

    def _summarise_portfolio(self, ef: Any) -> dict[str, float]:
        try:
            expected_ret, volatility, sharpe = ef.portfolio_performance(
                verbose=False,
                risk_free_rate=self._risk_free_rate,
            )
        except Exception as err:  # pragma: no cover - defensive guard
            raise OptimizationError(
                strategy_name=self.name,
                message=f"Failed to compute portfolio performance: {err}",
            ) from err

        return {
            "expected_return": float(expected_ret),
            "volatility": float(volatility),
            "sharpe_ratio": float(sharpe),
        }

    @staticmethod
    def _indices_for(index_map: dict[str, int], tickers: Sequence[str]) -> list[int]:
        return [index_map[t] for t in tickers if t in index_map]

name property

Return the registered strategy name.

min_history_periods property

Return the minimum number of periods needed for estimation.

construct(returns, constraints, asset_classes=None)

Construct a mean-variance optimised portfolio.

Source code in src/portfolio_management/portfolio/strategies/mean_variance.py
def construct(
    self,
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Construct a mean-variance optimised portfolio."""
    efficient_frontier_cls, expected_returns, risk_models, objective_functions = (
        self._load_backend()
    )

    self._validate_returns(returns)
    prepared_returns = self._prepare_returns(returns)
    self._validate_returns(prepared_returns)
    n_assets = prepared_returns.shape[1]

    signature = (
        tuple(prepared_returns.columns),
        tuple(prepared_returns.index),
    )
    if (
        self._cached_signature == signature
        and self._cached_weights is not None
        and self._cached_metadata is not None
    ):
        return Portfolio(
            weights=self._cached_weights.copy(),
            strategy=self.name,
            metadata={**self._cached_metadata},
        )

    if n_assets > LARGE_UNIVERSE_THRESHOLD:
        mu = prepared_returns.mean() * 252
        cov_matrix = prepared_returns.cov() * 252

        weights, performance = self._analytic_tangency_fallback(
            mu,
            cov_matrix,
            constraints,
        )
        RiskParityStrategy.validate_constraints(weights, constraints, asset_classes)
        return Portfolio(
            weights=weights,
            strategy=self.name,
            metadata={
                "n_assets": int(weights.size),
                **performance,
                "objective": self._objective,
                "method": "analytic_tangency_fallback",
            },
        )

    mu, cov_matrix = self._estimate_moments(
        prepared_returns,
        expected_returns,
        risk_models,
    )

    attempts: list[dict[str, Any]] = [
        {
            "cov": cov_matrix,
            "solver": None,
            "l2_gamma": None,
            "objective": self._objective,
        },
    ]

    if self._objective == "max_sharpe":
        reg_cov_array = (
            cov_matrix.to_numpy() + np.eye(len(cov_matrix), dtype=float) * 1e-4
        )
        regularised_cov = pd.DataFrame(
            reg_cov_array,
            index=cov_matrix.index,
            columns=cov_matrix.columns,
        )
        attempts.append(
            {
                "cov": regularised_cov,
                "solver": "ECOS",
                "l2_gamma": 1e-3,
                "objective": "max_sharpe",
            },
        )
        attempts.append(
            {
                "cov": regularised_cov,
                "solver": "ECOS",
                "l2_gamma": 1e-3,
                "objective": "min_volatility",
            },
        )

    final_weights: pd.Series | None = None
    final_ef: Any = None
    last_error: OptimizationError | None = None

    for attempt in attempts:
        try:
            candidate_ef = self._build_frontier(
                efficient_frontier_cls,
                mu,
                attempt["cov"],
                constraints,
                asset_classes,
            )
            if attempt["l2_gamma"]:
                # Import objective_functions only when needed
                objective_functions = importlib.import_module(
                    "pypfopt.objective_functions",
                )
                candidate_ef.add_objective(
                    objective_functions.L2_reg,
                    gamma=attempt["l2_gamma"],
                )
            if attempt["solver"]:
                candidate_ef._solver = attempt["solver"]
            self._optimise_frontier(candidate_ef, objective=attempt["objective"])
            weights_candidate = self._extract_weights(candidate_ef)
            weight_sum = float(weights_candidate.sum())
            if weight_sum <= 0:
                last_error = OptimizationError(
                    strategy_name=self.name,
                    message="Optimisation produced non-positive total weight.",
                )
                continue
            final_weights = weights_candidate / weight_sum
            final_ef = candidate_ef
            break
        except OptimizationError as error:
            last_error = error
            continue

    if final_weights is None or final_ef is None:
        raise (
            last_error
            if last_error
            else OptimizationError(
                strategy_name=self.name,
                message="Mean-variance optimisation failed for all fallback strategies.",
            )
        )

    weights = self._enforce_weight_bounds(final_weights, constraints)
    ef = final_ef
    try:
        RiskParityStrategy.validate_constraints(weights, constraints, asset_classes)
        performance = self._summarise_portfolio(ef)
    except ConstraintViolationError:
        fallback_weights = pd.Series(
            np.full(
                len(prepared_returns.columns),
                1.0 / len(prepared_returns.columns),
            ),
            index=prepared_returns.columns,
            dtype=float,
        )
        RiskParityStrategy.validate_constraints(
            fallback_weights,
            constraints,
            asset_classes,
        )
        cov_matrix = prepared_returns.cov() * 252
        mu_vector = prepared_returns.mean() * 252
        exp_ret = float(fallback_weights @ mu_vector)  # type: ignore[arg-type]
        vol = float(np.sqrt(fallback_weights @ cov_matrix @ fallback_weights))  # type: ignore[arg-type]
        sharpe = exp_ret / vol if vol > 0 else 0.0
        metadata: dict[str, Any] = {
            "n_assets": int(fallback_weights.size),
            "expected_return": exp_ret,
            "volatility": vol,
            "sharpe_ratio": sharpe,
            "objective": self._objective,
            "method": "fallback_equal_weight",
        }
        self._cached_signature = signature
        self._cached_weights = fallback_weights.copy()
        self._cached_metadata = metadata.copy()
        return Portfolio(
            weights=fallback_weights,
            strategy=self.name,
            metadata=metadata,
        )
    performance = self._summarise_portfolio(ef)

    metadata = {
        "n_assets": int(weights.size),
        **performance,
        "objective": self._objective,
    }
    self._cached_signature = signature
    self._cached_weights = weights.copy()
    self._cached_metadata = metadata.copy()
    return Portfolio(
        weights=weights,
        strategy=self.name,
        metadata=metadata,
    )

PortfolioStrategy

Bases: ABC

Abstract base class for all portfolio construction strategies.

This class defines the common interface for all strategies. Concrete implementations must provide the logic for constructing a portfolio, which involves calculating asset weights based on return data and a set of constraints.

The interface is designed to be flexible, accommodating strategies ranging from simple heuristics (like equal weight) to complex optimizations (like mean-variance or risk parity).

Source code in src/portfolio_management/portfolio/strategies/base.py
class PortfolioStrategy(ABC):
    """Abstract base class for all portfolio construction strategies.

    This class defines the common interface for all strategies. Concrete
    implementations must provide the logic for constructing a portfolio, which
    involves calculating asset weights based on return data and a set of
    constraints.

    The interface is designed to be flexible, accommodating strategies ranging
    from simple heuristics (like equal weight) to complex optimizations (like
    mean-variance or risk parity).
    """

    @abstractmethod
    def construct(
        self,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None = None,
    ) -> Portfolio:
        """Construct a portfolio based on the strategy's logic.

        This is the core method of the strategy. It takes historical or expected
        returns, a set of investment constraints, and optional asset-level
        metadata to calculate and return the target portfolio weights.

        Args:
            returns (pd.DataFrame): A DataFrame of asset returns, with assets
                as columns and dates as the index.
            constraints (PortfolioConstraints): An object defining the investment
                rules, such as weight limits and exposure constraints.
            asset_classes (pd.Series | None): An optional Series that maps asset
                tickers to their respective asset classes (e.g., 'EQUITY', 'BOND').
                This is used for applying group-level constraints.

        Returns:
            Portfolio: A `Portfolio` object containing the calculated weights
            and other relevant metadata about the constructed portfolio.

        Raises:
            InsufficientDataError: If the provided `returns` DataFrame does not
                contain enough data to perform the necessary calculations.
            OptimizationError: If a numerical optimization fails to converge to
                a valid solution.
            InfeasibleError: If the optimization problem is determined to be
                infeasible under the given constraints.

        """

    @property
    @abstractmethod
    def name(self) -> str:
        """Return the strategy name."""

    @property
    @abstractmethod
    def min_history_periods(self) -> int:
        """Return minimum number of return periods required."""

name abstractmethod property

Return the strategy name.

min_history_periods abstractmethod property

Return minimum number of return periods required.

construct(returns, constraints, asset_classes=None) abstractmethod

Construct a portfolio based on the strategy's logic.

This is the core method of the strategy. It takes historical or expected returns, a set of investment constraints, and optional asset-level metadata to calculate and return the target portfolio weights.

Parameters:

Name Type Description Default
returns DataFrame

A DataFrame of asset returns, with assets as columns and dates as the index.

required
constraints PortfolioConstraints

An object defining the investment rules, such as weight limits and exposure constraints.

required
asset_classes Series | None

An optional Series that maps asset tickers to their respective asset classes (e.g., 'EQUITY', 'BOND'). This is used for applying group-level constraints.

None

Returns:

Name Type Description
Portfolio Portfolio

A Portfolio object containing the calculated weights

Portfolio

and other relevant metadata about the constructed portfolio.

Raises:

Type Description
InsufficientDataError

If the provided returns DataFrame does not contain enough data to perform the necessary calculations.

OptimizationError

If a numerical optimization fails to converge to a valid solution.

InfeasibleError

If the optimization problem is determined to be infeasible under the given constraints.

Source code in src/portfolio_management/portfolio/strategies/base.py
@abstractmethod
def construct(
    self,
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Construct a portfolio based on the strategy's logic.

    This is the core method of the strategy. It takes historical or expected
    returns, a set of investment constraints, and optional asset-level
    metadata to calculate and return the target portfolio weights.

    Args:
        returns (pd.DataFrame): A DataFrame of asset returns, with assets
            as columns and dates as the index.
        constraints (PortfolioConstraints): An object defining the investment
            rules, such as weight limits and exposure constraints.
        asset_classes (pd.Series | None): An optional Series that maps asset
            tickers to their respective asset classes (e.g., 'EQUITY', 'BOND').
            This is used for applying group-level constraints.

    Returns:
        Portfolio: A `Portfolio` object containing the calculated weights
        and other relevant metadata about the constructed portfolio.

    Raises:
        InsufficientDataError: If the provided `returns` DataFrame does not
            contain enough data to perform the necessary calculations.
        OptimizationError: If a numerical optimization fails to converge to
            a valid solution.
        InfeasibleError: If the optimization problem is determined to be
            infeasible under the given constraints.

    """

RiskParityStrategy

Bases: PortfolioStrategy

Constructs a portfolio where each asset contributes equally to total risk.

This strategy, often called "risk parity," seeks to build a more balanced portfolio by ensuring that the contribution of each asset to the overall portfolio volatility is the same. It is considered a more robust approach to diversification than traditional capital allocation strategies.

Mathematical Formulation

The objective is to find the portfolio weights w such that the risk contribution of each asset is equal. The risk contribution of asset i is:

RCᵢ = wᵢ * ∂σ(w) / ∂wᵢ = wᵢ * (Σw)ᵢ / σ(w)

where: - w: portfolio weights vector - Σ: covariance matrix of asset returns - σ(w): portfolio volatility, sqrt(w.T * Σ * w)

The optimizer solves for w such that RCᵢ = RCⱼ for all assets i, j.

Example

import pandas as pd import numpy as np from portfolio_management.portfolio.strategies import RiskParityStrategy from portfolio_management.portfolio.constraints import PortfolioConstraints

Create returns with different volatilities

np.random.seed(42) returns = pd.DataFrame({ ... 'LOW_VOL': np.random.normal(0, 0.05, 252), ... 'HIGH_VOL': np.random.normal(0, 0.20, 252), ... })

strategy = RiskParityStrategy() print(strategy.name) risk_parity

Source code in src/portfolio_management/portfolio/strategies/risk_parity.py
class RiskParityStrategy(PortfolioStrategy):
    """Constructs a portfolio where each asset contributes equally to total risk.

    This strategy, often called "risk parity," seeks to build a more balanced
    portfolio by ensuring that the contribution of each asset to the overall
    portfolio volatility is the same. It is considered a more robust approach
    to diversification than traditional capital allocation strategies.

    Mathematical Formulation:
        The objective is to find the portfolio weights `w` such that the risk
        contribution of each asset is equal. The risk contribution of asset `i` is:

        RCᵢ = wᵢ * ∂σ(w) / ∂wᵢ = wᵢ * (Σw)ᵢ / σ(w)

        where:
        - w: portfolio weights vector
        - Σ: covariance matrix of asset returns
        - σ(w): portfolio volatility, sqrt(w.T * Σ * w)

        The optimizer solves for `w` such that RCᵢ = RCⱼ for all assets i, j.

    Example:
        >>> import pandas as pd
        >>> import numpy as np
        >>> from portfolio_management.portfolio.strategies import RiskParityStrategy
        >>> from portfolio_management.portfolio.constraints import PortfolioConstraints
        >>>
        >>> # Create returns with different volatilities
        >>> np.random.seed(42)
        >>> returns = pd.DataFrame({
        ...     'LOW_VOL': np.random.normal(0, 0.05, 252),
        ...     'HIGH_VOL': np.random.normal(0, 0.20, 252),
        ... })
        >>>
        >>> strategy = RiskParityStrategy()
        >>> print(strategy.name)
        risk_parity

    """

    def __init__(
        self,
        min_periods: int = 252,
        statistics_cache: RollingStatistics | None = None,
    ) -> None:
        """Initialize risk parity strategy.

        Args:
            min_periods: Minimum periods for covariance estimation
            statistics_cache: Optional statistics cache to avoid redundant calculations

        """
        self._min_periods = min_periods
        self._statistics_cache = statistics_cache

    @property
    def name(self) -> str:
        """Return the strategy name."""
        return "risk_parity"

    @property
    def min_history_periods(self) -> int:
        """Return minimum number of return periods required."""
        return self._min_periods

    def construct(
        self,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None = None,
    ) -> Portfolio:
        """Construct a risk parity portfolio.

        Args:
            returns: DataFrame with returns (assets as columns, dates as index)
            constraints: Portfolio constraints to enforce
            asset_classes: Optional Series mapping tickers to asset classes

        Returns:
            Portfolio with risk-parity weights

        Raises:
            InsufficientDataError: If insufficient data for covariance estimation
            OptimizationError: If optimization fails to converge
            DependencyError: If riskparityportfolio library is not installed

        """
        rpp = self._load_backend()
        self._validate_history(returns)

        if len(returns) < self.min_history_periods:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=len(returns),
            )

        n_assets = returns.shape[1]
        if n_assets > LARGE_UNIVERSE_THRESHOLD:
            return self._inverse_volatility_portfolio(
                returns,
                constraints,
                asset_classes,
            )

        # Use cached covariance if available
        if self._statistics_cache is not None:
            cov_matrix = self._statistics_cache.get_covariance_matrix(
                returns,
                annualize=False,
            )
            if not returns.empty:
                dataset_signature = (
                    f"{returns.index[0]}:{returns.index[-1]}:{len(returns)}"
                )
            else:
                dataset_signature = "0"
            base_key = self._statistics_cache._cache_key
            self._statistics_cache._cache_key = f"{base_key}:{dataset_signature}"
        else:
            cov_matrix = returns.cov()

        cov_matrix = self._regularize_covariance(cov_matrix, n_assets)
        max_uniform_weight = 1.0 / n_assets

        try:
            portfolio = rpp.RiskParityPortfolio(covariance=cov_matrix.to_numpy())
            if constraints.max_weight < max_uniform_weight:
                portfolio.design(
                    Dmat=np.vstack([np.eye(n_assets), -np.eye(n_assets)]),
                    dvec=np.hstack(
                        [
                            np.full(n_assets, constraints.max_weight),
                            -np.full(n_assets, constraints.min_weight),
                        ],
                    ),
                    verbose=False,
                    maxiter=200,
                )
            else:
                portfolio.design(verbose=False, maxiter=200)
            weights_array = portfolio.weights
        except Exception as err:
            if (
                constraints.max_weight >= max_uniform_weight - 1e-6
                and constraints.min_weight <= max_uniform_weight + 1e-6
            ):
                weights_array = np.full(n_assets, max_uniform_weight)
            else:
                raise OptimizationError(strategy_name=self.name) from err

        weights = pd.Series(weights_array, index=returns.columns, dtype=float)
        weights = weights / weights.sum()

        if (
            constraints.max_weight >= max_uniform_weight - 1e-6
            and (weights > constraints.max_weight + 1e-6).any()
        ):
            weights = pd.Series(
                np.full(n_assets, max_uniform_weight),
                index=returns.columns,
                dtype=float,
            )
            weights_array = weights.to_numpy()

        self.validate_constraints(weights, constraints, asset_classes)

        portfolio_vol = self._portfolio_volatility(weights_array, cov_matrix)
        risk_contrib = self._risk_contributions(
            weights_array,
            cov_matrix,
            portfolio_vol,
            returns.columns,
        )

        return Portfolio(
            weights=weights,
            strategy=self.name,
            metadata={
                "n_assets": n_assets,
                "portfolio_volatility": portfolio_vol,
                "risk_contributions": risk_contrib,
            },
        )

    def _load_backend(self) -> Any:
        try:
            return importlib.import_module("riskparityportfolio")
        except ImportError as err:  # pragma: no cover - dependency check
            raise DependencyNotInstalledError(
                "riskparityportfolio",
                context="for risk parity strategy",
            ) from err

    def _validate_history(self, returns: pd.DataFrame) -> None:
        if returns.empty:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=0,
            )

        if len(returns) < self.min_history_periods:
            raise InsufficientDataError(
                required_periods=self.min_history_periods,
                available_periods=len(returns),
            )

    def _inverse_volatility_portfolio(
        self,
        returns: pd.DataFrame,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None,
    ) -> Portfolio:
        vols = returns.std(ddof=0)
        if (vols <= 0).any():
            raise OptimizationError(strategy_name=self.name)
        inv_vol = 1.0 / vols.to_numpy().astype(float)
        weights = pd.Series(inv_vol / inv_vol.sum(), index=returns.columns, dtype=float)
        self.validate_constraints(weights, constraints, asset_classes)
        return Portfolio(
            weights=weights,
            strategy=self.name,
            metadata={
                "n_assets": len(returns.columns),
                "method": "inverse_volatility_fallback",
            },
        )

    def _regularize_covariance(
        self,
        cov_matrix: pd.DataFrame,
        n_assets: int,
    ) -> pd.DataFrame:
        eigenvalues = np.linalg.eigvalsh(cov_matrix.to_numpy())
        if np.any(eigenvalues < EIGENVALUE_TOLERANCE):
            min_eig = float(eigenvalues.min())
            jitter = (EIGENVALUE_TOLERANCE - min_eig) + 1e-6
            adjustment = pd.DataFrame(
                np.eye(n_assets) * jitter,
                index=cov_matrix.index,
                columns=cov_matrix.columns,
            )
            cov_matrix = cov_matrix + adjustment
            eigenvalues = np.linalg.eigvalsh(cov_matrix.to_numpy())
            if np.any(eigenvalues < EIGENVALUE_TOLERANCE):
                raise OptimizationError(strategy_name=self.name)
        return cov_matrix

    @staticmethod
    def _risk_contributions(
        weights_array: np.ndarray,
        cov_matrix: pd.DataFrame,
        portfolio_vol: float,
        tickers: pd.Index,
    ) -> dict[str, float]:
        marginal_risk = cov_matrix.to_numpy() @ weights_array
        contributions = weights_array * marginal_risk / portfolio_vol
        return {ticker: float(contributions[idx]) for idx, ticker in enumerate(tickers)}

    @staticmethod
    def _portfolio_volatility(
        weights_array: np.ndarray,
        cov_matrix: pd.DataFrame,
    ) -> float:
        return float(np.sqrt(weights_array @ cov_matrix.to_numpy() @ weights_array))

    @staticmethod
    def validate_constraints(
        weights: pd.Series,
        constraints: PortfolioConstraints,
        asset_classes: pd.Series | None,
    ) -> None:
        """Validate portfolio constraints."""
        # Check weight bounds
        if (weights > constraints.max_weight + 1e-6).any():
            violators = weights[weights > constraints.max_weight + 1e-6]
            raise ConstraintViolationError(
                constraint_name="max_weight",
                value=violators.max(),
                limit=constraints.max_weight,
            )

        # Check asset class constraints if provided
        if asset_classes is not None:
            equity_mask = asset_classes.str.contains("equity", case=False, na=False)
            equity_tickers = asset_classes[equity_mask].index
            equity_exposure = weights[weights.index.isin(equity_tickers)].sum()

            if equity_exposure > constraints.max_equity_exposure + 1e-6:
                raise ConstraintViolationError(
                    constraint_name="max_equity_exposure",
                    value=equity_exposure,
                    limit=constraints.max_equity_exposure,
                )

name property

Return the strategy name.

min_history_periods property

Return minimum number of return periods required.

construct(returns, constraints, asset_classes=None)

Construct a risk parity portfolio.

Parameters:

Name Type Description Default
returns DataFrame

DataFrame with returns (assets as columns, dates as index)

required
constraints PortfolioConstraints

Portfolio constraints to enforce

required
asset_classes Series | None

Optional Series mapping tickers to asset classes

None

Returns:

Type Description
Portfolio

Portfolio with risk-parity weights

Raises:

Type Description
InsufficientDataError

If insufficient data for covariance estimation

OptimizationError

If optimization fails to converge

DependencyError

If riskparityportfolio library is not installed

Source code in src/portfolio_management/portfolio/strategies/risk_parity.py
def construct(
    self,
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Construct a risk parity portfolio.

    Args:
        returns: DataFrame with returns (assets as columns, dates as index)
        constraints: Portfolio constraints to enforce
        asset_classes: Optional Series mapping tickers to asset classes

    Returns:
        Portfolio with risk-parity weights

    Raises:
        InsufficientDataError: If insufficient data for covariance estimation
        OptimizationError: If optimization fails to converge
        DependencyError: If riskparityportfolio library is not installed

    """
    rpp = self._load_backend()
    self._validate_history(returns)

    if len(returns) < self.min_history_periods:
        raise InsufficientDataError(
            required_periods=self.min_history_periods,
            available_periods=len(returns),
        )

    n_assets = returns.shape[1]
    if n_assets > LARGE_UNIVERSE_THRESHOLD:
        return self._inverse_volatility_portfolio(
            returns,
            constraints,
            asset_classes,
        )

    # Use cached covariance if available
    if self._statistics_cache is not None:
        cov_matrix = self._statistics_cache.get_covariance_matrix(
            returns,
            annualize=False,
        )
        if not returns.empty:
            dataset_signature = (
                f"{returns.index[0]}:{returns.index[-1]}:{len(returns)}"
            )
        else:
            dataset_signature = "0"
        base_key = self._statistics_cache._cache_key
        self._statistics_cache._cache_key = f"{base_key}:{dataset_signature}"
    else:
        cov_matrix = returns.cov()

    cov_matrix = self._regularize_covariance(cov_matrix, n_assets)
    max_uniform_weight = 1.0 / n_assets

    try:
        portfolio = rpp.RiskParityPortfolio(covariance=cov_matrix.to_numpy())
        if constraints.max_weight < max_uniform_weight:
            portfolio.design(
                Dmat=np.vstack([np.eye(n_assets), -np.eye(n_assets)]),
                dvec=np.hstack(
                    [
                        np.full(n_assets, constraints.max_weight),
                        -np.full(n_assets, constraints.min_weight),
                    ],
                ),
                verbose=False,
                maxiter=200,
            )
        else:
            portfolio.design(verbose=False, maxiter=200)
        weights_array = portfolio.weights
    except Exception as err:
        if (
            constraints.max_weight >= max_uniform_weight - 1e-6
            and constraints.min_weight <= max_uniform_weight + 1e-6
        ):
            weights_array = np.full(n_assets, max_uniform_weight)
        else:
            raise OptimizationError(strategy_name=self.name) from err

    weights = pd.Series(weights_array, index=returns.columns, dtype=float)
    weights = weights / weights.sum()

    if (
        constraints.max_weight >= max_uniform_weight - 1e-6
        and (weights > constraints.max_weight + 1e-6).any()
    ):
        weights = pd.Series(
            np.full(n_assets, max_uniform_weight),
            index=returns.columns,
            dtype=float,
        )
        weights_array = weights.to_numpy()

    self.validate_constraints(weights, constraints, asset_classes)

    portfolio_vol = self._portfolio_volatility(weights_array, cov_matrix)
    risk_contrib = self._risk_contributions(
        weights_array,
        cov_matrix,
        portfolio_vol,
        returns.columns,
    )

    return Portfolio(
        weights=weights,
        strategy=self.name,
        metadata={
            "n_assets": n_assets,
            "portfolio_volatility": portfolio_vol,
            "risk_contributions": risk_contrib,
        },
    )

validate_constraints(weights, constraints, asset_classes) staticmethod

Validate portfolio constraints.

Source code in src/portfolio_management/portfolio/strategies/risk_parity.py
@staticmethod
def validate_constraints(
    weights: pd.Series,
    constraints: PortfolioConstraints,
    asset_classes: pd.Series | None,
) -> None:
    """Validate portfolio constraints."""
    # Check weight bounds
    if (weights > constraints.max_weight + 1e-6).any():
        violators = weights[weights > constraints.max_weight + 1e-6]
        raise ConstraintViolationError(
            constraint_name="max_weight",
            value=violators.max(),
            limit=constraints.max_weight,
        )

    # Check asset class constraints if provided
    if asset_classes is not None:
        equity_mask = asset_classes.str.contains("equity", case=False, na=False)
        equity_tickers = asset_classes[equity_mask].index
        equity_exposure = weights[weights.index.isin(equity_tickers)].sum()

        if equity_exposure > constraints.max_equity_exposure + 1e-6:
            raise ConstraintViolationError(
                constraint_name="max_equity_exposure",
                value=equity_exposure,
                limit=constraints.max_equity_exposure,
            )

get_cardinality_optimizer(method)

Get optimizer function for specified cardinality method (stub).

Factory function to retrieve the appropriate optimizer implementation based on the cardinality method.

Parameters:

Name Type Description Default
method str

Cardinality method name ('miqp', 'heuristic', 'relaxation')

required

Returns:

Type Description
Any

Optimizer function for the specified method

Raises:

Type Description
CardinalityNotImplementedError

If method not implemented

ValueError

If method is unknown

Source code in src/portfolio_management/portfolio/cardinality.py
def get_cardinality_optimizer(method: str) -> Any:
    """Get optimizer function for specified cardinality method (stub).

    Factory function to retrieve the appropriate optimizer implementation
    based on the cardinality method.

    Args:
        method: Cardinality method name ('miqp', 'heuristic', 'relaxation')

    Returns:
        Optimizer function for the specified method

    Raises:
        CardinalityNotImplementedError: If method not implemented
        ValueError: If method is unknown

    """
    from .constraints.models import CardinalityMethod

    try:
        method_enum = CardinalityMethod(method)
    except ValueError:
        valid_methods = [m.value for m in CardinalityMethod]
        msg = f"Unknown cardinality method: {method}. Valid: {valid_methods}"
        raise ConfigurationError(None, msg) from None

    if method_enum == CardinalityMethod.PRESELECTION:
        msg = "Use preselection module directly, not cardinality optimizer"
        raise ConfigurationError(None, msg)
    if method_enum == CardinalityMethod.MIQP:
        return optimize_with_cardinality_miqp
    if method_enum == CardinalityMethod.HEURISTIC:
        return optimize_with_cardinality_heuristic
    if method_enum == CardinalityMethod.RELAXATION:
        return optimize_with_cardinality_relaxation
    raise CardinalityNotImplementedError(
        method=method_enum.value,
        available_methods=["preselection"],
    )

optimize_with_cardinality_heuristic(returns, constraints, cardinality, asset_classes=None)

Optimize portfolio with cardinality via heuristics (design stub).

This is a design stub for future heuristic-based cardinality optimization. When implemented, this will use iterative algorithms to find good (not necessarily optimal) sparse portfolios.

Potential Algorithms
  1. Greedy forward selection: Start with empty portfolio, add assets one-by-one
  2. Greedy backward elimination: Start with full portfolio, remove assets one-by-one
  3. Local search: Start with initial solution, iteratively swap assets
  4. Threshold-based: Optimize without cardinality, then threshold small weights
Expected Performance
  • Fast: Minutes even for large universes (>500 assets)
  • Near-optimal: Typically within 5-10% of MIQP solution
  • No special solver required
Implementation Considerations
  • Greedy algorithms may get stuck in local optima
  • Multiple random restarts can improve solution quality
  • Warm-start from preselection results often helps

Parameters:

Name Type Description Default
returns DataFrame

Historical returns DataFrame

required
constraints PortfolioConstraints

Portfolio constraints

required
cardinality CardinalityConstraints

Cardinality constraints

required
asset_classes Series | None

Optional asset class mapping

None

Returns:

Type Description
Portfolio

Portfolio with good approximate sparse weights

Raises:

Type Description
CardinalityNotImplementedError

Always (not yet implemented)

Source code in src/portfolio_management/portfolio/cardinality.py
def optimize_with_cardinality_heuristic(
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    cardinality: CardinalityConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Optimize portfolio with cardinality via heuristics (design stub).

    This is a design stub for future heuristic-based cardinality optimization.
    When implemented, this will use iterative algorithms to find good (not
    necessarily optimal) sparse portfolios.

    Potential Algorithms:
        1. Greedy forward selection: Start with empty portfolio, add assets one-by-one
        2. Greedy backward elimination: Start with full portfolio, remove assets one-by-one
        3. Local search: Start with initial solution, iteratively swap assets
        4. Threshold-based: Optimize without cardinality, then threshold small weights

    Expected Performance:
        - Fast: Minutes even for large universes (>500 assets)
        - Near-optimal: Typically within 5-10% of MIQP solution
        - No special solver required

    Implementation Considerations:
        - Greedy algorithms may get stuck in local optima
        - Multiple random restarts can improve solution quality
        - Warm-start from preselection results often helps

    Args:
        returns: Historical returns DataFrame
        constraints: Portfolio constraints
        cardinality: Cardinality constraints
        asset_classes: Optional asset class mapping

    Returns:
        Portfolio with good approximate sparse weights

    Raises:
        CardinalityNotImplementedError: Always (not yet implemented)

    """
    raise CardinalityNotImplementedError(
        method="heuristic",
        available_methods=["preselection"],
    )

optimize_with_cardinality_miqp(returns, constraints, cardinality, asset_classes=None)

Optimize portfolio with cardinality via MIQP (design stub).

This is a design stub for future MIQP-based cardinality optimization. When implemented, this will use Mixed-Integer Quadratic Programming to find the optimal sparse portfolio subject to cardinality constraints.

Implementation Requirements
  • Commercial solver: Gurobi or CPLEX with Python bindings
  • Binary variable z_i for each asset (z_i=1 if w_i > 0)
  • Constraint: sum(z_i) <= max_assets
  • Constraint: w_i <= z_i (big-M formulation)
  • Objective: Minimize risk or maximize Sharpe ratio
Expected Performance
  • Small universes (<50 assets): Seconds to optimal solution
  • Medium universes (50-200 assets): Minutes to optimal solution
  • Large universes (>200 assets): May not converge in reasonable time

Parameters:

Name Type Description Default
returns DataFrame

Historical returns DataFrame

required
constraints PortfolioConstraints

Portfolio constraints

required
cardinality CardinalityConstraints

Cardinality constraints

required
asset_classes Series | None

Optional asset class mapping

None

Returns:

Type Description
Portfolio

Portfolio with optimal sparse weights

Raises:

Type Description
CardinalityNotImplementedError

Always (not yet implemented)

Source code in src/portfolio_management/portfolio/cardinality.py
def optimize_with_cardinality_miqp(
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    cardinality: CardinalityConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Optimize portfolio with cardinality via MIQP (design stub).

    This is a design stub for future MIQP-based cardinality optimization.
    When implemented, this will use Mixed-Integer Quadratic Programming to
    find the optimal sparse portfolio subject to cardinality constraints.

    Implementation Requirements:
        - Commercial solver: Gurobi or CPLEX with Python bindings
        - Binary variable z_i for each asset (z_i=1 if w_i > 0)
        - Constraint: sum(z_i) <= max_assets
        - Constraint: w_i <= z_i (big-M formulation)
        - Objective: Minimize risk or maximize Sharpe ratio

    Expected Performance:
        - Small universes (<50 assets): Seconds to optimal solution
        - Medium universes (50-200 assets): Minutes to optimal solution
        - Large universes (>200 assets): May not converge in reasonable time

    Args:
        returns: Historical returns DataFrame
        constraints: Portfolio constraints
        cardinality: Cardinality constraints
        asset_classes: Optional asset class mapping

    Returns:
        Portfolio with optimal sparse weights

    Raises:
        CardinalityNotImplementedError: Always (not yet implemented)

    """
    raise CardinalityNotImplementedError(
        method="miqp",
        available_methods=["preselection"],
    )

optimize_with_cardinality_relaxation(returns, constraints, cardinality, asset_classes=None)

Optimize portfolio with cardinality via relaxation (design stub).

This is a design stub for future relaxation-based cardinality optimization. When implemented, this will use continuous relaxation followed by post-processing to enforce cardinality.

Approach
  1. Solve continuous (non-integer) relaxation with penalty on number of assets
  2. Use L1 or elastic-net regularization to encourage sparsity
  3. Post-process: threshold or round weights to satisfy exact cardinality
  4. Optional: local refinement after rounding
Trade-offs

✓ Fast: Similar to standard continuous optimization ✓ No special solver required ✓ Smooth optimization landscape ✗ Two-stage process (optimize, then round) ✗ Rounding may degrade solution quality ✗ Hard cardinality constraint approximated by penalty

Implementation Considerations
  • L1 penalty: λ * sum(|w_i|) encourages sparsity but doesn't control exact count
  • Regularization strength (λ) requires tuning
  • Rounding strategy: sort by weight magnitude, keep top-K

Parameters:

Name Type Description Default
returns DataFrame

Historical returns DataFrame

required
constraints PortfolioConstraints

Portfolio constraints

required
cardinality CardinalityConstraints

Cardinality constraints

required
asset_classes Series | None

Optional asset class mapping

None

Returns:

Type Description
Portfolio

Portfolio with approximate sparse weights

Raises:

Type Description
CardinalityNotImplementedError

Always (not yet implemented)

Source code in src/portfolio_management/portfolio/cardinality.py
def optimize_with_cardinality_relaxation(
    returns: pd.DataFrame,
    constraints: PortfolioConstraints,
    cardinality: CardinalityConstraints,
    asset_classes: pd.Series | None = None,
) -> Portfolio:
    """Optimize portfolio with cardinality via relaxation (design stub).

    This is a design stub for future relaxation-based cardinality optimization.
    When implemented, this will use continuous relaxation followed by
    post-processing to enforce cardinality.

    Approach:
        1. Solve continuous (non-integer) relaxation with penalty on number of assets
        2. Use L1 or elastic-net regularization to encourage sparsity
        3. Post-process: threshold or round weights to satisfy exact cardinality
        4. Optional: local refinement after rounding

    Trade-offs:
        ✓ Fast: Similar to standard continuous optimization
        ✓ No special solver required
        ✓ Smooth optimization landscape
        ✗ Two-stage process (optimize, then round)
        ✗ Rounding may degrade solution quality
        ✗ Hard cardinality constraint approximated by penalty

    Implementation Considerations:
        - L1 penalty: λ * sum(|w_i|) encourages sparsity but doesn't control exact count
        - Regularization strength (λ) requires tuning
        - Rounding strategy: sort by weight magnitude, keep top-K

    Args:
        returns: Historical returns DataFrame
        constraints: Portfolio constraints
        cardinality: Cardinality constraints
        asset_classes: Optional asset class mapping

    Returns:
        Portfolio with approximate sparse weights

    Raises:
        CardinalityNotImplementedError: Always (not yet implemented)

    """
    raise CardinalityNotImplementedError(
        method="relaxation",
        available_methods=["preselection"],
    )

validate_cardinality_constraints(constraints, portfolio_constraints, num_assets)

Validate cardinality constraints for feasibility.

Checks that cardinality constraints are internally consistent and compatible with portfolio constraints.

Parameters:

Name Type Description Default
constraints CardinalityConstraints

Cardinality constraints to validate

required
portfolio_constraints PortfolioConstraints

Portfolio-level constraints

required
num_assets int

Number of assets in the universe

required

Raises:

Type Description
ValueError

If constraints are infeasible or inconsistent

CardinalityNotImplementedError

If non-preselection method specified

Source code in src/portfolio_management/portfolio/cardinality.py
def validate_cardinality_constraints(
    constraints: CardinalityConstraints,
    portfolio_constraints: PortfolioConstraints,
    num_assets: int,
) -> None:
    """Validate cardinality constraints for feasibility.

    Checks that cardinality constraints are internally consistent and
    compatible with portfolio constraints.

    Args:
        constraints: Cardinality constraints to validate
        portfolio_constraints: Portfolio-level constraints
        num_assets: Number of assets in the universe

    Raises:
        ValueError: If constraints are infeasible or inconsistent
        CardinalityNotImplementedError: If non-preselection method specified

    """
    if not constraints.enabled:
        return

    # Check for unimplemented methods
    from .constraints.models import CardinalityMethod

    try:
        method = CardinalityMethod(constraints.method)
    except ValueError as exc:
        raise CardinalityNotImplementedError(
            method=str(constraints.method),
            available_methods=[CardinalityMethod.PRESELECTION.value],
        ) from exc

    if method != CardinalityMethod.PRESELECTION:
        raise CardinalityNotImplementedError(
            method=method.value,
            available_methods=[CardinalityMethod.PRESELECTION.value],
        )

    # Validate max_assets vs universe size
    if constraints.max_assets is not None and constraints.max_assets > num_assets:
        msg = f"max_assets ({constraints.max_assets}) exceeds universe size ({num_assets})"
        raise ConfigurationError(None, msg)

    # Validate min_position_size compatibility with max_assets
    if constraints.max_assets is not None and constraints.min_position_size > 0:
        min_total_weight = constraints.max_assets * constraints.min_position_size
        if min_total_weight > 1.0 and portfolio_constraints.require_full_investment:
            msg = (
                f"Infeasible: max_assets={constraints.max_assets} × "
                f"min_position_size={constraints.min_position_size} = "
                f"{min_total_weight:.3f} > 1.0"
            )
            raise ConfigurationError(None, msg)

    # Validate group_limits consistency
    if constraints.group_limits is not None and constraints.max_assets is not None:
        total_group_limits = sum(constraints.group_limits.values())
        if total_group_limits < constraints.max_assets:
            msg = (
                f"Sum of group_limits ({total_group_limits}) is less than "
                f"max_assets ({constraints.max_assets}), which may be infeasible"
            )
            # This is a warning condition, not an error
            import warnings

            warnings.warn(msg, UserWarning, stacklevel=2)

apply_membership_policy(current_holdings, preselected_ranks, policy, holding_periods=None, top_k=30, current_weights=None, candidate_weights=None)

Apply membership policy to determine final candidate set.

This function takes the preselected candidates (typically from a ranking/scoring step) and current portfolio holdings, then applies policy rules to determine the final set of assets that should be passed to the optimizer.

Policy application order
  1. Start with top_k from preselected_ranks
  2. Apply min_holding_periods: keep assets that haven't been held long enough
  3. Apply buffer_rank: keep existing holdings within buffer
  4. Apply max_new_assets: limit additions
  5. Apply max_removed_assets: limit removals
  6. Check max_turnover: if violated, reduce changes (future enhancement)

Parameters:

Name Type Description Default
current_holdings list[str]

List of asset IDs currently in the portfolio.

required
preselected_ranks Series

Series mapping asset IDs to their rank (1=best). Lower rank is better. Must include all current_holdings if they are still in the universe.

required
policy MembershipPolicy

MembershipPolicy configuration.

required
holding_periods dict[str, int] | None

Dict mapping asset ID to number of periods held. Required if policy.min_holding_periods is set. Default: None.

None
top_k int

Number of top-ranked assets to target. Default: 30.

30
current_weights dict[str, float] | None

Dict mapping current holdings to their portfolio weights. Required if policy.max_turnover is set. Default: None.

None
candidate_weights dict[str, float] | None

Dict mapping candidate assets to their expected weights after rebalance. Required if policy.max_turnover is set. Default: None.

None

Returns:

Type Description
list[str]

List of asset IDs that should be passed to the optimizer, respecting all

list[str]

policy constraints.

Raises:

Type Description
ValueError

If required data is missing or invalid.

Example

current_holdings = ["AAPL", "MSFT", "GOOGL"] ranks = pd.Series({"AAPL": 1, "MSFT": 2, "AMZN": 3, "GOOGL": 45}) holding_periods = {"AAPL": 5, "MSFT": 2, "GOOGL": 1} policy = MembershipPolicy( ... buffer_rank=50, ... min_holding_periods=3, ... max_new_assets=2 ... )

final = apply_membership_policy( ... current_holdings=current_holdings, ... preselected_ranks=ranks, ... policy=policy, ... holding_periods=holding_periods, ... top_k=30 ... )

GOOGL kept despite rank=45 (within buffer) and min_holding_periods

Only 2 new assets added due to max_new_assets

Source code in src/portfolio_management/portfolio/membership.py
def apply_membership_policy(
    current_holdings: list[str],
    preselected_ranks: pd.Series,
    policy: MembershipPolicy,
    holding_periods: dict[str, int] | None = None,
    top_k: int = 30,
    current_weights: dict[str, float] | None = None,
    candidate_weights: dict[str, float] | None = None,
) -> list[str]:
    """Apply membership policy to determine final candidate set.

    This function takes the preselected candidates (typically from a ranking/scoring
    step) and current portfolio holdings, then applies policy rules to determine the
    final set of assets that should be passed to the optimizer.

    Policy application order:
        1. Start with top_k from preselected_ranks
        2. Apply min_holding_periods: keep assets that haven't been held long enough
        3. Apply buffer_rank: keep existing holdings within buffer
        4. Apply max_new_assets: limit additions
        5. Apply max_removed_assets: limit removals
        6. Check max_turnover: if violated, reduce changes (future enhancement)

    Args:
        current_holdings: List of asset IDs currently in the portfolio.
        preselected_ranks: Series mapping asset IDs to their rank (1=best).
            Lower rank is better. Must include all current_holdings if they are
            still in the universe.
        policy: MembershipPolicy configuration.
        holding_periods: Dict mapping asset ID to number of periods held.
            Required if policy.min_holding_periods is set. Default: None.
        top_k: Number of top-ranked assets to target. Default: 30.
        current_weights: Dict mapping current holdings to their portfolio weights.
            Required if policy.max_turnover is set. Default: None.
        candidate_weights: Dict mapping candidate assets to their expected weights
            after rebalance. Required if policy.max_turnover is set. Default: None.

    Returns:
        List of asset IDs that should be passed to the optimizer, respecting all
        policy constraints.

    Raises:
        ValueError: If required data is missing or invalid.

    Example:
        >>> current_holdings = ["AAPL", "MSFT", "GOOGL"]
        >>> ranks = pd.Series({"AAPL": 1, "MSFT": 2, "AMZN": 3, "GOOGL": 45})
        >>> holding_periods = {"AAPL": 5, "MSFT": 2, "GOOGL": 1}
        >>> policy = MembershipPolicy(
        ...     buffer_rank=50,
        ...     min_holding_periods=3,
        ...     max_new_assets=2
        ... )
        >>>
        >>> final = apply_membership_policy(
        ...     current_holdings=current_holdings,
        ...     preselected_ranks=ranks,
        ...     policy=policy,
        ...     holding_periods=holding_periods,
        ...     top_k=30
        ... )
        >>> # GOOGL kept despite rank=45 (within buffer) and min_holding_periods
        >>> # Only 2 new assets added due to max_new_assets

    """
    # Validate inputs
    if not isinstance(current_holdings, list):
        raise DataValidationError(
            f"current_holdings must be a list, got {type(current_holdings).__name__}",
        )

    if not isinstance(preselected_ranks, pd.Series):
        raise DataValidationError(
            f"preselected_ranks must be a pandas Series, got {type(preselected_ranks).__name__}",
        )

    if preselected_ranks.empty:
        raise DataValidationError("preselected_ranks is empty")

    if top_k <= 0:
        raise DataValidationError(f"top_k must be > 0, got {top_k}")

    # Warn about potentially problematic configurations
    if policy.buffer_rank is not None and top_k > 0:
        gap = policy.buffer_rank - top_k
        gap_pct = gap / top_k if top_k > 0 else 0
        if gap_pct < 0.2:  # Less than 20% gap
            warnings.warn(
                "buffer_rank (%d) is very close to top_k (%d), gap=%d (%.1f%%). "
                "Small gaps (<20%%) may not provide sufficient buffer for stability. "
                "Consider increasing buffer_rank to top_k + 20%% or more. "
                "Recommendation: buffer_rank >= %d"
                % (
                    policy.buffer_rank,
                    top_k,
                    gap,
                    gap_pct * 100,
                    int(top_k * 1.2),
                ),
                UserWarning,
                stacklevel=2,
            )

    if not policy.enabled:
        # Return top_k without any policy constraints
        top_assets = preselected_ranks.nsmallest(top_k).index.tolist()
        logger.debug(
            "Membership policy disabled, returning top %d assets: %d assets",
            top_k,
            len(top_assets),
        )
        return top_assets

    policy.validate()

    if policy.min_holding_periods and holding_periods is None:
        raise DataValidationError(
            "holding_periods is required when min_holding_periods is set",
        )

    if holding_periods is not None:
        if not isinstance(holding_periods, dict):
            raise DataValidationError(
                f"holding_periods must be a dict, got {type(holding_periods).__name__}",
            )

        invalid_periods = {k: v for k, v in holding_periods.items() if v < 0}
        if invalid_periods:
            raise DataValidationError(
                f"holding_periods contains negative values: {invalid_periods}",
            )

    if policy.max_turnover is not None and (
        current_weights is None or candidate_weights is None
    ):
        raise DataValidationError(
            "current_weights and candidate_weights are required for max_turnover",
        )

    # Start with top_k candidates
    top_candidates = set(preselected_ranks.nsmallest(top_k).index.tolist())
    logger.debug(
        "Starting with top %d candidates: %d assets",
        top_k,
        len(top_candidates),
    )

    current_holdings_set = set(current_holdings)
    logger.debug("Current holdings: %d assets", len(current_holdings_set))

    # Step 1: Apply min_holding_periods - protect assets from premature exit
    protected_assets = set()
    if policy.min_holding_periods is not None and holding_periods is not None:
        for asset in current_holdings:
            periods_held = holding_periods.get(asset, 0)
            if periods_held < policy.min_holding_periods:
                protected_assets.add(asset)
                logger.debug(
                    "Protecting %s: held %d < %d periods",
                    asset,
                    periods_held,
                    policy.min_holding_periods,
                )

        if protected_assets:
            logger.info(
                "Min holding period protection: %d assets protected",
                len(protected_assets),
            )

    # Step 2: Apply buffer_rank - keep existing holdings within buffer
    buffered_assets = set()
    if policy.buffer_rank is not None:
        for asset in current_holdings:
            rank = preselected_ranks.get(asset)
            if rank is not None and rank <= policy.buffer_rank:
                buffered_assets.add(asset)
                logger.debug(
                    "Buffering %s: rank %d <= buffer_rank %d",
                    asset,
                    rank,
                    policy.buffer_rank,
                )

        if buffered_assets:
            logger.info(
                "Buffer rank protection: %d assets within buffer",
                len(buffered_assets),
            )

    # Combine protected and buffered assets with top_k
    candidate_set = top_candidates | protected_assets | buffered_assets

    # Step 3: Apply max_new_assets - limit additions
    new_assets = candidate_set - current_holdings_set
    if policy.max_new_assets is not None and len(new_assets) > policy.max_new_assets:
        # Keep the best-ranked new assets up to the limit
        new_asset_ranks = preselected_ranks[list(new_assets)].sort_values()
        allowed_new = set(new_asset_ranks.head(policy.max_new_assets).index)

        removed_new = new_assets - allowed_new
        candidate_set = candidate_set - removed_new

        logger.info(
            "Max new assets constraint: kept %d/%d new assets",
            len(allowed_new),
            len(new_assets),
        )
        logger.debug("Rejected new assets: %s", removed_new)

    # Step 4: Apply max_removed_assets - limit removals
    removed_assets = current_holdings_set - candidate_set
    if (
        policy.max_removed_assets is not None
        and len(removed_assets) > policy.max_removed_assets
    ):
        # Keep the worst-ranked assets up to the limit (i.e., remove the best of the worst)
        removed_asset_ranks = preselected_ranks[list(removed_assets)].sort_values()
        actually_removed = set(
            removed_asset_ranks.head(policy.max_removed_assets).index,
        )
        kept_back = removed_assets - actually_removed

        candidate_set = candidate_set | kept_back

        logger.info(
            "Max removed assets constraint: removing %d/%d assets",
            len(actually_removed),
            len(removed_assets),
        )
        logger.debug("Kept back (would-be-removed): %s", kept_back)

    # Step 5: Turnover check (currently informational only)
    # Full implementation requires optimizer-generated weights, which aren't available yet
    # This is a placeholder for future enhancement
    if policy.max_turnover is not None:
        logger.warning(
            "max_turnover policy is configured but not yet enforced "
            "(requires post-optimization weight adjustment)",
        )
        # Future: iteratively adjust candidate_set to meet turnover constraint

    final_candidates = sorted(candidate_set)  # Sort for determinism

    logger.info(
        "Membership policy applied: holdings=%d, candidates=%d, new=%d, removed=%d",
        len(current_holdings),
        len(final_candidates),
        len(candidate_set - current_holdings_set),
        len(current_holdings_set - candidate_set),
    )

    return final_candidates

create_preselection_from_dict(config_dict)

Create Preselection instance from dictionary configuration.

Parameters:

Name Type Description Default
config_dict PreselectionConfigDict | None

Dictionary with preselection configuration

required

Returns:

Type Description
Preselection | None

Preselection instance or None if preselection disabled

Source code in src/portfolio_management/portfolio/preselection.py
def create_preselection_from_dict(
    config_dict: PreselectionConfigDict | None,
) -> Preselection | None:
    """Create Preselection instance from dictionary configuration.

    Args:
        config_dict: Dictionary with preselection configuration

    Returns:
        Preselection instance or None if preselection disabled

    """
    if not config_dict:
        return None

    top_k = config_dict.get("top_k", 0)
    if top_k is None or top_k <= 0:
        return None

    method_str = config_dict.get("method", "momentum")
    try:
        method = PreselectionMethod(method_str)
    except ValueError:
        raise ConfigurationError(
            None,
            f"Invalid preselection method: {method_str}",
        )

    config = PreselectionConfig(
        method=method,
        top_k=config_dict.get("top_k"),
        lookback=config_dict.get("lookback", 252),
        skip=config_dict.get("skip", 1),
        momentum_weight=config_dict.get("momentum_weight", 0.5),
        low_vol_weight=config_dict.get("low_vol_weight", 0.5),
        min_periods=config_dict.get("min_periods", 60),
    )

    return Preselection(config)

options: show_root_heading: true show_source: false members_order: source group_by_category: true show_category_heading: true