Skip to content

Search Algorithms

SearchAlgorithm

Bases: ABC

Base class for all selection search algorithms (Pillar A).

Defines the interface for algorithms that find optimal representative subsets. The algorithm's sole responsibility is to take a problem context and find the best selection of k items based on its internal logic and objective function.

Different workflow types implement this protocol differently: - Generate-and-Test: Generates candidates, evaluates with ObjectiveSet, selects best - Constructive: Builds solution iteratively (e.g., k-means clustering) - Direct Optimization: Formulates and solves as single optimization problem (e.g., MILP)

Examples:

>>> class SimpleExhaustiveSearch(SearchAlgorithm):
...     def __init__(self, objective_set: ObjectiveSet, selection_policy: SelectionPolicy, k: int):
...         self.objective_set = objective_set
...         self.selection_policy = selection_policy
...         self.k = k
...
...     def find_selection(self, context: ProblemContext) -> RepSetResult:
...         # Generate all k-combinations
...         from itertools import combinations
...         all_combis = list(combinations(context.slicer.slices, self.k))
...
...         # Score each combination
...         scored_combis = []
...         for combi in all_combis:
...             scores = self.objective_set.evaluate(context, combi)
...             scored_combis.append((combi, scores))
...
...         # Select best according to policy
...         best_combi, best_scores = self.selection_policy.select(scored_combis)
...
...         return RepSetResult(
...             selection=best_combi,
...             weights={s: 1/self.k for s in best_combi},
...             scores=best_scores
...         )
...
>>> algorithm = SimpleExhaustiveSearch(objective_set, policy, k=4)
>>> result = algorithm.find_selection(context)
>>> print(result.selection)  # e.g., (0, 3, 6, 9) - selected slice IDs
Source code in energy_repset/search_algorithms/search_algorithm.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class SearchAlgorithm(ABC):
    """Base class for all selection search algorithms (Pillar A).

    Defines the interface for algorithms that find optimal representative subsets.
    The algorithm's sole responsibility is to take a problem context and find the
    best selection of k items based on its internal logic and objective function.

    Different workflow types implement this protocol differently:
    - Generate-and-Test: Generates candidates, evaluates with ObjectiveSet, selects best
    - Constructive: Builds solution iteratively (e.g., k-means clustering)
    - Direct Optimization: Formulates and solves as single optimization problem (e.g., MILP)

    Examples:
        >>> class SimpleExhaustiveSearch(SearchAlgorithm):
        ...     def __init__(self, objective_set: ObjectiveSet, selection_policy: SelectionPolicy, k: int):
        ...         self.objective_set = objective_set
        ...         self.selection_policy = selection_policy
        ...         self.k = k
        ...
        ...     def find_selection(self, context: ProblemContext) -> RepSetResult:
        ...         # Generate all k-combinations
        ...         from itertools import combinations
        ...         all_combis = list(combinations(context.slicer.slices, self.k))
        ...
        ...         # Score each combination
        ...         scored_combis = []
        ...         for combi in all_combis:
        ...             scores = self.objective_set.evaluate(context, combi)
        ...             scored_combis.append((combi, scores))
        ...
        ...         # Select best according to policy
        ...         best_combi, best_scores = self.selection_policy.select(scored_combis)
        ...
        ...         return RepSetResult(
        ...             selection=best_combi,
        ...             weights={s: 1/self.k for s in best_combi},
        ...             scores=best_scores
        ...         )
        ...
        >>> algorithm = SimpleExhaustiveSearch(objective_set, policy, k=4)
        >>> result = algorithm.find_selection(context)
        >>> print(result.selection)  # e.g., (0, 3, 6, 9) - selected slice IDs
    """

    @abstractmethod
    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Find the best subset of k representative periods.

        Args:
            context: The problem context with df_features populated (feature
                engineering must be run before calling this method).

        Returns:
            A RepSetResult containing the selected slice identifiers, their
            representation weights, and objective scores.
        """
        ...

find_selection abstractmethod

find_selection(context: ProblemContext) -> RepSetResult

Find the best subset of k representative periods.

Parameters:

Name Type Description Default
context ProblemContext

The problem context with df_features populated (feature engineering must be run before calling this method).

required

Returns:

Type Description
RepSetResult

A RepSetResult containing the selected slice identifiers, their

RepSetResult

representation weights, and objective scores.

Source code in energy_repset/search_algorithms/search_algorithm.py
53
54
55
56
57
58
59
60
61
62
63
64
65
@abstractmethod
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Find the best subset of k representative periods.

    Args:
        context: The problem context with df_features populated (feature
            engineering must be run before calling this method).

    Returns:
        A RepSetResult containing the selected slice identifiers, their
        representation weights, and objective scores.
    """
    ...

ObjectiveDrivenSearchAlgorithm

Bases: SearchAlgorithm, ABC

Base class for search algorithms guided by external objective functions.

Provides a common structure for algorithms that rely on a user-defined ObjectiveSet to score candidates and a SelectionPolicy to choose the best. This pattern separates the search strategy from the objective function, enabling flexible algorithm design.

Examples:

>>> from energy_repset.objectives import ObjectiveSet, ObjectiveSpec
>>> from energy_repset.score_components import WassersteinFidelity
>>> from energy_repset.selection_policies import WeightedSumPolicy
>>> objectives = ObjectiveSet({
...     'wasserstein': (1.0, WassersteinFidelity()),
... })
>>> policy = WeightedSumPolicy()
>>> # See ObjectiveDrivenCombinatorialSearchAlgorithm for concrete usage
Source code in energy_repset/search_algorithms/objective_driven.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class ObjectiveDrivenSearchAlgorithm(SearchAlgorithm, ABC):
    """Base class for search algorithms guided by external objective functions.

    Provides a common structure for algorithms that rely on a user-defined
    ObjectiveSet to score candidates and a SelectionPolicy to choose the best.
    This pattern separates the search strategy from the objective function,
    enabling flexible algorithm design.

    Examples:
        >>> from energy_repset.objectives import ObjectiveSet, ObjectiveSpec
        >>> from energy_repset.score_components import WassersteinFidelity
        >>> from energy_repset.selection_policies import WeightedSumPolicy
        >>> objectives = ObjectiveSet({
        ...     'wasserstein': (1.0, WassersteinFidelity()),
        ... })
        >>> policy = WeightedSumPolicy()
        >>> # See ObjectiveDrivenCombinatorialSearchAlgorithm for concrete usage
    """
    def __init__(
            self,
            objective_set: ObjectiveSet,
            selection_policy: SelectionPolicy,
    ):
        """Initialize objective-driven search algorithm.

        Args:
            objective_set: Collection of score components defining quality metrics.
            selection_policy: Strategy for selecting best combination from scored
                candidates (e.g., weighted sum, Pareto dominance).
        """
        self.objective_set = objective_set
        self.selection_policy = selection_policy

__init__

__init__(objective_set: ObjectiveSet, selection_policy: SelectionPolicy)

Initialize objective-driven search algorithm.

Parameters:

Name Type Description Default
objective_set ObjectiveSet

Collection of score components defining quality metrics.

required
selection_policy SelectionPolicy

Strategy for selecting best combination from scored candidates (e.g., weighted sum, Pareto dominance).

