Skip to content

Analytics API Reference

The analytics package provides financial analytics including returns calculation and metrics.

Overview

The analytics package contains:

  • Returns - Return calculation methods
  • Metrics - Performance and risk metrics
  • Indicators - Technical indicators

Analytics Package

portfolio_management.analytics

Analytics package for financial calculations.

This package provides tools for analyzing financial data: - Return calculation from price data - Technical indicator-based filtering (stub implementation) - Performance and risk metrics (future)

FilterHook

Hook for filtering assets based on technical indicator signals.

This class serves as a bridge between a chosen indicator provider and the asset selection process. It computes indicator signals for each asset's price series and filters the asset list based on the most recent signal value. Assets with a 'True' or '1.0' signal are retained, while those with 'False' or '0.0' are excluded.

Attributes:

Name Type Description
config IndicatorConfig

The configuration object for the indicators.

provider IndicatorProvider

The provider instance used to compute signals.

Example

import pandas as pd from .config import IndicatorConfig from .providers import NoOpIndicatorProvider

Using a NoOp provider which always returns True

config = IndicatorConfig(enabled=True, provider='noop') provider = NoOpIndicatorProvider() hook = FilterHook(config, provider)

prices = pd.DataFrame({'TICKER': [10, 11, 12]}) assets = ['TICKER'] result = hook.filter_assets(prices, assets) print(result) ['TICKER']

Source code in src/portfolio_management/analytics/indicators/filter_hook.py
class FilterHook:
    """Hook for filtering assets based on technical indicator signals.

    This class serves as a bridge between a chosen indicator provider and the
    asset selection process. It computes indicator signals for each asset's
    price series and filters the asset list based on the most recent signal value.
    Assets with a 'True' or '1.0' signal are retained, while those with 'False'
    or '0.0' are excluded.

    Attributes:
        config (IndicatorConfig): The configuration object for the indicators.
        provider (IndicatorProvider): The provider instance used to compute signals.

    Example:
        >>> import pandas as pd
        >>> from .config import IndicatorConfig
        >>> from .providers import NoOpIndicatorProvider
        >>>
        >>> # Using a NoOp provider which always returns True
        >>> config = IndicatorConfig(enabled=True, provider='noop')
        >>> provider = NoOpIndicatorProvider()
        >>> hook = FilterHook(config, provider)
        >>>
        >>> prices = pd.DataFrame({'TICKER': [10, 11, 12]})
        >>> assets = ['TICKER']
        >>> result = hook.filter_assets(prices, assets)
        >>> print(result)
        ['TICKER']

    """

    def __init__(self, config: IndicatorConfig, provider: IndicatorProvider):
        """Initialize the FilterHook.

        Args:
            config (IndicatorConfig): Indicator configuration object.
            provider (IndicatorProvider): An indicator provider implementation
                (e.g., `NoOpIndicatorProvider`, `TALibIndicatorProvider`).

        """
        self.config = config
        self.provider = provider
        self._validate_config()

    def _validate_config(self) -> None:
        """Validate the configuration.

        Raises:
            ConfigurationError: If the configuration is found to be invalid.

        """
        try:
            self.config.validate()
        except ValueError as e:
            raise ConfigurationError(None, f"Invalid indicator config: {e}") from e

    def filter_assets(
        self,
        prices: pd.DataFrame,
        assets: list[str],
    ) -> list[str]:
        """Filter assets based on technical indicator signals.

        For each asset in the input list, this method computes its technical
        indicator signal using the configured provider. It then includes the asset
        in the output list only if the most recent signal is True (or >= 0.5 for
        floating-point signals).

        Args:
            prices (pd.DataFrame): A DataFrame of price data, with asset symbols
                as columns and dates as the index.
            assets (list[str]): The list of asset symbols to be filtered.

        Returns:
            list[str]: A new list containing only the asset symbols that passed
            the indicator filter. If indicators are disabled in the config, this
            method returns the original list of assets unmodified.

        """
        # If indicators disabled, return all assets
        if not self.config.enabled:
            logger.debug("Technical indicators disabled, returning all assets")
            return assets

        logger.info(
            "Filtering %d assets using %s provider",
            len(assets),
            self.config.provider,
        )

        filtered_assets = []
        for asset in assets:
            if asset not in prices.columns:
                logger.warning("Asset %s not found in price data, excluding", asset)
                continue

            # Get price series for this asset
            price_series = prices[asset].dropna()

            if len(price_series) == 0:
                logger.warning("Asset %s has no valid price data, excluding", asset)
                continue

            # Compute indicator signal
            try:
                signal = self.provider.compute(price_series, self.config.params)

                # Get most recent signal value
                latest_signal = signal.iloc[-1] if len(signal) > 0 else False

                # Include asset if signal is True (or >= 0.5 for float values)
                if isinstance(latest_signal, bool):
                    include = latest_signal
                else:
                    include = float(latest_signal) >= 0.5

                if include:
                    filtered_assets.append(asset)
                    logger.debug(
                        "Asset %s passed filter (signal: %s)",
                        asset,
                        latest_signal,
                    )
                else:
                    logger.debug(
                        "Asset %s excluded by filter (signal: %s)",
                        asset,
                        latest_signal,
                    )

            except Exception as e:
                logger.exception(
                    "Error computing indicator for asset %s: %s, excluding",
                    asset,
                    e,
                )
                continue

        logger.info(
            "Technical indicator filtering: %d -> %d assets",
            len(assets),
            len(filtered_assets),
        )
        return filtered_assets

filter_assets(prices, assets)

Filter assets based on technical indicator signals.

For each asset in the input list, this method computes its technical indicator signal using the configured provider. It then includes the asset in the output list only if the most recent signal is True (or >= 0.5 for floating-point signals).

Parameters:

Name Type Description Default
prices DataFrame

A DataFrame of price data, with asset symbols as columns and dates as the index.

required
assets list[str]

The list of asset symbols to be filtered.

required

Returns:

Type Description
list[str]

list[str]: A new list containing only the asset symbols that passed

list[str]

the indicator filter. If indicators are disabled in the config, this

list[str]

method returns the original list of assets unmodified.

Source code in src/portfolio_management/analytics/indicators/filter_hook.py
def filter_assets(
    self,
    prices: pd.DataFrame,
    assets: list[str],
) -> list[str]:
    """Filter assets based on technical indicator signals.

    For each asset in the input list, this method computes its technical
    indicator signal using the configured provider. It then includes the asset
    in the output list only if the most recent signal is True (or >= 0.5 for
    floating-point signals).

    Args:
        prices (pd.DataFrame): A DataFrame of price data, with asset symbols
            as columns and dates as the index.
        assets (list[str]): The list of asset symbols to be filtered.

    Returns:
        list[str]: A new list containing only the asset symbols that passed
        the indicator filter. If indicators are disabled in the config, this
        method returns the original list of assets unmodified.

    """
    # If indicators disabled, return all assets
    if not self.config.enabled:
        logger.debug("Technical indicators disabled, returning all assets")
        return assets

    logger.info(
        "Filtering %d assets using %s provider",
        len(assets),
        self.config.provider,
    )

    filtered_assets = []
    for asset in assets:
        if asset not in prices.columns:
            logger.warning("Asset %s not found in price data, excluding", asset)
            continue

        # Get price series for this asset
        price_series = prices[asset].dropna()

        if len(price_series) == 0:
            logger.warning("Asset %s has no valid price data, excluding", asset)
            continue

        # Compute indicator signal
        try:
            signal = self.provider.compute(price_series, self.config.params)

            # Get most recent signal value
            latest_signal = signal.iloc[-1] if len(signal) > 0 else False

            # Include asset if signal is True (or >= 0.5 for float values)
            if isinstance(latest_signal, bool):
                include = latest_signal
            else:
                include = float(latest_signal) >= 0.5

            if include:
                filtered_assets.append(asset)
                logger.debug(
                    "Asset %s passed filter (signal: %s)",
                    asset,
                    latest_signal,
                )
            else:
                logger.debug(
                    "Asset %s excluded by filter (signal: %s)",
                    asset,
                    latest_signal,
                )

        except Exception as e:
            logger.exception(
                "Error computing indicator for asset %s: %s, excluding",
                asset,
                e,
            )
            continue

    logger.info(
        "Technical indicator filtering: %d -> %d assets",
        len(assets),
        len(filtered_assets),
    )
    return filtered_assets

IndicatorConfig dataclass

Configuration for technical indicator-based filtering.

This dataclass defines the parameters for technical indicator computation and filtering. It specifies whether the feature is enabled, which provider to use for calculations (e.g., 'talib'), and any indicator-specific parameters like window sizes or thresholds.

Attributes:

Name Type Description
enabled bool

If True, technical indicator filtering is active. Defaults to False.

provider str

The provider to use for indicator calculations. Examples: 'noop', 'talib', 'ta'. Defaults to 'noop'.

params dict[str, Any]

