Skip to content

Postprocessing

Post-processing and visualization of optimization results.

This module provides tools to extract, process, and visualize results from solved optimization models. The PostProcessor class handles denormalization of scaled results, data extraction into DataFrames, and creation of publication-quality plots.

Key Classes

  • PostProcessor: Extract and visualize optimization results

Available Methods

Data Extraction

  • get_impacts(): Extract impact results as DataFrame
  • get_installation(): Extract installation schedules
  • get_operation(): Extract operation profiles
  • get_production(): Extract production quantities
  • get_demand(): Extract demand values

Visualization

  • plot_impacts(): Stacked bar chart of impacts over time
  • plot_installation(): Installation schedule visualization
  • plot_operation(): Operation levels over time
  • plot_capacity_balance(): Production vs capacity comparison
  • plot_utilization_heatmap(): Capacity utilization heatmap by process

Module Reference

Post-processing and visualization of optimization results.

This module provides tools to extract, process, and visualize results from solved optimization models. The PostProcessor class handles denormalization of scaled results, data extraction into DataFrames, and creation of publication-quality plots for impacts, installation schedules, production, and operation profiles.

Key classes: - PostProcessor: Extract and visualize optimization results

Classes

PostProcessor(solved_model: pyo.ConcreteModel, plot_config: dict = None)

A class for post-processing and visualizing results from a solved Pyomo model.

This class provides plotting utilities with configurable styles for generating visualizations such as stacked bar charts, line plots, etc., from model outputs.

Parameters:

Name Type Description Default
solved_model ConcreteModel

A solved Pyomo model instance containing the data to be processed and visualized.

required
plot_config dict

A dictionary of plot styling options to override default settings. Recognized keys include: - "figsize" : tuple of (width, height) in inches - "fontsize" : int, font size for labels and titles - "grid_alpha" : float, transparency of grid lines - "grid_linestyle" : str, line style for grid (e.g., "--", ":", "-.") - "rotation" : int, angle of x-axis tick label rotation - "bar_width" : float, width of bars in bar charts - "colormap" : list of colors used for plotting - "line_color" : str, color of lines in line plots - "line_marker" : str, marker style for line plots - "line_width" : float, width of lines in line plots - "max_xticks" : int, maximum number of x-axis ticks to display

Unrecognized keys are ignored.

None

Attributes:

Name Type Description
m ConcreteModel

The solved Pyomo model.

_plot_config dict

The finalized configuration dictionary used for plotting.

Source code in src/optimex/postprocessing.py
def __init__(self, solved_model: pyo.ConcreteModel, plot_config: dict = None):
    self.m = solved_model

    # Default plot config
    default_config = {
        "figsize": (6, 3),
        "fontsize": 10,
        "label_fontsize": 11,
        "title_fontsize": 12,
        "legend_fontsize": 9,
        "grid_alpha": 0.3,
        "grid_linestyle": "-",
        "rotation": 45,
        "bar_width": 0.65,
        "colormap": [
            "#00549F",  # RWTH Blau
            "#F6A800",  # RWTH Gelb
            "#57AB27",  # RWTH Gruen
            "#CC071E",  # RWTH Rot
            "#612158",  # RWTH Violett
            "#A11035",  # RWTH Bordeaux
            "#7A6FAC",  # RWTH Lila
            "#006165",  # RWTH Petrol
            "#BDCD00",  # RWTH Maigruen
            "#0098A1",  # RWTH Tuerkis
        ],
        "color_map": None,
        "bar_edgecolor": "white",
        "bar_linewidth": 1,
        "line_color": "#000000",
        "line_marker": "o",
        "line_width": 1.5,
        "max_xticks": 10,
        "subplot_ncols": 1,
    }

    # If user provided config, update defaults with it
    if plot_config:
        default_config.update(
            {k: v for k, v in plot_config.items() if k in default_config}
        )

    self._plot_config = default_config

    # Create consistent color mapping for all processes and products
    self._color_map = self._create_color_map()

    # Pre-populate cache for code -> name lookups (batch load for performance)
    self._name_cache = self._build_name_cache()

Functions

get_impacts() -> pd.DataFrame

Extract environmental impacts by category, process, and time.

Returns denormalized impact values from the solved optimization model, organized as a pivoted DataFrame with time as rows and (category, process) as column MultiIndex.

Returns:

Type Description
DataFrame

Pivoted DataFrame with 'Time' as index and MultiIndex columns for (Category, Process) combinations. Values represent environmental impacts in the units of the characterization method.

Source code in src/optimex/postprocessing.py
def get_impacts(self) -> pd.DataFrame:
    """
    Extract environmental impacts by category, process, and time.

    Returns denormalized impact values from the solved optimization model,
    organized as a pivoted DataFrame with time as rows and (category, process)
    as column MultiIndex.

    Returns
    -------
    pd.DataFrame
        Pivoted DataFrame with 'Time' as index and MultiIndex columns for
        (Category, Process) combinations. Values represent environmental
        impacts in the units of the characterization method.
    """
    if hasattr(self, "df_impacts"):
        return self.df_impacts

    impacts = {}
    cat_scales = getattr(self.m, "scales", {}).get("characterization", 1.0)
    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)
    impacts = {
        (c, p, t): pyo.value(self.m.specific_impact[c, p, t])
        * cat_scales[c]
        * fg_scale  # Unscale foreground impacts
        for c in self.m.CATEGORY
        for p in self.m.PROCESS
        for t in self.m.SYSTEM_TIME
    }
    df = pd.DataFrame.from_dict(impacts, orient="index", columns=["Value"])
    df.index = pd.MultiIndex.from_tuples(
        df.index, names=["Category", "Process", "Time"]
    )
    df = df.reset_index()
    df_pivot = df.pivot(
        index="Time", columns=["Category", "Process"], values="Value"
    )
    self.df_impacts = df_pivot
    return self.df_impacts
get_dynamic_inventory(biosphere_database: str = 'ecoinvent-3.12-biosphere') -> pd.DataFrame

Extract the dynamic inventory from the solved model.

Returns a DataFrame with elementary flows over time, formatted for use with dynamic_characterization.

Parameters:

Name Type Description Default
biosphere_database str

Name of the biosphere database to look up flow IDs.

"ecoinvent-3.12-biosphere"

Returns:

Type Description
DataFrame

DataFrame with columns: activity, flow, date, amount. - activity: process code (str) - flow: biosphere flow ID (int) - date: datetime of emission - amount: flow amount (float)

