Skip to content

Assets API Reference

The assets package manages asset selection, classification, and universe definitions.

Overview

The assets package contains:

  • Selection - Asset filtering and selection logic
  • Classification - Asset type classification
  • Universes - Universe management and validation

Assets Package

portfolio_management.assets

Handles the definition, selection, and classification of financial assets.

This package forms the core of the asset management layer, responsible for transforming raw instrument data into well-defined, filtered, and classified investment universes. It acts as the bridge between raw data sources and the portfolio construction engine.

Pipeline Position

Data Layer -> Assets Layer -> Portfolio Layer

  1. Input: Raw asset metadata (e.g., from tradeable_matches.csv).
  2. Process:
    • selection: Filters assets based on data quality, history, and market criteria.
    • classification: Assigns assets to categories like asset class, geography, and sub-class.
    • universes: Combines selection and classification rules defined in YAML to build complete, investable universes.
  3. Output: A structured collection of assets, their classifications, and associated returns, ready for analysis and optimization.
Key Classes
  • AssetSelector: Filters assets using a multi-stage pipeline.
  • AssetClassifier: Classifies assets using a rule-based engine.
  • UniverseManager: The main entry point for loading and managing universes defined in a configuration file.
  • FilterCriteria: Defines the rules for asset selection.
  • UniverseDefinition: Defines the complete configuration for a universe.
Usage Example

This example demonstrates the end-to-end workflow of loading a universe.

In a real application, the config file and data would already exist.

from pathlib import Path import pandas as pd from portfolio_management.assets import UniverseManager

Assume the following setup:

1. A universe configuration file 'config/universes.yaml' with a

'global_equity' universe defined.

2. A DataFrame 'matches_df' containing metadata for all tradeable assets.

3. A directory 'prices/' containing historical price data for the assets.

Conceptual initialization (replace with actual paths and data):

>>> manager = UniverseManager(

... config_path=Path("config/universes.yaml"),

... matches_df=matches_df,

... prices_dir=Path("prices/")

... )

Load the 'global_equity' universe:

>>> universe_data = manager.load_universe("global_equity")

The resulting 'universe_data' is a dictionary containing:

- universe_data['assets']: DataFrame of selected asset metadata.

- universe_data['classifications']: DataFrame of asset classifications.

- universe_data['returns']: DataFrame of historical asset returns.

- universe_data['metadata']: Series containing universe definition.

>>> if universe_data:

... print(f"Loaded {len(universe_data['assets'])} assets for 'global_equity'.")

... print("Asset Classifications:")

... print(universe_data['classifications'][['symbol', 'asset_class']].head())

AssetClass

Bases: str, Enum

Broad asset classes.

Source code in src/portfolio_management/assets/classification/classification.py
class AssetClass(str, Enum):
    """Broad asset classes."""

    EQUITY = "equity"
    FIXED_INCOME = "fixed_income"
    ALTERNATIVE = "alternative"
    CASH = "cash"
    COMMODITY = "commodity"
    REAL_ESTATE = "real_estate"
    UNKNOWN = "unknown"

AssetClassification dataclass

Represents the classification of a single asset.

This data structure holds the complete classification profile for an asset after it has been processed by the AssetClassifier.

Attributes:

Name Type Description
symbol str

The unique ticker symbol for the asset.

isin str

The International Securities Identification Number.

name str

The human-readable name of the asset.

asset_class str

The broad asset class (e.g., 'equity', 'fixed_income').

sub_class str

The more granular sub-class (e.g., 'large_cap', 'government').

geography Geography

The geographical region of the asset.

sector str | None

The industry sector (optional, often populated by external data).

confidence float

A score from 0.0 to 1.0 indicating the classifier's confidence in the result. 1.0 indicates a manual override.

Source code in src/portfolio_management/assets/classification/classification.py
@dataclass
class AssetClassification:
    """Represents the classification of a single asset.

    This data structure holds the complete classification profile for an asset
    after it has been processed by the `AssetClassifier`.

    Attributes:
        symbol: The unique ticker symbol for the asset.
        isin: The International Securities Identification Number.
        name: The human-readable name of the asset.
        asset_class: The broad asset class (e.g., 'equity', 'fixed_income').
        sub_class: The more granular sub-class (e.g., 'large_cap', 'government').
        geography: The geographical region of the asset.
        sector: The industry sector (optional, often populated by external data).
        confidence: A score from 0.0 to 1.0 indicating the classifier's
            confidence in the result. 1.0 indicates a manual override.

    """

    symbol: str
    isin: str
    name: str
    asset_class: str
    sub_class: str
    geography: Geography
    sector: str | None = None
    confidence: float = 1.0

AssetClassifier

Applies a rule-based engine to classify assets.

This classifier determines an asset's class, sub-class, and geography by applying a series of rules based on keywords found in the asset's metadata (e.g., name, category). It is designed to provide a baseline classification that can be augmented with manual overrides for improved accuracy.

The classification logic is primarily handled by the _classify_dataframe method, which uses vectorized pandas operations for efficiency.

Attributes:

Name Type Description
overrides ClassificationOverrides

A collection of manual overrides that will take precedence over the rule-based engine.

Methods:

Name Description
- `classify_universe`

Classifies a list of assets and returns a DataFrame.

- `classify_asset`

Classifies a single asset.

Example

from portfolio_management.assets.selection import SelectedAsset

assets = [ ... SelectedAsset( ... symbol="AAPL.US", isin="US0378331005", name="Apple Inc. Equity", ... market="US", region="North America", currency="USD", category="stock", ... price_start="2010-01-01", price_end="2023-01-01", price_rows=3276, ... data_status="ok", data_flags="", stooq_path="", resolved_currency="USD", ... currency_status="matched" ... ) ... ] classifier = AssetClassifier() results = classifier.classify_universe(assets) result_series = results.iloc[0] result_series['symbol'] 'AAPL.US' result_series['asset_class'] 'equity' result_series['geography'] 'north_america'