required
Source code in energy_repset/search_algorithms/objective_driven.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
        self,
        objective_set: ObjectiveSet,
        selection_policy: SelectionPolicy,
):
    """Initialize objective-driven search algorithm.

    Args:
        objective_set: Collection of score components defining quality metrics.
        selection_policy: Strategy for selecting best combination from scored
            candidates (e.g., weighted sum, Pareto dominance).
    """
    self.objective_set = objective_set
    self.selection_policy = selection_policy

ObjectiveDrivenCombinatorialSearchAlgorithm

Bases: ObjectiveDrivenSearchAlgorithm

Generate-and-test search using a combination generator (Workflow Type 1).

Generates candidate combinations using a CombinationGenerator, scores each with the ObjectiveSet, and selects the best according to the SelectionPolicy. This is the canonical implementation of the Generate-and-Test workflow.

Supports exhaustive search (all k-combinations) and constrained generation (e.g., seasonal quotas). Displays progress with tqdm and stores all evaluations in diagnostics for analysis.

Examples:

>>> from energy_repset.objectives import ObjectiveSet, ObjectiveSpec,
>>> from energy_repset.combi_gens import ExhaustiveCombiGen,
>>> from energy_repset.selection_policies import WeightedSumPolicy
>>> from energy_repset.score_components import WassersteinFidelity, CorrelationFidelity
>>> objectives = ObjectiveSet({
...     'wasserstein': (1.0, WassersteinFidelity()),
...     'correlation': (0.5, CorrelationFidelity())
... })
>>> policy = WeightedSumPolicy()
>>> generator = ExhaustiveCombiGen(k=4)
>>> algorithm = ObjectiveDrivenCombinatorialSearchAlgorithm(
...     objective_set=objectives,
...     selection_policy=policy,
...     combination_generator=generator
... )
>>> result = algorithm.find_selection(context, k=4)
>>> print(result.selection)  # Best 4-month selection
>>> print(result.diagnostics['evaluations_df'])  # All scored combinations
Source code in energy_repset/search_algorithms/objective_driven.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class ObjectiveDrivenCombinatorialSearchAlgorithm(ObjectiveDrivenSearchAlgorithm):
    """Generate-and-test search using a combination generator (Workflow Type 1).

    Generates candidate combinations using a CombinationGenerator, scores each
    with the ObjectiveSet, and selects the best according to the SelectionPolicy.
    This is the canonical implementation of the Generate-and-Test workflow.

    Supports exhaustive search (all k-combinations) and constrained generation
    (e.g., seasonal quotas). Displays progress with tqdm and stores all
    evaluations in diagnostics for analysis.

    Examples:
        >>> from energy_repset.objectives import ObjectiveSet, ObjectiveSpec,
        >>> from energy_repset.combi_gens import ExhaustiveCombiGen,
        >>> from energy_repset.selection_policies import WeightedSumPolicy
        >>> from energy_repset.score_components import WassersteinFidelity, CorrelationFidelity
        >>> objectives = ObjectiveSet({
        ...     'wasserstein': (1.0, WassersteinFidelity()),
        ...     'correlation': (0.5, CorrelationFidelity())
        ... })
        >>> policy = WeightedSumPolicy()
        >>> generator = ExhaustiveCombiGen(k=4)
        >>> algorithm = ObjectiveDrivenCombinatorialSearchAlgorithm(
        ...     objective_set=objectives,
        ...     selection_policy=policy,
        ...     combination_generator=generator
        ... )
        >>> result = algorithm.find_selection(context, k=4)
        >>> print(result.selection)  # Best 4-month selection
        >>> print(result.diagnostics['evaluations_df'])  # All scored combinations
    """
    def __init__(
            self,
            objective_set: ObjectiveSet,
            selection_policy: SelectionPolicy,
            combination_generator: CombinationGenerator,
    ):
        """Initialize combinatorial search algorithm.

        Args:
            objective_set: Collection of score components defining quality metrics.
            selection_policy: Strategy for selecting the best combination.
            combination_generator: Defines which combinations to evaluate
                (e.g., all combinations, seasonal constraints).
        """
        super().__init__(objective_set, selection_policy)
        self.combination_generator = combination_generator
        self._all_scores_df: pd.DataFrame | None = None

    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Find optimal selection by exhaustively scoring generated combinations.

        Args:
            context: Problem context with df_features populated.

        Returns:
            RepSetResult with the winning selection, scores, representatives,
            and diagnostics containing evaluations_df with all scored combinations.
        """
        from tqdm import tqdm
        import pandas as pd

        self.objective_set.prepare(context)

        unique_slices = context.get_unique_slices()
        iterator = tqdm(
            self.combination_generator.generate(unique_slices),
            desc='Iterating over combinations',
            total=self.combination_generator.count(unique_slices)
        )

        rows = []
        for combi in iterator:
            metrics = self.objective_set.evaluate(combi, context)
            rec = {
                "slices": combi,
                "label": ", ".join(str(s) for s in combi)
            }
            rec.update(metrics)
            rows.append(rec)

        evaluations_df = pd.DataFrame(rows)
        self._all_scores_df = evaluations_df.copy()

        winning_combination = self.selection_policy.select_best(evaluations_df, self.objective_set)
        slice_labels = context.slicer.labels_for_index(context.df_raw.index)
        result = RepSetResult(
            context=context,
            selection_space='subset',
            selection=winning_combination,
            scores=self.objective_set.evaluate(winning_combination, context),
            representatives={s: context.df_raw.iloc[slice_labels == s] for s in winning_combination},
            diagnostics={'evaluations_df': evaluations_df}
        )
        return result

    def get_all_scores(self) -> pd.DataFrame:
        """Return DataFrame of all evaluated combinations with scores.

        Returns:
            DataFrame with columns: slices, label, score_comp_1, score_comp_2, ...

        Raises:
            ValueError: If find_selection() has not been called yet.
        """
        import pandas as pd

        if self._all_scores_df is None:
            raise ValueError("No scores available. Call find_selection() first.")
        return self._all_scores_df.copy()

__init__

__init__(objective_set: ObjectiveSet, selection_policy: SelectionPolicy, combination_generator: CombinationGenerator)

Initialize combinatorial search algorithm.

Parameters:

Name Type Description Default
objective_set ObjectiveSet

Collection of score components defining quality metrics.

required
selection_policy SelectionPolicy

Strategy for selecting the best combination.

required
combination_generator CombinationGenerator

Defines which combinations to evaluate (e.g., all combinations, seasonal constraints).

required
Source code in energy_repset/search_algorithms/objective_driven.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
        self,
        objective_set: ObjectiveSet,
        selection_policy: SelectionPolicy,
        combination_generator: CombinationGenerator,
):
    """Initialize combinatorial search algorithm.

    Args:
        objective_set: Collection of score components defining quality metrics.
        selection_policy: Strategy for selecting the best combination.
        combination_generator: Defines which combinations to evaluate
            (e.g., all combinations, seasonal constraints).
    """
    super().__init__(objective_set, selection_policy)
    self.combination_generator = combination_generator
    self._all_scores_df: pd.DataFrame | None = None

find_selection

find_selection(context: ProblemContext) -> RepSetResult

Find optimal selection by exhaustively scoring generated combinations.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with df_features populated.

required

Returns:

Type Description
RepSetResult

RepSetResult with the winning selection, scores, representatives,

RepSetResult

and diagnostics containing evaluations_df with all scored combinations.

Source code in energy_repset/search_algorithms/objective_driven.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Find optimal selection by exhaustively scoring generated combinations.

    Args:
        context: Problem context with df_features populated.

    Returns:
        RepSetResult with the winning selection, scores, representatives,
        and diagnostics containing evaluations_df with all scored combinations.
    """
    from tqdm import tqdm
    import pandas as pd

    self.objective_set.prepare(context)

    unique_slices = context.get_unique_slices()
    iterator = tqdm(
        self.combination_generator.generate(unique_slices),
        desc='Iterating over combinations',
        total=self.combination_generator.count(unique_slices)
    )

    rows = []
    for combi in iterator:
        metrics = self.objective_set.evaluate(combi, context)
        rec = {
            "slices": combi,
            "label": ", ".join(str(s) for s in combi)
        }
        rec.update(metrics)
        rows.append(rec)

    evaluations_df = pd.DataFrame(rows)
    self._all_scores_df = evaluations_df.copy()

    winning_combination = self.selection_policy.select_best(evaluations_df, self.objective_set)
    slice_labels = context.slicer.labels_for_index(context.df_raw.index)
    result = RepSetResult(
        context=context,
        selection_space='subset',
        selection=winning_combination,
        scores=self.objective_set.evaluate(winning_combination, context),
        representatives={s: context.df_raw.iloc[slice_labels == s] for s in winning_combination},
        diagnostics={'evaluations_df': evaluations_df}
    )
    return result