Source code in src/optimex/postprocessing.py
def get_dynamic_inventory(self, biosphere_database: str = "ecoinvent-3.12-biosphere") -> pd.DataFrame:
    """
    Extract the dynamic inventory from the solved model.

    Returns a DataFrame with elementary flows over time, formatted for use
    with dynamic_characterization.

    Parameters
    ----------
    biosphere_database : str, default="ecoinvent-3.12-biosphere"
        Name of the biosphere database to look up flow IDs.

    Returns
    -------
    pd.DataFrame
        DataFrame with columns: activity, flow, date, amount.
        - activity: process code (str)
        - flow: biosphere flow ID (int)
        - date: datetime of emission
        - amount: flow amount (float)
    """
    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)
    inventory = {
        (p, e, t): pyo.value(self.m.scaled_inventory[p, e, t]) * fg_scale
        for p in self.m.PROCESS
        for e in self.m.ELEMENTARY_FLOW
        for t in self.m.SYSTEM_TIME
    }

    df = pd.DataFrame.from_records(
        [(p, e, t, v) for (p, e, t), v in inventory.items()],
        columns=["activity", "flow", "date", "amount"]
    ).astype({
        "activity": "str",
        "flow": "str",
        "amount": "float64"
    })

    # Convert year integers to datetime
    df["date"] = pd.to_datetime(df["date"].astype(int), format="%Y")

    # Convert flow codes to database IDs
    biosphere_db = bd.Database(biosphere_database)
    df["flow"] = df["flow"].apply(
        lambda x: biosphere_db.get(code=x).id
    )

    self.df_dynamic_inventory = df
    return self.df_dynamic_inventory
get_characterized_dynamic_inventory(base_lcia_method: tuple, metric: str = 'radiative_forcing', time_horizon: int = 100, fixed_time_horizon: bool = True, biosphere_database: str = 'ecoinvent-3.12-biosphere', df_inventory: pd.DataFrame = None) -> pd.DataFrame

Characterize the dynamic inventory using dynamic_characterization.

Parameters:

Name Type Description Default
base_lcia_method tuple

The LCIA method tuple for characterization (e.g., ('IPCC', 'GWP100')).

required
metric str

Characterization metric. Options: "radiative_forcing", "GWP".

"radiative_forcing"
time_horizon int

Time horizon for characterization in years.

100
fixed_time_horizon bool

If True, use fixed time horizon; if False, use dynamic time horizon.

True
biosphere_database str

Name of the biosphere database (used if df_inventory not provided).

"ecoinvent-3.12-biosphere"
df_inventory DataFrame

Pre-computed inventory DataFrame. If not provided, calls get_dynamic_inventory().

None

Returns:

Type Description
DataFrame

Characterized inventory DataFrame with columns: date, amount.

Source code in src/optimex/postprocessing.py
def get_characterized_dynamic_inventory(
    self,
    base_lcia_method: tuple,
    metric: str = "radiative_forcing",
    time_horizon: int = 100,
    fixed_time_horizon: bool = True,
    biosphere_database: str = "ecoinvent-3.12-biosphere",
    df_inventory: pd.DataFrame = None,
) -> pd.DataFrame:
    """
    Characterize the dynamic inventory using dynamic_characterization.

    Parameters
    ----------
    base_lcia_method : tuple
        The LCIA method tuple for characterization (e.g., ('IPCC', 'GWP100')).
    metric : str, default="radiative_forcing"
        Characterization metric. Options: "radiative_forcing", "GWP".
    time_horizon : int, default=100
        Time horizon for characterization in years.
    fixed_time_horizon : bool, default=True
        If True, use fixed time horizon; if False, use dynamic time horizon.
    biosphere_database : str, default="ecoinvent-3.12-biosphere"
        Name of the biosphere database (used if df_inventory not provided).
    df_inventory : pd.DataFrame, optional
        Pre-computed inventory DataFrame. If not provided, calls get_dynamic_inventory().

    Returns
    -------
    pd.DataFrame
        Characterized inventory DataFrame with columns: date, amount.
    """
    from dynamic_characterization import characterize

    if df_inventory is None:
        df_inventory = self.get_dynamic_inventory(biosphere_database=biosphere_database)

    df_characterized = characterize(
        df_inventory,
        metric=metric,
        base_lcia_method=base_lcia_method,
        time_horizon=time_horizon,
        fixed_time_horizon=fixed_time_horizon,
    )

    self.df_characterized_inventory = df_characterized
    return self.df_characterized_inventory
plot_characterized_dynamic_inventory(base_lcia_method: tuple = None, metric: str = 'radiative_forcing', time_horizon: int = 100, fixed_time_horizon: bool = True, biosphere_database: str = 'ecoinvent-3.12-biosphere', df_characterized: pd.DataFrame = None)

Plot the characterized dynamic inventory aggregated by year.

Parameters:

Name Type Description Default
base_lcia_method tuple

The LCIA method tuple for characterization. Required if df_characterized is not provided.

None
metric str

Characterization metric (used if df_characterized not provided).

"radiative_forcing"
time_horizon int

Time horizon for characterization (used if df_characterized not provided).

100
fixed_time_horizon bool

If True, use fixed time horizon (used if df_characterized not provided).

True
biosphere_database str

Name of the biosphere database (used if df_characterized not provided).

"ecoinvent-3.12-biosphere"
df_characterized DataFrame

Pre-computed characterized inventory. If not provided, calls get_characterized_dynamic_inventory().