A dictionary of indicator-specific parameters. Common keys include 'window', 'threshold', 'indicator_type'.

Example

Config for a 50-day RSI filter with a threshold of 0.5

rsi_config = IndicatorConfig( ... enabled=True, ... provider='talib', # Assuming 'talib' is a supported provider ... params={'indicator_type': 'rsi', 'window': 50, 'threshold': 0.5} ... ) rsi_config.validate()

print(f"Provider: {rsi_config.provider}, Window: {rsi_config.params['window']}") Provider: talib, Window: 50

Source code in src/portfolio_management/analytics/indicators/config.py
@dataclass
class IndicatorConfig:
    """Configuration for technical indicator-based filtering.

    This dataclass defines the parameters for technical indicator computation
    and filtering. It specifies whether the feature is enabled, which provider
    to use for calculations (e.g., 'talib'), and any indicator-specific
    parameters like window sizes or thresholds.

    Attributes:
        enabled (bool): If True, technical indicator filtering is active.
            Defaults to False.
        provider (str): The provider to use for indicator calculations.
            Examples: 'noop', 'talib', 'ta'. Defaults to 'noop'.
        params (dict[str, Any]): A dictionary of indicator-specific parameters.
            Common keys include 'window', 'threshold', 'indicator_type'.

    Example:
        >>> # Config for a 50-day RSI filter with a threshold of 0.5
        >>> rsi_config = IndicatorConfig(
        ...     enabled=True,
        ...     provider='talib', # Assuming 'talib' is a supported provider
        ...     params={'indicator_type': 'rsi', 'window': 50, 'threshold': 0.5}
        ... )
        >>> rsi_config.validate()
        >>>
        >>> print(f"Provider: {rsi_config.provider}, Window: {rsi_config.params['window']}")
        Provider: talib, Window: 50

    """

    enabled: bool = False
    provider: str = "noop"
    params: dict[str, Any] = field(default_factory=dict)

    def validate(self) -> None:
        """Validate indicator configuration parameters.

        Checks if the provider is supported and validates common parameters
        like 'window' and 'threshold' if they are present.

        Raises:
            ConfigurationError: If the configuration is invalid.

        """
        if self.enabled and self.provider not in ("noop", "talib", "ta"):
            raise ConfigurationError(
                None,
                f"Unsupported indicator provider: {self.provider}",
            )

        if "window" in self.params and self.params["window"] <= 0:
            raise ConfigurationError(
                None,
                f"Invalid window parameter: {self.params['window']} (must be positive)",
            )

        if "threshold" in self.params:
            threshold = self.params["threshold"]
            if not 0 <= threshold <= 1:
                raise ConfigurationError(
                    None,
                    f"Invalid threshold parameter: {threshold} (must be in [0, 1])",
                )

    @classmethod
    def disabled(cls) -> IndicatorConfig:
        """Create a disabled indicator configuration.

        This is a convenience factory method for creating a configuration
        that explicitly disables indicator filtering.

        Returns:
            IndicatorConfig: An instance with `enabled` set to False.

        """
        return cls(enabled=False, provider="noop", params={})

    @classmethod
    def noop(cls, params: dict[str, Any] | None = None) -> IndicatorConfig:
        """Create a no-op indicator configuration.

        This factory creates a configuration that is enabled but uses the 'noop'
        provider, which performs no actual filtering. Useful for testing the
        pipeline's structure without applying indicator logic.

        Args:
            params (dict[str, Any] | None): Optional parameters for the no-op
                provider, primarily for testing purposes.

        Returns:
            IndicatorConfig: An instance with `provider` set to 'noop' and `enabled`
                set to True.

        """
        return cls(enabled=True, provider="noop", params=params or {})

validate()

Validate indicator configuration parameters.

Checks if the provider is supported and validates common parameters like 'window' and 'threshold' if they are present.

Raises:

Type Description
ConfigurationError

If the configuration is invalid.

Source code in src/portfolio_management/analytics/indicators/config.py
def validate(self) -> None:
    """Validate indicator configuration parameters.

    Checks if the provider is supported and validates common parameters
    like 'window' and 'threshold' if they are present.

    Raises:
        ConfigurationError: If the configuration is invalid.

    """
    if self.enabled and self.provider not in ("noop", "talib", "ta"):
        raise ConfigurationError(
            None,
            f"Unsupported indicator provider: {self.provider}",
        )

    if "window" in self.params and self.params["window"] <= 0:
        raise ConfigurationError(
            None,
            f"Invalid window parameter: {self.params['window']} (must be positive)",
        )

    if "threshold" in self.params:
        threshold = self.params["threshold"]
        if not 0 <= threshold <= 1:
            raise ConfigurationError(
                None,
                f"Invalid threshold parameter: {threshold} (must be in [0, 1])",
            )

disabled() classmethod

Create a disabled indicator configuration.

This is a convenience factory method for creating a configuration that explicitly disables indicator filtering.

Returns:

Name Type Description
IndicatorConfig IndicatorConfig

An instance with enabled set to False.

Source code in src/portfolio_management/analytics/indicators/config.py
@classmethod
def disabled(cls) -> IndicatorConfig:
    """Create a disabled indicator configuration.

    This is a convenience factory method for creating a configuration
    that explicitly disables indicator filtering.

    Returns:
        IndicatorConfig: An instance with `enabled` set to False.

    """
    return cls(enabled=False, provider="noop", params={})

noop(params=None) classmethod

Create a no-op indicator configuration.

This factory creates a configuration that is enabled but uses the 'noop' provider, which performs no actual filtering. Useful for testing the pipeline's structure without applying indicator logic.

Parameters:

Name Type Description Default
params dict[str, Any] | None

Optional parameters for the no-op provider, primarily for testing purposes.

None

Returns:

Name Type Description
IndicatorConfig IndicatorConfig

An instance with provider set to 'noop' and enabled set to True.

Source code in src/portfolio_management/analytics/indicators/config.py
@classmethod
def noop(cls, params: dict[str, Any] | None = None) -> IndicatorConfig:
    """Create a no-op indicator configuration.

    This factory creates a configuration that is enabled but uses the 'noop'
    provider, which performs no actual filtering. Useful for testing the
    pipeline's structure without applying indicator logic.

    Args:
        params (dict[str, Any] | None): Optional parameters for the no-op
            provider, primarily for testing purposes.

    Returns:
        IndicatorConfig: An instance with `provider` set to 'noop' and `enabled`
            set to True.

    """
    return cls(enabled=True, provider="noop", params=params or {})

IndicatorProvider

Bases: ABC

Abstract interface for technical indicator computation.

This abstract class defines the contract for computing technical indicators from a time series. Concrete implementations should inherit from this class and implement the compute method, typically by wrapping a technical analysis library like TA-Lib or ta.

The provider pattern allows the system to remain agnostic to the specific backend used for indicator calculations.

Example (for creating a new provider): >>> class MovingAverageCrossProvider(IndicatorProvider): ... def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series: ... short_window = params.get("short", 20) ... long_window = params.get("long", 50) ... short_ma = series.rolling(window=short_window).mean() ... long_ma = series.rolling(window=long_window).mean() ... # Signal is True when short MA crosses above long MA ... signal = (short_ma > long_ma) ... return signal.fillna(False)

Source code in src/portfolio_management/analytics/indicators/providers.py
class IndicatorProvider(ABC):
    """Abstract interface for technical indicator computation.

    This abstract class defines the contract for computing technical indicators
    from a time series. Concrete implementations should inherit from this class
    and implement the `compute` method, typically by wrapping a technical
    analysis library like TA-Lib or `ta`.

    The provider pattern allows the system to remain agnostic to the specific
    backend used for indicator calculations.

    Example (for creating a new provider):
        >>> class MovingAverageCrossProvider(IndicatorProvider):
        ...     def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series:
        ...         short_window = params.get("short", 20)
        ...         long_window = params.get("long", 50)
        ...         short_ma = series.rolling(window=short_window).mean()
        ...         long_ma = series.rolling(window=long_window).mean()
        ...         # Signal is True when short MA crosses above long MA
        ...         signal = (short_ma > long_ma)
        ...         return signal.fillna(False)
    """

    @abstractmethod
    def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series:
        """Compute a technical indicator signal from a time series.

        Args:
            series (pd.Series): An input time series of data, typically prices,
                indexed by date.
            params (dict[str, Any]): A dictionary of indicator-specific parameters,
                such as window sizes or thresholds (e.g., `{"window": 20}`).

        Returns:
            pd.Series: A Series of indicator signals with the same index as the
            input. Values should be boolean (True/False) or float (0.0 to 1.0)
            to signify inclusion or exclusion.

        """

compute(series, params) abstractmethod

Compute a technical indicator signal from a time series.

Parameters:

Name Type Description Default
series Series

An input time series of data, typically prices, indexed by date.

required
params dict[str, Any]

A dictionary of indicator-specific parameters, such as window sizes or thresholds (e.g., {"window": 20}).

required

Returns:

Type Description
Series

pd.Series: A Series of indicator signals with the same index as the

Series

input. Values should be boolean (True/False) or float (0.0 to 1.0)

Series

to signify inclusion or exclusion.

Source code in src/portfolio_management/analytics/indicators/providers.py
@abstractmethod
def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series:
    """Compute a technical indicator signal from a time series.

    Args:
        series (pd.Series): An input time series of data, typically prices,
            indexed by date.
        params (dict[str, Any]): A dictionary of indicator-specific parameters,
            such as window sizes or thresholds (e.g., `{"window": 20}`).

    Returns:
        pd.Series: A Series of indicator signals with the same index as the
        input. Values should be boolean (True/False) or float (0.0 to 1.0)
        to signify inclusion or exclusion.

    """

NoOpIndicatorProvider

Bases: IndicatorProvider

No-op stub implementation that returns pass-through signals.

This implementation of IndicatorProvider always returns a signal of True, effectively including all assets without applying any filtering. It serves as a default placeholder, useful for testing the indicator framework's structure without requiring technical analysis dependencies or logic.

Use this provider for: - Testing the overall asset selection pipeline. - Disabling indicator filtering while keeping the configuration enabled. - Serving as a base for future indicator implementations.

Example

import pandas as pd provider = NoOpIndicatorProvider() prices = pd.Series( ... [100, 101, 102], ... index=pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03']) ... ) signal = provider.compute(prices, params={}) print(signal) 2023-01-01 True 2023-01-02 True 2023-01-03 True dtype: bool

Source code in src/portfolio_management/analytics/indicators/providers.py
class NoOpIndicatorProvider(IndicatorProvider):
    """No-op stub implementation that returns pass-through signals.

    This implementation of `IndicatorProvider` always returns a signal of `True`,
    effectively including all assets without applying any filtering. It serves as a
    default placeholder, useful for testing the indicator framework's structure
    without requiring technical analysis dependencies or logic.

    Use this provider for:
    - Testing the overall asset selection pipeline.
    - Disabling indicator filtering while keeping the configuration enabled.
    - Serving as a base for future indicator implementations.

    Example:
        >>> import pandas as pd
        >>> provider = NoOpIndicatorProvider()
        >>> prices = pd.Series(
        ...     [100, 101, 102],
        ...     index=pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
        ... )
        >>> signal = provider.compute(prices, params={})
        >>> print(signal)
        2023-01-01    True
        2023-01-02    True
        2023-01-03    True
        dtype: bool

    """

    def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series:
        """Return a pass-through signal (all True).

        This method ignores the input series and parameters and simply returns
        a boolean Series of `True` values with the same index.

        Args:
            series (pd.Series): The input time series (ignored).
            params (dict[str, Any]): Indicator-specific parameters (ignored).

        Returns:
            pd.Series: A boolean Series of `True` values, which results in no
            assets being filtered out.

        """
        # Return all True values - no filtering
        return pd.Series(True, index=series.index, dtype=bool)

compute(series, params)

Return a pass-through signal (all True).

This method ignores the input series and parameters and simply returns a boolean Series of True values with the same index.

Parameters:

Name Type Description Default
series Series

The input time series (ignored).

required
params dict[str, Any]

Indicator-specific parameters (ignored).

required

Returns:

Type Description
Series

pd.Series: A boolean Series of True values, which results in no

Series

assets being filtered out.

Source code in src/portfolio_management/analytics/indicators/providers.py
def compute(self, series: pd.Series, params: dict[str, Any]) -> pd.Series:
    """Return a pass-through signal (all True).

    This method ignores the input series and parameters and simply returns
    a boolean Series of `True` values with the same index.

    Args:
        series (pd.Series): The input time series (ignored).
        params (dict[str, Any]): Indicator-specific parameters (ignored).

    Returns:
        pd.Series: A boolean Series of `True` values, which results in no
        assets being filtered out.

    """
    # Return all True values - no filtering
    return pd.Series(True, index=series.index, dtype=bool)

PriceLoader

Utilities for reading price files into pandas objects.

This loader is designed to efficiently read and process numerous price files. It includes a bounded LRU cache to prevent unbounded memory growth during long-running workflows and uses a thread pool for parallel I/O operations to accelerate data loading.

The loader also performs data validation and cleaning, such as removing duplicate timestamps and non-positive price values.

Attributes:

Name Type Description
max_workers int | None

Maximum number of concurrent threads for parallel loading. If None, a default is calculated based on CPU cores.

cache_size int

Maximum number of price series to hold in the LRU cache. Set to 0 to disable caching.

io_backend Backend

The backend to use for reading CSV files. Options include 'pandas', 'polars', and 'pyarrow'. 'auto' selects the fastest available option.

Example

from pathlib import Path from portfolio_management.analytics.returns.loaders import PriceLoader from portfolio_management.assets.selection.models import SelectedAsset

prices_dir = Path("tests/data/prices") # Dummy path assets = [ ... SelectedAsset(symbol="AAPL"), ... SelectedAsset(symbol="MSFT") ... ] loader = PriceLoader(max_workers=4, cache_size=500)

price_df = loader.load_multiple_prices(assets, prices_dir)

if price_df is not None:

... # print(price_df.info())

Check cache status

stats = loader.get_cache_stats() print(f"Cache entries: {stats['cache_entries']}, " ... f"Cache size: {stats['cache_size']}") Cache entries: 0, Cache size: 500

Source code in src/portfolio_management/analytics/returns/loaders.py
class PriceLoader:
    """Utilities for reading price files into pandas objects.

    This loader is designed to efficiently read and process numerous price files.
    It includes a bounded LRU cache to prevent unbounded memory growth during
    long-running workflows and uses a thread pool for parallel I/O operations
    to accelerate data loading.

    The loader also performs data validation and cleaning, such as removing
    duplicate timestamps and non-positive price values.

    Attributes:
        max_workers (int | None): Maximum number of concurrent threads for
            parallel loading. If None, a default is calculated based on CPU cores.
        cache_size (int): Maximum number of price series to hold in the LRU cache.
            Set to 0 to disable caching.
        io_backend (Backend): The backend to use for reading CSV files. Options
            include 'pandas', 'polars', and 'pyarrow'. 'auto' selects the
            fastest available option.

    Example:
        >>> from pathlib import Path
        >>> from portfolio_management.analytics.returns.loaders import PriceLoader
        >>> from portfolio_management.assets.selection.models import SelectedAsset
        >>>
        >>> prices_dir = Path("tests/data/prices") # Dummy path
        >>> assets = [
        ...     SelectedAsset(symbol="AAPL"),
        ...     SelectedAsset(symbol="MSFT")
        ... ]
        >>> loader = PriceLoader(max_workers=4, cache_size=500)
        >>> # price_df = loader.load_multiple_prices(assets, prices_dir)
        >>> # if price_df is not None:
        ... #     print(price_df.info())
        >>>
        >>> # Check cache status
        >>> stats = loader.get_cache_stats()
        >>> print(f"Cache entries: {stats['cache_entries']}, "
        ...       f"Cache size: {stats['cache_size']}")
        Cache entries: 0, Cache size: 500

    """

    def __init__(
        self,
        max_workers: int | None = None,
        cache_size: int = 1000,
        io_backend: Backend = "pandas",
    ):
        """Initializes the PriceLoader.

        Args:
            max_workers: Maximum number of concurrent threads for parallel loading.
            cache_size: Maximum number of price series to cache. Default is 1000.
                Set to 0 to disable caching entirely.
            io_backend: IO backend for reading CSV files. Options:
                - 'pandas' (default): Standard pandas CSV reader
                - 'polars': Use polars for faster CSV parsing (requires polars)
                - 'pyarrow': Use pyarrow for faster CSV parsing (requires pyarrow)
                - 'auto': Automatically select fastest available backend

        """
        self.max_workers = max_workers
        self.cache_size = max(0, cache_size)  # Ensure non-negative
        self.io_backend = io_backend
        self._cache: OrderedDict[Path, pd.Series] = OrderedDict()
        self._cache_lock = Lock()

    def load_price_file(self, path: Path) -> pd.Series:
        """Load a single price file into a ``Series`` indexed by date.

        This method reads a CSV file, standardizes its columns, cleans the data
        (handles duplicates, non-positive values), and returns a sorted Series
        of close prices.

        Args:
            path (Path): The path to the price CSV file.

        Returns:
            pd.Series: A Series of close prices, indexed by date. The Series
                will be empty if the file is empty or contains no valid data.

        Raises:
            DataLoadError: If the file cannot be found or parsed.

        """
        if not path.exists():
            raise DataLoadError(path, "Price file not found")

        selected_backend = select_backend(self.io_backend)

        if path.suffix.lower() == ".csv" and selected_backend == "pandas":
            df = read_csv_fast(
                path,
                backend=self.io_backend,
                header=0,
                usecols=["date", "close"],
                parse_dates=["date"],
                index_col="date",
            )
        else:
            raw_kwargs = {}
            if selected_backend == "pandas":
                raw_kwargs["header"] = 0
            raw = read_csv_fast(path, backend=self.io_backend, **raw_kwargs)
            df = self._standardize_price_dataframe(raw, path)

        df = df.sort_index()

        if df.index.duplicated().any():
            duplicate_count = int(df.index.duplicated().sum())
            logger.debug(
                "%s contains %d duplicate dates; keeping latest entries",
                path,
                duplicate_count,
            )
            df = df[~df.index.duplicated(keep="last")]

        non_positive_mask = df["close"] <= 0
        if non_positive_mask.any():
            count = int(non_positive_mask.sum())
            logger.debug("Dropping %d non-positive close values from %s", count, path)
            df = df.loc[~non_positive_mask]

        if df.empty:
            logger.warning("Price file %s is empty after cleaning.", path)
            return pd.Series(dtype="float64")

        if len(df.index) > 1:
            gaps = df.index.to_series().diff().dt.days.dropna()
            max_gap = int(gaps.max()) if not gaps.empty else 0
            if max_gap > 10:
                logger.debug("Detected maximum gap of %d days in %s", max_gap, path)

        return df["close"].astype(float)

    def load_multiple_prices(
        self,
        assets: list[SelectedAsset],
        prices_dir: Path,
    ) -> pd.DataFrame:
        """Load price data for many assets and align on the union of dates.

        This method orchestrates the loading of price files for a list of assets
        in parallel. It resolves file paths, submits loading tasks to a thread
        pool, and assembles the resulting Series into a single DataFrame.

        Args:
            assets (list[SelectedAsset]): The list of assets to load prices for.
            prices_dir (Path): The base directory containing the price CSV files.

        Returns:
            pd.DataFrame: A DataFrame containing the close prices for all successfully
                loaded assets. The index is the union of all dates, and columns
                are the asset symbols. Returns an empty DataFrame if no price
                files can be loaded.

        """
        logger.info(
            "Loading price series for %d assets from %s",
            len(assets),
            prices_dir,
        )
        symbol_to_path: dict[str, Path] = {}
        path_to_symbols: dict[Path, list[str]] = defaultdict(list)

        for asset in assets:
            resolved_path = self._resolve_price_path(prices_dir, asset)
            if resolved_path is None:
                continue
            symbol_to_path[asset.symbol] = resolved_path
            path_to_symbols[resolved_path].append(asset.symbol)

        if not symbol_to_path:
            logger.warning("No price files were successfully resolved.")
            return pd.DataFrame()

        unique_paths = list(path_to_symbols.keys())
        logger.debug("Resolved %d unique price files.", len(unique_paths))

        price_series: dict[Path, pd.Series] = {}
        tasks = self._submit_load_tasks(unique_paths)
        for path, series in tasks:
            if series is None or series.empty:
                logger.warning("Skipping empty price series: %s", path)
                continue
            price_series[path] = series

        if not price_series:
            logger.warning("No price files were successfully loaded.")
            return pd.DataFrame()

        all_prices: dict[str, pd.Series] = {}
        for path, symbols in path_to_symbols.items():
            series = price_series.get(path)
            if series is None or series.empty:
                missing_symbols = ", ".join(symbols[:5])
                logger.warning("Price data missing for assets: %s", missing_symbols)
                continue
            for symbol in symbols:
                all_prices[symbol] = series

        if not all_prices:
            logger.warning("No price files were successfully loaded.")
            return pd.DataFrame()

        df = pd.DataFrame(all_prices).sort_index()
        logger.info("Loaded price matrix: shape=%s", df.shape)
        return df

    @staticmethod
    def _standardize_price_dataframe(raw: pd.DataFrame, path: Path) -> pd.DataFrame:
        """Normalize various price file column conventions to a standard format."""
        if "<DATE>" in raw.columns and "<CLOSE>" in raw.columns:
            df = (
                raw[["<DATE>", "<CLOSE>"]]
                .copy()
                .rename(columns={"<DATE>": "date", "<CLOSE>": "close"})
            )
        elif "DATE" in raw.columns and "CLOSE" in raw.columns:
            df = (
                raw[["DATE", "CLOSE"]]
                .copy()
                .rename(columns={"DATE": "date", "CLOSE": "close"})
            )
        elif "date" in raw.columns and "close" in raw.columns:
            df = raw[["date", "close"]].copy()
        else:
            raise DataValidationError(
                f"Unsupported price file structure for {path}: columns={list(raw.columns)}",
            )

        df["date"] = pd.to_datetime(df["date"], errors="coerce")
        df = df.set_index("date")
        return df

    def _load_price_with_cache(self, path: Path) -> pd.Series:
        """Load price file with LRU cache eviction when cache is full."""
        # Check cache first (and update LRU order if hit)
        with self._cache_lock:
            if path in self._cache:
                # Move to end (most recently used)
                self._cache.move_to_end(path)
                return self._cache[path]

        # Cache miss - load from disk
        series = self.load_price_file(path)

        # Store in cache if non-empty and caching is enabled
        if not series.empty and self.cache_size > 0:
            with self._cache_lock:
                # Remove oldest entry if at capacity
                if len(self._cache) >= self.cache_size:
                    # Remove from beginning (least recently used)
                    self._cache.popitem(last=False)
                # Add new entry at end (most recently used)
                self._cache[path] = series

        return series

    def clear_cache(self) -> None:
        """Clear all cached price series.

        This is useful after bulk operations where cached data is unlikely
        to be reused, helping to free memory immediately rather than waiting
        for LRU eviction.
        """
        with self._cache_lock:
            self._cache.clear()
            logger.debug("Cleared price loader cache")

    def get_cache_stats(self) -> dict[str, int]:
        """Get cache statistics.

        Returns:
            Dictionary with 'size' (current entries) and 'maxsize' (capacity).

        Useful for testing and monitoring.

        """
        with self._cache_lock:
            return {
                "size": len(self._cache),
                "maxsize": self.cache_size,
            }

    def cache_info(self) -> dict[str, int]:
        """Return cache statistics for monitoring.

        This method is an alias for get_cache_stats for backward compatibility.
        """
        return self.get_cache_stats()

    def _submit_load_tasks(
        self,
        paths: Iterable[Path],
    ) -> list[tuple[Path, pd.Series | None]]:
        paths = list(paths)
        if not paths:
            return []

        # If only one path or concurrency disabled, fall back to sequential loading.
        if len(paths) == 1 or (self.max_workers is not None and self.max_workers <= 1):
            return [(path, self._load_price_with_cache(path)) for path in paths]

        max_workers = self.max_workers
        if max_workers is None:
            cpu_count = os.cpu_count() or 1
            max_workers = min(32, cpu_count * 5, len(paths))
        else:
            max_workers = min(max_workers, len(paths))

        if max_workers <= 1:
            return [(path, self._load_price_with_cache(path)) for path in paths]

        results: list[tuple[Path, pd.Series | None]] = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_path = {
                executor.submit(self._load_price_with_cache, path): path
                for path in paths
            }
            for future in as_completed(future_to_path):
                path = future_to_path[future]
                try:
                    series = future.result()
                except Exception:  # pragma: no cover - defensive logging
                    logger.exception("Failed to load %s", path)
                    series = None
                results.append((path, series))

        return results

    def _resolve_price_path(
        self,
        prices_dir: Path,
        asset: SelectedAsset,
    ) -> Path | None:
        price_path = prices_dir / asset.stooq_path
        if price_path.exists():
            return price_path

        alt_name = Path(asset.stooq_path).stem.lower()
        alt_path = prices_dir / f"{alt_name}.csv"
        if alt_path.exists():
            return alt_path

        logger.warning(
            "Price file not found: symbol=%s, path=%s",
            asset.symbol,
            price_path,
        )
        return None

load_price_file(path)

Load a single price file into a Series indexed by date.

This method reads a CSV file, standardizes its columns, cleans the data (handles duplicates, non-positive values), and returns a sorted Series of close prices.

Parameters:

Name Type Description Default
path Path

The path to the price CSV file.

required

Returns:

Type Description
Series

pd.Series: A Series of close prices, indexed by date. The Series will be empty if the file is empty or contains no valid data.

Raises:

Type Description
DataLoadError

If the file cannot be found or parsed.

Source code in src/portfolio_management/analytics/returns/loaders.py
def load_price_file(self, path: Path) -> pd.Series:
    """Load a single price file into a ``Series`` indexed by date.

    This method reads a CSV file, standardizes its columns, cleans the data
    (handles duplicates, non-positive values), and returns a sorted Series
    of close prices.

    Args:
        path (Path): The path to the price CSV file.

    Returns:
        pd.Series: A Series of close prices, indexed by date. The Series
            will be empty if the file is empty or contains no valid data.

    Raises:
        DataLoadError: If the file cannot be found or parsed.

    """
    if not path.exists():
        raise DataLoadError(path, "Price file not found")

    selected_backend = select_backend(self.io_backend)

    if path.suffix.lower() == ".csv" and selected_backend == "pandas":
        df = read_csv_fast(
            path,
            backend=self.io_backend,
            header=0,
            usecols=["date", "close"],
            parse_dates=["date"],
            index_col="date",
        )
    else:
        raw_kwargs = {}
        if selected_backend == "pandas":
            raw_kwargs["header"] = 0
        raw = read_csv_fast(path, backend=self.io_backend, **raw_kwargs)
        df = self._standardize_price_dataframe(raw, path)

    df = df.sort_index()

    if df.index.duplicated().any():
        duplicate_count = int(df.index.duplicated().sum())
        logger.debug(
            "%s contains %d duplicate dates; keeping latest entries",
            path,
            duplicate_count,
        )
        df = df[~df.index.duplicated(keep="last")]

    non_positive_mask = df["close"] <= 0
    if non_positive_mask.any():
        count = int(non_positive_mask.sum())
        logger.debug("Dropping %d non-positive close values from %s", count, path)
        df = df.loc[~non_positive_mask]

    if df.empty:
        logger.warning("Price file %s is empty after cleaning.", path)
        return pd.Series(dtype="float64")

    if len(df.index) > 1:
        gaps = df.index.to_series().diff().dt.days.dropna()
        max_gap = int(gaps.max()) if not gaps.empty else 0
        if max_gap > 10:
            logger.debug("Detected maximum gap of %d days in %s", max_gap, path)

    return df["close"].astype(float)

load_multiple_prices(assets, prices_dir)

Load price data for many assets and align on the union of dates.

This method orchestrates the loading of price files for a list of assets in parallel. It resolves file paths, submits loading tasks to a thread pool, and assembles the resulting Series into a single DataFrame.

Parameters:

Name Type Description Default
assets list[SelectedAsset]

The list of assets to load prices for.

required
prices_dir Path

The base directory containing the price CSV files.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame containing the close prices for all successfully loaded assets. The index is the union of all dates, and columns are the asset symbols. Returns an empty DataFrame if no price files can be loaded.

Source code in src/portfolio_management/analytics/returns/loaders.py
def load_multiple_prices(
    self,
    assets: list[SelectedAsset],
    prices_dir: Path,
) -> pd.DataFrame:
    """Load price data for many assets and align on the union of dates.

    This method orchestrates the loading of price files for a list of assets
    in parallel. It resolves file paths, submits loading tasks to a thread
    pool, and assembles the resulting Series into a single DataFrame.

    Args:
        assets (list[SelectedAsset]): The list of assets to load prices for.
        prices_dir (Path): The base directory containing the price CSV files.

    Returns:
        pd.DataFrame: A DataFrame containing the close prices for all successfully
            loaded assets. The index is the union of all dates, and columns
            are the asset symbols. Returns an empty DataFrame if no price
            files can be loaded.

    """
    logger.info(
        "Loading price series for %d assets from %s",
        len(assets),
        prices_dir,
    )
    symbol_to_path: dict[str, Path] = {}
    path_to_symbols: dict[Path, list[str]] = defaultdict(list)

    for asset in assets:
        resolved_path = self._resolve_price_path(prices_dir, asset)
        if resolved_path is None:
            continue
        symbol_to_path[asset.symbol] = resolved_path
        path_to_symbols[resolved_path].append(asset.symbol)

    if not symbol_to_path:
        logger.warning("No price files were successfully resolved.")
        return pd.DataFrame()

    unique_paths = list(path_to_symbols.keys())
    logger.debug("Resolved %d unique price files.", len(unique_paths))

    price_series: dict[Path, pd.Series] = {}
    tasks = self._submit_load_tasks(unique_paths)
    for path, series in tasks:
        if series is None or series.empty:
            logger.warning("Skipping empty price series: %s", path)
            continue
        price_series[path] = series

    if not price_series:
        logger.warning("No price files were successfully loaded.")
        return pd.DataFrame()

    all_prices: dict[str, pd.Series] = {}
    for path, symbols in path_to_symbols.items():
        series = price_series.get(path)
        if series is None or series.empty:
            missing_symbols = ", ".join(symbols[:5])
            logger.warning("Price data missing for assets: %s", missing_symbols)
            continue
        for symbol in symbols:
            all_prices[symbol] = series

    if not all_prices:
        logger.warning("No price files were successfully loaded.")
        return pd.DataFrame()

    df = pd.DataFrame(all_prices).sort_index()
    logger.info("Loaded price matrix: shape=%s", df.shape)
    return df

clear_cache()

Clear all cached price series.

This is useful after bulk operations where cached data is unlikely to be reused, helping to free memory immediately rather than waiting for LRU eviction.

Source code in src/portfolio_management/analytics/returns/loaders.py
def clear_cache(self) -> None:
    """Clear all cached price series.

    This is useful after bulk operations where cached data is unlikely
    to be reused, helping to free memory immediately rather than waiting
    for LRU eviction.
    """
    with self._cache_lock:
        self._cache.clear()
        logger.debug("Cleared price loader cache")

get_cache_stats()

Get cache statistics.

Returns:

Type Description
dict[str, int]

Dictionary with 'size' (current entries) and 'maxsize' (capacity).

Useful for testing and monitoring.

Source code in src/portfolio_management/analytics/returns/loaders.py
def get_cache_stats(self) -> dict[str, int]:
    """Get cache statistics.

    Returns:
        Dictionary with 'size' (current entries) and 'maxsize' (capacity).

    Useful for testing and monitoring.

    """
    with self._cache_lock:
        return {
            "size": len(self._cache),
            "maxsize": self.cache_size,
        }

cache_info()

Return cache statistics for monitoring.

This method is an alias for get_cache_stats for backward compatibility.

Source code in src/portfolio_management/analytics/returns/loaders.py
def cache_info(self) -> dict[str, int]:
    """Return cache statistics for monitoring.

    This method is an alias for get_cache_stats for backward compatibility.
    """
    return self.get_cache_stats()

ReturnCalculator

Prepare aligned return series ready for portfolio construction.

Source code in src/portfolio_management/analytics/returns/calculator.py
class ReturnCalculator:
    """Prepare aligned return series ready for portfolio construction."""

    def __init__(self, price_loader: PriceLoader | None = None) -> None:
        self.price_loader = price_loader or PriceLoader()
        self._latest_summary: ReturnSummary | None = None

    @property
    def latest_summary(self) -> ReturnSummary | None:
        """Return the summary produced by the most recent pipeline run."""
        return self._latest_summary

    def load_and_prepare(
        self,
        assets: list[SelectedAsset] | None,
        prices_dir: Path,
        config: ReturnConfig,
    ) -> pd.DataFrame:
        """Execute the full return-preparation pipeline for *assets*."""
        if assets is None:
            raise InsufficientDataError(required_periods=1, available_periods=0)
        if not assets:
            raise InsufficientDataError(required_periods=1, available_periods=0)
        if not prices_dir.exists():
            raise ReturnCalculationError(
                f"Prices directory does not exist: {prices_dir}. "
                "Ensure price files have been exported.",
            )

        try:
            config.validate()
        except (ConfigurationError, ValueError) as exc:
            raise ReturnCalculationError(
                f"Invalid return configuration: {exc}",
            ) from exc

        logger.info("Preparing returns for %d assets", len(assets))

        prices = self.price_loader.load_multiple_prices(assets, prices_dir)
        if prices.empty:
            self._latest_summary = None
            raise InsufficientDataError(required_periods=1, available_periods=0)

        prices = self.handle_missing_data(prices, config)
        if prices.empty:
            self._latest_summary = None
            raise InsufficientDataError(required_periods=1, available_periods=0)

        returns = self.calculate_returns(prices, config)
        if returns.empty:
            self._latest_summary = None
            raise InsufficientDataError(required_periods=1, available_periods=0)

        returns = self._align_dates(returns, config)
        returns = self._resample_to_frequency(returns, config.frequency, config.method)
        returns = self._apply_coverage_filter(returns, config.min_coverage)

        if returns.empty:
            self._latest_summary = None
            raise InsufficientDataError(required_periods=1, available_periods=0)

        self._latest_summary = self._summarize_returns(returns, config)
        if self._latest_summary:
            logger.info(
                "Prepared returns: assets=%d, periods=%d, mean_annual_return=%.2f%%",
                returns.shape[1],
                returns.shape[0],
                float(self._latest_summary.mean_returns.mean()) * 100,
            )

        return returns

    def _calculate_simple_returns(self, prices: PriceSeries) -> ReturnSeries:
        """Calculate simple percentage returns ``r_t = (P_t / P_{t-1}) - 1``."""
        return prices.pct_change().dropna()

    def _calculate_log_returns(self, prices: PriceSeries) -> ReturnSeries:
        r"""Calculate log returns ``r_t = \ln(P_t / P_{t-1})``."""
        return pd.Series(np.log(prices / prices.shift(1))).dropna()

    def _calculate_excess_returns(
        self,
        prices: PriceSeries,
        risk_free_rate: float,
    ) -> ReturnSeries:
        """Calculate excess returns ``r_t^{ex} = r_t - r_f`` over the risk-free leg."""
        simple_returns = self._calculate_simple_returns(prices)
        daily_rf = (1 + risk_free_rate) ** (1 / 252) - 1
        return simple_returns - daily_rf  # type: ignore[no-any-return]

    def calculate_returns(
        self,
        prices: PriceFrame,
        config: ReturnConfig,
    ) -> ReturnFrame:
        """Calculate returns for each column in *prices* according to *config*.

        This function applies the return calculation method specified in the
        configuration (simple, log, or excess) to the price data. It also
        filters out assets with insufficient historical data.

        Args:
            prices (pd.DataFrame): DataFrame of prices with dates as index and
                assets as columns.
            config (ReturnConfig): Configuration object specifying calculation
                parameters.

        Returns:
            pd.DataFrame: DataFrame of calculated returns.

        """
        if prices.empty:
            return pd.DataFrame()

        price_counts = prices.notna().sum()
        insufficient = price_counts[price_counts < config.min_periods]
        for symbol, count in insufficient.items():
            logger.debug(
                "Skipping %s: only %d observations available (min %d required)",
                symbol,
                count,
                config.min_periods,
            )

        eligible_columns = price_counts[price_counts >= config.min_periods].index
        if eligible_columns.empty:
            return pd.DataFrame()

        filtered_prices = prices[eligible_columns].sort_index()
        shifted = filtered_prices.shift(1)

        with np.errstate(divide="ignore", invalid="ignore"):
            if config.method == "log":
                returns = pd.DataFrame(np.log(filtered_prices / shifted))
            else:
                returns = pd.DataFrame((filtered_prices / shifted) - 1)
                if config.method == "excess":
                    daily_rf = (1 + config.risk_free_rate) ** (1 / 252) - 1
                    returns = returns - daily_rf

        returns = returns.replace([np.inf, -np.inf], np.nan)
        returns = returns.dropna(how="all")
        returns = returns.dropna(axis=1, how="all")

        if returns.empty:
            return pd.DataFrame()

        nonzero_counts = returns.count()
        empty_returns = nonzero_counts[nonzero_counts == 0].index.tolist()
        if empty_returns:
            logger.warning(
                "No returns generated for %d assets; skipping: %s",
                len(empty_returns),
                ", ".join(empty_returns[:5]),
            )
            returns = returns.drop(columns=empty_returns)

        if returns.empty:
            return pd.DataFrame()

        extreme_counts = (returns.abs() > 1).sum()
        extreme_assets = extreme_counts[extreme_counts > 0]
        if not extreme_assets.empty:
            for symbol, count in extreme_assets.items():
                logger.debug(
                    "Found %d extreme daily returns (>100%%) for %s",
                    int(count),
                    symbol,
                )
            logger.warning(
                "Found extreme daily returns (>100%%) for %d assets, e.g., %s",
                len(extreme_assets),
                extreme_assets.index[0],
            )

        returns = returns.sort_index()
        return returns

    def _handle_missing_forward_fill(
        self,
        prices: PriceFrame,
        max_days: int,
    ) -> PriceFrame:
        """Forward fill gaps up to ``max_days`` for each asset."""
        before = int(prices.isna().sum().sum())
        filled = prices.ffill(limit=max_days)
        after = int(filled.isna().sum().sum())
        logger.debug("Forward filled %d missing values", before - after)
        return filled

    def _handle_missing_drop(self, prices: PriceFrame) -> PriceFrame:
        """Drop any row containing missing values."""
        before = len(prices)
        dropped = prices.dropna()
        logger.debug("Dropped %d rows with missing values", before - len(dropped))
        return dropped

    def _handle_missing_interpolate(
        self,
        prices: PriceFrame,
        max_days: int,
    ) -> PriceFrame:
        """Linearly interpolate gaps up to ``max_days`` consecutive NaNs."""
        interpolated = prices.interpolate(
            method="linear",
            limit=max_days,
            limit_direction="both",
        )
        filled = int(interpolated.isna().sum().sum())
        logger.debug("Interpolated missing values; %d NaNs remain", filled)
        return interpolated

    def handle_missing_data(
        self,
        prices: PriceFrame,
        config: ReturnConfig,
    ) -> PriceFrame:
        """Apply the configured missing-data strategy to *prices*.

        Args:
            prices (pd.DataFrame): The input price data.
            config (ReturnConfig): Configuration specifying the handling method.

        Returns:
            pd.DataFrame: Price data with missing values handled.

        Raises:
            ConfigurationError: If an unknown missing data handling method is configured.

        """
        if prices.empty:
            return prices

        missing_mask = prices.isna()
        initial_missing = int(missing_mask.to_numpy().sum())
        if initial_missing == 0:
            logger.debug("No missing data detected")
            return prices

        if config.handle_missing == "forward_fill":
            handled = self._handle_missing_forward_fill(
                prices,
                config.max_forward_fill_days,
            )
        elif config.handle_missing == "drop":
            handled = self._handle_missing_drop(prices)
        elif config.handle_missing == "interpolate":
            handled = self._handle_missing_interpolate(
                prices,
                config.max_forward_fill_days,
            )
        else:  # pragma: no cover - validated earlier
            raise ConfigurationError(
                None,
                f"Unknown missing data handling method: {config.handle_missing}",
            )

        remaining = int(handled.isna().sum().sum())
        if remaining > 0:
            logger.debug(
                "%d NaNs remain after missing-data handling",
                remaining,
            )

        return handled.dropna(how="all")

    def _align_dates(self, returns: ReturnFrame, config: ReturnConfig) -> ReturnFrame:
        """Align return dates according to the configuration."""
        if returns.empty:
            return returns

        returns = returns.sort_index()
        if config.align_method == "inner":
            aligned = returns.dropna(how="any")
        else:  # outer
            aligned = returns

        if config.reindex_to_business_days and not aligned.empty:
            business_index = pd.bdate_range(
                aligned.index.min(),
                aligned.index.max(),
            )
            aligned = aligned.reindex(business_index)

        return aligned

    def _resample_to_frequency(
        self,
        returns: ReturnFrame,
        frequency: str,
        method: str,
    ) -> ReturnFrame:
        """Resample the returns DataFrame to the requested frequency.

        Args:
            returns (pd.DataFrame): DataFrame of returns, typically daily.
            frequency (str): Target frequency ('weekly', 'monthly').
            method (str): Return calculation method ('log' or 'simple') used to
                determine the resampling aggregation method.

        Returns:
            pd.DataFrame: Resampled returns DataFrame.

        Raises:
            ConfigurationError: If the frequency is not supported.

        """
        if returns.empty or frequency == "daily":
            return returns

        rule_map = {
            "weekly": "W-FRI",
            "monthly": "ME",
        }
        rule = rule_map.get(frequency)
        if rule is None:
            raise ConfigurationError(None, f"Unsupported frequency: {frequency}")

        if method == "log":
            resampled = returns.resample(rule).sum(min_count=1)
        else:
            resampled = (1 + returns).resample(rule).prod(min_count=1) - 1

        return resampled.dropna(how="all")

    def _apply_coverage_filter(
        self,
        returns: ReturnFrame,
        min_coverage: float,
    ) -> ReturnFrame:
        """Remove assets that do not meet the minimum data coverage threshold."""
        if returns.empty:
            return returns

        coverage = returns.notna().mean()
        kept = coverage[coverage >= min_coverage].index.tolist()
        dropped = sorted(set(returns.columns) - set(kept))
        if dropped:
            logger.info(
                "Dropping %d assets below %.0f%% coverage: %s",
                len(dropped),
                min_coverage * 100,
                ", ".join(dropped[:5]),
            )
        filtered = returns[kept]
        return filtered

    @staticmethod
    def export_returns(returns: pd.DataFrame, path: Path) -> None:
        """Persist prepared returns as a CSV file.

        Args:
            returns (pd.DataFrame): The returns data to save.
            path (Path): The file path to save the CSV to.

        """
        returns.to_csv(path)

    def _summarize_returns(
        self,
        returns: pd.DataFrame,
        config: ReturnConfig,
    ) -> ReturnSummary | None:
        """Create summary statistics for the prepared returns."""
        if returns.empty:
            return None

        annualisation = {"daily": 252, "weekly": 52, "monthly": 12}
        factor = annualisation.get(config.frequency, 252)
        mean_returns = returns.mean() * factor
        volatility = returns.std(ddof=0) * np.sqrt(factor)
        correlation = returns.corr()
        coverage = returns.notna().mean()

        logger.debug(
            "Summary stats calculated for %d assets (freq=%s)",
            returns.shape[1],
            config.frequency,
        )

        return ReturnSummary(
            mean_returns=mean_returns,
            volatility=volatility,
            correlation=correlation,
            coverage=coverage,
        )

latest_summary property

Return the summary produced by the most recent pipeline run.

load_and_prepare(assets, prices_dir, config)

Execute the full return-preparation pipeline for assets.

Source code in src/portfolio_management/analytics/returns/calculator.py
def load_and_prepare(
    self,
    assets: list[SelectedAsset] | None,
    prices_dir: Path,
    config: ReturnConfig,
) -> pd.DataFrame:
    """Execute the full return-preparation pipeline for *assets*."""
    if assets is None:
        raise InsufficientDataError(required_periods=1, available_periods=0)
    if not assets:
        raise InsufficientDataError(required_periods=1, available_periods=0)
    if not prices_dir.exists():
        raise ReturnCalculationError(
            f"Prices directory does not exist: {prices_dir}. "
            "Ensure price files have been exported.",
        )

    try:
        config.validate()
    except (ConfigurationError, ValueError) as exc:
        raise ReturnCalculationError(
            f"Invalid return configuration: {exc}",
        ) from exc

    logger.info("Preparing returns for %d assets", len(assets))

    prices = self.price_loader.load_multiple_prices(assets, prices_dir)
    if prices.empty:
        self._latest_summary = None
        raise InsufficientDataError(required_periods=1, available_periods=0)

    prices = self.handle_missing_data(prices, config)
    if prices.empty:
        self._latest_summary = None
        raise InsufficientDataError(required_periods=1, available_periods=0)

    returns = self.calculate_returns(prices, config)
    if returns.empty:
        self._latest_summary = None
        raise InsufficientDataError(required_periods=1, available_periods=0)

    returns = self._align_dates(returns, config)
    returns = self._resample_to_frequency(returns, config.frequency, config.method)
    returns = self._apply_coverage_filter(returns, config.min_coverage)

    if returns.empty:
        self._latest_summary = None
        raise InsufficientDataError(required_periods=1, available_periods=0)

    self._latest_summary = self._summarize_returns(returns, config)
    if self._latest_summary:
        logger.info(
            "Prepared returns: assets=%d, periods=%d, mean_annual_return=%.2f%%",
            returns.shape[1],
            returns.shape[0],
            float(self._latest_summary.mean_returns.mean()) * 100,
        )

    return returns

calculate_returns(prices, config)

Calculate returns for each column in prices according to config.

This function applies the return calculation method specified in the configuration (simple, log, or excess) to the price data. It also filters out assets with insufficient historical data.

Parameters:

Name Type Description Default
prices DataFrame

DataFrame of prices with dates as index and assets as columns.

required
config ReturnConfig

Configuration object specifying calculation parameters.

required

Returns:

Type Description
ReturnFrame

pd.DataFrame: DataFrame of calculated returns.

Source code in src/portfolio_management/analytics/returns/calculator.py
def calculate_returns(
    self,
    prices: PriceFrame,
    config: ReturnConfig,
) -> ReturnFrame:
    """Calculate returns for each column in *prices* according to *config*.

    This function applies the return calculation method specified in the
    configuration (simple, log, or excess) to the price data. It also
    filters out assets with insufficient historical data.

    Args:
        prices (pd.DataFrame): DataFrame of prices with dates as index and
            assets as columns.
        config (ReturnConfig): Configuration object specifying calculation
            parameters.

    Returns:
        pd.DataFrame: DataFrame of calculated returns.

    """
    if prices.empty:
        return pd.DataFrame()

    price_counts = prices.notna().sum()
    insufficient = price_counts[price_counts < config.min_periods]
    for symbol, count in insufficient.items():
        logger.debug(
            "Skipping %s: only %d observations available (min %d required)",
            symbol,
            count,
            config.min_periods,
        )

    eligible_columns = price_counts[price_counts >= config.min_periods].index
    if eligible_columns.empty:
        return pd.DataFrame()

    filtered_prices = prices[eligible_columns].sort_index()
    shifted = filtered_prices.shift(1)

    with np.errstate(divide="ignore", invalid="ignore"):
        if config.method == "log":
            returns = pd.DataFrame(np.log(filtered_prices / shifted))
        else:
            returns = pd.DataFrame((filtered_prices / shifted) - 1)
            if config.method == "excess":
                daily_rf = (1 + config.risk_free_rate) ** (1 / 252) - 1
                returns = returns - daily_rf

    returns = returns.replace([np.inf, -np.inf], np.nan)
    returns = returns.dropna(how="all")
    returns = returns.dropna(axis=1, how="all")

    if returns.empty:
        return pd.DataFrame()

    nonzero_counts = returns.count()
    empty_returns = nonzero_counts[nonzero_counts == 0].index.tolist()
    if empty_returns:
        logger.warning(
            "No returns generated for %d assets; skipping: %s",
            len(empty_returns),
            ", ".join(empty_returns[:5]),
        )
        returns = returns.drop(columns=empty_returns)

    if returns.empty:
        return pd.DataFrame()

    extreme_counts = (returns.abs() > 1).sum()
    extreme_assets = extreme_counts[extreme_counts > 0]
    if not extreme_assets.empty:
        for symbol, count in extreme_assets.items():
            logger.debug(
                "Found %d extreme daily returns (>100%%) for %s",
                int(count),
                symbol,
            )
        logger.warning(
            "Found extreme daily returns (>100%%) for %d assets, e.g., %s",
            len(extreme_assets),
            extreme_assets.index[0],
        )

    returns = returns.sort_index()
    return returns

handle_missing_data(prices, config)

Apply the configured missing-data strategy to prices.

Parameters:

Name Type Description Default
prices DataFrame

The input price data.

required
config ReturnConfig

Configuration specifying the handling method.

required

Returns:

Type Description
PriceFrame

pd.DataFrame: Price data with missing values handled.

Raises:

Type Description
ConfigurationError

If an unknown missing data handling method is configured.

Source code in src/portfolio_management/analytics/returns/calculator.py
def handle_missing_data(
    self,
    prices: PriceFrame,
    config: ReturnConfig,
) -> PriceFrame:
    """Apply the configured missing-data strategy to *prices*.

    Args:
        prices (pd.DataFrame): The input price data.
        config (ReturnConfig): Configuration specifying the handling method.

    Returns:
        pd.DataFrame: Price data with missing values handled.

    Raises:
        ConfigurationError: If an unknown missing data handling method is configured.

    """
    if prices.empty:
        return prices

    missing_mask = prices.isna()
    initial_missing = int(missing_mask.to_numpy().sum())
    if initial_missing == 0:
        logger.debug("No missing data detected")
        return prices

    if config.handle_missing == "forward_fill":
        handled = self._handle_missing_forward_fill(
            prices,
            config.max_forward_fill_days,
        )
    elif config.handle_missing == "drop":
        handled = self._handle_missing_drop(prices)
    elif config.handle_missing == "interpolate":
        handled = self._handle_missing_interpolate(
            prices,
            config.max_forward_fill_days,
        )
    else:  # pragma: no cover - validated earlier
        raise ConfigurationError(
            None,
            f"Unknown missing data handling method: {config.handle_missing}",
        )

    remaining = int(handled.isna().sum().sum())
    if remaining > 0:
        logger.debug(
            "%d NaNs remain after missing-data handling",
            remaining,
        )

    return handled.dropna(how="all")

export_returns(returns, path) staticmethod

Persist prepared returns as a CSV file.

Parameters:

Name Type Description Default
returns DataFrame

The returns data to save.

required
path Path

The file path to save the CSV to.

required
Source code in src/portfolio_management/analytics/returns/calculator.py
@staticmethod
def export_returns(returns: pd.DataFrame, path: Path) -> None:
    """Persist prepared returns as a CSV file.

    Args:
        returns (pd.DataFrame): The returns data to save.
        path (Path): The file path to save the CSV to.

    """
    returns.to_csv(path)

ReturnConfig dataclass

Configuration for return preparation.

This dataclass holds all settings related to the calculation and cleaning of asset returns. It provides a centralized place to define the behavior of the ReturnCalculator.

Attributes:

Name Type Description
method str

The method for calculating returns. Options: 'simple', 'log', 'excess'. Defaults to 'simple'.

frequency str

The target frequency for the returns. Options: 'daily', 'weekly', 'monthly'. Defaults to 'daily'.

risk_free_rate float

The annualized risk-free rate to use when method is 'excess'. Defaults to 0.0.

handle_missing str

The strategy for handling missing price data. Options: 'forward_fill', 'drop', 'interpolate'. Defaults to 'forward_fill'.

max_forward_fill_days int

The maximum number of consecutive days to forward-fill missing data. Defaults to 5.

min_periods int

The minimum number of price observations required for an asset to be included. Defaults to 2.

align_method str

The method for aligning dates across assets. Options: 'outer' (union of dates), 'inner' (intersection of dates). Defaults to 'outer'.

reindex_to_business_days bool

Whether to reindex the final returns to a standard business day calendar. Defaults to False.

min_coverage float

The minimum proportion of non-NaN returns an asset must have to be kept after processing. Defaults to 0.8.

Example

Create a config for weekly log returns, requiring at least 1 year of data

config = ReturnConfig( ... method="log", ... frequency="weekly", ... min_periods=52, ... handle_missing="interpolate", ... max_forward_fill_days=3, ... min_coverage=0.95 ... ) config.validate() # Raises ValueError on invalid settings

Source code in src/portfolio_management/analytics/returns/config.py
@dataclass
class ReturnConfig:
    """Configuration for return preparation.

    This dataclass holds all settings related to the calculation and cleaning
    of asset returns. It provides a centralized place to define the behavior
    of the `ReturnCalculator`.

    Attributes:
        method (str): The method for calculating returns.
            Options: 'simple', 'log', 'excess'. Defaults to 'simple'.
        frequency (str): The target frequency for the returns.
            Options: 'daily', 'weekly', 'monthly'. Defaults to 'daily'.
        risk_free_rate (float): The annualized risk-free rate to use when
            `method` is 'excess'. Defaults to 0.0.
        handle_missing (str): The strategy for handling missing price data.
            Options: 'forward_fill', 'drop', 'interpolate'. Defaults to 'forward_fill'.
        max_forward_fill_days (int): The maximum number of consecutive days to
            forward-fill missing data. Defaults to 5.
        min_periods (int): The minimum number of price observations required
            for an asset to be included. Defaults to 2.
        align_method (str): The method for aligning dates across assets.
            Options: 'outer' (union of dates), 'inner' (intersection of dates).
            Defaults to 'outer'.
        reindex_to_business_days (bool): Whether to reindex the final returns
            to a standard business day calendar. Defaults to False.
        min_coverage (float): The minimum proportion of non-NaN returns an
            asset must have to be kept after processing. Defaults to 0.8.

    Example:
        >>> # Create a config for weekly log returns, requiring at least 1 year of data
        >>> config = ReturnConfig(
        ...     method="log",
        ...     frequency="weekly",
        ...     min_periods=52,
        ...     handle_missing="interpolate",
        ...     max_forward_fill_days=3,
        ...     min_coverage=0.95
        ... )
        >>> config.validate() # Raises ValueError on invalid settings

    """

    method: str = "simple"  # one of: simple, log, excess
    frequency: str = "daily"  # one of: daily, weekly, monthly
    risk_free_rate: float = 0.0  # annual rate used for excess returns
    handle_missing: str = "forward_fill"  # forward_fill, drop, interpolate
    max_forward_fill_days: int = 5
    min_periods: int = 2  # minimum price observations required per asset
    align_method: str = "outer"  # outer keeps full union, inner = intersection
    reindex_to_business_days: bool = False
    min_coverage: float = 0.8  # minimum proportion of non-NaN returns per asset

    def validate(self) -> None:
        """Validate the configuration values and raise ``ConfigurationError`` on issues."""
        if self.method not in {"simple", "log", "excess"}:
            raise ConfigurationError(None, f"Invalid return method: {self.method}")
        if self.frequency not in {"daily", "weekly", "monthly"}:
            raise ConfigurationError(
                None,
                f"Invalid return frequency: {self.frequency}",
            )
        if self.handle_missing not in {"forward_fill", "drop", "interpolate"}:
            raise ConfigurationError(
                None,
                f"Invalid missing data handling method: {self.handle_missing}",
            )
        if self.align_method not in {"outer", "inner"}:
            raise ConfigurationError(None, f"Invalid align_method: {self.align_method}")
        if self.max_forward_fill_days < 0:
            raise ConfigurationError(None, "max_forward_fill_days must be >= 0")
        if self.min_periods <= 1:
            raise ConfigurationError(None, "min_periods must be greater than 1")
        if not 0 < self.min_coverage <= 1:
            raise ConfigurationError(None, "min_coverage must be within (0, 1]")

    @classmethod
    def default(cls) -> ReturnConfig:
        """Factory for the default (daily, simple) configuration."""
        return cls()

    @classmethod
    def monthly_simple(cls) -> ReturnConfig:
        """Factory that annualises to monthly simple returns."""
        return cls(method="simple", frequency="monthly")

    @classmethod
    def weekly_log(cls) -> ReturnConfig:
        """Factory that prepares weekly log returns."""
        return cls(method="log", frequency="weekly")

validate()

Validate the configuration values and raise ConfigurationError on issues.

Source code in src/portfolio_management/analytics/returns/config.py
def validate(self) -> None:
    """Validate the configuration values and raise ``ConfigurationError`` on issues."""
    if self.method not in {"simple", "log", "excess"}:
        raise ConfigurationError(None, f"Invalid return method: {self.method}")
    if self.frequency not in {"daily", "weekly", "monthly"}:
        raise ConfigurationError(
            None,
            f"Invalid return frequency: {self.frequency}",
        )
    if self.handle_missing not in {"forward_fill", "drop", "interpolate"}:
        raise ConfigurationError(
            None,
            f"Invalid missing data handling method: {self.handle_missing}",
        )
    if self.align_method not in {"outer", "inner"}:
        raise ConfigurationError(None, f"Invalid align_method: {self.align_method}")
    if self.max_forward_fill_days < 0:
        raise ConfigurationError(None, "max_forward_fill_days must be >= 0")
    if self.min_periods <= 1:
        raise ConfigurationError(None, "min_periods must be greater than 1")
    if not 0 < self.min_coverage <= 1:
        raise ConfigurationError(None, "min_coverage must be within (0, 1]")

default() classmethod

Factory for the default (daily, simple) configuration.

Source code in src/portfolio_management/analytics/returns/config.py
@classmethod
def default(cls) -> ReturnConfig:
    """Factory for the default (daily, simple) configuration."""
    return cls()

monthly_simple() classmethod

Factory that annualises to monthly simple returns.

Source code in src/portfolio_management/analytics/returns/config.py
@classmethod
def monthly_simple(cls) -> ReturnConfig:
    """Factory that annualises to monthly simple returns."""
    return cls(method="simple", frequency="monthly")

weekly_log() classmethod

Factory that prepares weekly log returns.

Source code in src/portfolio_management/analytics/returns/config.py
@classmethod
def weekly_log(cls) -> ReturnConfig:
    """Factory that prepares weekly log returns."""
    return cls(method="log", frequency="weekly")

ReturnSummary dataclass

Summary statistics produced alongside prepared returns.

This dataclass acts as a container for key statistics derived from a returns matrix, typically generated by the ReturnCalculator. It bundles related data together for convenient access.

Attributes:

Name Type Description
mean_returns Series

A Series of annualized mean returns for each asset.

volatility Series

A Series of annualized volatility (standard deviation) for each asset.

correlation DataFrame

A DataFrame representing the correlation matrix between all assets' returns.

coverage Series

A Series indicating the proportion of non-missing return data for each asset over the calculation period.

Example

import pandas as pd

summary = ReturnSummary( ... mean_returns=pd.Series({"A": 0.1, "B": 0.12}), ... volatility=pd.Series({"A": 0.2, "B": 0.25}), ... correlation=pd.DataFrame({"A": [1.0, 0.5], "B": [0.5, 1.0]}, index=["A", "B"]), ... coverage=pd.Series({"A": 1.0, "B": 0.98}) ... ) print(f"Mean return for A: {summary.mean_returns['A']:.2f}") Mean return for A: 0.10

Source code in src/portfolio_management/analytics/returns/models.py
@dataclass
class ReturnSummary:
    """Summary statistics produced alongside prepared returns.

    This dataclass acts as a container for key statistics derived from a
    returns matrix, typically generated by the `ReturnCalculator`. It bundles
    related data together for convenient access.

    Attributes:
        mean_returns (pd.Series): A Series of annualized mean returns for each asset.
        volatility (pd.Series): A Series of annualized volatility (standard
            deviation) for each asset.
        correlation (pd.DataFrame): A DataFrame representing the correlation
            matrix between all assets' returns.
        coverage (pd.Series): A Series indicating the proportion of non-missing
            return data for each asset over the calculation period.

    Example:
        >>> import pandas as pd
        >>>
        >>> summary = ReturnSummary(
        ...     mean_returns=pd.Series({"A": 0.1, "B": 0.12}),
        ...     volatility=pd.Series({"A": 0.2, "B": 0.25}),
        ...     correlation=pd.DataFrame({"A": [1.0, 0.5], "B": [0.5, 1.0]}, index=["A", "B"]),
        ...     coverage=pd.Series({"A": 1.0, "B": 0.98})
        ... )
        >>> print(f"Mean return for A: {summary.mean_returns['A']:.2f}")
        Mean return for A: 0.10

    """

    mean_returns: pd.Series
    volatility: pd.Series
    correlation: pd.DataFrame
    coverage: pd.Series

options: show_root_heading: true show_source: false members_order: source group_by_category: true show_category_heading: true