Source code in src/portfolio_management/assets/classification/classification.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
class AssetClassifier:
    """Applies a rule-based engine to classify assets.

    This classifier determines an asset's class, sub-class, and geography by
    applying a series of rules based on keywords found in the asset's metadata
    (e.g., name, category). It is designed to provide a baseline classification
    that can be augmented with manual overrides for improved accuracy.

    The classification logic is primarily handled by the `_classify_dataframe`
    method, which uses vectorized pandas operations for efficiency.

    Attributes:
        overrides (ClassificationOverrides): A collection of manual overrides
            that will take precedence over the rule-based engine.

    Methods:
        - `classify_universe`: Classifies a list of assets and returns a DataFrame.
        - `classify_asset`: Classifies a single asset.

    Example:
        >>> from portfolio_management.assets.selection import SelectedAsset
        >>>
        >>> assets = [
        ...     SelectedAsset(
        ...         symbol="AAPL.US", isin="US0378331005", name="Apple Inc. Equity",
        ...         market="US", region="North America", currency="USD", category="stock",
        ...         price_start="2010-01-01", price_end="2023-01-01", price_rows=3276,
        ...         data_status="ok", data_flags="", stooq_path="", resolved_currency="USD",
        ...         currency_status="matched"
        ...     )
        ... ]
        >>> classifier = AssetClassifier()
        >>> results = classifier.classify_universe(assets)
        >>> result_series = results.iloc[0]
        >>> result_series['symbol']
        'AAPL.US'
        >>> result_series['asset_class']
        'equity'
        >>> result_series['geography']
        'north_america'

    """

    EQUITY_KEYWORDS: ClassVar[set[str]] = {"stock", "equity", "shares", "fund", "etf"}
    BOND_KEYWORDS: ClassVar[set[str]] = {"bond", "gilt", "treasury", "credit"}
    COMMODITY_KEYWORDS: ClassVar[set[str]] = {"gold", "silver", "oil", "commodity"}
    REAL_ESTATE_KEYWORDS: ClassVar[set[str]] = {"reit", "real estate"}
    LOW_CONFIDENCE_THRESHOLD: ClassVar[float] = 0.6

    GEOGRAPHY_PATTERNS: ClassVar[dict[Geography, list[str]]] = {
        Geography.NORTH_AMERICA: ["us", "usa", "america", "usd", "north america"],
        Geography.UNITED_KINGDOM: ["uk", "gbr", "gbp", "british", "united kingdom"],
        Geography.EUROPE: ["de", "fr", "eur", "europe"],
        Geography.ASIA_PACIFIC: ["jp", "jpy", "asia"],
    }

    def __init__(self, overrides: ClassificationOverrides | None = None):
        """Initialise the classifier with optional manual overrides."""
        self.overrides = overrides or ClassificationOverrides()

    def classify_asset(self, asset: SelectedAsset) -> AssetClassification:
        """Classifies a single asset using keyword-based rules.

        This method first checks for a manual override for the asset. If none
        is found, it applies rules based on the asset's name and category to
        determine its classification. This method is suitable for classifying
        individual assets but is less efficient than `classify_universe` for
        large batches.

        Args:
            asset: The `SelectedAsset` instance to classify.

        Returns:
            An `AssetClassification` instance containing the classification results.

        """
        override = self.overrides.overrides.get(
            asset.isin,
        ) or self.overrides.overrides.get(asset.symbol)
        if override:
            return AssetClassification(
                symbol=asset.symbol,
                isin=asset.isin,
                name=asset.name,
                asset_class=str(override.get("asset_class", AssetClass.UNKNOWN.value)),
                sub_class=str(override.get("sub_class", SubClass.UNKNOWN.value)),
                geography=Geography(override.get("geography", Geography.UNKNOWN)),
                sector=override.get("sector"),
                confidence=1.0,
            )

        asset_class_from_name = self._classify_by_name(asset)
        asset_class_from_cat = self._classify_by_category(asset)

        if {
            asset_class_from_name,
            asset_class_from_cat,
        } == {AssetClass.UNKNOWN}:
            asset_class = AssetClass.UNKNOWN
            confidence = 0.5
        elif AssetClass.UNKNOWN not in {
            asset_class_from_name,
            asset_class_from_cat,
        }:
            asset_class = asset_class_from_name
            confidence = 0.9
        elif asset_class_from_name != AssetClass.UNKNOWN:
            asset_class = asset_class_from_name
            confidence = 0.7
        else:
            asset_class = asset_class_from_cat
            confidence = 0.7

        geography = self._classify_geography(asset)
        sub_class = self._classify_sub_class(asset, asset_class)

        return AssetClassification(
            symbol=asset.symbol,
            isin=asset.isin,
            name=asset.name,
            asset_class=(
                asset_class.value
                if isinstance(asset_class, AssetClass)
                else str(asset_class)
            ),
            sub_class=(
                sub_class.value if isinstance(sub_class, SubClass) else str(sub_class)
            ),
            geography=geography,
            confidence=confidence,
        )

    def classify_universe(self, assets: list[SelectedAsset]) -> pd.DataFrame:
        """Classifies a list of assets and returns a DataFrame of results.

        This is the primary method for bulk classification. It converts the list
        of assets into a pandas DataFrame and uses efficient, vectorized
        operations to apply the classification rules.

        Args:
            assets: A list of `SelectedAsset` objects to be classified.

        Returns:
            A pandas DataFrame where each row represents an asset and columns
            contain the classification results (e.g., 'asset_class', 'geography').

        Raises:
            DataValidationError: If the input is None or not a list.
            ClassificationError: If assets cannot be serialized for processing.

        """
        if assets is None:
            raise DataValidationError(
                "Assets to classify cannot be None.",
            )
        if not isinstance(assets, list):
            raise DataValidationError(
                "Assets must be provided as a list.",
            )
        if not assets:
            logging.getLogger(__name__).info("No assets supplied for classification.")
            return pd.DataFrame(
                columns=[
                    "symbol",
                    "isin",
                    "name",
                    "asset_class",
                    "sub_class",
                    "geography",
                    "sector",
                    "confidence",
                ],
            )

        try:
            asset_dicts = [asdict(asset) for asset in assets]
        except TypeError as exc:  # pragma: no cover - defensive
            raise ClassificationError(
                "Failed to serialise assets for classification."
            ) from exc

        assets_df = pd.DataFrame(asset_dicts)
        df = self._classify_dataframe(assets_df)

        logger = logging.getLogger(__name__)
        logger.info("Classified %d assets.", len(df))
        logger.info("Asset class breakdown:\n%s", df["asset_class"].value_counts())
        logger.info("Geography breakdown:\n%s", df["geography"].value_counts())
        low_confidence = df[df["confidence"] < self.LOW_CONFIDENCE_THRESHOLD]
        if not low_confidence.empty:
            logger.warning(
                "%d assets with low classification confidence.",
                len(low_confidence),
            )
            logger.warning(
                "\n%s",
                low_confidence[["symbol", "name", "asset_class", "confidence"]],
            )

        return df

    def _contains_keywords(self, series: pd.Series, keywords: set[str]) -> pd.Series:
        if series.empty or not keywords:
            return pd.Series(False, index=series.index)
        pattern = "|".join(re.escape(keyword) for keyword in keywords if keyword)
        if not pattern:
            return pd.Series(False, index=series.index)
        return series.str.contains(pattern, na=False)

    def _classify_dataframe(self, assets_df: pd.DataFrame) -> pd.DataFrame:
        def column_or_empty(column: str) -> pd.Series:
            if column in assets_df:
                return assets_df[column]
            return pd.Series([""] * len(assets_df), index=assets_df.index, dtype=object)

        name_lower = column_or_empty("name").fillna("").astype(str).str.lower()
        category_lower = column_or_empty("category").fillna("").astype(str).str.lower()
        region_lower = column_or_empty("region").fillna("").astype(str).str.lower()
        currency_lower = column_or_empty("currency").fillna("").astype(str).str.lower()

        result_df = pd.DataFrame(
            {
                "symbol": column_or_empty("symbol"),
                "isin": column_or_empty("isin"),
                "name": column_or_empty("name"),
            },
            index=assets_df.index,
        )

        unknown_class = AssetClass.UNKNOWN.value
        asset_class_name = pd.Series(unknown_class, index=result_df.index, dtype=object)

        equity_mask = self._contains_keywords(name_lower, self.EQUITY_KEYWORDS)
        asset_class_name[equity_mask] = AssetClass.EQUITY.value

        remaining = asset_class_name == unknown_class
        bond_mask = remaining & self._contains_keywords(name_lower, self.BOND_KEYWORDS)
        asset_class_name[bond_mask] = AssetClass.FIXED_INCOME.value

        remaining = asset_class_name == unknown_class
        commodity_mask = remaining & self._contains_keywords(
            name_lower, self.COMMODITY_KEYWORDS
        )
        asset_class_name[commodity_mask] = AssetClass.COMMODITY.value

        remaining = asset_class_name == unknown_class
        real_estate_mask = remaining & self._contains_keywords(
            name_lower, self.REAL_ESTATE_KEYWORDS
        )
        asset_class_name[real_estate_mask] = AssetClass.REAL_ESTATE.value

        class_from_category = pd.Series(
            unknown_class, index=result_df.index, dtype=object
        )
        stock_mask = category_lower.str.contains("stock", na=False)
        class_from_category[stock_mask] = AssetClass.EQUITY.value

        remaining_cat = class_from_category == unknown_class
        etf_mask = remaining_cat & category_lower.str.contains("etf", na=False)
        class_from_category[etf_mask] = AssetClass.EQUITY.value

        remaining_cat = class_from_category == unknown_class
        bond_cat_mask = remaining_cat & category_lower.str.contains("bond", na=False)
        class_from_category[bond_cat_mask] = AssetClass.FIXED_INCOME.value

        unknown_name_mask = asset_class_name == unknown_class
        unknown_cat_mask = class_from_category == unknown_class

        asset_class = asset_class_name.copy()
        confidence = pd.Series(0.7, index=result_df.index, dtype=float)

        both_unknown = unknown_name_mask & unknown_cat_mask
        asset_class[both_unknown] = unknown_class
        confidence[both_unknown] = 0.5

        both_known = (~unknown_name_mask) & (~unknown_cat_mask)
        confidence[both_known] = 0.9

        name_only_known = (~unknown_name_mask) & unknown_cat_mask
        asset_class[name_only_known] = asset_class_name[name_only_known]
        confidence[name_only_known] = 0.7

        cat_only_known = unknown_name_mask & (~unknown_cat_mask)
        asset_class[cat_only_known] = class_from_category[cat_only_known]
        confidence[cat_only_known] = 0.7

        geography = pd.Series(Geography.UNKNOWN, index=result_df.index, dtype=object)
        assigned_geo = geography != Geography.UNKNOWN

        for geo_enum, patterns in self.GEOGRAPHY_PATTERNS.items():
            patterns_lower = [pattern.lower() for pattern in patterns]
            mask_region = region_lower.isin(patterns_lower)
            mask_currency = currency_lower.isin(patterns_lower)
            pattern_regex = "|".join(
                re.escape(pattern) for pattern in patterns_lower if pattern
            )
            mask_name = (
                name_lower.str.contains(pattern_regex, na=False)
                if pattern_regex
                else pd.Series(False, index=result_df.index)
            )
            combined = (~assigned_geo) & (mask_region | mask_currency | mask_name)
            geography[combined] = geo_enum
            assigned_geo = geography != Geography.UNKNOWN

        sub_class = pd.Series(
            SubClass.UNKNOWN.value, index=result_df.index, dtype=object
        )
        equity_asset_mask = asset_class == AssetClass.EQUITY.value
        sub_class[
            equity_asset_mask & name_lower.str.contains("large cap", na=False)
        ] = SubClass.LARGE_CAP.value
        sub_class[
            equity_asset_mask & name_lower.str.contains("small cap", na=False)
        ] = SubClass.SMALL_CAP.value
        sub_class[equity_asset_mask & name_lower.str.contains("value", na=False)] = (
            SubClass.VALUE.value
        )
        sub_class[equity_asset_mask & name_lower.str.contains("growth", na=False)] = (
            SubClass.GROWTH.value
        )
        sub_class[equity_asset_mask & name_lower.str.contains("dividend", na=False)] = (
            SubClass.DIVIDEND.value
        )

        fixed_income_mask = asset_class == AssetClass.FIXED_INCOME.value
        sub_class[
            fixed_income_mask
            & name_lower.str.contains("government|gilt|treasury", na=False)
        ] = SubClass.GOVERNMENT.value
        sub_class[
            fixed_income_mask & name_lower.str.contains("corporate", na=False)
        ] = SubClass.CORPORATE.value
        sub_class[
            fixed_income_mask & name_lower.str.contains("high yield", na=False)
        ] = SubClass.HIGH_YIELD.value

        commodity_asset_mask = asset_class == AssetClass.COMMODITY.value
        sub_class[commodity_asset_mask & name_lower.str.contains("gold", na=False)] = (
            SubClass.GOLD.value
        )

        real_estate_asset_mask = asset_class == AssetClass.REAL_ESTATE.value
        sub_class[
            real_estate_asset_mask & name_lower.str.contains("reit", na=False)
        ] = SubClass.REIT.value

        result_df["asset_class"] = asset_class.astype(str)
        result_df["sub_class"] = sub_class.astype(str)
        result_df["geography"] = geography.apply(lambda x: x.value)
        result_df["sector"] = None
        result_df["confidence"] = confidence

        if self.overrides.overrides:
            isin_series = assets_df.get("isin", pd.Series([], dtype=str)).fillna("")
            symbol_series = assets_df.get("symbol", pd.Series([], dtype=str)).fillna("")
            override_keys = isin_series.where(isin_series != "", symbol_series)
            for idx, key in enumerate(override_keys):
                override = self.overrides.overrides.get(key)
                if not override:
                    continue
                asset_class_override = override.get(
                    "asset_class", AssetClass.UNKNOWN.value
                )
                if isinstance(asset_class_override, AssetClass):
                    asset_class_override = asset_class_override.value
                result_df.at[idx, "asset_class"] = str(asset_class_override)

                sub_class_override = override.get("sub_class", SubClass.UNKNOWN.value)
                if isinstance(sub_class_override, SubClass):
                    sub_class_override = sub_class_override.value
                result_df.at[idx, "sub_class"] = str(sub_class_override)

                geography_override = override.get("geography", Geography.UNKNOWN)
                if not isinstance(geography_override, Geography):
                    try:
                        geography_override = Geography(geography_override)
                    except ValueError:
                        geography_override = Geography.UNKNOWN
                result_df.at[idx, "geography"] = geography_override

                result_df.at[idx, "sector"] = override.get("sector")
                confidence_override = override.get("confidence", 1.0)
                try:
                    result_df.at[idx, "confidence"] = float(confidence_override)
                except (TypeError, ValueError):
                    result_df.at[idx, "confidence"] = 1.0

        return result_df

    @staticmethod
    def _normalize_text(value: object) -> str | None:
        """Normalize potentially missing values to lower-case strings."""
        if value is None:
            return None
        if isinstance(value, str):
            stripped = value.strip()
            return stripped.lower() if stripped else None
        if value is pd.NA or value is pd.NaT:
            return None
        if isinstance(value, float):
            if math.isnan(value):
                return None
            normalized_float = str(value).strip().lower()
            return normalized_float or None
        normalized = str(value).strip().lower()
        return normalized or None

    def _classify_by_name(self, asset: SelectedAsset) -> AssetClass:
        name = self._normalize_text(asset.name)
        if name is None:
            return AssetClass.UNKNOWN
        if any(keyword in name for keyword in self.EQUITY_KEYWORDS):
            return AssetClass.EQUITY
        if any(keyword in name for keyword in self.BOND_KEYWORDS):
            return AssetClass.FIXED_INCOME
        if any(keyword in name for keyword in self.COMMODITY_KEYWORDS):
            return AssetClass.COMMODITY
        if any(keyword in name for keyword in self.REAL_ESTATE_KEYWORDS):
            return AssetClass.REAL_ESTATE
        return AssetClass.UNKNOWN

    def _classify_by_category(self, asset: SelectedAsset) -> AssetClass:
        category = self._normalize_text(asset.category)
        if category is None:
            return AssetClass.UNKNOWN
        if "stock" in category:
            return AssetClass.EQUITY
        if "etf" in category:
            return AssetClass.EQUITY
        if "bond" in category:
            return AssetClass.FIXED_INCOME
        return AssetClass.UNKNOWN

    def _classify_geography(self, asset: SelectedAsset) -> Geography:
        region = self._normalize_text(asset.region)
        currency = self._normalize_text(asset.currency)
        name = self._normalize_text(asset.name)
        for geo, patterns in self.GEOGRAPHY_PATTERNS.items():
            if region and region in patterns:
                return geo
            if currency and currency in patterns:
                return geo
            if name and any(pattern.lower() in name for pattern in patterns):
                return geo
        return Geography.UNKNOWN

    def _classify_sub_class(  # noqa: C901, PLR0911, PLR0912
        self,
        asset: SelectedAsset,
        asset_class: AssetClass,
    ) -> str:
        name = self._normalize_text(asset.name)
        if name is None:
            return SubClass.UNKNOWN
        if asset_class == AssetClass.EQUITY:
            if "large cap" in name:
                return SubClass.LARGE_CAP
            if "small cap" in name:
                return SubClass.SMALL_CAP
            if "value" in name:
                return SubClass.VALUE
            if "growth" in name:
                return SubClass.GROWTH
            if "dividend" in name:
                return SubClass.DIVIDEND
        if asset_class == AssetClass.FIXED_INCOME:
            if "government" in name or "gilt" in name or "treasury" in name:
                return SubClass.GOVERNMENT
            if "corporate" in name:
                return SubClass.CORPORATE
            if "high yield" in name:
                return SubClass.HIGH_YIELD
        if asset_class == AssetClass.COMMODITY and "gold" in name:
            return SubClass.GOLD
        if asset_class == AssetClass.REAL_ESTATE:
            if "reit" in name:
                return SubClass.REIT
        return SubClass.UNKNOWN

    @staticmethod
    def export_for_review(
        classifications: Sequence[
            AssetClassification | Mapping[str, Any] | _SupportsDict
        ],
        path: Path,
    ) -> None:
        records: list[dict[str, Any]] = []
        for classification in classifications:
            if is_dataclass(classification) and not isinstance(classification, type):
                records.append(asdict(classification))
            elif isinstance(classification, Mapping):
                records.append(dict(classification))
            elif isinstance(classification, _SupportsDict):
                records.append(dict(vars(classification)))
            else:
                raise TypeError("Unsupported classification record type for export.")

        df = pd.DataFrame(records)
        df.to_csv(path, index=False)