None
Source code in src/optimex/postprocessing.py
def plot_characterized_dynamic_inventory(
    self,
    base_lcia_method: tuple = None,
    metric: str = "radiative_forcing",
    time_horizon: int = 100,
    fixed_time_horizon: bool = True,
    biosphere_database: str = "ecoinvent-3.12-biosphere",
    df_characterized: pd.DataFrame = None,
):
    """
    Plot the characterized dynamic inventory aggregated by year.

    Parameters
    ----------
    base_lcia_method : tuple, optional
        The LCIA method tuple for characterization. Required if df_characterized
        is not provided.
    metric : str, default="radiative_forcing"
        Characterization metric (used if df_characterized not provided).
    time_horizon : int, default=100
        Time horizon for characterization (used if df_characterized not provided).
    fixed_time_horizon : bool, default=True
        If True, use fixed time horizon (used if df_characterized not provided).
    biosphere_database : str, default="ecoinvent-3.12-biosphere"
        Name of the biosphere database (used if df_characterized not provided).
    df_characterized : pd.DataFrame, optional
        Pre-computed characterized inventory. If not provided, calls
        get_characterized_dynamic_inventory().
    """
    if df_characterized is None:
        if base_lcia_method is None:
            raise ValueError("base_lcia_method is required when df_characterized is not provided")
        df_characterized = self.get_characterized_dynamic_inventory(
            base_lcia_method=base_lcia_method,
            metric=metric,
            time_horizon=time_horizon,
            fixed_time_horizon=fixed_time_horizon,
            biosphere_database=biosphere_database,
        )

    # Ensure date column is datetime
    df_plot = df_characterized.copy()
    df_plot["date"] = pd.to_datetime(df_plot["date"])

    # Round to nearest year and aggregate
    df_grouped = (
        df_plot
        .assign(date_rounded=(df_plot["date"] + pd.offsets.MonthBegin(6)).dt.to_period("Y").dt.to_timestamp())
        .groupby("date_rounded")["amount"]
        .sum()
        .reset_index()
    )

    # Create plot
    fig, axes = self._create_clean_axes()
    ax = axes[0]

    ax.plot(
        df_grouped["date_rounded"],
        df_grouped["amount"],
        marker=self._plot_config["line_marker"],
        linewidth=self._plot_config["line_width"],
        color=self._plot_config["line_color"],
        label=metric.replace("_", " ").title(),
    )

    ax.set_ylabel(f"{metric.replace('_', ' ').title()}", fontsize=self._plot_config["label_fontsize"])
    ax.set_title(f"Dynamic {metric.replace('_', ' ').title()}", fontsize=self._plot_config["title_fontsize"])
    ax.set_axisbelow(True)
    ax.grid(
        axis="both",
        linestyle=self._plot_config["grid_linestyle"],
        alpha=self._plot_config["grid_alpha"],
    )

    self._add_legend(ax, position="bottom", bbox_to_anchor=(0.5, -0.35))

    fig.tight_layout()
    plt.show()
get_installation() -> pd.DataFrame

Extracts the installation data from the model and returns it as a DataFrame. The DataFrame will have a MultiIndex with 'Time' and 'Process'. The values are the installed capacities for each process at each time step.

Source code in src/optimex/postprocessing.py
def get_installation(self) -> pd.DataFrame:
    """
    Extracts the installation data from the model and returns it as a DataFrame.
    The DataFrame will have a MultiIndex with 'Time' and 'Process'.
    The values are the installed capacities for each process at each time step.
    """
    # var_installation is already in real units, no scaling needed
    installation_matrix = {
        (t, p): pyo.value(self.m.var_installation[p, t])
        for p in self.m.PROCESS
        for t in self.m.SYSTEM_TIME
    }
    df = pd.DataFrame.from_dict(
        installation_matrix, orient="index", columns=["Value"]
    )
    df.index = pd.MultiIndex.from_tuples(df.index, names=["Time", "Process"])
    df = df.reset_index()
    df_pivot = df.pivot(index="Time", columns="Process", values="Value")
    self.df_installation = df_pivot
    return self.df_installation
get_operation(aggregate_vintages: bool = True) -> pd.DataFrame

Extracts the operation data from the model and returns it as a DataFrame.

With 3D var_operation[p, v, t], this method can either aggregate across vintages (backward compatible) or return per-vintage data.

Parameters:

Name Type Description Default
aggregate_vintages bool

If True (default), sum operation across vintages for each (process, time) to provide backward-compatible 2D output. If False, return full 3D data with (Process, Vintage) as MultiIndex columns.

True

Returns:

Name Type Description
DataFrame

If aggregate_vintages=True: DataFrame with Time as index, Process as columns. If aggregate_vintages=False: DataFrame with Time as index, (Process, Vintage) MultiIndex columns.

Note var_operation is not scaled because when both demand and
foreground_production are scaled by the same factor, the scaling
cancels out in the constraint: demand = production * operation.
Source code in src/optimex/postprocessing.py
def get_operation(self, aggregate_vintages: bool = True) -> pd.DataFrame:
    """
    Extracts the operation data from the model and returns it as a DataFrame.

    With 3D var_operation[p, v, t], this method can either aggregate across
    vintages (backward compatible) or return per-vintage data.

    Parameters
    ----------
    aggregate_vintages : bool, default=True
        If True (default), sum operation across vintages for each (process, time)
        to provide backward-compatible 2D output.
        If False, return full 3D data with (Process, Vintage) as MultiIndex columns.

    Returns
    -------
    pd.DataFrame
        If aggregate_vintages=True: DataFrame with Time as index, Process as columns.
        If aggregate_vintages=False: DataFrame with Time as index,
            (Process, Vintage) MultiIndex columns.

    Note: var_operation is not scaled because when both demand and
    foreground_production are scaled by the same factor, the scaling
    cancels out in the constraint: demand = production * operation.
    """
    if aggregate_vintages:
        # Aggregate across vintages (backward compatible)
        operation_matrix = {}
        for p in self.m.PROCESS:
            for t in self.m.SYSTEM_TIME:
                total_op = sum(
                    pyo.value(self.m.var_operation[proc, v, time])
                    for (proc, v, time) in self.m.ACTIVE_VINTAGE_TIME
                    if proc == p and time == t
                )
                operation_matrix[(t, p)] = total_op
        df = pd.DataFrame.from_dict(operation_matrix, orient="index", columns=["Value"])
        df.index = pd.MultiIndex.from_tuples(df.index, names=["Time", "Process"])
        df = df.reset_index()
        df_pivot = df.pivot(index="Time", columns="Process", values="Value")
        self.df_operation = df_pivot
        return self.df_operation
    else:
        # Return full 3D data with (Process, Vintage) columns
        operation_matrix = {
            (t, p, v): pyo.value(self.m.var_operation[p, v, t])
            for (p, v, t) in self.m.ACTIVE_VINTAGE_TIME
        }
        df = pd.DataFrame.from_dict(operation_matrix, orient="index", columns=["Value"])
        df.index = pd.MultiIndex.from_tuples(df.index, names=["Time", "Process", "Vintage"])
        df = df.reset_index()
        df_pivot = df.pivot(index="Time", columns=["Process", "Vintage"], values="Value")
        return df_pivot
get_production() -> pd.DataFrame

Extracts the production data from the model and returns it as a DataFrame. The DataFrame will have a MultiIndex with 'Process', 'Product', and 'Time'. The values are the total production for each process and product at each time step.

With 3D var_operation[p, v, t], production is summed across all active vintages at each time step.

