Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/policyengine/outputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
ChangeAggregate,
ChangeAggregateType,
)
from policyengine.outputs.congressional_district_impact import (
CongressionalDistrictImpact,
compute_us_congressional_district_impacts,
)
from policyengine.outputs.constituency_impact import (
ConstituencyImpact,
compute_uk_constituency_impacts,
)
from policyengine.outputs.decile_impact import (
DecileImpact,
calculate_decile_impacts,
Expand All @@ -15,6 +23,14 @@
calculate_uk_inequality,
calculate_us_inequality,
)
from policyengine.outputs.intra_decile_impact import (
IntraDecileImpact,
compute_intra_decile_impacts,
)
from policyengine.outputs.local_authority_impact import (
LocalAuthorityImpact,
compute_uk_local_authority_impacts,
)
from policyengine.outputs.poverty import (
UK_POVERTY_VARIABLES,
US_POVERTY_VARIABLES,
Expand All @@ -34,6 +50,8 @@
"ChangeAggregateType",
"DecileImpact",
"calculate_decile_impacts",
"IntraDecileImpact",
"compute_intra_decile_impacts",
"Poverty",
"UKPovertyType",
"USPovertyType",
Expand All @@ -46,4 +64,10 @@
"US_INEQUALITY_INCOME_VARIABLE",
"calculate_uk_inequality",
"calculate_us_inequality",
"CongressionalDistrictImpact",
"compute_us_congressional_district_impacts",
"ConstituencyImpact",
"compute_uk_constituency_impacts",
"LocalAuthorityImpact",
"compute_uk_local_authority_impacts",
]
100 changes: 100 additions & 0 deletions src/policyengine/outputs/congressional_district_impact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Congressional district impact output class for US policy reforms."""

from typing import TYPE_CHECKING

import numpy as np
from pydantic import ConfigDict

from policyengine.core import Output

if TYPE_CHECKING:
from policyengine.core.simulation import Simulation


class CongressionalDistrictImpact(Output):
"""Per-congressional-district income change from a policy reform.

Groups households by congressional_district_geoid (integer SSDD format
where SS = state FIPS, DD = district number) and computes weighted
average and relative household income changes per district.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

baseline_simulation: "Simulation"
reform_simulation: "Simulation"

# Results populated by run()
district_results: list[dict] | None = None

def run(self) -> None:
"""Group households by geoid and compute per-district metrics."""
baseline_hh = self.baseline_simulation.output_dataset.data.household
reform_hh = self.reform_simulation.output_dataset.data.household

geoids = baseline_hh["congressional_district_geoid"].values
baseline_income = baseline_hh["household_net_income"].values
reform_income = reform_hh["household_net_income"].values
weights = baseline_hh["household_weight"].values

# Only include valid geoids (positive integers)
unique_geoids = np.unique(geoids[geoids > 0])

results: list[dict] = []
for geoid in unique_geoids:
mask = geoids == geoid
w = weights[mask]
total_weight = float(w.sum())
if total_weight == 0:
continue

b_inc = baseline_income[mask]
r_inc = reform_income[mask]

weighted_baseline = float((b_inc * w).sum())
weighted_reform = float((r_inc * w).sum())

avg_change = (weighted_reform - weighted_baseline) / total_weight
rel_change = (
(weighted_reform / weighted_baseline - 1.0)
if weighted_baseline != 0
else 0.0
)

geoid_int = int(geoid)
state_fips = geoid_int // 100
district_number = geoid_int % 100

results.append(
{
"district_geoid": geoid_int,
"state_fips": state_fips,
"district_number": district_number,
"average_household_income_change": float(avg_change),
"relative_household_income_change": float(rel_change),
"population": total_weight,
}
)

self.district_results = results


def compute_us_congressional_district_impacts(
baseline_simulation: "Simulation",
reform_simulation: "Simulation",
) -> CongressionalDistrictImpact:
"""Compute per-congressional-district income changes.

Args:
baseline_simulation: Completed baseline simulation.
reform_simulation: Completed reform simulation.

Returns:
CongressionalDistrictImpact with district_results populated.
"""
impact = CongressionalDistrictImpact.model_construct(
baseline_simulation=baseline_simulation,
reform_simulation=reform_simulation,
)
impact.run()
return impact
126 changes: 126 additions & 0 deletions src/policyengine/outputs/constituency_impact.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""UK parliamentary constituency impact output class.

