Skip to content

Feature Engineering

FeatureEngineer

Bases: ABC

Base class for feature engineering transformations (Pillar F).

Transforms raw sliced time-series data into a feature matrix that can be used for comparing and selecting representative periods. Implementations define how raw data is converted into a comparable feature space.

The run() method creates a new ProblemContext with df_features populated, while subclasses implement _calc_and_get_features_df() to define the specific feature engineering logic.

Examples:

>>> class SimpleStatsFeatureEngineer(FeatureEngineer):
...     def _calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
...         features = []
...         for slice_id in context.slicer.slices:
...             slice_data = context.df_raw.loc[slice_id]
...             features.append({
...                 'mean': slice_data.mean().mean(),
...                 'std': slice_data.std().mean(),
...                 'max': slice_data.max().max()
...             })
...         return pd.DataFrame(features, index=context.slicer.slices)
...
>>> engineer = SimpleStatsFeatureEngineer()
>>> context_with_features = engineer.run(context)
>>> print(context_with_features.df_features.head())
Source code in energy_repset/feature_engineering/base_feature_engineer.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class FeatureEngineer(ABC):
    """Base class for feature engineering transformations (Pillar F).

    Transforms raw sliced time-series data into a feature matrix that can be used
    for comparing and selecting representative periods. Implementations define how
    raw data is converted into a comparable feature space.

    The run() method creates a new ProblemContext with df_features populated,
    while subclasses implement _calc_and_get_features_df() to define the specific
    feature engineering logic.

    Examples:
        >>> class SimpleStatsFeatureEngineer(FeatureEngineer):
        ...     def _calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
        ...         features = []
        ...         for slice_id in context.slicer.slices:
        ...             slice_data = context.df_raw.loc[slice_id]
        ...             features.append({
        ...                 'mean': slice_data.mean().mean(),
        ...                 'std': slice_data.std().mean(),
        ...                 'max': slice_data.max().max()
        ...             })
        ...         return pd.DataFrame(features, index=context.slicer.slices)
        ...
        >>> engineer = SimpleStatsFeatureEngineer()
        >>> context_with_features = engineer.run(context)
        >>> print(context_with_features.df_features.head())
    """
    def run(self, context: ProblemContext) -> ProblemContext:
        """Calculate features and return a new context with df_features populated.

        Args:
            context: The problem context containing raw time-series data and slicing
                information.

        Returns:
            A new ProblemContext instance with df_features set to the computed
            feature matrix. The original context is not modified.
        """
        context_with_features = context.copy()
        context_with_features.df_features = self.calc_and_get_features_df(context)
        return context_with_features

    @abstractmethod
    def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
        """Calculate and return the feature matrix.

        Args:
            context: The problem context containing raw data and slicing information.

        Returns:
            A DataFrame where each row represents one slice (candidate period) and
            each column represents a feature. The index should match the slice
            identifiers from context.slicer.slices.
        """
        ...

run

run(context: ProblemContext) -> ProblemContext

Calculate features and return a new context with df_features populated.

Parameters:

Name Type Description Default
context ProblemContext

The problem context containing raw time-series data and slicing information.

required

Returns:

Type Description
ProblemContext

A new ProblemContext instance with df_features set to the computed

ProblemContext

feature matrix. The original context is not modified.

Source code in energy_repset/feature_engineering/base_feature_engineer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def run(self, context: ProblemContext) -> ProblemContext:
    """Calculate features and return a new context with df_features populated.

    Args:
        context: The problem context containing raw time-series data and slicing
            information.

    Returns:
        A new ProblemContext instance with df_features set to the computed
        feature matrix. The original context is not modified.
    """
    context_with_features = context.copy()
    context_with_features.df_features = self.calc_and_get_features_df(context)
    return context_with_features

calc_and_get_features_df abstractmethod

calc_and_get_features_df(context: ProblemContext) -> DataFrame

Calculate and return the feature matrix.

Parameters:

Name Type Description Default
context ProblemContext

The problem context containing raw data and slicing information.

required

Returns:

Type Description
DataFrame

A DataFrame where each row represents one slice (candidate period) and

DataFrame

each column represents a feature. The index should match the slice

DataFrame

identifiers from context.slicer.slices.

Source code in energy_repset/feature_engineering/base_feature_engineer.py
54
55
56
57
58
59
60
61
62
63
64
65
66
@abstractmethod
def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
    """Calculate and return the feature matrix.

    Args:
        context: The problem context containing raw data and slicing information.

    Returns:
        A DataFrame where each row represents one slice (candidate period) and
        each column represents a feature. The index should match the slice
        identifiers from context.slicer.slices.
    """
    ...