classify_asset(asset)

Classifies a single asset using keyword-based rules.

This method first checks for a manual override for the asset. If none is found, it applies rules based on the asset's name and category to determine its classification. This method is suitable for classifying individual assets but is less efficient than classify_universe for large batches.

Parameters:

Name Type Description Default
asset SelectedAsset

The SelectedAsset instance to classify.

required

Returns:

Type Description
AssetClassification

An AssetClassification instance containing the classification results.

Source code in src/portfolio_management/assets/classification/classification.py
def classify_asset(self, asset: SelectedAsset) -> AssetClassification:
    """Classifies a single asset using keyword-based rules.

    This method first checks for a manual override for the asset. If none
    is found, it applies rules based on the asset's name and category to
    determine its classification. This method is suitable for classifying
    individual assets but is less efficient than `classify_universe` for
    large batches.

    Args:
        asset: The `SelectedAsset` instance to classify.

    Returns:
        An `AssetClassification` instance containing the classification results.

    """
    override = self.overrides.overrides.get(
        asset.isin,
    ) or self.overrides.overrides.get(asset.symbol)
    if override:
        return AssetClassification(
            symbol=asset.symbol,
            isin=asset.isin,
            name=asset.name,
            asset_class=str(override.get("asset_class", AssetClass.UNKNOWN.value)),
            sub_class=str(override.get("sub_class", SubClass.UNKNOWN.value)),
            geography=Geography(override.get("geography", Geography.UNKNOWN)),
            sector=override.get("sector"),
            confidence=1.0,
        )

    asset_class_from_name = self._classify_by_name(asset)
    asset_class_from_cat = self._classify_by_category(asset)

    if {
        asset_class_from_name,
        asset_class_from_cat,
    } == {AssetClass.UNKNOWN}:
        asset_class = AssetClass.UNKNOWN
        confidence = 0.5
    elif AssetClass.UNKNOWN not in {
        asset_class_from_name,
        asset_class_from_cat,
    }:
        asset_class = asset_class_from_name
        confidence = 0.9
    elif asset_class_from_name != AssetClass.UNKNOWN:
        asset_class = asset_class_from_name
        confidence = 0.7
    else:
        asset_class = asset_class_from_cat
        confidence = 0.7

    geography = self._classify_geography(asset)
    sub_class = self._classify_sub_class(asset, asset_class)

    return AssetClassification(
        symbol=asset.symbol,
        isin=asset.isin,
        name=asset.name,
        asset_class=(
            asset_class.value
            if isinstance(asset_class, AssetClass)
            else str(asset_class)
        ),
        sub_class=(
            sub_class.value if isinstance(sub_class, SubClass) else str(sub_class)
        ),
        geography=geography,
        confidence=confidence,
    )

classify_universe(assets)

Classifies a list of assets and returns a DataFrame of results.

This is the primary method for bulk classification. It converts the list of assets into a pandas DataFrame and uses efficient, vectorized operations to apply the classification rules.

Parameters:

Name Type Description Default
assets list[SelectedAsset]

A list of SelectedAsset objects to be classified.

required

Returns:

Type Description
DataFrame

A pandas DataFrame where each row represents an asset and columns

DataFrame

contain the classification results (e.g., 'asset_class', 'geography').

Raises:

Type Description
DataValidationError

If the input is None or not a list.

ClassificationError

If assets cannot be serialized for processing.

Source code in src/portfolio_management/assets/classification/classification.py
def classify_universe(self, assets: list[SelectedAsset]) -> pd.DataFrame:
    """Classifies a list of assets and returns a DataFrame of results.

    This is the primary method for bulk classification. It converts the list
    of assets into a pandas DataFrame and uses efficient, vectorized
    operations to apply the classification rules.

    Args:
        assets: A list of `SelectedAsset` objects to be classified.

    Returns:
        A pandas DataFrame where each row represents an asset and columns
        contain the classification results (e.g., 'asset_class', 'geography').

    Raises:
        DataValidationError: If the input is None or not a list.
        ClassificationError: If assets cannot be serialized for processing.

    """
    if assets is None:
        raise DataValidationError(
            "Assets to classify cannot be None.",
        )
    if not isinstance(assets, list):
        raise DataValidationError(
            "Assets must be provided as a list.",
        )
    if not assets:
        logging.getLogger(__name__).info("No assets supplied for classification.")
        return pd.DataFrame(
            columns=[
                "symbol",
                "isin",
                "name",
                "asset_class",
                "sub_class",
                "geography",
                "sector",
                "confidence",
            ],
        )

    try:
        asset_dicts = [asdict(asset) for asset in assets]
    except TypeError as exc:  # pragma: no cover - defensive
        raise ClassificationError(
            "Failed to serialise assets for classification."
        ) from exc

    assets_df = pd.DataFrame(asset_dicts)
    df = self._classify_dataframe(assets_df)

    logger = logging.getLogger(__name__)
    logger.info("Classified %d assets.", len(df))
    logger.info("Asset class breakdown:\n%s", df["asset_class"].value_counts())
    logger.info("Geography breakdown:\n%s", df["geography"].value_counts())
    low_confidence = df[df["confidence"] < self.LOW_CONFIDENCE_THRESHOLD]
    if not low_confidence.empty:
        logger.warning(
            "%d assets with low classification confidence.",
            len(low_confidence),
        )
        logger.warning(
            "\n%s",
            low_confidence[["symbol", "name", "asset_class", "confidence"]],
        )

    return df

ClassificationOverrides dataclass

Manages manual classification overrides loaded from a CSV file.

This class provides a mechanism to manually set the classification for specific assets, bypassing the rule-based engine. Overrides are indexed by ISIN or symbol, with ISIN taking precedence.

Attributes:

Name Type Description
overrides dict[str, dict[str, str]]

A dictionary where keys are asset identifiers (ISIN or symbol) and values are dictionaries of classification fields to override.

Configuration (CSV Format): The CSV file should contain columns that match the AssetClassification attributes. The 'symbol' or 'isin' column is required for matching.

Example `overrides.csv`:
```csv
symbol,isin,asset_class,sub_class,geography
AMZN.US,US0231351067,equity,large_cap,north_america
BRK.A,US0846701086,equity,value,north_america
```
Example

from pathlib import Path import io

csv_lines = [ ... "symbol,isin,asset_class,sub_class,geography", ... "AMZN.US,US0231351067,equity,large_cap,north_america", ... "BRK.A,US0846701086,equity,value,north_america" ... ] csv_content = "\n".join(csv_lines)

In a real scenario, you would provide a file path.

For this example, we simulate the file with an in-memory buffer.

with open("overrides.csv", "w") as f: ... _ = f.write(csv_content)

overrides = ClassificationOverrides.from_csv("overrides.csv") amzn_override = overrides.overrides.get("US0231351067") print(amzn_override['asset_class']) equity import os os.remove("overrides.csv")

Source code in src/portfolio_management/assets/classification/classification.py
@dataclass
class ClassificationOverrides:
    r"""Manages manual classification overrides loaded from a CSV file.

    This class provides a mechanism to manually set the classification for
    specific assets, bypassing the rule-based engine. Overrides are indexed by
    ISIN or symbol, with ISIN taking precedence.

    Attributes:
        overrides: A dictionary where keys are asset identifiers (ISIN or symbol)
            and values are dictionaries of classification fields to override.

    Configuration (CSV Format):
        The CSV file should contain columns that match the `AssetClassification`
        attributes. The 'symbol' or 'isin' column is required for matching.

        Example `overrides.csv`:
        ```csv
        symbol,isin,asset_class,sub_class,geography
        AMZN.US,US0231351067,equity,large_cap,north_america
        BRK.A,US0846701086,equity,value,north_america
        ```

    Example:
        >>> from pathlib import Path
        >>> import io
        >>>
        >>> csv_lines = [
        ...     "symbol,isin,asset_class,sub_class,geography",
        ...     "AMZN.US,US0231351067,equity,large_cap,north_america",
        ...     "BRK.A,US0846701086,equity,value,north_america"
        ... ]
        >>> csv_content = "\\n".join(csv_lines)
        >>>
        >>> # In a real scenario, you would provide a file path.
        >>> # For this example, we simulate the file with an in-memory buffer.
        >>> with open("overrides.csv", "w") as f:
        ...     _ = f.write(csv_content)
        >>>
        >>> overrides = ClassificationOverrides.from_csv("overrides.csv")
        >>> amzn_override = overrides.overrides.get("US0231351067")
        >>> print(amzn_override['asset_class'])
        equity
        >>> import os
        >>> os.remove("overrides.csv")

    """

    overrides: dict[str, dict[str, str]] = field(default_factory=dict)

    @classmethod
    def from_csv(cls, path: pathlib.Path | str) -> ClassificationOverrides:
        """Load classification overrides from a CSV file.

        The CSV file must contain a 'symbol' or 'isin' column to identify the
        asset. Other columns should correspond to `AssetClassification` fields
        (e.g., 'asset_class', 'sub_class', 'geography').

        Args:
            path: The file path to the CSV containing the overrides.

        Returns:
            A `ClassificationOverrides` instance populated with the data from
            the CSV file. Returns an empty instance if the path does not exist.

        """
        csv_path = pathlib.Path(path)
        if not csv_path.exists():
            return cls()
        overrides_df = pd.read_csv(csv_path)
        overrides: dict[str, dict[str, object]] = {}
        for _, row in overrides_df.iterrows():
            key = row["isin"] if pd.notna(row["isin"]) else row["symbol"]
            overrides[key] = row.to_dict()
        # Defensive: CSV data may contain non-string values, type system assumes strings
        return cls(overrides=overrides)  # type: ignore[arg-type]

from_csv(path) classmethod

Load classification overrides from a CSV file.

The CSV file must contain a 'symbol' or 'isin' column to identify the asset. Other columns should correspond to AssetClassification fields (e.g., 'asset_class', 'sub_class', 'geography').

Parameters:

Name Type Description Default
path Path | str

The file path to the CSV containing the overrides.

required

Returns:

Type Description
ClassificationOverrides

A ClassificationOverrides instance populated with the data from

ClassificationOverrides

the CSV file. Returns an empty instance if the path does not exist.

Source code in src/portfolio_management/assets/classification/classification.py
@classmethod
def from_csv(cls, path: pathlib.Path | str) -> ClassificationOverrides:
    """Load classification overrides from a CSV file.

    The CSV file must contain a 'symbol' or 'isin' column to identify the
    asset. Other columns should correspond to `AssetClassification` fields
    (e.g., 'asset_class', 'sub_class', 'geography').

    Args:
        path: The file path to the CSV containing the overrides.

    Returns:
        A `ClassificationOverrides` instance populated with the data from
        the CSV file. Returns an empty instance if the path does not exist.

    """
    csv_path = pathlib.Path(path)
    if not csv_path.exists():
        return cls()
    overrides_df = pd.read_csv(csv_path)
    overrides: dict[str, dict[str, object]] = {}
    for _, row in overrides_df.iterrows():
        key = row["isin"] if pd.notna(row["isin"]) else row["symbol"]
        overrides[key] = row.to_dict()
    # Defensive: CSV data may contain non-string values, type system assumes strings
    return cls(overrides=overrides)  # type: ignore[arg-type]