Source code in src/optimex/postprocessing.py
def get_production(self) -> pd.DataFrame:
    """
    Extracts the production data from the model and returns it as a DataFrame.
    The DataFrame will have a MultiIndex with 'Process', 'Product', and
    'Time'. The values are the total production for each process and product
    at each time step.

    With 3D var_operation[p, v, t], production is summed across all active
    vintages at each time step.
    """
    production_tensor = {}
    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)

    # Get production overrides data from model (if exists)
    production_overrides = getattr(self.m, "_production_vintage_overrides", {})
    production_overrides_index = getattr(self.m, "_production_overrides_index", frozenset())

    def get_production_value(p, r, tau, vintage):
        """Get production value, checking sparse overrides first."""
        key = (p, r, tau, vintage)
        if key in production_overrides:
            return production_overrides[key]
        return pyo.value(self.m.foreground_production[p, r, tau])

    def has_production_overrides(p, r):
        """Check if any vintage overrides exist for this process/product."""
        return (p, r) in production_overrides_index

    for p in self.m.PROCESS:
        for f in self.m.PRODUCT:
            op_start = pyo.value(self.m.process_operation_start[p])
            op_end = pyo.value(self.m.process_operation_end[p])

            for t in self.m.SYSTEM_TIME:
                # Sum production across all active vintages at time t
                total_production = 0
                for (proc, v, time) in self.m.ACTIVE_VINTAGE_TIME:
                    if proc != p or time != t:
                        continue

                    # Get production rate for this vintage
                    if has_production_overrides(p, f):
                        production_rate = sum(
                            pyo.value(get_production_value(p, f, tau_op, v))
                            for tau_op in self.m.PROCESS_TIME
                            if op_start <= tau_op <= op_end
                        )
                    else:
                        production_rate = sum(
                            pyo.value(self.m.foreground_production[p, f, tau_op])
                            for tau_op in self.m.PROCESS_TIME
                            if op_start <= tau_op <= op_end
                        )

                    # Production from this vintage
                    total_production += production_rate * pyo.value(self.m.var_operation[p, v, t])

                production_tensor[(p, f, t)] = total_production * fg_scale

    df = pd.DataFrame.from_dict(
        production_tensor, orient="index", columns=["Value"]
    )
    df.index = pd.MultiIndex.from_tuples(
        df.index, names=["Process", "Product", "Time"]
    )
    df = df.reset_index()
    df_pivot = df.pivot(
        index="Time", columns=["Process", "Product"], values="Value"
    )
    self.df_production = df_pivot
    return self.df_production
get_demand() -> pd.DataFrame

Extracts the demand data from the model and returns it as a DataFrame. The DataFrame will have a MultiIndex with 'Product' and 'Time'. The values are the demand for each Product at each time step.

Source code in src/optimex/postprocessing.py
def get_demand(self) -> pd.DataFrame:
    """
    Extracts the demand data from the model and returns it as a DataFrame.
    The DataFrame will have a MultiIndex with 'Product' and 'Time'.
    The values are the demand for each Product at each time step.
    """
    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)
    demand_matrix = {
        (f, t): self.m.demand[f, t] * fg_scale
        for f in self.m.PRODUCT
        for t in self.m.SYSTEM_TIME
    }
    df = pd.DataFrame.from_dict(demand_matrix, orient="index", columns=["Value"])
    df.index = pd.MultiIndex.from_tuples(
        df.index, names=["Product", "Time"]
    )
    df = df.reset_index()
    df_pivot = df.pivot(index="Time", columns="Product", values="Value")
    self.df_demand = df_pivot
    return self.df_demand
plot_impacts(df_impacts=None, annotated=True)

Plot a stacked bar chart for impacts by category and process over time.

Creates a figure with one subplot per impact category, showing process contributions as stacked bars. Automatically denormalizes scaled values and optionally displays human-readable process names.

Parameters:

Name Type Description Default
df_impacts DataFrame

DataFrame with Time as index, Categories and Processes as columns. Columns must be a MultiIndex: (Category, Process). If not provided, automatically extracted via get_impacts().

None
annotated bool

If True, show human-readable names from Brightway database instead of process codes.

True
Source code in src/optimex/postprocessing.py
def plot_impacts(self, df_impacts=None, annotated=True):
    """
    Plot a stacked bar chart for impacts by category and process over time.

    Creates a figure with one subplot per impact category, showing process
    contributions as stacked bars. Automatically denormalizes scaled values
    and optionally displays human-readable process names.

    Parameters
    ----------
    df_impacts : DataFrame, optional
        DataFrame with Time as index, Categories and Processes as columns.
        Columns must be a MultiIndex: (Category, Process). If not provided,
        automatically extracted via get_impacts().
    annotated : bool, default=True
        If True, show human-readable names from Brightway database instead
        of process codes.
    """
    if df_impacts is None:
        if getattr(self, "df_impacts", None) is not None:
            df_impacts = self.df_impacts
        else:
            df_impacts = self.get_impacts()

    categories = df_impacts.columns.get_level_values(0).unique()
    n_categories = len(categories)
    ncols = min(self._plot_config["subplot_ncols"], n_categories)
    nrows = math.ceil(n_categories / ncols)

    base_w, base_h = self._plot_config["figsize"]
    fig_w = base_w * ncols
    fig_h = base_h * nrows

    fig, axes = self._create_clean_axes(nrows=nrows, ncols=ncols, figsize=(fig_w, fig_h))

    all_handles = []
    all_labels = []
    for i, category in enumerate(categories):
        ax = axes[i]
        sub_df = df_impacts[category]
        # Filter out processes with all zero values in this category
        sub_df = sub_df.loc[:, (sub_df != 0).any(axis=0)]
        # Get colors BEFORE annotation (using codes)
        colors = self._get_colors_for_dataframe(sub_df)
        # Annotate if requested
        sub_df = self._annotate_dataframe(sub_df, annotated)
        self._apply_bar_styles(sub_df, ax, colors, title=category)
        ax.set_xlabel("")
        ax.set_ylabel("Impact", fontsize=self._plot_config["label_fontsize"])

        # Collect handles/labels for shared legend
        h, l = ax.get_legend_handles_labels()
        for handle, label in zip(h, l):
            if label not in all_labels:
                all_handles.append(handle)
                all_labels.append(label)

    # Hide unused axes
    for j in range(i + 1, len(axes)):
        fig.delaxes(axes[j])

    # Shared legend to the right of figure
    if all_handles:
        all_handles, all_labels = self._reorder_legend_row_first(all_handles, all_labels, 2)
        fig.legend(
            handles=all_handles,
            labels=all_labels,
            loc="upper center",
            bbox_to_anchor=(0.5, -0.02),
            ncol=2,
            fontsize=self._plot_config["legend_fontsize"],
            frameon=False,
        )

    fig.tight_layout()
    plt.show()