FeaturePipeline

Bases: FeatureEngineer

Chains multiple feature engineers to create a combined feature space.

Runs multiple feature engineering transformations sequentially and concatenates their outputs into a single feature matrix. Useful for combining different feature types (e.g., statistical summaries + PCA components).

Examples:

>>> from energy_repset.feature_engineering import StandardStatsFeatureEngineer, PCAFeatureEngineer
>>> stats_engineer = StandardStatsFeatureEngineer()
>>> pca_engineer = PCAFeatureEngineer(n_components=3)
>>> pipeline = FeaturePipeline({'stats': stats_engineer, 'pca': pca_engineer})
>>> context_with_features = pipeline.run(context)
>>> print(context_with_features.df_features.columns)
    # Shows columns from both engineers: ['mean', 'std', 'max', 'min', 'pc1', 'pc2', 'pc3']
Source code in energy_repset/feature_engineering/base_feature_engineer.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class FeaturePipeline(FeatureEngineer):
    """Chains multiple feature engineers to create a combined feature space.

    Runs multiple feature engineering transformations sequentially and concatenates
    their outputs into a single feature matrix. Useful for combining different
    feature types (e.g., statistical summaries + PCA components).

    Examples:

        >>> from energy_repset.feature_engineering import StandardStatsFeatureEngineer, PCAFeatureEngineer
        >>> stats_engineer = StandardStatsFeatureEngineer()
        >>> pca_engineer = PCAFeatureEngineer(n_components=3)
        >>> pipeline = FeaturePipeline({'stats': stats_engineer, 'pca': pca_engineer})
        >>> context_with_features = pipeline.run(context)
        >>> print(context_with_features.df_features.columns)
            # Shows columns from both engineers: ['mean', 'std', 'max', 'min', 'pc1', 'pc2', 'pc3']
    """
    def __init__(self, engineers: Dict[str, FeatureEngineer]):
        """Initialize the feature pipeline.

        Args:
            engineers: Dict of FeatureEngineer instances to run sequentially.
                Features from all engineers will be concatenated column-wise.
        """
        self.engineers = engineers

    def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
        """Calculate features from all engineers sequentially, accumulating results.

        Each engineer in the pipeline sees the accumulated features from all
        previous engineers via context.df_features. New features from each stage
        are concatenated to the existing feature set. This allows:
        - Early engineers to create base features (e.g., StandardStatsFeatureEngineer)
        - Later engineers to transform or add to those features (e.g., PCAFeatureEngineer)

        Args:
            context: The problem context containing raw data.

        Returns:
            A DataFrame with columns from all engineers concatenated horizontally.
            Each engineer's features are added to the cumulative feature set.
        """
        # Create a mutable working copy of the context
        working_context = context.copy()

        # Accumulate features from each engineer
        all_features = []
        for _, engineer in self.engineers.items():
            features = engineer.calc_and_get_features_df(working_context)
            all_features.append(features)

            # Update context so next engineer can see accumulated features
            if all_features:
                working_context._df_features = pd.concat(all_features, axis=1)

        # Return the concatenated feature set
        return pd.concat(all_features, axis=1)

__init__

__init__(engineers: dict[str, FeatureEngineer])

Initialize the feature pipeline.

Parameters:

Name Type Description Default
engineers dict[str, FeatureEngineer]

Dict of FeatureEngineer instances to run sequentially. Features from all engineers will be concatenated column-wise.

required
Source code in energy_repset/feature_engineering/base_feature_engineer.py
86
87
88
89
90
91
92
93
def __init__(self, engineers: Dict[str, FeatureEngineer]):
    """Initialize the feature pipeline.

    Args:
        engineers: Dict of FeatureEngineer instances to run sequentially.
            Features from all engineers will be concatenated column-wise.
    """
    self.engineers = engineers

calc_and_get_features_df

calc_and_get_features_df(context: ProblemContext) -> DataFrame

Calculate features from all engineers sequentially, accumulating results.

Each engineer in the pipeline sees the accumulated features from all previous engineers via context.df_features. New features from each stage are concatenated to the existing feature set. This allows: - Early engineers to create base features (e.g., StandardStatsFeatureEngineer) - Later engineers to transform or add to those features (e.g., PCAFeatureEngineer)

Parameters:

Name Type Description Default
context ProblemContext

The problem context containing raw data.

required

Returns:

Type Description
DataFrame