Geography

Bases: str, Enum

Geographical classifications for assets.

Source code in src/portfolio_management/assets/classification/classification.py
class Geography(str, Enum):
    """Geographical classifications for assets."""

    DEVELOPED_MARKETS = "developed_markets"
    EMERGING_MARKETS = "emerging_markets"
    GLOBAL = "global"
    NORTH_AMERICA = "north_america"
    EUROPE = "europe"
    ASIA_PACIFIC = "asia_pacific"
    UNITED_KINGDOM = "united_kingdom"
    UNKNOWN = "unknown"

SubClass

Bases: str, Enum

Granular asset sub-classes.

Source code in src/portfolio_management/assets/classification/classification.py
class SubClass(str, Enum):
    """Granular asset sub-classes."""

    LARGE_CAP = "large_cap"
    SMALL_CAP = "small_cap"
    VALUE = "value"
    GROWTH = "growth"
    DIVIDEND = "dividend"
    GOVERNMENT = "government"
    CORPORATE = "corporate"
    HIGH_YIELD = "high_yield"
    INFLATION_LINKED = "inflation_linked"
    GOLD = "gold"
    COMMODITIES = "commodities"
    REIT = "reit"
    HEDGE_FUND = "hedge_fund"
    UNKNOWN = "unknown"

AssetSelector

Filters a universe of assets based on a set of criteria.

This class acts as a preselection engine, applying a multi-stage filtering pipeline to a DataFrame of asset metadata. It is stateless and its primary entry point is the select_assets method.

The filtering pipeline is executed in a specific order to ensure that the most efficient filters are applied first.

Filtering Stages
  1. Data Quality: Removes assets with unacceptable data_status or zero_volume_severity.
  2. History: Enforces minimum data history (min_history_days) and row count (min_price_rows).
  3. Characteristics: Filters by market, region, currency, and category.
  4. Allow/Block Lists: Applies manual overrides to include or exclude specific assets.
Example

import pandas as pd from portfolio_management.assets.selection import AssetSelector, FilterCriteria

Assume 'matches_df' is a DataFrame with asset metadata.

matches_df = pd.DataFrame({ ... 'symbol': ['AAPL.US', 'BAD.UK'], 'isin': ['US0378331005', 'GB00B1XFGM60'], ... 'name': ['Apple Inc', 'Bad Data PLC'], 'market': ['US', 'UK'], ... 'region': ['North America', 'Europe'], 'currency': ['USD', 'GBP'], ... 'category': ['Stock', 'Stock'], 'price_start': ['2010-01-01', '2023-01-01'], ... 'price_end': ['2023-12-31', '2023-12-31'], 'price_rows': [3522, 252], ... 'data_status': ['ok', 'error'], 'data_flags': ['' , ''], ... 'stooq_path': ['' , ''], 'resolved_currency': ['USD', 'GBP'], ... 'currency_status': ['matched', 'matched'] ... })

criteria = FilterCriteria(data_status=['ok'], markets=['US']) selector = AssetSelector() selected_assets = selector.select_assets(matches_df, criteria) print(selected_assets[0].symbol) AAPL.US