plot_installation(df_installation=None, annotated=True)

Plot a stacked bar chart for installation data.

Parameters:

Name Type Description Default
df_installation DataFrame

DataFrame with Time as index, Processes as columns

None
annotated bool

If True, show human-readable names instead of codes

True
Source code in src/optimex/postprocessing.py
def plot_installation(self, df_installation=None, annotated=True):
    """
    Plot a stacked bar chart for installation data.

    Parameters
    ----------
    df_installation : DataFrame, optional
        DataFrame with Time as index, Processes as columns
    annotated : bool, default=True
        If True, show human-readable names instead of codes
    """
    if df_installation is None:
        df_installation = self.get_installation()

    # Filter out columns with all zero values
    df_installation = df_installation.loc[:, (df_installation != 0).any(axis=0)]

    # Get colors BEFORE annotation (using codes)
    colors = self._get_colors_for_dataframe(df_installation)

    # Annotate if requested
    df_installation = self._annotate_dataframe(df_installation, annotated)

    fig, axes = self._create_clean_axes()
    ax = axes[0]
    self._apply_bar_styles(
        df_installation, ax, colors, title="Installed Capacity"
    )
    ax.set_ylabel("Installed Capacity", fontsize=self._plot_config["label_fontsize"])


    # Legend at bottom
    h, l = ax.get_legend_handles_labels()
    if h:
        h, l = self._reorder_legend_row_first(h, l, 2)
        fig.legend(
            handles=h,
            labels=l,
            loc="upper center",
            bbox_to_anchor=(0.5, -0.02),
            ncol=2,
            fontsize=self._plot_config["legend_fontsize"],
            frameon=False,
        )

    fig.tight_layout()
    plt.show()
plot_operation(df_operation=None, annotated=True)

Plot a stacked bar chart for operation data.

Parameters:

Name Type Description Default
df_operation DataFrame

DataFrame with Time as index, Processes as columns

None
annotated bool

If True, show human-readable names instead of codes

True
Source code in src/optimex/postprocessing.py
def plot_operation(self, df_operation=None, annotated=True):
    """
    Plot a stacked bar chart for operation data.

    Parameters
    ----------
    df_operation : DataFrame, optional
        DataFrame with Time as index, Processes as columns
    annotated : bool, default=True
        If True, show human-readable names instead of codes
    """
    if df_operation is None:
        df_operation = self.get_operation()

    # Filter out columns with all zero values
    df_operation = df_operation.loc[:, (df_operation != 0).any(axis=0)]

    # Get colors BEFORE annotation (using codes)
    colors = self._get_colors_for_dataframe(df_operation)

    # Annotate if requested
    df_operation = self._annotate_dataframe(df_operation, annotated)

    fig, axes = self._create_clean_axes()
    ax = axes[0]
    self._apply_bar_styles(
        df_operation, ax, colors, title="Operational Level"
    )
    ax.set_ylabel("Operation Level", fontsize=self._plot_config["label_fontsize"])


    # Legend at bottom
    h, l = ax.get_legend_handles_labels()
    if h:
        h, l = self._reorder_legend_row_first(h, l, 2)
        fig.legend(
            handles=h,
            labels=l,
            loc="upper center",
            bbox_to_anchor=(0.5, -0.02),
            ncol=2,
            fontsize=self._plot_config["legend_fontsize"],
            frameon=False,
        )

    fig.tight_layout()
    plt.show()
get_existing_capacity() -> pd.DataFrame

Extract existing (brownfield) capacity data from the model.

Returns a DataFrame showing which processes have existing capacity, when they were installed, and their operational status at each time step.

Returns:

Type Description
DataFrame

DataFrame with Time as index and (Process, Type) as MultiIndex columns. Type can be 'existing_capacity' (total existing) or 'existing_operating' (existing capacity in operation phase at that time).

Source code in src/optimex/postprocessing.py
def get_existing_capacity(self) -> pd.DataFrame:
    """
    Extract existing (brownfield) capacity data from the model.

    Returns a DataFrame showing which processes have existing capacity,
    when they were installed, and their operational status at each time step.

    Returns
    -------
    pd.DataFrame
        DataFrame with Time as index and (Process, Type) as MultiIndex columns.
        Type can be 'existing_capacity' (total existing) or 'existing_operating'
        (existing capacity in operation phase at that time).
    """
    existing_cap_dict = getattr(self.m, "_existing_capacity_dict", {})

    if not existing_cap_dict:
        # Return empty DataFrame if no existing capacity
        return pd.DataFrame()

    data = {}
    for t in self.m.SYSTEM_TIME:
        for p in self.m.PROCESS:
            # Calculate existing capacity in operation at time t
            op_start = pyo.value(self.m.process_operation_start[p])
            op_end = pyo.value(self.m.process_operation_end[p])

            existing_operating = 0
            existing_total = 0

            for (proc, inst_year), capacity in existing_cap_dict.items():
                if proc == p:
                    existing_total += capacity
                    tau_existing = t - inst_year
                    if op_start <= tau_existing <= op_end:
                        existing_operating += capacity

            if existing_total > 0:
                data[(t, p, "existing_capacity")] = existing_total
                data[(t, p, "existing_operating")] = existing_operating

    if not data:
        return pd.DataFrame()

    df = pd.DataFrame.from_dict(data, orient="index", columns=["Value"])
    df.index = pd.MultiIndex.from_tuples(df.index, names=["Time", "Process", "Type"])
    df = df.reset_index()
    df_pivot = df.pivot(index="Time", columns=["Process", "Type"], values="Value")

    return df_pivot
get_production_capacity() -> pd.DataFrame

Calculate maximum available production capacity for each product at each time step.

Capacity is determined by counting installations in their operation phase and multiplying by their production coefficients. This includes both new installations (from var_installation) and existing (brownfield) capacity.

Note: Uses vintage-aware 4D calculation when production overrides exist, matching the optimizer's capacity constraint calculation.

Returns:

Type Description
DataFrame

DataFrame with Time as index and Products as columns. Values represent maximum production capacity (not actual production).