A DataFrame with columns from all engineers concatenated horizontally.

DataFrame

Each engineer's features are added to the cumulative feature set.

Source code in energy_repset/feature_engineering/base_feature_engineer.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
    """Calculate features from all engineers sequentially, accumulating results.

    Each engineer in the pipeline sees the accumulated features from all
    previous engineers via context.df_features. New features from each stage
    are concatenated to the existing feature set. This allows:
    - Early engineers to create base features (e.g., StandardStatsFeatureEngineer)
    - Later engineers to transform or add to those features (e.g., PCAFeatureEngineer)

    Args:
        context: The problem context containing raw data.

    Returns:
        A DataFrame with columns from all engineers concatenated horizontally.
        Each engineer's features are added to the cumulative feature set.
    """
    # Create a mutable working copy of the context
    working_context = context.copy()

    # Accumulate features from each engineer
    all_features = []
    for _, engineer in self.engineers.items():
        features = engineer.calc_and_get_features_df(working_context)
        all_features.append(features)

        # Update context so next engineer can see accumulated features
        if all_features:
            working_context._df_features = pd.concat(all_features, axis=1)

    # Return the concatenated feature set
    return pd.concat(all_features, axis=1)

StandardStatsFeatureEngineer

Bases: FeatureEngineer

Extracts statistical features from time-series slices with robust scaling.

For each original variable and slice, computes: - Central tendency: mean, median (q50) - Dispersion: std, IQR (q90 - q10), q10, q90 - Distribution shape: neg_share (proportion of negative values) - Temporal dynamics: ramp_std (std of first differences)

Optionally includes cross-variable correlations within each slice (upper triangle only, Fisher-z transformed). Features are z-score normalized across slices to ensure comparability.

Examples:

>>> engineer = StandardStatsFeatureEngineer()
>>> context_with_features = engineer.run(context)
>>> print(context_with_features.df_features.columns)
# ['mean__demand', 'mean__solar', 'std__demand', 'std__solar', ...]
>>> engineer_no_corr = StandardStatsFeatureEngineer(
...     include_correlations=False,
...     scale='zscore'
... )
>>> context_with_features = engineer_no_corr.run(context)
>>> print(context_with_features.df_features.shape)
# (12, 16) for 12 months, 2 variables, 8 stats each
Source code in energy_repset/feature_engineering/standard_stats.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class StandardStatsFeatureEngineer(FeatureEngineer):
    """Extracts statistical features from time-series slices with robust scaling.

    For each original variable and slice, computes:
    - Central tendency: mean, median (q50)
    - Dispersion: std, IQR (q90 - q10), q10, q90
    - Distribution shape: neg_share (proportion of negative values)
    - Temporal dynamics: ramp_std (std of first differences)

    Optionally includes cross-variable correlations within each slice (upper
    triangle only, Fisher-z transformed). Features are z-score normalized
    across slices to ensure comparability.

    Examples:
        >>> engineer = StandardStatsFeatureEngineer()
        >>> context_with_features = engineer.run(context)
        >>> print(context_with_features.df_features.columns)
        # ['mean__demand', 'mean__solar', 'std__demand', 'std__solar', ...]

        >>> engineer_no_corr = StandardStatsFeatureEngineer(
        ...     include_correlations=False,
        ...     scale='zscore'
        ... )
        >>> context_with_features = engineer_no_corr.run(context)
        >>> print(context_with_features.df_features.shape)
        # (12, 16) for 12 months, 2 variables, 8 stats each
    """

    def __init__(
            self,
            include_correlations: bool = True,
            scale: Literal["zscore", "none"] = "zscore",
            min_rows_for_corr: int = 8,
    ):
        """Initialize the statistical feature engineer.

        Args:
            include_correlations: If True, include cross-variable correlations
                per slice (Fisher-z transformed).
            scale: Scaling method. Currently only "zscore" is fully supported.
            min_rows_for_corr: Minimum number of rows per slice required to
                compute correlations. Slices with fewer rows get correlation
                features set to 0.
        """
        self.include_correlations = include_correlations
        self.scale = scale
        self.min_rows_for_corr = min_rows_for_corr

        self._raw_feats_: pd.Series = None
        self._means_: pd.Series = None
        self._stds_: pd.Series = None
        self._feature_names_: List[str] = None

    def calc_and_get_features_df(self, context: "ProblemContext") -> pd.DataFrame:
        """Calculate statistical features and return scaled feature matrix.

        Args:
            context: Problem context with raw time-series data.

        Returns:
            DataFrame where each row is a slice and columns are scaled statistical
            features. Column names follow pattern '{stat}__{variable}'.
        """
        self._fit(context)
        return self._transform(context)

    def _fit(self, context: "ProblemContext") -> None:
        """Compute raw features and fit scaling parameters."""
        df_raw = context.df_raw
        slicer = context.slicer

        self._raw_feats_ = self._compute_raw_features(df_raw, slicer)
        if self.scale == "zscore":
            self._means_ = self._raw_feats_.mean(axis=0)
            self._stds_ = self._raw_feats_.std(axis=0).replace(0, 1.0)
        self._feature_names_ = list(self._raw_feats_.columns)

    def _transform(self, context: "ProblemContext") -> pd.DataFrame:
        """Apply scaling to raw features."""
        feats = self._raw_feats_
        if self.scale == "zscore":
            feats = (feats - self._means_) / self._stds_
        elif self.scale == "none":
            pass
        else:
            raise NotImplementedError(f"Scaling {self.scale} not recognized.")
        feats = feats.replace([np.inf, -np.inf], 0.0).fillna(0.0)
        return feats

    def feature_names(self) -> List[str]:
        """Get list of feature column names.

        Returns:
            List of feature names in the format '{stat}__{variable}' or
            'corr__{var1}__{var2}' for correlations.
        """
        if self._feature_names_ is None:
            return []
        return list(self._feature_names_)

    def _compute_raw_features(self, df: pd.DataFrame, slicer: TimeSlicer) -> pd.DataFrame:
        """Compute raw (unscaled) statistical features for each slice."""
        X = df.select_dtypes(include=[np.number]).copy()
        labels = pd.Index(slicer.labels_for_index(X.index), name="slice")
        grp = X.groupby(labels)

        def neg_share(a: pd.Series) -> float:
            n = a.notna().sum()
            return float((a < 0).sum() / n) if n > 0 else 0.0

        def ramp_std(a: pd.Series) -> float:
            d = a.diff().dropna()
            return float(d.std()) if len(d) else 0.0

        stats: Dict[str, pd.DataFrame] = {}
        stats["mean"] = grp.mean(numeric_only=True)
        stats["std"] = grp.std(numeric_only=True).fillna(0.0)
        stats["q10"] = grp.quantile(0.10)
        stats["q50"] = grp.quantile(0.50)
        stats["q90"] = grp.quantile(0.90)
        stats["iqr"] = stats["q90"] - stats["q10"]
        stats["neg_share"] = grp.apply(lambda g: g.apply(neg_share, axis=0))
        stats["ramp_std"] = grp.apply(lambda g: g.apply(ramp_std, axis=0))

        frames = []
        for key, dfk in stats.items():
            dfk = dfk.add_prefix(f"{key}__")
            frames.append(dfk)

        if self.include_correlations and X.shape[1] >= 2:
            cols = list(X.columns)
            pairs = [(i, j) for i in range(len(cols)) for j in range(i + 1, len(cols))]
            names = [f"corr__{cols[i]}__{cols[j]}" for i, j in pairs]
            corr_rows = []
            idx_rows = []
            for s, g in grp:
                if len(g) >= self.min_rows_for_corr:
                    C = g.corr().to_numpy()
                    vals = [C[i, j] for i, j in pairs]
                else:
                    vals = [0.0] * len(pairs)
                zvals = [0.5 * np.log((1 + v) / (1 - v)) if abs(v) < 0.999 else np.sign(v) * 3.8 for v in vals]
                corr_rows.append(zvals)
                idx_rows.append(s)
            corr_df = pd.DataFrame(corr_rows, index=idx_rows, columns=names)
            frames.append(corr_df)

        df_features = pd.concat(frames, axis=1).sort_index()
        return df_features

__init__

__init__(include_correlations: bool = True, scale: Literal['zscore', 'none'] = 'zscore', min_rows_for_corr: int = 8)

Initialize the statistical feature engineer.

Parameters:

Name Type Description Default
include_correlations bool

If True, include cross-variable correlations per slice (Fisher-z transformed).

True
scale Literal['zscore', 'none']

Scaling method. Currently only "zscore" is fully supported.

'zscore'
min_rows_for_corr int

Minimum number of rows per slice required to compute correlations. Slices with fewer rows get correlation features set to 0.