get_all_scores

get_all_scores() -> DataFrame

Return DataFrame of all evaluated combinations with scores.

Returns:

Type Description
DataFrame

DataFrame with columns: slices, label, score_comp_1, score_comp_2, ...

Raises:

Type Description
ValueError

If find_selection() has not been called yet.

Source code in energy_repset/search_algorithms/objective_driven.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_all_scores(self) -> pd.DataFrame:
    """Return DataFrame of all evaluated combinations with scores.

    Returns:
        DataFrame with columns: slices, label, score_comp_1, score_comp_2, ...

    Raises:
        ValueError: If find_selection() has not been called yet.
    """
    import pandas as pd

    if self._all_scores_df is None:
        raise ValueError("No scores available. Call find_selection() first.")
    return self._all_scores_df.copy()

HullClusteringSearch

Bases: SearchAlgorithm

Farthest-point greedy hull clustering (Neustroev et al., 2025).

Implements the greedy convex/conic hull clustering algorithm from Neustroev et al. (2025). At each iteration the algorithm selects the data point furthest from the current hull, i.e. the point with maximum projection error onto the hull spanned by the already- selected representatives. The first representative is the point furthest from the dataset mean.

This farthest-point strategy naturally selects extreme/boundary periods first, producing a hull that spans the data well.

The algorithm leaves weights=None in the result so that an external RepresentationModel (typically BlendedRepresentationModel) can compute the final soft-assignment weights.

Parameters:

Name Type Description Default
k int

Number of representative periods to select.

required
hull_type Literal['convex', 'conic']

Type of projection constraint. 'convex' enforces non-negative weights that sum to 1. 'conic' enforces only non-negativity.

'convex'
References

G. Neustroev, D. A. Tejada-Arango, G. Morales-Espana, M. M. de Weerdt. "Hull Clustering with Blended Representative Periods for Energy System Optimization Models." arXiv:2508.21641, 2025.

Examples:

Basic usage with blended representation:

>>> from energy_repset.search_algorithms import HullClusteringSearch
>>> from energy_repset.representation import BlendedRepresentationModel
>>> search = HullClusteringSearch(k=4, hull_type='convex')
>>> repr_model = BlendedRepresentationModel(blend_type='convex')
Source code in energy_repset/search_algorithms/hull_clustering.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class HullClusteringSearch(SearchAlgorithm):
    """Farthest-point greedy hull clustering (Neustroev et al., 2025).

    Implements the greedy convex/conic hull clustering algorithm from
    Neustroev et al. (2025).  At each iteration the algorithm selects
    the data point **furthest from the current hull**, i.e. the point
    with maximum projection error onto the hull spanned by the already-
    selected representatives.  The first representative is the point
    furthest from the dataset mean.

    This farthest-point strategy naturally selects extreme/boundary
    periods first, producing a hull that spans the data well.

    The algorithm leaves ``weights=None`` in the result so that an external
    ``RepresentationModel`` (typically ``BlendedRepresentationModel``) can
    compute the final soft-assignment weights.

    Args:
        k: Number of representative periods to select.
        hull_type: Type of projection constraint. ``'convex'`` enforces
            non-negative weights that sum to 1. ``'conic'`` enforces only
            non-negativity.

    References:
        G. Neustroev, D. A. Tejada-Arango, G. Morales-Espana,
        M. M. de Weerdt. "Hull Clustering with Blended Representative
        Periods for Energy System Optimization Models."
        arXiv:2508.21641, 2025.

    Examples:
        Basic usage with blended representation:

        >>> from energy_repset.search_algorithms import HullClusteringSearch
        >>> from energy_repset.representation import BlendedRepresentationModel
        >>> search = HullClusteringSearch(k=4, hull_type='convex')
        >>> repr_model = BlendedRepresentationModel(blend_type='convex')
    """

    def __init__(self, k: int, hull_type: Literal['convex', 'conic'] = 'convex'):
        """Initialize Hull Clustering search.

        Args:
            k: Number of hull vertices (representative periods) to select.
            hull_type: Projection type. ``'convex'`` requires weights >= 0 and
                sum(weights) == 1. ``'conic'`` requires only weights >= 0.
        """
        self.k = k
        self.hull_type = hull_type

    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Find k hull vertices via farthest-point greedy selection.

        The algorithm (Algorithm 2 in Neustroev et al.):

        1. Select the point furthest from the dataset mean.
        2. For iterations 2..k, compute the projection error (hull
           distance) for every remaining point and select the one with
           the **maximum** error.

        Args:
            context: Problem context with ``df_features`` populated.

        Returns:
            RepSetResult with the selected hull vertices, ``weights=None``
            (to be filled by an external representation model), and the
            final projection error in ``scores``.
        """
        Z = context.df_features.values
        labels = list(context.df_features.index)
        N = Z.shape[0]

        selected_idx, final_error = self._greedy_farthest_point(Z)

        selection = tuple(labels[i] for i in selected_idx)

        slice_labels = context.slicer.labels_for_index(context.df_raw.index)
        representatives = {
            s: context.df_raw.loc[slice_labels == s] for s in selection
        }

        return RepSetResult(
            context=context,
            selection_space='subset',
            selection=selection,
            scores={'projection_error': final_error},
            representatives=representatives,
            weights=None,
        )

    def _greedy_farthest_point(
        self, Z: np.ndarray
    ) -> tuple[list[int], float]:
        """Run the farthest-point greedy hull clustering.

        Args:
            Z: Feature matrix (N x p).

        Returns:
            Tuple of (selected indices, total projection error).
        """
        N = Z.shape[0]

        first_idx = self._init_furthest_from_mean(Z)
        selected_idx = [first_idx]
        remaining = set(range(N)) - {first_idx}

        hull_dists = np.full(N, np.inf)
        self._update_hull_distances(Z, selected_idx, remaining, hull_dists)

        for _ in range(self.k - 1):
            best = max(remaining, key=lambda i: hull_dists[i])
            selected_idx.append(best)
            remaining.discard(best)

            if remaining:
                self._update_hull_distances(
                    Z, selected_idx, remaining, hull_dists
                )

        final_error = sum(hull_dists[i] for i in range(N) if i not in selected_idx)
        return selected_idx, float(final_error)

    @staticmethod
    def _init_furthest_from_mean(Z: np.ndarray) -> int:
        """Select the point furthest from the dataset mean.

        Args:
            Z: Feature matrix (N x p).

        Returns:
            Index of the point with maximum squared distance to the mean.
        """
        mean_z = Z.mean(axis=0)
        dists = np.sum((Z - mean_z) ** 2, axis=1)
        return int(np.argmax(dists))

    def _update_hull_distances(
        self,
        Z: np.ndarray,
        selected_idx: list[int],
        remaining: set[int],
        hull_dists: np.ndarray,
    ) -> None:
        """Recompute hull distances for remaining points.

        Args:
            Z: Feature matrix (N x p).
            selected_idx: Currently selected hull vertex indices.
            remaining: Set of indices still available for selection.
            hull_dists: Array to update in-place with new hull distances.
        """
        Z_sel = Z[selected_idx]
        for i in remaining:
            hull_dists[i] = self._projection_error(Z[i], Z_sel)

    def _projection_error(self, z: np.ndarray, Z_sel: np.ndarray) -> float:
        """Compute projection error for a single point onto the hull.

        Args:
            z: Feature vector for one period (length p).
            Z_sel: Feature matrix of selected hull vertices (k_current x p).

        Returns:
            Squared L2 projection error.
        """
        if self.hull_type == 'conic':
            return self._projection_error_conic(z, Z_sel)
        return self._projection_error_convex(z, Z_sel)

    def _projection_error_conic(
        self, z: np.ndarray, Z_sel: np.ndarray
    ) -> float:
        """Conic projection: min_w ||z - Z_sel^T @ w||^2, w >= 0."""
        w, _ = nnls(Z_sel.T, z)
        reconstruction = Z_sel.T @ w
        return float(np.sum((z - reconstruction) ** 2))

    def _projection_error_convex(
        self, z: np.ndarray, Z_sel: np.ndarray
    ) -> float:
        """Convex projection: min_w ||z - w @ Z_sel||^2, w >= 0, sum(w) = 1."""
        k_sel = Z_sel.shape[0]
        if k_sel == 1:
            return float(np.sum((z - Z_sel[0]) ** 2))

        def objective(w):
            return np.sum((z - w @ Z_sel) ** 2)

        w0 = np.ones(k_sel) / k_sel
        bounds = [(0.0, 1.0)] * k_sel
        constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}

        result = minimize(
            objective, w0, method='SLSQP',
            bounds=bounds, constraints=constraints,
            options={'ftol': 1e-10, 'maxiter': 200},
        )
        return float(result.fun)

__init__

__init__(k: int, hull_type: Literal['convex', 'conic'] = 'convex')

Initialize Hull Clustering search.

Parameters:

Name Type Description Default
k int

Number of hull vertices (representative periods) to select.

required
hull_type Literal['convex', 'conic']

Projection type. 'convex' requires weights >= 0 and sum(weights) == 1. 'conic' requires only weights >= 0.

'convex'
Source code in energy_repset/search_algorithms/hull_clustering.py
53
54
55
56
57
58
59
60
61
62
def __init__(self, k: int, hull_type: Literal['convex', 'conic'] = 'convex'):
    """Initialize Hull Clustering search.

    Args:
        k: Number of hull vertices (representative periods) to select.
        hull_type: Projection type. ``'convex'`` requires weights >= 0 and
            sum(weights) == 1. ``'conic'`` requires only weights >= 0.
    """
    self.k = k
    self.hull_type = hull_type

find_selection

find_selection(context: ProblemContext) -> RepSetResult

Find k hull vertices via farthest-point greedy selection.

The algorithm (Algorithm 2 in Neustroev et al.):

  1. Select the point furthest from the dataset mean.
  2. For iterations 2..k, compute the projection error (hull distance) for every remaining point and select the one with the maximum error.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with df_features populated.

required

Returns:

Type Description
RepSetResult

RepSetResult with the selected hull vertices, weights=None

RepSetResult

(to be filled by an external representation model), and the

RepSetResult

final projection error in scores.

Source code in energy_repset/search_algorithms/hull_clustering.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Find k hull vertices via farthest-point greedy selection.

    The algorithm (Algorithm 2 in Neustroev et al.):

    1. Select the point furthest from the dataset mean.
    2. For iterations 2..k, compute the projection error (hull
       distance) for every remaining point and select the one with
       the **maximum** error.

    Args:
        context: Problem context with ``df_features`` populated.

    Returns:
        RepSetResult with the selected hull vertices, ``weights=None``
        (to be filled by an external representation model), and the
        final projection error in ``scores``.
    """
    Z = context.df_features.values
    labels = list(context.df_features.index)
    N = Z.shape[0]

    selected_idx, final_error = self._greedy_farthest_point(Z)

    selection = tuple(labels[i] for i in selected_idx)

    slice_labels = context.slicer.labels_for_index(context.df_raw.index)
    representatives = {
        s: context.df_raw.loc[slice_labels == s] for s in selection
    }

    return RepSetResult(
        context=context,
        selection_space='subset',
        selection=selection,
        scores={'projection_error': final_error},
        representatives=representatives,
        weights=None,
    )

CTPCSearch

Bases: SearchAlgorithm

Chronological Time-Period Clustering with contiguity constraint.

Implements hierarchical agglomerative clustering where only temporally adjacent periods may merge, producing k contiguous time segments. Based on Pineda & Morales (2018).

The algorithm computes weights as the fraction of time covered by each segment, so the external representation model is skipped when the result is used in RepSetExperiment.run().

Parameters:

Name Type Description Default
k int

Number of contiguous time segments to produce.

required
linkage Literal['ward', 'complete', 'average', 'single']

Linkage criterion for agglomerative clustering. One of 'ward', 'complete', 'average', or 'single'.

'ward'
References

S. Pineda, J. M. Morales. "Chronological Time-Period Clustering for Optimal Capacity Expansion Planning With Storage." IEEE Trans. Power Syst., 33(6), 7162--7170, 2018.

Examples:

Basic usage:

>>> from energy_repset.search_algorithms import CTPCSearch
>>> search = CTPCSearch(k=4, linkage='ward')
>>> result = search.find_selection(feature_context)
>>> result.selection  # Tuple of medoid labels
>>> result.weights    # Dict mapping labels to time fractions
Source code in energy_repset/search_algorithms/ctpc.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class CTPCSearch(SearchAlgorithm):
    """Chronological Time-Period Clustering with contiguity constraint.

    Implements hierarchical agglomerative clustering where only temporally
    adjacent periods may merge, producing k contiguous time segments.
    Based on Pineda & Morales (2018).

    The algorithm computes weights as the fraction of time covered by each
    segment, so the external representation model is skipped when the result
    is used in ``RepSetExperiment.run()``.

    Args:
        k: Number of contiguous time segments to produce.
        linkage: Linkage criterion for agglomerative clustering. One of
            ``'ward'``, ``'complete'``, ``'average'``, or ``'single'``.

    References:
        S. Pineda, J. M. Morales. "Chronological Time-Period Clustering
        for Optimal Capacity Expansion Planning With Storage."
        IEEE Trans. Power Syst., 33(6), 7162--7170, 2018.

    Examples:
        Basic usage:

        >>> from energy_repset.search_algorithms import CTPCSearch
        >>> search = CTPCSearch(k=4, linkage='ward')
        >>> result = search.find_selection(feature_context)
        >>> result.selection  # Tuple of medoid labels
        >>> result.weights    # Dict mapping labels to time fractions
    """

    def __init__(
        self,
        k: int,
        linkage: Literal['ward', 'complete', 'average', 'single'] = 'ward',
    ):
        """Initialize CTPC search.

        Args:
            k: Number of contiguous clusters to produce.
            linkage: Agglomerative linkage criterion.
        """
        self.k = k
        self.linkage = linkage

    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Run contiguity-constrained hierarchical clustering.

        Args:
            context: Problem context with ``df_features`` populated. Slices
                must be naturally ordered by time (which they are when coming
                from ``TimeSlicer``).

        Returns:
            RepSetResult with medoid (or centroid) labels as the selection,
            pre-computed weights (segment size fractions), and within-cluster
            sum of squares in ``scores``.
        """
        Z = context.df_features.values
        labels = list(context.df_features.index)
        N = Z.shape[0]

        connectivity = self._build_connectivity(N)

        clustering = AgglomerativeClustering(
            n_clusters=self.k,
            connectivity=connectivity,
            linkage=self.linkage,
        )
        cluster_labels = clustering.fit_predict(Z)

        selection, weights, wcss, diagnostics = self._extract_results(
            Z, labels, cluster_labels
        )

        slice_labels = context.slicer.labels_for_index(context.df_raw.index)
        representatives = {
            s: context.df_raw.loc[slice_labels == s] for s in selection
        }

        return RepSetResult(
            context=context,
            selection_space='chronological',
            selection=selection,
            scores={'wcss': wcss},
            representatives=representatives,
            weights=weights,
            diagnostics=diagnostics,
        )

    def _build_connectivity(self, n: int) -> np.ndarray:
        """Build tridiagonal connectivity matrix for n slices.

        Args:
            n: Number of time slices.

        Returns:
            Sparse-like (n x n) binary adjacency matrix connecting only
            temporally adjacent slices.
        """
        off_diag = np.ones(n - 1)
        return diags([off_diag, np.ones(n), off_diag], [-1, 0, 1]).toarray()

    def _extract_results(
        self,
        Z: np.ndarray,
        labels: list,
        cluster_labels: np.ndarray,
    ) -> tuple:
        """Extract selection, weights, WCSS, and diagnostics from clustering.

        Args:
            Z: Feature matrix (N x p).
            labels: Slice labels aligned with rows of Z.
            cluster_labels: Cluster assignment for each slice (length N).

        Returns:
            Tuple of (selection, weights, wcss, diagnostics).
        """
        unique_clusters = sorted(set(cluster_labels))
        N = len(labels)
        wcss = 0.0

        selected_labels = []
        weight_dict: Dict = {}
        segment_info: list[Dict[str, Any]] = []

        for c in unique_clusters:
            mask = cluster_labels == c
            indices = np.where(mask)[0]
            cluster_Z = Z[indices]
            centroid = cluster_Z.mean(axis=0)

            cluster_wcss = np.sum((cluster_Z - centroid) ** 2)
            wcss += cluster_wcss

            dists = np.sum((cluster_Z - centroid) ** 2, axis=1)
            medoid_local = int(np.argmin(dists))
            rep_idx = indices[medoid_local]
            rep_label = labels[rep_idx]

            fraction = len(indices) / N
            selected_labels.append(rep_label)
            weight_dict[rep_label] = fraction

            segment_info.append({
                'cluster': c,
                'start': labels[indices[0]],
                'end': labels[indices[-1]],
                'size': len(indices),
                'representative': rep_label,
            })

        segment_info.sort(key=lambda seg: seg['start'])

        selection = tuple(selected_labels)
        diagnostics = {
            'cluster_labels': cluster_labels.tolist(),
            'segments': segment_info,
        }

        return selection, weight_dict, float(wcss), diagnostics

__init__

__init__(k: int, linkage: Literal['ward', 'complete', 'average', 'single'] = 'ward')

Initialize CTPC search.

Parameters:

Name Type Description Default
k int

Number of contiguous clusters to produce.

required
linkage Literal['ward', 'complete', 'average', 'single']

Agglomerative linkage criterion.

'ward'
Source code in energy_repset/search_algorithms/ctpc.py
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    k: int,
    linkage: Literal['ward', 'complete', 'average', 'single'] = 'ward',
):
    """Initialize CTPC search.

    Args:
        k: Number of contiguous clusters to produce.
        linkage: Agglomerative linkage criterion.
    """
    self.k = k
    self.linkage = linkage

find_selection

find_selection(context: ProblemContext) -> RepSetResult

Run contiguity-constrained hierarchical clustering.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with df_features populated. Slices must be naturally ordered by time (which they are when coming from TimeSlicer).

required

Returns:

Type Description
RepSetResult

RepSetResult with medoid (or centroid) labels as the selection,

RepSetResult

pre-computed weights (segment size fractions), and within-cluster

RepSetResult

sum of squares in scores.

Source code in energy_repset/search_algorithms/ctpc.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Run contiguity-constrained hierarchical clustering.

    Args:
        context: Problem context with ``df_features`` populated. Slices
            must be naturally ordered by time (which they are when coming
            from ``TimeSlicer``).

    Returns:
        RepSetResult with medoid (or centroid) labels as the selection,
        pre-computed weights (segment size fractions), and within-cluster
        sum of squares in ``scores``.
    """
    Z = context.df_features.values
    labels = list(context.df_features.index)
    N = Z.shape[0]

    connectivity = self._build_connectivity(N)

    clustering = AgglomerativeClustering(
        n_clusters=self.k,
        connectivity=connectivity,
        linkage=self.linkage,
    )
    cluster_labels = clustering.fit_predict(Z)

    selection, weights, wcss, diagnostics = self._extract_results(
        Z, labels, cluster_labels
    )

    slice_labels = context.slicer.labels_for_index(context.df_raw.index)
    representatives = {
        s: context.df_raw.loc[slice_labels == s] for s in selection
    }

    return RepSetResult(
        context=context,
        selection_space='chronological',
        selection=selection,
        scores={'wcss': wcss},
        representatives=representatives,
        weights=weights,
        diagnostics=diagnostics,
    )

SnippetSearch

Bases: SearchAlgorithm

Greedy p-median selection of multi-day representative subsequences.