Source code in src/optimex/postprocessing.py
def get_production_capacity(self) -> pd.DataFrame:
    """
    Calculate maximum available production capacity for each product at each time step.

    Capacity is determined by counting installations in their operation phase and
    multiplying by their production coefficients. This includes both new installations
    (from var_installation) and existing (brownfield) capacity.

    Note: Uses vintage-aware 4D calculation when production overrides exist,
    matching the optimizer's capacity constraint calculation.

    Returns
    -------
    pd.DataFrame
        DataFrame with Time as index and Products as columns.
        Values represent maximum production capacity (not actual production).
    """
    capacity_tensor = {}
    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)
    existing_cap_dict = getattr(self.m, "_existing_capacity_dict", {})

    # Get production overrides data from model (if exists)
    production_overrides = getattr(self.m, "_production_vintage_overrides", {})
    production_overrides_index = getattr(self.m, "_production_overrides_index", frozenset())

    def get_production_value(p, r, tau, vintage):
        """Get production value, checking sparse overrides first."""
        key = (p, r, tau, vintage)
        if key in production_overrides:
            return production_overrides[key]
        return pyo.value(self.m.foreground_production[p, r, tau])

    def has_production_overrides(p, r):
        """Check if any vintage overrides exist for this process/product."""
        return (p, r) in production_overrides_index

    for f in self.m.PRODUCT:
        for t in self.m.SYSTEM_TIME:
            # Calculate total capacity across all processes
            total_capacity = 0

            for p in self.m.PROCESS:
                op_start = pyo.value(self.m.process_operation_start[p])
                op_end = pyo.value(self.m.process_operation_end[p])

                if has_production_overrides(p, f):
                    # 4D vintage-aware capacity calculation
                    # Each vintage may have different production rates
                    process_capacity = 0

                    # New installations: sum capacity by vintage
                    for tau in self.m.PROCESS_TIME:
                        vintage = t - tau
                        if vintage in self.m.SYSTEM_TIME and op_start <= tau <= op_end:
                            # Production rate for this vintage (sum over all operating taus)
                            production_per_unit = sum(
                                get_production_value(p, f, tau_op, vintage)
                                for tau_op in self.m.PROCESS_TIME
                                if op_start <= tau_op <= op_end
                            )
                            installation = pyo.value(self.m.var_installation[p, vintage])
                            process_capacity += production_per_unit * installation

                    # Existing (brownfield) capacity
                    for (proc, inst_year), capacity in existing_cap_dict.items():
                        if proc == p:
                            tau_existing = t - inst_year
                            if op_start <= tau_existing <= op_end:
                                nearest_vintage = min(self.m.SYSTEM_TIME)
                                production_per_unit = sum(
                                    get_production_value(p, f, tau_op, nearest_vintage)
                                    for tau_op in self.m.PROCESS_TIME
                                    if op_start <= tau_op <= op_end
                                )
                                process_capacity += production_per_unit * capacity

                    total_capacity += process_capacity
                else:
                    # 3D calculation: no overrides, all vintages have same production rate
                    # Count new installations in operation phase at time t
                    installations_operating = sum(
                        pyo.value(self.m.var_installation[p, t - tau])
                        for tau in self.m.PROCESS_TIME
                        if (t - tau in self.m.SYSTEM_TIME)
                        and op_start <= tau <= op_end
                    )

                    # Add existing (brownfield) capacity in operation phase
                    for (proc, inst_year), capacity in existing_cap_dict.items():
                        if proc == p:
                            tau_existing = t - inst_year
                            if op_start <= tau_existing <= op_end:
                                installations_operating += capacity

                    # Production capacity per installation (sum over operation phase)
                    production_per_installation = sum(
                        pyo.value(self.m.foreground_production[p, f, tau])
                        for tau in self.m.PROCESS_TIME
                        if op_start <= tau <= op_end
                    )

                    # Total capacity for this process
                    total_capacity += installations_operating * production_per_installation

            # Store denormalized capacity
            capacity_tensor[(f, t)] = total_capacity * fg_scale

    # Convert to DataFrame
    df = pd.DataFrame.from_dict(capacity_tensor, orient="index", columns=["Value"])
    df.index = pd.MultiIndex.from_tuples(df.index, names=["Product", "Time"])
    df = df.reset_index()
    df_pivot = df.pivot(index="Time", columns="Product", values="Value")

    return df_pivot
plot_capacity_balance(product=None, prod_df=None, capacity_df=None, demand_df=None, annotated=True, detailed=False)

Plot actual production vs maximum available capacity.

When a specific product is given, plots a single chart. When product is None, auto-detects all products with non-zero demand or production and plots a grid of subplots.

Shows two lines per product: - Production (demand is assumed equal and overlaid) - Maximum available capacity (dashed line)

When detailed=True, also shows grouped bars per time step: - Left bar: Capacity changes (additions/removals stacked by process) - Right bar: Operation level (stacked by process)

Parameters:

Name Type Description Default
product str

Product to plot. If None, plots all products with non-zero demand or production in a grid layout.

None
prod_df DataFrame

Production DataFrame from get_production()

None
capacity_df DataFrame

Capacity DataFrame from get_production_capacity()

None
demand_df DataFrame

Demand DataFrame from get_demand()

None
annotated bool

If True, show human-readable names instead of codes

True
detailed bool

If True, show grouped bars for capacity changes and operation by process