Source code in src/portfolio_management/assets/selection/selection.py
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
class AssetSelector:
    """Filters a universe of assets based on a set of criteria.

    This class acts as a preselection engine, applying a multi-stage filtering
    pipeline to a DataFrame of asset metadata. It is stateless and its primary
    entry point is the `select_assets` method.

    The filtering pipeline is executed in a specific order to ensure that the
    most efficient filters are applied first.

    Filtering Stages:
        1.  **Data Quality**: Removes assets with unacceptable `data_status` or
            `zero_volume_severity`.
        2.  **History**: Enforces minimum data history (`min_history_days`) and
            row count (`min_price_rows`).
        3.  **Characteristics**: Filters by market, region, currency, and category.
        4.  **Allow/Block Lists**: Applies manual overrides to include or exclude
            specific assets.

    Example:
        >>> import pandas as pd
        >>> from portfolio_management.assets.selection import AssetSelector, FilterCriteria
        >>>
        >>> # Assume 'matches_df' is a DataFrame with asset metadata.
        >>> matches_df = pd.DataFrame({
        ...     'symbol': ['AAPL.US', 'BAD.UK'], 'isin': ['US0378331005', 'GB00B1XFGM60'],
        ...     'name': ['Apple Inc', 'Bad Data PLC'], 'market': ['US', 'UK'],
        ...     'region': ['North America', 'Europe'], 'currency': ['USD', 'GBP'],
        ...     'category': ['Stock', 'Stock'], 'price_start': ['2010-01-01', '2023-01-01'],
        ...     'price_end': ['2023-12-31', '2023-12-31'], 'price_rows': [3522, 252],
        ...     'data_status': ['ok', 'error'], 'data_flags': ['' , ''],
        ...     'stooq_path': ['' , ''], 'resolved_currency': ['USD', 'GBP'],
        ...     'currency_status': ['matched', 'matched']
        ... })
        >>>
        >>> criteria = FilterCriteria(data_status=['ok'], markets=['US'])
        >>> selector = AssetSelector()
        >>> selected_assets = selector.select_assets(matches_df, criteria)
        >>> print(selected_assets[0].symbol)
        AAPL.US

    """

    def __init__(self) -> None:
        """Initialize the AssetSelector."""

    @staticmethod
    def _parse_severity(data_flags: str | float | None) -> str | None:
        """Extract zero_volume_severity value from data_flags string.

        Parses semicolon-separated flags to find the zero_volume_severity
        value. Flags are formatted as "key=value;key=value".

        Args:
            data_flags: Flags string, potentially containing zero_volume_severity.
                Example: "zero_volume=10;zero_volume_ratio=0.05;zero_volume_severity=low"
                Can also be None or NaN (float).

        Returns:
            The severity level string (e.g., "low", "moderate", "high") if found,
            None otherwise.

        Example:
            >>> AssetSelector._parse_severity("zero_volume=10;zero_volume_severity=high")
            'high'
            >>> AssetSelector._parse_severity("other_flag=value")
            >>> AssetSelector._parse_severity("")
            >>> AssetSelector._parse_severity(None)

        """
        if not data_flags or (isinstance(data_flags, float)):
            # Handle NaN and None values
            return None

        data_flags_str = str(data_flags).strip()
        if not data_flags_str:
            return None

        # Split by semicolon and look for zero_volume_severity
        flags = data_flags_str.split(";")
        for flag in flags:
            if "zero_volume_severity=" in flag:
                # Extract the value after the equals sign
                parts = flag.split("=")
                if len(parts) == 2:
                    return parts[1].strip()

        return None

    @staticmethod
    def _parse_severity_vectorized(data_flags_series: pd.Series) -> pd.Series:
        """Vectorized version of _parse_severity for entire Series.

        Args:
            data_flags_series: Series of data_flags strings.

        Returns:
            Series of severity levels (str or None).

        """
        # Replace NaN and empty strings with None
        flags = data_flags_series.fillna("").astype(str)

        # Extract severity using string operations
        # Look for pattern "zero_volume_severity=X" where X is the severity
        severity = flags.str.extract(r"zero_volume_severity=([^;]+)", expand=False)

        # Strip whitespace from extracted values
        severity = severity.str.strip()

        # Replace empty strings with None
        severity = severity.replace("", None)

        return severity

    def _filter_by_data_quality(
        self,
        df: pd.DataFrame,
        criteria: FilterCriteria,
    ) -> pd.DataFrame:
        """Filter assets by data quality metrics.

        Applies two-stage quality filtering:
        1. Filter by data_status (e.g., "ok", "warning")
        2. Filter by zero_volume_severity if specified in criteria

        Args:
            df: DataFrame with columns 'data_status' and 'data_flags'.
            criteria: FilterCriteria containing data_status and zero_volume_severity.

        Returns:
            Filtered DataFrame with only assets meeting quality criteria.

        Example:
            >>> import pandas as pd
            >>> selector = AssetSelector()
            >>> df = pd.DataFrame({
            ...     'data_status': ['ok', 'ok', 'error', 'ok'],
            ...     'data_flags': ['', 'zero_volume_severity=high', '', 'zero_volume_severity=low']
            ... })
            >>> criteria = FilterCriteria(data_status=['ok'], zero_volume_severity=['low'])
            >>> filtered = selector._filter_by_data_quality(df, criteria)
            >>> len(filtered)
            1

        """
        import logging

        logger = logging.getLogger(__name__)

        if df.empty:
            logger.warning("Empty input DataFrame to _filter_by_data_quality")
            return df

        # Check required columns
        required_cols = {"data_status", "data_flags"}
        if not required_cols.issubset(df.columns):
            missing = required_cols - set(df.columns)
            logger.error("Missing columns for data quality filter: %s", missing)
            raise DataValidationError(
                f"Match report missing required columns for data quality filter: {missing}",
            )

        initial_count = len(df)
        logger.debug("Starting with %d assets", initial_count)

        # Stage 1: Filter by data_status
        status_mask = df["data_status"].isin(criteria.data_status)
        df_status = df[status_mask].copy()
        status_count = len(df_status)
        logger.debug(
            "After data_status filter: %d assets (removed %d)",
            status_count,
            initial_count - status_count,
        )

        # Stage 2: Filter by zero_volume_severity if specified (vectorized)
        if criteria.zero_volume_severity is not None:
            severity_list = criteria.zero_volume_severity

            # Use vectorized version to extract severity from all rows at once
            severity_series = self._parse_severity_vectorized(df_status["data_flags"])
            severity_mask = severity_series.isin(severity_list)

            df_result = df_status[severity_mask].copy()
            severity_count = len(df_result)
            logger.debug(
                "After zero_volume_severity filter: %d assets (removed %d)",
                severity_count,
                status_count - severity_count,
            )
        else:
            df_result = df_status.copy()
            logger.debug("Skipping zero_volume_severity filter (not specified)")

        return df_result

    @staticmethod
    def _calculate_history_days(price_start: str | None, price_end: str | None) -> int:
        """Calculate the number of days between price_start and price_end.

        Handles invalid dates gracefully by returning 0.

        Args:
            price_start: Start date as ISO string (YYYY-MM-DD) or None.
            price_end: End date as ISO string (YYYY-MM-DD) or None.

        Returns:
            Number of days between dates if both are valid, 0 otherwise.

        Example:
            >>> AssetSelector._calculate_history_days("2020-01-01", "2025-10-15")
            2114
            >>> AssetSelector._calculate_history_days("invalid", "2025-10-15")
            0
            >>> AssetSelector._calculate_history_days(None, "2025-10-15")
            0

        """
        if not price_start or not price_end:
            return 0

        try:
            start = pd.to_datetime(price_start)
            end = pd.to_datetime(price_end)

            # Check for invalid dates (e.g., future dates, reversed order)
            if start > end:
                return 0

            delta = end - start
            return int(delta.days)
        except (ValueError, TypeError):
            return 0

    @staticmethod
    def _calculate_history_days_vectorized(
        price_start_series: pd.Series,
        price_end_series: pd.Series,
    ) -> pd.Series:
        """Vectorized version of _calculate_history_days for entire Series.

        Args:
            price_start_series: Series of start dates.
            price_end_series: Series of end dates.

        Returns:
            Series of history days (int), with 0 for invalid dates.

        """
        # Convert to datetime with explicit format to avoid inference warning
        # Most dates are in YYYY-MM-DD format from CSV files
        start_dates = pd.to_datetime(
            price_start_series,
            errors="coerce",
            format="ISO8601",
        )
        end_dates = pd.to_datetime(price_end_series, errors="coerce", format="ISO8601")

        # Calculate timedelta
        deltas = end_dates - start_dates

        # Convert to days, handling NaT by replacing with 0
        days = deltas.dt.days.fillna(0).astype(int)

        # Handle reversed dates (start > end) by setting to 0
        days = days.where(days >= 0, 0)

        return days

    def _filter_by_history(
        self,
        df: pd.DataFrame,
        criteria: FilterCriteria,
    ) -> pd.DataFrame:
        """Filter assets by price history requirements.

        Applies two-stage history filtering:
        1. Filter by minimum history length in days (price_end - price_start)
        2. Filter by minimum price row count

        Args:
            df: DataFrame with columns 'price_start', 'price_end', 'price_rows'.
            criteria: FilterCriteria with min_history_days and min_price_rows.

        Returns:
            Filtered DataFrame with only assets meeting history criteria.

        Example:
            >>> import pandas as pd
            >>> selector = AssetSelector()
            >>> df = pd.DataFrame({
            ...     'price_start': ['2020-01-01', '2022-01-01', '2023-01-01'],
            ...     'price_end': ['2023-01-01', '2023-01-01', '2023-06-01'],
            ...     'price_rows': [756, 252, 126]
            ... })
            >>> criteria = FilterCriteria(min_history_days=365, min_price_rows=200)
            >>> filtered = selector._filter_by_history(df, criteria)
            >>> len(filtered)
            2

        """
        logger = logging.getLogger(__name__)

        if df.empty:
            logger.warning("Empty input DataFrame to _filter_by_history")
            return df

        # Check required columns
        required_cols = {"price_start", "price_end", "price_rows"}
        if not required_cols.issubset(df.columns):
            missing = required_cols - set(df.columns)
            logger.error("Missing columns for history filter: %s", missing)
            raise DataValidationError(
                f"Match report missing required columns for history filter: {missing}",
            )

        initial_count = len(df)
        logger.debug("Starting with %d assets", initial_count)

        # Stage 1: Calculate and filter by history days (vectorized)
        df_copy = df.copy()

        # Use vectorized calculation
        df_copy["_history_days"] = self._calculate_history_days_vectorized(
            df_copy["price_start"],
            df_copy["price_end"],
        )

        history_mask = df_copy["_history_days"] >= criteria.min_history_days
        df_history = df_copy[history_mask].copy()
        history_count = len(df_history)
        logger.debug(
            "After min_history_days filter (%d days): %d assets (removed %d)",
            criteria.min_history_days,
            history_count,
            initial_count - history_count,
        )

        # Stage 2: Filter by minimum price rows
        rows_mask = df_history["price_rows"] >= criteria.min_price_rows
        df_result = df_history[rows_mask].copy()
        rows_count = len(df_result)
        logger.debug(
            "After min_price_rows filter (%d rows): %d assets (removed %d)",
            criteria.min_price_rows,
            rows_count,
            history_count - rows_count,
        )

        # Drop the temporary column
        if "_history_days" in df_result.columns:
            df_result = df_result.drop(columns=["_history_days"])

        return df_result

    def _filter_by_characteristics(
        self,
        df: pd.DataFrame,
        criteria: FilterCriteria,
    ) -> pd.DataFrame:
        """Filter assets by market, region, currency, and category characteristics.

        Applies four optional filtering stages (each applied only if specified):
        1. Filter by market (if criteria.markets is not None)
        2. Filter by region (if criteria.regions is not None)
        3. Filter by currency (if criteria.currencies is not None)
        4. Filter by category (if criteria.categories is not None)

        All specified filters are combined with AND logic.

        Args:
            df: DataFrame with columns 'market', 'region', 'resolved_currency', 'category'.
            criteria: FilterCriteria with optional market/region/currency/category filters.

        Returns:
            Filtered DataFrame with only assets matching all specified characteristics.

        Example:
            >>> import pandas as pd
            >>> selector = AssetSelector()
            >>> df = pd.DataFrame({
            ...     'market': ['US', 'US', 'UK', 'DE'],
            ...     'region': ['North America', 'North America', 'Europe', 'Europe'],
            ...     'resolved_currency': ['USD', 'USD', 'GBP', 'EUR'],
            ...     'category': ['Stock', 'ETF', 'Stock', 'ETF']
            ... })
            >>> criteria = FilterCriteria(markets=['UK', 'US'], currencies=['GBP', 'USD'])
            >>> filtered = selector._filter_by_characteristics(df, criteria)
            >>> len(filtered)
            3

        """
        logger = logging.getLogger(__name__)

        if df.empty:
            logger.warning("Empty input DataFrame to _filter_by_characteristics")
            return df

        # Check required columns
        required_cols = {"market", "region", "resolved_currency", "category"}
        if not required_cols.issubset(df.columns):
            missing = required_cols - set(df.columns)
            logger.error("Missing columns for characteristic filter: %s", missing)
            raise DataValidationError(
                f"Match report missing required columns for characteristic filter: {missing}",
            )

        initial_count = len(df)
        logger.debug("Starting with %d assets", initial_count)

        df_result = df.copy()

        # Filter by market if specified
        if criteria.markets is not None:
            market_mask = df_result["market"].isin(criteria.markets)
            df_result = df_result[market_mask].copy()
            market_count = len(df_result)
            logger.debug(
                "After market filter (%s): %d assets (removed %d)",
                criteria.markets,
                market_count,
                initial_count - market_count,
            )
            initial_count = market_count
        else:
            logger.debug("Skipping market filter (not specified)")

        # Filter by region if specified
        if criteria.regions is not None:
            region_mask = df_result["region"].isin(criteria.regions)
            df_result = df_result[region_mask].copy()
            region_count = len(df_result)
            logger.debug(
                "After region filter (%s): %d assets (removed %d)",
                criteria.regions,
                region_count,
                initial_count - region_count,
            )
            initial_count = region_count
        else:
            logger.debug("Skipping region filter (not specified)")

        # Filter by currency if specified
        if criteria.currencies is not None:
            currency_mask = df_result["resolved_currency"].isin(criteria.currencies)
            df_result = df_result[currency_mask].copy()
            currency_count = len(df_result)
            logger.debug(
                "After currency filter (%s): %d assets (removed %d)",
                criteria.currencies,
                currency_count,
                initial_count - currency_count,
            )
            initial_count = currency_count
        else:
            logger.debug("Skipping currency filter (not specified)")

        # Filter by category if specified
        if criteria.categories is not None:
            category_mask = df_result["category"].isin(criteria.categories)
            df_result = df_result[category_mask].copy()
            category_count = len(df_result)
            logger.debug(
                "After category filter (%s): %d assets (removed %d)",
                criteria.categories,
                category_count,
                initial_count - category_count,
            )
        else:
            logger.debug("Skipping category filter (not specified)")

        return df_result

    @staticmethod
    def _is_in_list(symbol: str, isin: str, asset_list: set[str]) -> bool:
        """Check if asset is in list by symbol or ISIN.

        Args:
            symbol: Asset symbol.
            isin: Asset ISIN.
            asset_list: Set of symbols/ISINs to check against.

        Returns:
            True if symbol or isin is in asset_list, False otherwise.

        Example:
            >>> AssetSelector._is_in_list("AAPL.US", "US0378331005", {"AAPL.US"})
            True
            >>> AssetSelector._is_in_list("AAPL.US", "US0378331005", {"US0378331005"})
            True
            >>> AssetSelector._is_in_list("MSFT.US", "US0378331005", {"AAPL.US"})
            False

        """
        return symbol in asset_list or isin in asset_list

    def _apply_lists(
        self,
        df: pd.DataFrame,
        criteria: FilterCriteria,
    ) -> pd.DataFrame:
        """Apply allowlist and blocklist filtering.

        Applies two-stage list-based filtering:
        1. Remove rows where symbol/isin is in blocklist (if specified)
        2. Keep only rows where symbol/isin is in allowlist (if specified)

        If both lists are specified:
        - Blocklist is applied first (more restrictive)
        - Allowlist is applied second
        - The effective filter is: NOT in blocklist AND in allowlist

        Args:
            df: DataFrame with columns 'symbol' and 'isin'.
            criteria: FilterCriteria with optional allowlist/blocklist.

        Returns:
            Filtered DataFrame after applying list-based filters.

        Example:
            >>> import pandas as pd
            >>> selector = AssetSelector()
            >>> df = pd.DataFrame({
            ...     'symbol': ['AAPL.US', 'MSFT.US', 'GOOG.US'],
            ...     'isin': ['US0378331005', 'US5949181045', 'US02079K3059']
            ... })
            >>> criteria = FilterCriteria(allowlist={'AAPL.US', 'MSFT.US'})
            >>> filtered = selector._apply_lists(df, criteria)
            >>> sorted(filtered['symbol'].tolist())
            ['AAPL.US', 'MSFT.US']

        """
        logger = logging.getLogger(__name__)

        if df.empty:
            logger.warning("Empty input DataFrame to _apply_lists")
            return df

        # Check required columns
        required_cols = {"symbol", "isin"}
        if not required_cols.issubset(df.columns):
            missing = required_cols - set(df.columns)
            logger.error("Missing columns for allow/block list filter: %s", missing)
            raise DataValidationError(
                f"Match report missing required columns for allow/block list filter: {missing}",
            )

        initial_count = len(df)
        logger.debug("Starting with %d assets", initial_count)

        df_result = df.copy()

        # Stage 1: Apply blocklist if specified (vectorized)
        if criteria.blocklist is not None:
            blocklist = criteria.blocklist

            # Vectorized check: row is NOT in blocklist if both symbol AND isin are not in blocklist
            symbol_blocked = df_result["symbol"].isin(blocklist)
            isin_blocked = df_result["isin"].isin(blocklist)
            in_blocklist = symbol_blocked | isin_blocked

            blocklist_mask = ~in_blocklist
            df_result = df_result[blocklist_mask].copy()
            blocklist_count = len(df_result)
            logger.debug(
                "After blocklist filter (%d items): %d assets (removed %d)",
                len(blocklist),
                blocklist_count,
                initial_count - blocklist_count,
            )
            initial_count = blocklist_count
        else:
            logger.debug("Skipping blocklist filter (not specified)")

        # Stage 2: Apply allowlist if specified (vectorized)
        if criteria.allowlist is not None:
            allowlist = criteria.allowlist

            # Vectorized check: row is in allowlist if symbol OR isin is in allowlist
            symbol_allowed = df_result["symbol"].isin(allowlist)
            isin_allowed = df_result["isin"].isin(allowlist)
            in_allowlist = symbol_allowed | isin_allowed

            allowlist_mask = in_allowlist
            df_result = df_result[allowlist_mask].copy()
            allowlist_count = len(df_result)
            logger.debug(
                "After allowlist filter (%d items): %d assets (removed %d)",
                len(allowlist),
                allowlist_count,
                initial_count - allowlist_count,
            )
        else:
            logger.debug("Skipping allowlist filter (not specified)")

        # Warn if both lists overlap
        if criteria.blocklist is not None and criteria.allowlist is not None:
            overlap = criteria.blocklist & criteria.allowlist
            if overlap:
                logger.warning(
                    "Allowlist and blocklist overlap (%d items): %s. "
                    "These items will be excluded (blocklist takes precedence).",
                    len(overlap),
                    overlap,
                )

        return df_result

    @staticmethod
    def _df_to_selected_assets(df: pd.DataFrame) -> list[SelectedAsset]:
        """Convert a DataFrame to a list of SelectedAsset objects.

        Uses to_dict("records") for efficient conversion instead of iterrows.
        """
        logger = logging.getLogger(__name__)

        # Convert DataFrame to list of dicts for faster iteration
        records = df.to_dict("records")

        assets = []
        for record in records:
            try:
                asset = SelectedAsset(
                    symbol=record["symbol"],
                    isin=record["isin"],
                    name=record["name"],
                    market=record["market"],
                    region=record["region"],
                    currency=record["currency"],
                    category=record["category"],
                    price_start=record["price_start"],
                    price_end=record["price_end"],
                    price_rows=int(record["price_rows"]),
                    data_status=record["data_status"],
                    data_flags=record.get("data_flags", ""),
                    stooq_path=record["stooq_path"],
                    resolved_currency=record["resolved_currency"],
                    currency_status=record["currency_status"],
                )
                assets.append(asset)
            except (KeyError, TypeError, ValueError) as e:
                logger.warning(
                    "Skipping asset due to conversion error: %s in record %s",
                    e,
                    record,
                )
        return assets

    def select_assets(
        self,
        matches_df: pd.DataFrame,
        criteria: FilterCriteria,
    ) -> list[SelectedAsset]:
        """Runs the full asset selection pipeline on a DataFrame of assets.

        This is the main entry point for the `AssetSelector`. It takes a DataFrame
        of asset metadata and a `FilterCriteria` object, then applies the
        entire filtering pipeline in sequence.

        Args:
            matches_df: A DataFrame containing the raw metadata for all assets
                to be considered for selection. Must include columns specified
                in `FilterCriteria` and `SelectedAsset`.
            criteria: A `FilterCriteria` object that defines the rules for the
                selection process.

        Returns:
            A list of `SelectedAsset` objects, each representing an asset that
            passed all stages of the filtering pipeline. Returns an empty list
            if no assets pass the filters.

        Raises:
            DataValidationError: If `matches_df` is None or is missing required
                columns, or if the `criteria` object is invalid.
            AssetSelectionError: If an allowlist is provided but no assets are
                selected, indicating a potential configuration issue.

        """
        logger = logging.getLogger(__name__)

        if matches_df is None:
            raise DataValidationError(
                "Asset selection requires a non-null matches DataFrame.",
            )

        try:
            criteria.validate()
        except ValueError as exc:
            raise DataValidationError(f"Invalid filter criteria: {exc}") from exc

        required_cols = {
            "symbol",
            "isin",
            "name",
            "market",
            "region",
            "currency",
            "category",
            "price_start",
            "price_end",
            "price_rows",
            "data_status",
            "data_flags",
            "stooq_path",
            "resolved_currency",
            "currency_status",
        }
        if not required_cols.issubset(matches_df.columns):
            missing = required_cols - set(matches_df.columns)
            raise DataValidationError(
                f"Input DataFrame is missing required columns: {missing}",
            )

        initial_count = len(matches_df)
        logger.info("Starting asset selection for %d assets.", initial_count)

        if matches_df.empty:
            logger.warning("Input DataFrame is empty. No assets to select.")
            return []

        df = matches_df.copy()

        # Apply filters in sequence
        df = self._filter_by_data_quality(df, criteria)
        df = self._filter_by_history(df, criteria)
        df = self._filter_by_characteristics(df, criteria)
        df = self._apply_lists(df, criteria)

        final_count = len(df)
        logger.info(
            "Finished asset selection. Selected %d of %d assets.",
            final_count,
            initial_count,
        )

        if final_count == 0:
            logger.warning("No assets were selected after filtering.")
            if criteria.allowlist:
                raise AssetSelectionError(
                    "No assets matched the provided allowlist and filter criteria.",
                )
            return []

        # Add summary logging
        percentage_selected = (
            (final_count / initial_count) * 100 if initial_count > 0 else 0
        )
        logger.info("Selected %.2f%% of the initial universe.", percentage_selected)

        market_breakdown = df["market"].value_counts().to_dict()
        region_breakdown = df["region"].value_counts().to_dict()
        logger.info("Breakdown by market: %s", market_breakdown)
        logger.info("Breakdown by region: %s", region_breakdown)

        return self._df_to_selected_assets(df)