Implements a greedy approximation of the Snippet algorithm from Anderson et al. (2024). Selects k sliding-window subsequences of period_length_days days each, minimizing the total day-level distance across the full time horizon.

Each candidate subsequence contains period_length_days daily profile snippets. The distance from any day to a candidate is the minimum squared Euclidean distance to any of its constituent daily snippets. The greedy selection picks the candidate with the greatest total cost reduction at each iteration.

The original paper solves the selection as a MILP; this implementation uses a greedy p-median heuristic which provides a (1 - 1/e) approximation guarantee.

Requires context.slicer.unit == 'day'.

Parameters:

Name Type Description Default
k int

Number of representative subsequences to select.

required
period_length_days int

Length of each candidate subsequence in days.

7
step_days int

Stride between consecutive sliding-window candidates.

1
References

O. Anderson, N. Yu, K. Oikonomou, D. Wu. "On the Selection of Intermediate Length Representative Periods for Capacity Expansion." arXiv:2401.02888, 2024.

Examples:

Basic usage with daily slicing:

>>> from energy_repset.search_algorithms import SnippetSearch
>>> from energy_repset.time_slicer import TimeSlicer
>>> slicer = TimeSlicer(unit='day')
>>> search = SnippetSearch(k=8, period_length_days=7, step_days=1)
Source code in energy_repset/search_algorithms/snippet.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
class SnippetSearch(SearchAlgorithm):
    """Greedy p-median selection of multi-day representative subsequences.

    Implements a greedy approximation of the Snippet algorithm from
    Anderson et al. (2024).  Selects k sliding-window subsequences of
    ``period_length_days`` days each, minimizing the total day-level
    distance across the full time horizon.

    Each candidate subsequence contains ``period_length_days`` daily profile
    snippets. The distance from any day to a candidate is the minimum
    squared Euclidean distance to any of its constituent daily snippets.
    The greedy selection picks the candidate with the greatest total cost
    reduction at each iteration.

    The original paper solves the selection as a MILP; this implementation
    uses a greedy p-median heuristic which provides a (1 - 1/e)
    approximation guarantee.

    Requires ``context.slicer.unit == 'day'``.

    Args:
        k: Number of representative subsequences to select.
        period_length_days: Length of each candidate subsequence in days.
        step_days: Stride between consecutive sliding-window candidates.

    References:
        O. Anderson, N. Yu, K. Oikonomou, D. Wu. "On the Selection of
        Intermediate Length Representative Periods for Capacity Expansion."
        arXiv:2401.02888, 2024.

    Examples:
        Basic usage with daily slicing:

        >>> from energy_repset.search_algorithms import SnippetSearch
        >>> from energy_repset.time_slicer import TimeSlicer
        >>> slicer = TimeSlicer(unit='day')
        >>> search = SnippetSearch(k=8, period_length_days=7, step_days=1)
    """

    def __init__(
        self, k: int, period_length_days: int = 7, step_days: int = 1,
    ):
        """Initialize Snippet search.

        Args:
            k: Number of representative subsequences to select.
            period_length_days: Number of days in each candidate subsequence.
            step_days: Stride between consecutive candidate start positions.
        """
        self.k = k
        self.period_length_days = period_length_days
        self.step_days = step_days

    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Find k representative subsequences via greedy p-median selection.

        Args:
            context: Problem context. Must have ``slicer.unit == 'day'``.
                Feature engineering should provide daily profile vectors in
                ``df_features``, but the algorithm can also build profiles
                from ``df_raw`` directly.

        Returns:
            RepSetResult with selected starting-day labels, pre-computed
            weights (fraction of days assigned), and total distance score.

        Raises:
            ValueError: If ``context.slicer.unit`` is not ``'day'``.
        """
        if context.slicer.unit != 'day':
            raise ValueError(
                f"SnippetSearch requires daily slicing (unit='day'), "
                f"got unit='{context.slicer.unit}'."
            )

        daily_profiles = self._build_daily_profiles(context)
        N = daily_profiles.shape[0]
        day_labels = list(context.df_features.index)
        L = self.period_length_days

        candidates = self._generate_candidates(N, L, self.step_days)
        if len(candidates) < self.k:
            raise ValueError(
                f"Only {len(candidates)} candidate subsequences available, "
                f"but k={self.k} requested. Reduce k or period_length_days."
            )

        dist_matrix = self._compute_distance_matrix(daily_profiles, candidates)

        selected_candidates, per_day_min = self._greedy_select(
            dist_matrix, self.k
        )

        assignments = np.argmin(
            dist_matrix[:, selected_candidates], axis=1
        )

        selection_labels = []
        weight_dict = {}
        for local_idx, cand_idx in enumerate(selected_candidates):
            start_day = candidates[cand_idx][0]
            label = day_labels[start_day]
            selection_labels.append(label)
            n_assigned = int(np.sum(assignments == local_idx))
            weight_dict[label] = n_assigned / N

        selection = tuple(selection_labels)
        total_distance = float(np.sum(per_day_min))

        slice_labels = context.slicer.labels_for_index(context.df_raw.index)
        representatives = {}
        for label in selection:
            start_idx = day_labels.index(label)
            end_idx = min(start_idx + L, N)
            period_labels = day_labels[start_idx:end_idx]
            mask = slice_labels.isin(set(period_labels))
            representatives[label] = context.df_raw.loc[mask]

        return RepSetResult(
            context=context,
            selection_space='subset',
            selection=selection,
            scores={'total_distance': total_distance},
            representatives=representatives,
            weights=weight_dict,
            diagnostics={
                'assignments': assignments.tolist(),
                'candidate_starts': [
                    day_labels[candidates[c][0]] for c in selected_candidates
                ],
            },
        )

    def _build_daily_profiles(self, context: ProblemContext) -> np.ndarray:
        """Build daily profile vectors from context features.

        Args:
            context: Problem context with ``df_features`` populated.

        Returns:
            Array of shape (N_days, n_features) with one row per day.
        """
        return context.df_features.values

    def _generate_candidates(
        self, n_days: int, length: int, step: int
    ) -> list[list[int]]:
        """Generate sliding-window candidate subsequences.

        Args:
            n_days: Total number of days.
            length: Number of days per subsequence.
            step: Stride between consecutive candidates.

        Returns:
            List of candidates, each a list of day indices.
        """
        candidates = []
        start = 0
        while start + length <= n_days:
            candidates.append(list(range(start, start + length)))
            start += step
        return candidates

    def _compute_distance_matrix(
        self, profiles: np.ndarray, candidates: list[list[int]]
    ) -> np.ndarray:
        """Compute distance from each day to each candidate.

        For each (day, candidate) pair, the distance is the minimum Euclidean
        distance between the day's profile and any of the candidate's daily
        snippet profiles.

        Args:
            profiles: Daily profile matrix (N x p).
            candidates: List of candidate subsequences (each a list of day indices).

        Returns:
            Distance matrix of shape (N, C) where C is the number of candidates.
        """
        N = profiles.shape[0]
        C = len(candidates)
        dist_matrix = np.empty((N, C))

        for j, cand_days in enumerate(candidates):
            cand_profiles = profiles[cand_days]
            for i in range(N):
                diffs = cand_profiles - profiles[i]
                dists = np.sum(diffs ** 2, axis=1)
                dist_matrix[i, j] = np.min(dists)

        return dist_matrix

    def _greedy_select(
        self, dist_matrix: np.ndarray, k: int
    ) -> tuple[list[int], np.ndarray]:
        """Greedy p-median selection of k candidates.

        At each iteration, selects the candidate that most reduces the total
        per-day minimum distance.

        Args:
            dist_matrix: Distance matrix (N x C).
            k: Number of candidates to select.

        Returns:
            Tuple of (selected candidate indices, per-day minimum distances).
        """
        N = dist_matrix.shape[0]
        per_day_min = np.full(N, np.inf)
        selected: list[int] = []

        for _ in range(k):
            best_candidate = -1
            best_reduction = -np.inf

            for j in range(dist_matrix.shape[1]):
                if j in selected:
                    continue
                new_min = np.minimum(per_day_min, dist_matrix[:, j])
                reduction = np.sum(per_day_min) - np.sum(new_min)
                if reduction > best_reduction:
                    best_reduction = reduction
                    best_candidate = j

            selected.append(best_candidate)
            per_day_min = np.minimum(per_day_min, dist_matrix[:, best_candidate])

        return selected, per_day_min

__init__

__init__(k: int, period_length_days: int = 7, step_days: int = 1)

Initialize Snippet search.

Parameters:

Name Type Description Default
k int

Number of representative subsequences to select.

required
period_length_days int

Number of days in each candidate subsequence.

7
step_days int

Stride between consecutive candidate start positions.

1
Source code in energy_repset/search_algorithms/snippet.py
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self, k: int, period_length_days: int = 7, step_days: int = 1,
):
    """Initialize Snippet search.

    Args:
        k: Number of representative subsequences to select.
        period_length_days: Number of days in each candidate subsequence.
        step_days: Stride between consecutive candidate start positions.
    """
    self.k = k
    self.period_length_days = period_length_days
    self.step_days = step_days

find_selection

find_selection(context: ProblemContext) -> RepSetResult

Find k representative subsequences via greedy p-median selection.

Parameters:

Name Type Description Default
context ProblemContext

Problem context. Must have slicer.unit == 'day'. Feature engineering should provide daily profile vectors in df_features, but the algorithm can also build profiles from df_raw directly.

required

Returns:

Type Description
RepSetResult

RepSetResult with selected starting-day labels, pre-computed

RepSetResult

weights (fraction of days assigned), and total distance score.

Raises:

Type Description
ValueError

If context.slicer.unit is not 'day'.

Source code in energy_repset/search_algorithms/snippet.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Find k representative subsequences via greedy p-median selection.

    Args:
        context: Problem context. Must have ``slicer.unit == 'day'``.
            Feature engineering should provide daily profile vectors in
            ``df_features``, but the algorithm can also build profiles
            from ``df_raw`` directly.

    Returns:
        RepSetResult with selected starting-day labels, pre-computed
        weights (fraction of days assigned), and total distance score.

    Raises:
        ValueError: If ``context.slicer.unit`` is not ``'day'``.
    """
    if context.slicer.unit != 'day':
        raise ValueError(
            f"SnippetSearch requires daily slicing (unit='day'), "
            f"got unit='{context.slicer.unit}'."
        )

    daily_profiles = self._build_daily_profiles(context)
    N = daily_profiles.shape[0]
    day_labels = list(context.df_features.index)
    L = self.period_length_days

    candidates = self._generate_candidates(N, L, self.step_days)
    if len(candidates) < self.k:
        raise ValueError(
            f"Only {len(candidates)} candidate subsequences available, "
            f"but k={self.k} requested. Reduce k or period_length_days."
        )

    dist_matrix = self._compute_distance_matrix(daily_profiles, candidates)

    selected_candidates, per_day_min = self._greedy_select(
        dist_matrix, self.k
    )

    assignments = np.argmin(
        dist_matrix[:, selected_candidates], axis=1
    )

    selection_labels = []
    weight_dict = {}
    for local_idx, cand_idx in enumerate(selected_candidates):
        start_day = candidates[cand_idx][0]
        label = day_labels[start_day]
        selection_labels.append(label)
        n_assigned = int(np.sum(assignments == local_idx))
        weight_dict[label] = n_assigned / N

    selection = tuple(selection_labels)
    total_distance = float(np.sum(per_day_min))

    slice_labels = context.slicer.labels_for_index(context.df_raw.index)
    representatives = {}
    for label in selection:
        start_idx = day_labels.index(label)
        end_idx = min(start_idx + L, N)
        period_labels = day_labels[start_idx:end_idx]
        mask = slice_labels.isin(set(period_labels))
        representatives[label] = context.df_raw.loc[mask]

    return RepSetResult(
        context=context,
        selection_space='subset',
        selection=selection,
        scores={'total_distance': total_distance},
        representatives=representatives,
        weights=weight_dict,
        diagnostics={
            'assignments': assignments.tolist(),
            'candidate_starts': [
                day_labels[candidates[c][0]] for c in selected_candidates
            ],
        },
    )