False
Source code in src/optimex/postprocessing.py
def plot_capacity_balance(self, product=None, prod_df=None, capacity_df=None, demand_df=None, annotated=True, detailed=False):
    """
    Plot actual production vs maximum available capacity.

    When a specific product is given, plots a single chart. When product is
    None, auto-detects all products with non-zero demand or production and
    plots a grid of subplots.

    Shows two lines per product:
    - Production (demand is assumed equal and overlaid)
    - Maximum available capacity (dashed line)

    When detailed=True, also shows grouped bars per time step:
    - Left bar: Capacity changes (additions/removals stacked by process)
    - Right bar: Operation level (stacked by process)

    Parameters
    ----------
    product : str, optional
        Product to plot. If None, plots all products with non-zero
        demand or production in a grid layout.
    prod_df : pd.DataFrame, optional
        Production DataFrame from get_production()
    capacity_df : pd.DataFrame, optional
        Capacity DataFrame from get_production_capacity()
    demand_df : pd.DataFrame, optional
        Demand DataFrame from get_demand()
    annotated : bool, default=True
        If True, show human-readable names instead of codes
    detailed : bool, default=False
        If True, show grouped bars for capacity changes and operation by process
    """
    # Get data if not provided
    if prod_df is None:
        prod_df = self.get_production()
    if capacity_df is None:
        capacity_df = self.get_production_capacity()
    if demand_df is None:
        demand_df = self.get_demand()

    if product is not None:
        # Single-product plot — legend to the right
        fig, axes = self._create_clean_axes(nrows=1, ncols=1)
        ax = axes[0]

        if detailed:
            self._plot_capacity_balance_detailed_on_ax(
                ax, product, prod_df, capacity_df,
                annotated=annotated, show_legend=True, show_title=True,
                legend_position="bottom",
            )
        else:
            self._plot_capacity_balance_on_ax(
                ax, product, prod_df, capacity_df,
                annotated=annotated, show_legend=True, show_fill=True, show_title=True,
                legend_position="bottom",
            )


        fig.tight_layout()
        plt.show()
        return

    # Multi-product: collect all products with production or capacity
    all_products = set()

    if isinstance(prod_df.columns, pd.MultiIndex):
        prod_products = set(col[1] for col in prod_df.columns)
        for p in prod_products:
            production_cols = [col for col in prod_df.columns if col[1] == p]
            if (prod_df[production_cols].sum(axis=1) != 0).any():
                all_products.add(p)
    else:
        for p in prod_df.columns:
            if (prod_df[p] != 0).any():
                all_products.add(p)

    for p in capacity_df.columns:
        if (capacity_df[p] != 0).any():
            all_products.add(p)

    if not all_products:
        raise ValueError("No products with production or capacity found")

    products = sorted(all_products)
    n_products = len(products)
    ncols = min(self._plot_config["subplot_ncols"], n_products)
    nrows = math.ceil(n_products / ncols)

    base_w, base_h = self._plot_config["figsize"]
    fig_w = base_w * ncols
    fig_h = base_h * nrows

    fig, axes = plt.subplots(
        nrows=nrows, ncols=ncols, figsize=(fig_w, fig_h)
    )

    if n_products == 1:
        axes = np.array([axes])
    axes = axes.flatten() if isinstance(axes, np.ndarray) else [axes]

    for ax in axes:
        ax.set_axisbelow(True)
        ax.grid(
            axis="both",
            linestyle=self._plot_config["grid_linestyle"],
            alpha=self._plot_config["grid_alpha"],
        )
        ax.tick_params(axis="x", labelsize=self._plot_config["fontsize"])
        ax.tick_params(axis="y", labelsize=self._plot_config["fontsize"])
        ax.yaxis.set_major_formatter(_EngineeringFormatter())

    all_handles = []
    all_labels = []
    type_handles = []
    line_handles = []
    for i, p in enumerate(products):
        ax = axes[i]
        if detailed:
            proc_legend, type_legend, line_legend = self._plot_capacity_balance_detailed_on_ax(
                ax, p, prod_df, capacity_df,
                annotated=annotated, show_legend=False, show_title=True
            )
            # Collect process handles from all subplots (deduplicate by label)
            for handle in proc_legend:
                if handle.get_label() not in all_labels:
                    all_handles.append(handle)
                    all_labels.append(handle.get_label())
            # Keep type/line handles from first subplot only
            if not type_handles and type_legend:
                type_handles = type_legend
                line_handles = line_legend
        else:
            self._plot_capacity_balance_on_ax(
                ax, p, prod_df, capacity_df,
                annotated=annotated, show_legend=False, show_fill=True, show_title=True
            )
            # Collect handles from all subplots (deduplicate by label)
            h, l = ax.get_legend_handles_labels()
            for handle, label in zip(h, l):
                if label not in all_labels:
                    all_handles.append(handle)
                    all_labels.append(label)

    # For detailed mode, append type and line handles after all process handles
    if detailed and type_handles:
        all_handles = all_handles + type_handles + line_handles
        all_labels = [h.get_label() for h in all_handles]

    for j in range(len(products), len(axes)):
        fig.delaxes(axes[j])

    # Shared legend at bottom of figure
    if all_handles:
        all_handles, all_labels = self._reorder_legend_row_first(all_handles, all_labels, 2)
        fig.legend(
            handles=all_handles,
            labels=all_labels,
            loc="upper center",
            bbox_to_anchor=(0.5, 0.0),
            ncol=2,
            fontsize=self._plot_config["legend_fontsize"],
            frameon=False,
        )

    fig.tight_layout()
    plt.show()
plot_utilization_heatmap(product=None, annotated=True, show_values=True)

Plot a heatmap showing capacity utilization by process over time.

This provides a clean, dedicated view of which processes are being operated vs sitting idle at each time step.

Parameters:

Name Type Description Default
product str

Product to analyze. If None, uses the first product with non-zero demand.

None
annotated bool

If True, show human-readable process names instead of codes.

True
show_values bool

If True, show utilization percentages in cells.