8
Source code in energy_repset/feature_engineering/standard_stats.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(
        self,
        include_correlations: bool = True,
        scale: Literal["zscore", "none"] = "zscore",
        min_rows_for_corr: int = 8,
):
    """Initialize the statistical feature engineer.

    Args:
        include_correlations: If True, include cross-variable correlations
            per slice (Fisher-z transformed).
        scale: Scaling method. Currently only "zscore" is fully supported.
        min_rows_for_corr: Minimum number of rows per slice required to
            compute correlations. Slices with fewer rows get correlation
            features set to 0.
    """
    self.include_correlations = include_correlations
    self.scale = scale
    self.min_rows_for_corr = min_rows_for_corr

    self._raw_feats_: pd.Series = None
    self._means_: pd.Series = None
    self._stds_: pd.Series = None
    self._feature_names_: List[str] = None

calc_and_get_features_df

calc_and_get_features_df(context: 'ProblemContext') -> DataFrame

Calculate statistical features and return scaled feature matrix.

Parameters:

Name Type Description Default
context 'ProblemContext'

Problem context with raw time-series data.

required

Returns:

Type Description
DataFrame

DataFrame where each row is a slice and columns are scaled statistical

DataFrame

features. Column names follow pattern '{stat}__{variable}'.

Source code in energy_repset/feature_engineering/standard_stats.py
67
68
69
70
71
72
73
74
75
76
77
78
def calc_and_get_features_df(self, context: "ProblemContext") -> pd.DataFrame:
    """Calculate statistical features and return scaled feature matrix.

    Args:
        context: Problem context with raw time-series data.

    Returns:
        DataFrame where each row is a slice and columns are scaled statistical
        features. Column names follow pattern '{stat}__{variable}'.
    """
    self._fit(context)
    return self._transform(context)

feature_names

feature_names() -> list[str]

Get list of feature column names.

Returns:

Type Description
list[str]

List of feature names in the format '{stat}__{variable}' or

list[str]

'corr__{var1}__{var2}' for correlations.

Source code in energy_repset/feature_engineering/standard_stats.py
103
104
105
106
107
108
109
110
111
112
def feature_names(self) -> List[str]:
    """Get list of feature column names.

    Returns:
        List of feature names in the format '{stat}__{variable}' or
        'corr__{var1}__{var2}' for correlations.
    """
    if self._feature_names_ is None:
        return []
    return list(self._feature_names_)

PCAFeatureEngineer

Bases: FeatureEngineer

Performs PCA dimensionality reduction on existing features.

Reduces the feature space using Principal Component Analysis, typically applied after statistical feature engineering. This is useful for: - Reducing dimensionality when you have many correlated features - Creating orthogonal feature representations - Focusing on the main axes of variation

Commonly used in a FeaturePipeline after StandardStatsFeatureEngineer to compress statistical features into a smaller number of principal components.

Parameters:

Name Type Description Default
n_components int | float | None

Number of principal components to retain. Can be: - int: Exact number of components - float (0.0-1.0): Retain enough components to explain this fraction of variance - None: Retain all components (no reduction)

None
whiten bool

If True, scale components to unit variance. This can improve results when PCA features are used with distance-based algorithms.

False

Examples:

>>> from energy_repset.feature_engineering import PCAFeatureEngineer
>>> # Use PCA alone (requires context to already have df_features)
>>> pca_engineer = PCAFeatureEngineer(n_components=5)
>>> context_with_pca = pca_engineer.run(context_with_features)
>>> print(context_with_pca.df_features.columns)
    ['pc_0', 'pc_1', 'pc_2', 'pc_3', 'pc_4']

>>> # More common: chain with StandardStats in a pipeline
>>> from energy_repset.feature_engineering import (
...     StandardStatsFeatureEngineer,
...     FeaturePipeline
... )
>>> pipeline = FeaturePipeline([
...     StandardStatsFeatureEngineer(),
...     PCAFeatureEngineer(n_components=0.95)  # Keep 95% variance
... ])
>>> context_with_both = pipeline.run(context)

>>> # Check explained variance
>>> pca_engineer = PCAFeatureEngineer(n_components=10)
>>> context_out = pca_engineer.run(context_with_features)
>>> print(pca_engineer.explained_variance_ratio_)
    [0.45, 0.22, 0.11, ...]