select_assets(matches_df, criteria)

Runs the full asset selection pipeline on a DataFrame of assets.

This is the main entry point for the AssetSelector. It takes a DataFrame of asset metadata and a FilterCriteria object, then applies the entire filtering pipeline in sequence.

Parameters:

Name Type Description Default
matches_df DataFrame

A DataFrame containing the raw metadata for all assets to be considered for selection. Must include columns specified in FilterCriteria and SelectedAsset.

required
criteria FilterCriteria

A FilterCriteria object that defines the rules for the selection process.

required

Returns:

Type Description
list[SelectedAsset]

A list of SelectedAsset objects, each representing an asset that

list[SelectedAsset]

passed all stages of the filtering pipeline. Returns an empty list

list[SelectedAsset]

if no assets pass the filters.

Raises:

Type Description
DataValidationError

If matches_df is None or is missing required columns, or if the criteria object is invalid.

AssetSelectionError

If an allowlist is provided but no assets are selected, indicating a potential configuration issue.

Source code in src/portfolio_management/assets/selection/selection.py
def select_assets(
    self,
    matches_df: pd.DataFrame,
    criteria: FilterCriteria,
) -> list[SelectedAsset]:
    """Runs the full asset selection pipeline on a DataFrame of assets.

    This is the main entry point for the `AssetSelector`. It takes a DataFrame
    of asset metadata and a `FilterCriteria` object, then applies the
    entire filtering pipeline in sequence.

    Args:
        matches_df: A DataFrame containing the raw metadata for all assets
            to be considered for selection. Must include columns specified
            in `FilterCriteria` and `SelectedAsset`.
        criteria: A `FilterCriteria` object that defines the rules for the
            selection process.

    Returns:
        A list of `SelectedAsset` objects, each representing an asset that
        passed all stages of the filtering pipeline. Returns an empty list
        if no assets pass the filters.

    Raises:
        DataValidationError: If `matches_df` is None or is missing required
            columns, or if the `criteria` object is invalid.
        AssetSelectionError: If an allowlist is provided but no assets are
            selected, indicating a potential configuration issue.

    """
    logger = logging.getLogger(__name__)

    if matches_df is None:
        raise DataValidationError(
            "Asset selection requires a non-null matches DataFrame.",
        )

    try:
        criteria.validate()
    except ValueError as exc:
        raise DataValidationError(f"Invalid filter criteria: {exc}") from exc

    required_cols = {
        "symbol",
        "isin",
        "name",
        "market",
        "region",
        "currency",
        "category",
        "price_start",
        "price_end",
        "price_rows",
        "data_status",
        "data_flags",
        "stooq_path",
        "resolved_currency",
        "currency_status",
    }
    if not required_cols.issubset(matches_df.columns):
        missing = required_cols - set(matches_df.columns)
        raise DataValidationError(
            f"Input DataFrame is missing required columns: {missing}",
        )

    initial_count = len(matches_df)
    logger.info("Starting asset selection for %d assets.", initial_count)

    if matches_df.empty:
        logger.warning("Input DataFrame is empty. No assets to select.")
        return []

    df = matches_df.copy()

    # Apply filters in sequence
    df = self._filter_by_data_quality(df, criteria)
    df = self._filter_by_history(df, criteria)
    df = self._filter_by_characteristics(df, criteria)
    df = self._apply_lists(df, criteria)

    final_count = len(df)
    logger.info(
        "Finished asset selection. Selected %d of %d assets.",
        final_count,
        initial_count,
    )

    if final_count == 0:
        logger.warning("No assets were selected after filtering.")
        if criteria.allowlist:
            raise AssetSelectionError(
                "No assets matched the provided allowlist and filter criteria.",
            )
        return []

    # Add summary logging
    percentage_selected = (
        (final_count / initial_count) * 100 if initial_count > 0 else 0
    )
    logger.info("Selected %.2f%% of the initial universe.", percentage_selected)

    market_breakdown = df["market"].value_counts().to_dict()
    region_breakdown = df["region"].value_counts().to_dict()
    logger.info("Breakdown by market: %s", market_breakdown)
    logger.info("Breakdown by region: %s", region_breakdown)

    return self._df_to_selected_assets(df)

FilterCriteria dataclass

Defines the parameters for filtering assets.

This dataclass holds all configurable parameters used by the AssetSelector to filter the tradeable universe. It allows for detailed control over data quality, history requirements, market characteristics, and inclusion/exclusion lists.

Attributes:

Name Type Description
data_status list[str]

List of acceptable data quality status values (e.g., ["ok"]).

min_history_days int

The minimum number of calendar days of price history required.

max_gap_days int

Maximum allowed gap in days between consecutive price points.

min_price_rows int

The minimum number of data rows (e.g., trading days) required.

zero_volume_severity list[str] | None

Filters assets based on the severity of zero-volume trading days (e.g., ["low", "medium"]). If None, this filter is disabled.

markets list[str] | None

A list of market codes to include (e.g., ["US", "UK"]). If None, assets from all markets are considered.

regions list[str] | None

A list of geographic regions to include (e.g., ["North America"]). If None, assets from all regions are considered.

currencies list[str] | None

A list of currency codes to include (e.g., ["USD", "EUR"]). If None, assets in all currencies are considered.

categories list[str] | None

A list of asset categories to include (e.g., ["Stock", "ETF"]). If None, assets of all categories are considered.

allowlist set[str] | None

A set of symbols or ISINs to include, bypassing other filters. These assets will be included if they exist in the input data.

blocklist set[str] | None

A set of symbols or ISINs to explicitly exclude from the output. Blocklisted assets are removed regardless of whether they pass other filters.

regime_config RegimeConfig | None

Configuration for macroeconomic regime-based filtering. If None, no regime-based gating is applied.

Example

Create a strict filter for US large-cap stocks

criteria = FilterCriteria( ... min_history_days=365 * 5, ... data_status=['ok'], ... markets=['US'], ... categories=['Stock'], ... blocklist={'DO-NOT-TRADE.US'} ... ) criteria.validate() # No error raised

Source code in src/portfolio_management/assets/selection/selection.py
@dataclass
class FilterCriteria:
    """Defines the parameters for filtering assets.

    This dataclass holds all configurable parameters used by the `AssetSelector`
    to filter the tradeable universe. It allows for detailed control over data
    quality, history requirements, market characteristics, and inclusion/exclusion lists.

    Attributes:
        data_status: List of acceptable data quality status values (e.g., ["ok"]).
        min_history_days: The minimum number of calendar days of price history required.
        max_gap_days: Maximum allowed gap in days between consecutive price points.
        min_price_rows: The minimum number of data rows (e.g., trading days) required.
        zero_volume_severity: Filters assets based on the severity of zero-volume
            trading days (e.g., ["low", "medium"]). If None, this filter is disabled.
        markets: A list of market codes to include (e.g., ["US", "UK"]). If None,
            assets from all markets are considered.
        regions: A list of geographic regions to include (e.g., ["North America"]).
            If None, assets from all regions are considered.
        currencies: A list of currency codes to include (e.g., ["USD", "EUR"]).
            If None, assets in all currencies are considered.
        categories: A list of asset categories to include (e.g., ["Stock", "ETF"]).
            If None, assets of all categories are considered.
        allowlist: A set of symbols or ISINs to include, bypassing other filters.
            These assets will be included if they exist in the input data.
        blocklist: A set of symbols or ISINs to explicitly exclude from the output.
            Blocklisted assets are removed regardless of whether they pass other filters.
        regime_config: Configuration for macroeconomic regime-based filtering.
            If None, no regime-based gating is applied.

    Example:
        >>> # Create a strict filter for US large-cap stocks
        >>> criteria = FilterCriteria(
        ...     min_history_days=365 * 5,
        ...     data_status=['ok'],
        ...     markets=['US'],
        ...     categories=['Stock'],
        ...     blocklist={'DO-NOT-TRADE.US'}
        ... )
        >>> criteria.validate()  # No error raised

    """

    data_status: list[str] = field(default_factory=lambda: ["ok"])
    min_history_days: int = 252
    max_gap_days: int = 10
    min_price_rows: int = 252
    zero_volume_severity: list[str] | None = None
    markets: list[str] | None = None
    regions: list[str] | None = None
    currencies: list[str] | None = None
    categories: list[str] | None = None
    allowlist: set[str] | None = None
    blocklist: set[str] | None = None
    regime_config: RegimeConfig | None = None

    def validate(self) -> None:
        """Validate filter criteria parameters.

        Raises:
            ValueError: If any parameter is invalid (e.g., negative values,
                empty required lists).

        Example:
            >>> # This will raise a ValueError because min_history_days is negative.
            >>> # criteria = FilterCriteria(min_history_days=-1)
            >>> # criteria.validate()

        """
        if self.min_history_days <= 0:
            raise ValueError(
                f"min_history_days must be positive, got {self.min_history_days}",
            )

        if self.min_price_rows <= 0:
            raise ValueError(
                f"min_price_rows must be positive, got {self.min_price_rows}",
            )

        if self.max_gap_days < 0:
            raise ValueError(
                f"max_gap_days must be non-negative, got {self.max_gap_days}",
            )

        if not self.data_status:
            raise ValueError("data_status must not be empty")

    @classmethod
    def default(cls) -> FilterCriteria:
        """Create default filter criteria suitable for most portfolios.

        Returns:
            FilterCriteria with conservative defaults:
            - Require "ok" data status
            - Minimum 1 year of history (252 trading days)
            - Maximum 10-day gaps
            - No filtering by market, region, currency, or category
            - No allow/block lists
            - No regime gating

        Example:
            >>> criteria = FilterCriteria.default()
            >>> criteria.min_history_days
            252

        """
        return cls(
            data_status=["ok"],
            min_history_days=252,
            max_gap_days=10,
            min_price_rows=252,
            zero_volume_severity=None,
            markets=None,
            regions=None,
            currencies=None,
            categories=None,
            allowlist=None,
            blocklist=None,
            regime_config=None,
        )