KMedoidsSearch

Bases: SearchAlgorithm

K-medoids clustering for representative subset selection.

Wraps sklearn_extra.cluster.KMedoids to partition feature-space slices into k clusters and select the medoid of each cluster as a representative period. Weights are computed as the fraction of slices assigned to each cluster.

This is a constructive (Workflow Type 2) algorithm: it has its own internal objective and does not require an external ObjectiveSet. The RepresentationModel is skipped by RepSetExperiment.run() because weights are pre-computed.

Parameters:

Name Type Description Default
k int

Number of clusters / representative periods.

required
metric str

Distance metric for k-medoids (default 'euclidean').

'euclidean'
method str

K-medoids algorithm variant. 'alternate' (default) or 'pam' (Partitioning Around Medoids, slower but optimal).

'alternate'
init str

Initialization method (default 'k-medoids++').

'k-medoids++'
random_state int | None

Seed for reproducibility.

None
max_iter int

Maximum number of iterations.

300

Examples:

Basic usage:

>>> from energy_repset.search_algorithms import KMedoidsSearch
>>> search = KMedoidsSearch(k=4, random_state=42)
>>> result = search.find_selection(feature_context)
>>> result.selection  # Tuple of medoid labels
>>> result.weights    # Dict mapping labels to cluster-size fractions
Source code in energy_repset/search_algorithms/clustering.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class KMedoidsSearch(SearchAlgorithm):
    """K-medoids clustering for representative subset selection.

    Wraps ``sklearn_extra.cluster.KMedoids`` to partition feature-space
    slices into k clusters and select the medoid of each cluster as a
    representative period. Weights are computed as the fraction of slices
    assigned to each cluster.

    This is a constructive (Workflow Type 2) algorithm: it has its own
    internal objective and does not require an external ``ObjectiveSet``.
    The ``RepresentationModel`` is skipped by ``RepSetExperiment.run()``
    because weights are pre-computed.

    Args:
        k: Number of clusters / representative periods.
        metric: Distance metric for k-medoids (default ``'euclidean'``).
        method: K-medoids algorithm variant. ``'alternate'`` (default) or
            ``'pam'`` (Partitioning Around Medoids, slower but optimal).
        init: Initialization method (default ``'k-medoids++'``).
        random_state: Seed for reproducibility.
        max_iter: Maximum number of iterations.

    Examples:
        Basic usage:

        >>> from energy_repset.search_algorithms import KMedoidsSearch
        >>> search = KMedoidsSearch(k=4, random_state=42)
        >>> result = search.find_selection(feature_context)
        >>> result.selection  # Tuple of medoid labels
        >>> result.weights    # Dict mapping labels to cluster-size fractions
    """

    def __init__(
        self,
        k: int,
        metric: str = 'euclidean',
        method: str = 'alternate',
        init: str = 'k-medoids++',
        random_state: int | None = None,
        max_iter: int = 300,
    ):
        """Initialize k-medoids clustering search.

        Args:
            k: Number of clusters to produce.
            metric: Distance metric passed to ``KMedoids``.
            method: Algorithm variant (``'alternate'`` or ``'pam'``).
            init: Medoid initialization strategy.
            random_state: Random seed for reproducibility.
            max_iter: Maximum iterations for convergence.
        """
        self.k = k
        self.metric = metric
        self.method = method
        self.init = init
        self.random_state = random_state
        self.max_iter = max_iter

    def find_selection(self, context: ProblemContext) -> RepSetResult:
        """Run k-medoids clustering on the feature space.

        Args:
            context: Problem context with ``df_features`` populated.

        Returns:
            RepSetResult with medoid labels as the selection, pre-computed
            cluster-size-proportional weights, and WCSS (Within-Cluster Sum
            of Squares) in ``scores``.
        """
        Z = context.df_features.values
        labels = list(context.df_features.index)

        model = KMedoids(
            n_clusters=self.k,
            metric=self.metric,
            method=self.method,
            init=self.init,
            random_state=self.random_state,
            max_iter=self.max_iter,
        )
        model.fit(Z)

        selection, weights, wcss, diagnostics = self._extract_results(
            Z, labels, model.labels_, model.medoid_indices_
        )
        diagnostics['inertia'] = float(model.inertia_)
        diagnostics['n_iter'] = int(model.n_iter_)

        slice_labels = context.slicer.labels_for_index(context.df_raw.index)
        representatives = {
            s: context.df_raw.loc[slice_labels == s] for s in selection
        }

        return RepSetResult(
            context=context,
            selection_space='subset',
            selection=selection,
            scores={'wcss': wcss},
            representatives=representatives,
            weights=weights,
            diagnostics=diagnostics,
        )

    def _extract_results(
        self,
        Z: np.ndarray,
        labels: list,
        cluster_labels: np.ndarray,
        medoid_indices: np.ndarray,
    ) -> tuple:
        """Extract selection, weights, WCSS, and diagnostics from clustering.

        Args:
            Z: Feature matrix (N x p).
            labels: Slice labels aligned with rows of Z.
            cluster_labels: Cluster assignment for each slice (length N).
            medoid_indices: Row indices of medoids in Z.

        Returns:
            Tuple of (selection, weights, wcss, diagnostics).
        """
        unique_clusters = sorted(set(cluster_labels))
        N = len(labels)
        wcss = 0.0

        selected_labels = []
        weight_dict: Dict[Any, float] = {}
        cluster_info: list[Dict[str, Any]] = []

        for c in unique_clusters:
            mask = cluster_labels == c
            indices = np.where(mask)[0]
            cluster_Z = Z[indices]
            centroid = cluster_Z.mean(axis=0)

            cluster_wcss = float(np.sum((cluster_Z - centroid) ** 2))
            wcss += cluster_wcss

            medoid_idx = medoid_indices[c]
            rep_label = labels[medoid_idx]

            fraction = len(indices) / N
            selected_labels.append(rep_label)
            weight_dict[rep_label] = fraction

            member_labels = [labels[i] for i in indices]
            cluster_info.append({
                'cluster': int(c),
                'medoid': rep_label,
                'size': len(indices),
                'members': member_labels,
            })

        selection = tuple(selected_labels)
        diagnostics = {
            'cluster_labels': cluster_labels.tolist(),
            'cluster_info': cluster_info,
        }

        return selection, weight_dict, float(wcss), diagnostics