True
Note
required
Source code in src/optimex/postprocessing.py
def plot_utilization_heatmap(self, product=None, annotated=True, show_values=True):
    """
    Plot a heatmap showing capacity utilization by process over time.

    This provides a clean, dedicated view of which processes are being
    operated vs sitting idle at each time step.

    Parameters
    ----------
    product : str, optional
        Product to analyze. If None, uses the first product with non-zero demand.
    annotated : bool, default=True
        If True, show human-readable process names instead of codes.
    show_values : bool, default=True
        If True, show utilization percentages in cells.

    Note: Uses vintage-aware 4D calculation when production overrides exist.
    """
    # Get demand to determine product
    demand_df = self.get_demand()

    if product is None:
        products_with_demand = demand_df.columns[(demand_df != 0).any(axis=0)]
        if len(products_with_demand) == 0:
            raise ValueError("No products with non-zero demand found")
        product = products_with_demand[0]

    fg_scale = getattr(self.m, "scales", {}).get("foreground", 1.0)
    existing_cap_dict = getattr(self.m, "_existing_capacity_dict", {})

    # Get production overrides data from model (if exists)
    production_overrides = getattr(self.m, "_production_vintage_overrides", {})
    production_overrides_index = getattr(self.m, "_production_overrides_index", frozenset())

    def get_production_value(p, r, tau, vintage):
        """Get production value, checking sparse overrides first."""
        key = (p, r, tau, vintage)
        if key in production_overrides:
            return production_overrides[key]
        return pyo.value(self.m.foreground_production[p, r, tau])

    def has_production_overrides(p, r):
        """Check if any vintage overrides exist for this process/product."""
        return (p, r) in production_overrides_index

    # Calculate utilization for each process at each time
    utilization_data = {}
    capacity_data = {}
    operation_data = {}

    for p in self.m.PROCESS:
        op_start = pyo.value(self.m.process_operation_start[p])
        op_end = pyo.value(self.m.process_operation_end[p])

        # Check if process produces this product (3D base rate)
        prod_per_inst_3d = sum(
            pyo.value(self.m.foreground_production[p, product, tau])
            for tau in self.m.PROCESS_TIME
            if op_start <= tau <= op_end
        )

        if prod_per_inst_3d == 0:
            continue  # Skip processes that don't produce this product

        utilization_data[p] = {}
        capacity_data[p] = {}
        operation_data[p] = {}

        for t in self.m.SYSTEM_TIME:
            if has_production_overrides(p, product):
                # 4D vintage-aware calculations
                # Capacity from new installations (sum by vintage)
                capacity = 0
                for tau in self.m.PROCESS_TIME:
                    vintage = t - tau
                    if vintage in self.m.SYSTEM_TIME and op_start <= tau <= op_end:
                        production_per_unit = sum(
                            get_production_value(p, product, tau_op, vintage)
                            for tau_op in self.m.PROCESS_TIME
                            if op_start <= tau_op <= op_end
                        )
                        installation = pyo.value(self.m.var_installation[p, vintage])
                        capacity += production_per_unit * installation

                # Add existing (brownfield) capacity
                nearest_vintage = min(self.m.SYSTEM_TIME)
                prod_per_inst_existing = sum(
                    get_production_value(p, product, tau_op, nearest_vintage)
                    for tau_op in self.m.PROCESS_TIME
                    if op_start <= tau_op <= op_end
                )
                for (proc, inst_year), cap in existing_cap_dict.items():
                    if proc == p:
                        tau_existing = t - inst_year
                        if op_start <= tau_existing <= op_end:
                            capacity += prod_per_inst_existing * cap

                capacity *= fg_scale

                # Operation - sum production across all active vintages
                operation = 0
                for (proc, v, time) in self.m.ACTIVE_VINTAGE_TIME:
                    if proc != p or time != t:
                        continue
                    # Get production rate for this vintage
                    production_rate = sum(
                        pyo.value(get_production_value(p, product, tau_op, v))
                        for tau_op in self.m.PROCESS_TIME
                        if op_start <= tau_op <= op_end
                    )
                    operation += production_rate * pyo.value(self.m.var_operation[p, v, t])
                operation *= fg_scale
            else:
                # 3D calculation: no overrides
                prod_per_inst = prod_per_inst_3d

                # Calculate capacity from new installations
                installations_operating = sum(
                    pyo.value(self.m.var_installation[p, t - tau])
                    for tau in self.m.PROCESS_TIME
                    if (t - tau in self.m.SYSTEM_TIME)
                    and op_start <= tau <= op_end
                )

                # Add existing (brownfield) capacity in operation phase
                for (proc, inst_year), cap in existing_cap_dict.items():
                    if proc == p:
                        tau_existing = t - inst_year
                        if op_start <= tau_existing <= op_end:
                            installations_operating += cap

                capacity = installations_operating * prod_per_inst * fg_scale

                # Calculate operation - sum across all active vintages
                operation = 0
                for (proc, v, time) in self.m.ACTIVE_VINTAGE_TIME:
                    if proc != p or time != t:
                        continue
                    operation += prod_per_inst * pyo.value(self.m.var_operation[p, v, t])
                operation *= fg_scale

            capacity_data[p][t] = capacity
            operation_data[p][t] = operation

            # Calculate utilization %
            if capacity > 0.001:
                utilization_data[p][t] = (operation / capacity) * 100
            else:
                utilization_data[p][t] = np.nan  # No capacity = no utilization possible

    if not utilization_data:
        raise ValueError(f"No processes produce {product}")

    # Convert to DataFrame
    util_df = pd.DataFrame(utilization_data).T
    cap_df = pd.DataFrame(capacity_data).T
    op_df = pd.DataFrame(operation_data).T

    # Filter to only times with some capacity
    has_capacity = (cap_df.sum(axis=0) > 0.001)
    util_df = util_df.loc[:, has_capacity]
    cap_df = cap_df.loc[:, has_capacity]
    op_df = op_df.loc[:, has_capacity]

    if util_df.empty:
        raise ValueError(f"No capacity found for {product}")

    # Annotate process names
    if annotated:
        util_df.index = [self._get_name(p) for p in util_df.index]
        cap_df.index = [self._get_name(p) for p in cap_df.index]
        op_df.index = [self._get_name(p) for p in op_df.index]

    product_name = self._get_name(product) if annotated else product

    # Create figure
    fig, ax = plt.subplots(figsize=(max(10, len(util_df.columns) * 0.4), max(4, len(util_df) * 0.8)))

    # Create heatmap
    # Use a diverging colormap: red (0%) -> yellow (50%) -> green (100%)
    cmap = plt.cm.RdYlGn
    im = ax.imshow(util_df.values, aspect='auto', cmap=cmap, vmin=0, vmax=100)

    # Set ticks
    ax.set_xticks(np.arange(len(util_df.columns)))
    ax.set_yticks(np.arange(len(util_df.index)))
    ax.set_xticklabels(util_df.columns, fontsize=self._plot_config["fontsize"] - 2)
    ax.set_yticklabels(util_df.index, fontsize=self._plot_config["fontsize"] - 1)

    # Rotate x labels if many years
    if len(util_df.columns) > 15:
        plt.setp(ax.get_xticklabels(), rotation=45, ha='right', rotation_mode='anchor')

    # Add value annotations
    if show_values:
        for i in range(len(util_df.index)):
            for j in range(len(util_df.columns)):
                val = util_df.iloc[i, j]
                if not np.isnan(val):
                    # Choose text color based on background
                    text_color = 'white' if val < 30 or val > 70 else 'black'
                    ax.text(j, i, f'{val:.0f}%', ha='center', va='center',
                           fontsize=self._plot_config["fontsize"] - 3, color=text_color)
                else:
                    ax.text(j, i, '-', ha='center', va='center',
                           fontsize=self._plot_config["fontsize"] - 3, color='gray')

    # Add colorbar
    cbar = fig.colorbar(im, ax=ax, shrink=0.8)
    cbar.set_label('Utilization %', fontsize=self._plot_config["fontsize"])
    cbar.ax.tick_params(labelsize=self._plot_config["fontsize"] - 1)

    # Labels and title
    ax.set_xlabel('Year', fontsize=self._plot_config["label_fontsize"])
    ax.set_ylabel('Process', fontsize=self._plot_config["label_fontsize"])
    ax.set_title(f'Capacity Utilization: {product_name}', fontsize=self._plot_config["title_fontsize"])

    fig.tight_layout()
    plt.show()