validate()

Validate filter criteria parameters.

Raises:

Type Description
ValueError

If any parameter is invalid (e.g., negative values, empty required lists).

Example
This will raise a ValueError because min_history_days is negative.
criteria = FilterCriteria(min_history_days=-1)
criteria.validate()
Source code in src/portfolio_management/assets/selection/selection.py
def validate(self) -> None:
    """Validate filter criteria parameters.

    Raises:
        ValueError: If any parameter is invalid (e.g., negative values,
            empty required lists).

    Example:
        >>> # This will raise a ValueError because min_history_days is negative.
        >>> # criteria = FilterCriteria(min_history_days=-1)
        >>> # criteria.validate()

    """
    if self.min_history_days <= 0:
        raise ValueError(
            f"min_history_days must be positive, got {self.min_history_days}",
        )

    if self.min_price_rows <= 0:
        raise ValueError(
            f"min_price_rows must be positive, got {self.min_price_rows}",
        )

    if self.max_gap_days < 0:
        raise ValueError(
            f"max_gap_days must be non-negative, got {self.max_gap_days}",
        )

    if not self.data_status:
        raise ValueError("data_status must not be empty")

default() classmethod

Create default filter criteria suitable for most portfolios.

Returns:

Type Description
FilterCriteria

FilterCriteria with conservative defaults:

FilterCriteria
  • Require "ok" data status
FilterCriteria
  • Minimum 1 year of history (252 trading days)
FilterCriteria
  • Maximum 10-day gaps
FilterCriteria
  • No filtering by market, region, currency, or category
FilterCriteria
  • No allow/block lists
FilterCriteria
  • No regime gating
Example

criteria = FilterCriteria.default() criteria.min_history_days 252

Source code in src/portfolio_management/assets/selection/selection.py
@classmethod
def default(cls) -> FilterCriteria:
    """Create default filter criteria suitable for most portfolios.

    Returns:
        FilterCriteria with conservative defaults:
        - Require "ok" data status
        - Minimum 1 year of history (252 trading days)
        - Maximum 10-day gaps
        - No filtering by market, region, currency, or category
        - No allow/block lists
        - No regime gating

    Example:
        >>> criteria = FilterCriteria.default()
        >>> criteria.min_history_days
        252

    """
    return cls(
        data_status=["ok"],
        min_history_days=252,
        max_gap_days=10,
        min_price_rows=252,
        zero_volume_severity=None,
        markets=None,
        regions=None,
        currencies=None,
        categories=None,
        allowlist=None,
        blocklist=None,
        regime_config=None,
    )

SelectedAsset dataclass

Represents a selected asset with metadata from the match report.

This dataclass captures all relevant information about an asset that has passed filtering criteria. It combines instrument metadata (symbol, ISIN, name) with market information (market, region, currency, category) and data quality metrics (date ranges, row counts, status flags).

Attributes:

Name Type Description
symbol str

Stooq ticker symbol (e.g., "1pas.uk", "aapl.us").

isin str

International Securities Identification Number.

name str

Human-readable asset name.

market str

Market code (e.g., "UK", "US", "DE").

region str

Geographic region (e.g., "Europe", "North America").

currency str

Trading currency code (e.g., "GBP", "USD", "EUR").

category str

Asset category (e.g., "ETF", "Stock", "Bond").

price_start str

First available price date as ISO string (YYYY-MM-DD).

price_end str

Last available price date as ISO string (YYYY-MM-DD).

price_rows int

Total number of price observations available.

data_status str

Overall data quality status ("ok", "warning", "error").

data_flags str

Pipe-separated flags with additional quality information. Example: "zero_volume_severity=low|other_flag=value"

stooq_path str

Relative path to price file in Stooq data directory.

resolved_currency str

Currency after harmonization/resolution logic.

currency_status str

Status of currency resolution ("matched", "resolved", etc.).

Example

asset = SelectedAsset( ... symbol="1pas.uk", ... isin="GB00BD3RYZ16", ... name="iShares Core MSCI Asia ex Japan UCITS ETF", ... market="UK", ... region="Europe", ... currency="GBP", ... category="ETF", ... price_start="2020-01-02", ... price_end="2025-10-15", ... price_rows=1500, ... data_status="ok", ... data_flags="zero_volume_severity=low", ... stooq_path="d_uk_txt/data/daily/uk/1pas.txt", ... resolved_currency="GBP", ... currency_status="matched" ... )

Source code in src/portfolio_management/assets/selection/selection.py
@dataclass
class SelectedAsset:
    """Represents a selected asset with metadata from the match report.

    This dataclass captures all relevant information about an asset that has
    passed filtering criteria. It combines instrument metadata (symbol, ISIN,
    name) with market information (market, region, currency, category) and
    data quality metrics (date ranges, row counts, status flags).

    Attributes:
        symbol: Stooq ticker symbol (e.g., "1pas.uk", "aapl.us").
        isin: International Securities Identification Number.
        name: Human-readable asset name.
        market: Market code (e.g., "UK", "US", "DE").
        region: Geographic region (e.g., "Europe", "North America").
        currency: Trading currency code (e.g., "GBP", "USD", "EUR").
        category: Asset category (e.g., "ETF", "Stock", "Bond").
        price_start: First available price date as ISO string (YYYY-MM-DD).
        price_end: Last available price date as ISO string (YYYY-MM-DD).
        price_rows: Total number of price observations available.
        data_status: Overall data quality status ("ok", "warning", "error").
        data_flags: Pipe-separated flags with additional quality information.
            Example: "zero_volume_severity=low|other_flag=value"
        stooq_path: Relative path to price file in Stooq data directory.
        resolved_currency: Currency after harmonization/resolution logic.
        currency_status: Status of currency resolution ("matched", "resolved", etc.).

    Example:
        >>> asset = SelectedAsset(
        ...     symbol="1pas.uk",
        ...     isin="GB00BD3RYZ16",
        ...     name="iShares Core MSCI Asia ex Japan UCITS ETF",
        ...     market="UK",
        ...     region="Europe",
        ...     currency="GBP",
        ...     category="ETF",
        ...     price_start="2020-01-02",
        ...     price_end="2025-10-15",
        ...     price_rows=1500,
        ...     data_status="ok",
        ...     data_flags="zero_volume_severity=low",
        ...     stooq_path="d_uk_txt/data/daily/uk/1pas.txt",
        ...     resolved_currency="GBP",
        ...     currency_status="matched"
        ... )

    """

    symbol: str
    isin: str
    name: str
    market: str
    region: str
    currency: str
    category: str
    price_start: str
    price_end: str
    price_rows: int
    data_status: str
    data_flags: str
    stooq_path: str
    resolved_currency: str
    currency_status: str

UniverseConfigLoader

Loads and parses universe definitions from a YAML configuration file.

This is a static utility class that provides a single method, load_config, to read a YAML file and convert it into a dictionary of UniverseDefinition objects.

Configuration (YAML Format): The YAML file must have a top-level key universes, which contains a mapping of universe names to their definitions.

Example `universes.yaml`:
```yaml
universes:
  us_equity_large_cap:
    description: "US Large Cap Equities"
    filter_criteria:
      min_history_days: 1825 # 5 years
      markets: ["US"]
      categories: ["Stock"]
    classification_requirements:
      asset_class: ["equity"]
      sub_class: ["large_cap"]
    return_config:
      window: 252
      min_periods: 200
```
Source code in src/portfolio_management/assets/universes/loader.py
class UniverseConfigLoader:
    """Loads and parses universe definitions from a YAML configuration file.

    This is a static utility class that provides a single method, `load_config`,
    to read a YAML file and convert it into a dictionary of `UniverseDefinition`
    objects.

    Configuration (YAML Format):
        The YAML file must have a top-level key `universes`, which contains a
        mapping of universe names to their definitions.

        Example `universes.yaml`:
        ```yaml
        universes:
          us_equity_large_cap:
            description: "US Large Cap Equities"
            filter_criteria:
              min_history_days: 1825 # 5 years
              markets: ["US"]
              categories: ["Stock"]
            classification_requirements:
              asset_class: ["equity"]
              sub_class: ["large_cap"]
            return_config:
              window: 252
              min_periods: 200
        ```
    """

    @staticmethod
    def load_config(path: Path) -> dict[str, UniverseDefinition]:
        """Loads and parses the universe configuration file.

        Args:
            path: The file path to the universe YAML configuration.

        Returns:
            A dictionary mapping universe names to `UniverseDefinition` instances.

        Raises:
            ConfigurationError: If the file is not found, cannot be parsed,
                is badly structured, or contains invalid parameter values.

        """
        if not path.exists():
            raise ConfigurationError(path, f"Universe config file not found: {path}")

        try:
            with open(path, encoding="utf-8") as stream:
                config = yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            raise ConfigurationError(
                path,
                f"Failed to parse universe config: {exc}",
            ) from exc

        if not isinstance(config, dict) or "universes" not in config:
            raise ConfigurationError(
                path,
                "'universes' key not found in the config file",
            )

        universe_defs: dict[str, UniverseDefinition] = {}
        for name, u_def in config["universes"].items():
            try:
                filter_criteria = FilterCriteria(**u_def.get("filter_criteria", {}))
                return_config = ReturnConfig(**u_def.get("return_config", {}))
                # Parse technical_indicators configuration if present
                indicators_def = u_def.get("technical_indicators", {})
                if indicators_def:
                    technical_indicators = IndicatorConfig(**indicators_def)
                else:
                    technical_indicators = IndicatorConfig.disabled()
            except (TypeError, ValueError) as exc:
                raise ConfigurationError(
                    path,
                    f"Invalid configuration for universe '{name}': {exc}",
                ) from exc

            definition = UniverseDefinition(
                description=u_def.get("description", ""),
                filter_criteria=filter_criteria,
                classification_requirements=u_def.get(
                    "classification_requirements",
                    {},
                ),
                return_config=return_config,
                constraints=u_def.get("constraints", {}),
                technical_indicators=technical_indicators,
            )

            try:
                definition.validate()
            except ValueError as exc:
                raise ConfigurationError(
                    path,
                    f"Universe '{name}' failed validation: {exc}",
                ) from exc

            universe_defs[name] = definition

        return universe_defs

load_config(path) staticmethod

Loads and parses the universe configuration file.

Parameters:

Name Type Description Default
path Path

The file path to the universe YAML configuration.

required

Returns:

Type Description
dict[str, UniverseDefinition]

A dictionary mapping universe names to UniverseDefinition instances.

Raises:

Type Description
ConfigurationError

If the file is not found, cannot be parsed, is badly structured, or contains invalid parameter values.