Computes per-constituency income changes using pre-computed weight matrices.
Each constituency has a row in the weight matrix (shape: 650 x N_households)
that reweights all households to represent that constituency's demographics.
"""

from typing import TYPE_CHECKING

import h5py
import numpy as np
import pandas as pd
from pydantic import ConfigDict

from policyengine.core import Output

if TYPE_CHECKING:
from policyengine.core.simulation import Simulation


class ConstituencyImpact(Output):
"""Per-parliamentary-constituency income change from a UK policy reform.

Uses pre-computed weight matrices from GCS to reweight households
for each of 650 constituencies, then computes weighted average and
relative household income changes.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

baseline_simulation: "Simulation"
reform_simulation: "Simulation"
weight_matrix_path: str
constituency_csv_path: str
year: str = "2025"

# Results populated by run()
constituency_results: list[dict] | None = None

def run(self) -> None:
"""Load weight matrix and compute per-constituency metrics."""
# Load constituency metadata (code, name, x, y)
constituency_df = pd.read_csv(self.constituency_csv_path)

# Load weight matrix: shape (N_constituencies, N_households)
with h5py.File(self.weight_matrix_path, "r") as f:
weight_matrix = f[self.year][...]

# Get household income arrays from output datasets
baseline_hh = self.baseline_simulation.output_dataset.data.household
reform_hh = self.reform_simulation.output_dataset.data.household

baseline_income = baseline_hh["household_net_income"].values
reform_income = reform_hh["household_net_income"].values

results: list[dict] = []
for i in range(len(constituency_df)):
row = constituency_df.iloc[i]
code = str(row["code"])
name = str(row["name"])
x = int(row["x"])
y = int(row["y"])
w = weight_matrix[i]

total_weight = float(np.sum(w))
if total_weight == 0:
continue

weighted_baseline = float(np.sum(baseline_income * w))
weighted_reform = float(np.sum(reform_income * w))

# Count of weighted households
count = float(np.sum(w > 0))
if count == 0:
continue

avg_change = (weighted_reform - weighted_baseline) / total_weight
rel_change = (
(weighted_reform / weighted_baseline - 1.0)
if weighted_baseline != 0
else 0.0
)

results.append(
{
"constituency_code": code,
"constituency_name": name,
"x": x,
"y": y,
"average_household_income_change": float(avg_change),
"relative_household_income_change": float(rel_change),
"population": total_weight,
}
)

self.constituency_results = results


def compute_uk_constituency_impacts(
baseline_simulation: "Simulation",
reform_simulation: "Simulation",
weight_matrix_path: str,
constituency_csv_path: str,
year: str = "2025",
) -> ConstituencyImpact:
"""Compute per-constituency income changes for UK.

Args:
baseline_simulation: Completed baseline simulation.
reform_simulation: Completed reform simulation.
weight_matrix_path: Path to parliamentary_constituency_weights.h5.
constituency_csv_path: Path to constituencies_2024.csv.
year: Year key in the H5 file (default "2025").

Returns:
ConstituencyImpact with constituency_results populated.
"""
impact = ConstituencyImpact.model_construct(
baseline_simulation=baseline_simulation,
reform_simulation=reform_simulation,
weight_matrix_path=weight_matrix_path,
constituency_csv_path=constituency_csv_path,
year=year,
)
impact.run()
return impact
22 changes: 13 additions & 9 deletions src/policyengine/outputs/decile_impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class DecileImpact(Output):
baseline_simulation: Simulation
reform_simulation: Simulation
income_variable: str = "equiv_hbai_household_net_income"
decile_variable: str | None = None # If set, use pre-computed grouping variable
entity: str | None = None
decile: int
quantiles: int = 10
Expand Down Expand Up @@ -68,16 +69,19 @@ def run(self):
baseline_income = baseline_data[self.income_variable]
reform_income = reform_data[self.income_variable]

# Calculate deciles based on baseline income
decile_series = (
pd.qcut(
baseline_income,
self.quantiles,
labels=False,
duplicates="drop",
# Calculate deciles: use pre-computed variable or qcut
if self.decile_variable:
decile_series = baseline_data[self.decile_variable]
else:
decile_series = (
pd.qcut(
baseline_income,
self.quantiles,
labels=False,
duplicates="drop",
)
+ 1
)
+ 1
)

# Calculate changes
absolute_change = reform_income - baseline_income
Expand Down
Loading