__init__

__init__(k: int, metric: str = 'euclidean', method: str = 'alternate', init: str = 'k-medoids++', random_state: int | None = None, max_iter: int = 300)

Initialize k-medoids clustering search.

Parameters:

Name Type Description Default
k int

Number of clusters to produce.

required
metric str

Distance metric passed to KMedoids.

'euclidean'
method str

Algorithm variant ('alternate' or 'pam').

'alternate'
init str

Medoid initialization strategy.

'k-medoids++'
random_state int | None

Random seed for reproducibility.

None
max_iter int

Maximum iterations for convergence.

300
Source code in energy_repset/search_algorithms/clustering.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
    self,
    k: int,
    metric: str = 'euclidean',
    method: str = 'alternate',
    init: str = 'k-medoids++',
    random_state: int | None = None,
    max_iter: int = 300,
):
    """Initialize k-medoids clustering search.

    Args:
        k: Number of clusters to produce.
        metric: Distance metric passed to ``KMedoids``.
        method: Algorithm variant (``'alternate'`` or ``'pam'``).
        init: Medoid initialization strategy.
        random_state: Random seed for reproducibility.
        max_iter: Maximum iterations for convergence.
    """
    self.k = k
    self.metric = metric
    self.method = method
    self.init = init
    self.random_state = random_state
    self.max_iter = max_iter

find_selection

find_selection(context: ProblemContext) -> RepSetResult

Run k-medoids clustering on the feature space.

Parameters:

Name Type Description Default
context ProblemContext

Problem context with df_features populated.

required

Returns:

Type Description
RepSetResult

RepSetResult with medoid labels as the selection, pre-computed

RepSetResult

cluster-size-proportional weights, and WCSS (Within-Cluster Sum

RepSetResult

of Squares) in scores.

Source code in energy_repset/search_algorithms/clustering.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def find_selection(self, context: ProblemContext) -> RepSetResult:
    """Run k-medoids clustering on the feature space.

    Args:
        context: Problem context with ``df_features`` populated.

    Returns:
        RepSetResult with medoid labels as the selection, pre-computed
        cluster-size-proportional weights, and WCSS (Within-Cluster Sum
        of Squares) in ``scores``.
    """
    Z = context.df_features.values
    labels = list(context.df_features.index)

    model = KMedoids(
        n_clusters=self.k,
        metric=self.metric,
        method=self.method,
        init=self.init,
        random_state=self.random_state,
        max_iter=self.max_iter,
    )
    model.fit(Z)

    selection, weights, wcss, diagnostics = self._extract_results(
        Z, labels, model.labels_, model.medoid_indices_
    )
    diagnostics['inertia'] = float(model.inertia_)
    diagnostics['n_iter'] = int(model.n_iter_)

    slice_labels = context.slicer.labels_for_index(context.df_raw.index)
    representatives = {
        s: context.df_raw.loc[slice_labels == s] for s in selection
    }

    return RepSetResult(
        context=context,
        selection_space='subset',
        selection=selection,
        scores={'wcss': wcss},
        representatives=representatives,
        weights=weights,
        diagnostics=diagnostics,
    )