Source code in src/portfolio_management/assets/universes/loader.py
@staticmethod
def load_config(path: Path) -> dict[str, UniverseDefinition]:
    """Loads and parses the universe configuration file.

    Args:
        path: The file path to the universe YAML configuration.

    Returns:
        A dictionary mapping universe names to `UniverseDefinition` instances.

    Raises:
        ConfigurationError: If the file is not found, cannot be parsed,
            is badly structured, or contains invalid parameter values.

    """
    if not path.exists():
        raise ConfigurationError(path, f"Universe config file not found: {path}")

    try:
        with open(path, encoding="utf-8") as stream:
            config = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        raise ConfigurationError(
            path,
            f"Failed to parse universe config: {exc}",
        ) from exc

    if not isinstance(config, dict) or "universes" not in config:
        raise ConfigurationError(
            path,
            "'universes' key not found in the config file",
        )

    universe_defs: dict[str, UniverseDefinition] = {}
    for name, u_def in config["universes"].items():
        try:
            filter_criteria = FilterCriteria(**u_def.get("filter_criteria", {}))
            return_config = ReturnConfig(**u_def.get("return_config", {}))
            # Parse technical_indicators configuration if present
            indicators_def = u_def.get("technical_indicators", {})
            if indicators_def:
                technical_indicators = IndicatorConfig(**indicators_def)
            else:
                technical_indicators = IndicatorConfig.disabled()
        except (TypeError, ValueError) as exc:
            raise ConfigurationError(
                path,
                f"Invalid configuration for universe '{name}': {exc}",
            ) from exc

        definition = UniverseDefinition(
            description=u_def.get("description", ""),
            filter_criteria=filter_criteria,
            classification_requirements=u_def.get(
                "classification_requirements",
                {},
            ),
            return_config=return_config,
            constraints=u_def.get("constraints", {}),
            technical_indicators=technical_indicators,
        )

        try:
            definition.validate()
        except ValueError as exc:
            raise ConfigurationError(
                path,
                f"Universe '{name}' failed validation: {exc}",
            ) from exc

        universe_defs[name] = definition

    return universe_defs

UniverseDefinition dataclass

Represents the complete configuration for a single investment universe.

This dataclass holds all the parameters needed to construct a universe, from initial filtering to final return calculation. It is typically instantiated by UniverseConfigLoader from a YAML file.

Attributes:

Name Type Description
description str

A human-readable description of the universe.

filter_criteria FilterCriteria

An instance of FilterCriteria defining the rules for the initial asset selection.

classification_requirements dict[str, list[str]]

A dictionary specifying required classification values. Assets not matching these values will be filtered out after classification. Example: {'asset_class': ['equity']}.

return_config ReturnConfig

A ReturnConfig object defining how historical returns should be calculated for the assets in the universe.

constraints dict[str, int | float]

A dictionary of hard constraints for the universe, such as {'max_assets': 100}.

technical_indicators IndicatorConfig

An IndicatorConfig object for configuring the calculation of technical indicators like SMA or RSI.

Source code in src/portfolio_management/assets/universes/universe.py
@dataclass
class UniverseDefinition:
    """Represents the complete configuration for a single investment universe.

    This dataclass holds all the parameters needed to construct a universe,
    from initial filtering to final return calculation. It is typically
    instantiated by `UniverseConfigLoader` from a YAML file.

    Attributes:
        description: A human-readable description of the universe.
        filter_criteria: An instance of `FilterCriteria` defining the rules for
            the initial asset selection.
        classification_requirements: A dictionary specifying required classification
            values. Assets not matching these values will be filtered out after
            classification. Example: `{'asset_class': ['equity']}`.
        return_config: A `ReturnConfig` object defining how historical returns
            should be calculated for the assets in the universe.
        constraints: A dictionary of hard constraints for the universe, such as
            `{'max_assets': 100}`.
        technical_indicators: An `IndicatorConfig` object for configuring
            the calculation of technical indicators like SMA or RSI.

    """

    description: str
    filter_criteria: FilterCriteria
    classification_requirements: dict[str, list[str]] = field(default_factory=dict)
    return_config: ReturnConfig = field(default_factory=ReturnConfig)
    constraints: dict[str, int | float] = field(default_factory=dict)
    technical_indicators: IndicatorConfig = field(
        default_factory=IndicatorConfig.disabled,
    )

    def validate(self) -> None:
        """Validate the universe definition."""
        self.filter_criteria.validate()
        self.return_config.validate()
        self.technical_indicators.validate()

validate()

Validate the universe definition.

Source code in src/portfolio_management/assets/universes/universe.py
def validate(self) -> None:
    """Validate the universe definition."""
    self.filter_criteria.validate()
    self.return_config.validate()
    self.technical_indicators.validate()

UniverseManager

Orchestrates the loading and construction of investment universes.

Source code in src/portfolio_management/assets/universes/manager.py
class UniverseManager:
    """Orchestrates the loading and construction of investment universes."""

    def __init__(self, config_path: Path, matches_df: pd.DataFrame, prices_dir: Path):
        """Initializes the UniverseManager."""
        self.config_path = config_path
        self.matches_df = matches_df
        self.prices_dir = prices_dir
        self.universes = UniverseConfigLoader.load_config(config_path)
        self.asset_selector = AssetSelector()
        self.asset_classifier = AssetClassifier()
        self.return_calculator = ReturnCalculator()
        self._cache: dict[str, dict[str, pd.DataFrame | pd.Series]] = {}

    def list_universes(self) -> list[str]:
        """List the names of all available universes."""
        return list(self.universes.keys())

    def get_definition(self, name: str) -> UniverseDefinition:
        """Get the definition for a named universe."""
        if name not in self.universes:
            raise ConfigurationError(self.config_path, f"Universe '{name}' not found.")
        return self.universes[name]

    def load_universe(
        self,
        name: str,
        use_cache: bool = True,
        strict: bool = True,
    ) -> dict[str, pd.DataFrame | pd.Series] | None:
        """Loads and constructs a universe by its configured name."""
        logger = logging.getLogger(__name__)
        if use_cache and strict and name in self._cache:
            logger.info("Loading universe '%s' from cache.", name)
            return self._cache[name]

        logger.info("Loading universe '%s' from scratch.", name)
        try:
            definition = self.get_definition(name)

            selected_assets = self._select_assets(definition)
            if not selected_assets:
                raise InsufficientDataError(
                    required_periods=1,
                    available_periods=0,
                )

            classified_df = self._classify_assets(selected_assets)
            final_classified_df = self._filter_by_classification(
                classified_df,
                definition,
            )
            if final_classified_df.empty:
                raise InsufficientDataError(
                    required_periods=1,
                    available_periods=0,
                )

            final_assets = self._get_final_assets(selected_assets, final_classified_df)

            returns_df = self._calculate_returns(final_assets, definition)

            if "max_assets" in definition.constraints and not returns_df.empty:
                returns_df = returns_df.iloc[
                    :,
                    : int(definition.constraints["max_assets"]),
                ]

            universe_data = self._build_universe_data(
                final_assets,
                final_classified_df,
                returns_df,
                definition,
            )

            if use_cache and strict:
                self._cache[name] = universe_data

            return universe_data

        except (
            ConfigurationError,
            AssetSelectionError,
            DataValidationError,
            ClassificationError,
            InsufficientDataError,
            ReturnCalculationError,
            KeyError,
        ) as exc:
            if strict:
                raise UniverseLoadError(
                    f"Failed to load universe '{name}': {exc}",
                ) from exc
            logger.warning("Universe loading failed for '%s': %s", name, exc)
            return None

    def _select_assets(self, definition: UniverseDefinition) -> list[SelectedAsset]:
        """Select assets based on the universe definition."""
        return self.asset_selector.select_assets(
            self.matches_df,
            definition.filter_criteria,
        )

    def _classify_assets(self, assets: list[SelectedAsset]) -> pd.DataFrame:
        """Classify a list of assets."""
        return self.asset_classifier.classify_universe(assets)

    def _filter_by_classification(
        self,
        classified_df: pd.DataFrame,
        definition: UniverseDefinition,
    ) -> pd.DataFrame:
        """Filter classified assets based on requirements."""
        df = classified_df.copy()
        for key, values in definition.classification_requirements.items():
            if key not in df.columns:
                raise KeyError(
                    f"Classification requirement '{key}' not found in columns.",
                )
            df = df[df[key].isin(values)]
        return df

    def _get_final_assets(
        self,
        selected_assets: list[SelectedAsset],
        classified_df: pd.DataFrame,
    ) -> list[SelectedAsset]:
        """Get the final list of assets after classification filtering."""
        final_symbols = set(classified_df["symbol"])
        return [asset for asset in selected_assets if asset.symbol in final_symbols]

    def _calculate_returns(
        self,
        assets: list[SelectedAsset],
        definition: UniverseDefinition,
    ) -> pd.DataFrame:
        """Calculate returns for a list of assets."""
        return self.return_calculator.load_and_prepare(
            assets,
            self.prices_dir,
            definition.return_config,
        )

    def _build_universe_data(
        self,
        final_assets: list[SelectedAsset],
        classified_df: pd.DataFrame,
        returns_df: pd.DataFrame,
        definition: UniverseDefinition,
    ) -> dict[str, Any]:
        """Build the final dictionary of universe data."""
        return {
            "assets": pd.DataFrame([asset.__dict__ for asset in final_assets]),
            "classifications": classified_df,
            "returns": returns_df,
            "metadata": pd.Series(definition.__dict__),
        }

list_universes()

List the names of all available universes.

Source code in src/portfolio_management/assets/universes/manager.py
def list_universes(self) -> list[str]:
    """List the names of all available universes."""
    return list(self.universes.keys())

get_definition(name)

Get the definition for a named universe.

Source code in src/portfolio_management/assets/universes/manager.py
def get_definition(self, name: str) -> UniverseDefinition:
    """Get the definition for a named universe."""
    if name not in self.universes:
        raise ConfigurationError(self.config_path, f"Universe '{name}' not found.")
    return self.universes[name]

load_universe(name, use_cache=True, strict=True)

Loads and constructs a universe by its configured name.

Source code in src/portfolio_management/assets/universes/manager.py
def load_universe(
    self,
    name: str,
    use_cache: bool = True,
    strict: bool = True,
) -> dict[str, pd.DataFrame | pd.Series] | None:
    """Loads and constructs a universe by its configured name."""
    logger = logging.getLogger(__name__)
    if use_cache and strict and name in self._cache:
        logger.info("Loading universe '%s' from cache.", name)
        return self._cache[name]

    logger.info("Loading universe '%s' from scratch.", name)
    try:
        definition = self.get_definition(name)

        selected_assets = self._select_assets(definition)
        if not selected_assets:
            raise InsufficientDataError(
                required_periods=1,
                available_periods=0,
            )

        classified_df = self._classify_assets(selected_assets)
        final_classified_df = self._filter_by_classification(
            classified_df,
            definition,
        )
        if final_classified_df.empty:
            raise InsufficientDataError(
                required_periods=1,
                available_periods=0,
            )

        final_assets = self._get_final_assets(selected_assets, final_classified_df)

        returns_df = self._calculate_returns(final_assets, definition)

        if "max_assets" in definition.constraints and not returns_df.empty:
            returns_df = returns_df.iloc[
                :,
                : int(definition.constraints["max_assets"]),
            ]

        universe_data = self._build_universe_data(
            final_assets,
            final_classified_df,
            returns_df,
            definition,
        )

        if use_cache and strict:
            self._cache[name] = universe_data

        return universe_data

    except (
        ConfigurationError,
        AssetSelectionError,
        DataValidationError,
        ClassificationError,
        InsufficientDataError,
        ReturnCalculationError,
        KeyError,
    ) as exc:
        if strict:
            raise UniverseLoadError(
                f"Failed to load universe '{name}': {exc}",
            ) from exc
        logger.warning("Universe loading failed for '%s': %s", name, exc)
        return None

options: show_root_heading: true show_source: false members_order: source group_by_category: true show_category_heading: true