Source code in energy_repset/feature_engineering/pca.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class PCAFeatureEngineer(FeatureEngineer):
    """Performs PCA dimensionality reduction on existing features.

    Reduces the feature space using Principal Component Analysis, typically
    applied after statistical feature engineering. This is useful for:
    - Reducing dimensionality when you have many correlated features
    - Creating orthogonal feature representations
    - Focusing on the main axes of variation

    Commonly used in a FeaturePipeline after StandardStatsFeatureEngineer
    to compress statistical features into a smaller number of principal
    components.

    Args:
        n_components: Number of principal components to retain. Can be:
            - int: Exact number of components
            - float (0.0-1.0): Retain enough components to explain this
              fraction of variance
            - None: Retain all components (no reduction)
        whiten: If True, scale components to unit variance. This can improve
            results when PCA features are used with distance-based algorithms.

    Examples:

        >>> from energy_repset.feature_engineering import PCAFeatureEngineer
        >>> # Use PCA alone (requires context to already have df_features)
        >>> pca_engineer = PCAFeatureEngineer(n_components=5)
        >>> context_with_pca = pca_engineer.run(context_with_features)
        >>> print(context_with_pca.df_features.columns)
            ['pc_0', 'pc_1', 'pc_2', 'pc_3', 'pc_4']

        >>> # More common: chain with StandardStats in a pipeline
        >>> from energy_repset.feature_engineering import (
        ...     StandardStatsFeatureEngineer,
        ...     FeaturePipeline
        ... )
        >>> pipeline = FeaturePipeline([
        ...     StandardStatsFeatureEngineer(),
        ...     PCAFeatureEngineer(n_components=0.95)  # Keep 95% variance
        ... ])
        >>> context_with_both = pipeline.run(context)

        >>> # Check explained variance
        >>> pca_engineer = PCAFeatureEngineer(n_components=10)
        >>> context_out = pca_engineer.run(context_with_features)
        >>> print(pca_engineer.explained_variance_ratio_)
            [0.45, 0.22, 0.11, ...]
    """

    def __init__(
        self,
        n_components: int | float | None = None,
        whiten: bool = False
    ) -> None:
        """Initialize PCA feature engineer.

        Args:
            n_components: Number of components to keep, or fraction of
                variance to preserve (if float). None keeps all components.
            whiten: Whether to whiten (scale) the principal components.
        """
        self.n_components = n_components
        self.whiten = whiten
        self._pca: PCA | None = None
        self._feature_names: List[str] = []

    def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
        """Apply PCA to existing features in context.

        Args:
            context: Problem context with df_features already populated
                (typically by StandardStatsFeatureEngineer or similar).

        Returns:
            DataFrame with principal component features. Columns are named
            'pc_0', 'pc_1', etc.

        Raises:
            ValueError: If context.df_features is None or empty.
        """
        if context._df_features is None or context._df_features.empty:
            raise ValueError(
                "PCAFeatureEngineer requires context.df_features to be populated. "
                "Run StandardStatsFeatureEngineer or similar first, or use "
                "FeaturePipeline([StandardStatsFeatureEngineer(), PCAFeatureEngineer()])."
            )

        X = context.df_features.values
        index = context.df_features.index

        # Handle NaN/inf values
        X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        # Fit PCA
        self._pca = PCA(n_components=self.n_components, whiten=self.whiten)
        X_transformed = self._pca.fit_transform(X)

        # Create feature names
        n_components_actual = X_transformed.shape[1]
        self._feature_names = [f"pc_{i}" for i in range(n_components_actual)]

        # Create DataFrame
        df_pca = pd.DataFrame(
            X_transformed,
            index=index,
            columns=self._feature_names
        )

        return df_pca

    def feature_names(self) -> List[str]:
        """Get list of principal component feature names.

        Returns:
            List of feature names: ['pc_0', 'pc_1', ...].
        """
        return list(self._feature_names)

    @property
    def explained_variance_ratio_(self) -> np.ndarray | None:
        """Get the proportion of variance explained by each component.

        Returns:
            Array of explained variance ratios, or None if PCA not fitted yet.

        Examples:

            >>> pca_eng = PCAFeatureEngineer(n_components=5)
            >>> context_out = pca_eng.run(context_with_features)
            >>> print(pca_eng.explained_variance_ratio_)
            # [0.45, 0.22, 0.15, 0.09, 0.05]
            >>> print(f"Total variance explained: {pca_eng.explained_variance_ratio_.sum():.2%}")
            # Total variance explained: 96%
        """
        if self._pca is None:
            return None
        return self._pca.explained_variance_ratio_

    @property
    def components_(self) -> np.ndarray | None:
        """Get the principal component loadings.

        Returns:
            Array of shape (n_components, n_features) containing the
            principal axes in feature space, or None if PCA not fitted yet.
        """
        if self._pca is None:
            return None
        return self._pca.components_

explained_variance_ratio_ property

explained_variance_ratio_: ndarray | None

Get the proportion of variance explained by each component.

Returns:

Type Description
ndarray | None

Array of explained variance ratios, or None if PCA not fitted yet.

Examples:

>>> pca_eng = PCAFeatureEngineer(n_components=5)
>>> context_out = pca_eng.run(context_with_features)
>>> print(pca_eng.explained_variance_ratio_)
# [0.45, 0.22, 0.15, 0.09, 0.05]
>>> print(f"Total variance explained: {pca_eng.explained_variance_ratio_.sum():.2%}")
# Total variance explained: 96%

components_ property

components_: ndarray | None

Get the principal component loadings.

Returns:

Type Description
ndarray | None

Array of shape (n_components, n_features) containing the

ndarray | None

principal axes in feature space, or None if PCA not fitted yet.

__init__

__init__(n_components: int | float | None = None, whiten: bool = False) -> None

Initialize PCA feature engineer.

Parameters:

Name Type Description Default
n_components int | float | None

Number of components to keep, or fraction of variance to preserve (if float). None keeps all components.

None
whiten bool

Whether to whiten (scale) the principal components.

False
Source code in energy_repset/feature_engineering/pca.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    n_components: int | float | None = None,
    whiten: bool = False
) -> None:
    """Initialize PCA feature engineer.

    Args:
        n_components: Number of components to keep, or fraction of
            variance to preserve (if float). None keeps all components.
        whiten: Whether to whiten (scale) the principal components.
    """
    self.n_components = n_components
    self.whiten = whiten
    self._pca: PCA | None = None
    self._feature_names: List[str] = []

calc_and_get_features_df

calc_and_get_features_df(context: ProblemContext) -> DataFrame

Apply PCA to existing features in context.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with df_features already populated (typically by StandardStatsFeatureEngineer or similar).

required

Returns:

Type Description
DataFrame

DataFrame with principal component features. Columns are named

DataFrame

'pc_0', 'pc_1', etc.

Raises:

Type Description
ValueError

If context.df_features is None or empty.

Source code in energy_repset/feature_engineering/pca.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
    """Apply PCA to existing features in context.

    Args:
        context: Problem context with df_features already populated
            (typically by StandardStatsFeatureEngineer or similar).

    Returns:
        DataFrame with principal component features. Columns are named
        'pc_0', 'pc_1', etc.

    Raises:
        ValueError: If context.df_features is None or empty.
    """
    if context._df_features is None or context._df_features.empty:
        raise ValueError(
            "PCAFeatureEngineer requires context.df_features to be populated. "
            "Run StandardStatsFeatureEngineer or similar first, or use "
            "FeaturePipeline([StandardStatsFeatureEngineer(), PCAFeatureEngineer()])."
        )

    X = context.df_features.values
    index = context.df_features.index

    # Handle NaN/inf values
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

    # Fit PCA
    self._pca = PCA(n_components=self.n_components, whiten=self.whiten)
    X_transformed = self._pca.fit_transform(X)

    # Create feature names
    n_components_actual = X_transformed.shape[1]
    self._feature_names = [f"pc_{i}" for i in range(n_components_actual)]

    # Create DataFrame
    df_pca = pd.DataFrame(
        X_transformed,
        index=index,
        columns=self._feature_names
    )

    return df_pca

feature_names

feature_names() -> list[str]

Get list of principal component feature names.

Returns:

Type Description
list[str]

List of feature names: ['pc_0', 'pc_1', ...].

Source code in energy_repset/feature_engineering/pca.py
125
126
127
128
129
130
131
def feature_names(self) -> List[str]:
    """Get list of principal component feature names.

    Returns:
        List of feature names: ['pc_0', 'pc_1', ...].
    """
    return list(self._feature_names)

DirectProfileFeatureEngineer

Bases: FeatureEngineer

Feature engineer that uses raw profile vectors directly (F_direct).

For each slice, concatenates the raw hourly values across all variables into a single flat feature vector. This preserves the full temporal shape of each period, making it suitable for algorithms that compare time-series profiles directly (e.g., Snippet Algorithm, DTW-based methods).

Parameters:

Name Type Description Default
variable_weights dict[str, float] | None

Optional dict mapping column names to scalar weights. Weighted columns are multiplied by their weight before flattening. Columns not in the dict are included with weight 1.0.

None

Examples:

Basic usage with daily slicing:

>>> from energy_repset.feature_engineering import DirectProfileFeatureEngineer
>>> engineer = DirectProfileFeatureEngineer()
>>> context_with_features = engineer.run(context)
>>> context_with_features.df_features.shape
(365, 72)  # 365 days x (24 hours * 3 variables)
Source code in energy_repset/feature_engineering/direct_profile.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class DirectProfileFeatureEngineer(FeatureEngineer):
    """Feature engineer that uses raw profile vectors directly (F_direct).

    For each slice, concatenates the raw hourly values across all variables
    into a single flat feature vector. This preserves the full temporal shape
    of each period, making it suitable for algorithms that compare time-series
    profiles directly (e.g., Snippet Algorithm, DTW-based methods).

    Args:
        variable_weights: Optional dict mapping column names to scalar weights.
            Weighted columns are multiplied by their weight before flattening.
            Columns not in the dict are included with weight 1.0.

    Examples:
        Basic usage with daily slicing:

        >>> from energy_repset.feature_engineering import DirectProfileFeatureEngineer
        >>> engineer = DirectProfileFeatureEngineer()
        >>> context_with_features = engineer.run(context)
        >>> context_with_features.df_features.shape
        (365, 72)  # 365 days x (24 hours * 3 variables)
    """

    def __init__(self, variable_weights: Optional[Dict[str, float]] = None):
        """Initialize direct profile feature engineer.

        Args:
            variable_weights: Optional mapping of variable names to weights.
                Variables not in the dict receive weight 1.0.
        """
        self.variable_weights = variable_weights or {}

    def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
        """Flatten each slice's raw values into a single feature row.

        Args:
            context: Problem context with raw time-series data.

        Returns:
            DataFrame where each row is one slice and columns are the
            flattened hourly values (hours x variables).
        """
        df = context.df_raw.select_dtypes(include=[np.number]).copy()

        for col, w in self.variable_weights.items():
            if col in df.columns:
                df[col] = df[col] * w

        slice_labels = context.slicer.labels_for_index(df.index)
        unique_slices = context.slicer.unique_slices(df.index)

        rows = []
        for s in unique_slices:
            mask = slice_labels == s
            chunk = df.loc[mask]
            flat = chunk.values.flatten(order='C')
            rows.append(flat)

        max_len = max(len(r) for r in rows)
        padded = []
        for r in rows:
            if len(r) < max_len:
                r = np.pad(r, (0, max_len - len(r)), constant_values=np.nan)
            padded.append(r)

        col_names = [f"t{i}" for i in range(max_len)]
        return pd.DataFrame(padded, index=unique_slices, columns=col_names)

__init__

__init__(variable_weights: dict[str, float] | None = None)

Initialize direct profile feature engineer.

Parameters:

Name Type Description Default
variable_weights dict[str, float] | None

Optional mapping of variable names to weights. Variables not in the dict receive weight 1.0.

None
Source code in energy_repset/feature_engineering/direct_profile.py
37
38
39
40
41
42
43
44
def __init__(self, variable_weights: Optional[Dict[str, float]] = None):
    """Initialize direct profile feature engineer.

    Args:
        variable_weights: Optional mapping of variable names to weights.
            Variables not in the dict receive weight 1.0.
    """
    self.variable_weights = variable_weights or {}

calc_and_get_features_df

calc_and_get_features_df(context: ProblemContext) -> DataFrame

Flatten each slice's raw values into a single feature row.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with raw time-series data.

required

Returns:

Type Description
DataFrame

DataFrame where each row is one slice and columns are the

DataFrame

flattened hourly values (hours x variables).

Source code in energy_repset/feature_engineering/direct_profile.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def calc_and_get_features_df(self, context: ProblemContext) -> pd.DataFrame:
    """Flatten each slice's raw values into a single feature row.

    Args:
        context: Problem context with raw time-series data.

    Returns:
        DataFrame where each row is one slice and columns are the
        flattened hourly values (hours x variables).
    """
    df = context.df_raw.select_dtypes(include=[np.number]).copy()

    for col, w in self.variable_weights.items():
        if col in df.columns:
            df[col] = df[col] * w

    slice_labels = context.slicer.labels_for_index(df.index)
    unique_slices = context.slicer.unique_slices(df.index)

    rows = []
    for s in unique_slices:
        mask = slice_labels == s
        chunk = df.loc[mask]
        flat = chunk.values.flatten(order='C')
        rows.append(flat)

    max_len = max(len(r) for r in rows)
    padded = []
    for r in rows:
        if len(r) < max_len:
            r = np.pad(r, (0, max_len - len(r)), constant_values=np.nan)
        padded.append(r)

    col_names = [f"t{i}" for i in range(max_len)]
    return pd.DataFrame(padded, index=unique_slices, columns=col_names)