diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f7ff8b5..c5fa07ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 exit-event dynamics, R-package parity (PR-C2), and survey-design support remain follow-ups. Pure-Python validation covers the absorbing reduction, the re-entry mechanism, pre-trend placebos, non-negative weighting, stabilized-control admission, and DGP recovery. +- **Weighted multiple absorbed fixed effects (`absorb=[a, b, ...]`) now supported in + `DifferenceInDifferences` / `MultiPeriodDiD`.** The prior `ValueError` rejecting multi-absorb + with survey weights is lifted: the absorb path now uses the method of alternating projections + (`diff_diff.utils.demean_by_groups`), the exact weighted Frisch-Waugh-Lovell residualization for + N > 1 dimensions. New `demean_by_groups()` N-way helper; the two-way `within_transform()` now + delegates to it. Single-absorb and balanced-panel results are byte-stable (weighted + `within_transform` output is bit-identical; balanced multi-way matches the prior closed-form + demean to machine precision). - **`LPDiD` R-parity validation (absorbing).** `tests/test_methodology_lpdid.py` pins the estimator against the method authors' own R recipes (`danielegirardi/lpdid` event-study / reweight / premean / pooled `fixest::feols` specifications) with an `alexCardazzi/lpdid` @@ -174,6 +182,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 numerical, or public-API change. ### Fixed +- **Unbalanced-panel correctness: N > 1 absorbed fixed effects now use iterative alternating + projections instead of single-pass sequential demeaning.** Affected `DifferenceInDifferences` / + `MultiPeriodDiD` with `absorb=[a, b, ...]` and the shared unweighted two-way `within_transform` + (used by `TwoWayFixedEffects`, `SunAbraham`, `BaconDecomposition`). A single sequential demean + sweep is the exact Frisch-Waugh-Lovell residualization only when the fixed-effect subspaces are + orthogonal (balanced fully-crossed panels); on unbalanced panels it was a biased approximation + (coefficients off by ~1e-2 in tested cases). The within transformation now iterates to + convergence (`diff_diff.utils.demean_by_groups`), matching R `fixest` / `reghdfe` / `lfe`. + Balanced-panel and single-absorb results are unchanged to machine precision; the unweighted + two-way path now also emits the non-convergence `UserWarning` (previously only the weighted path + could). - **Structural (non-covariate) matrix inverses are now rank-guarded.** The internal design-Gram bread inversions in `ContinuousDiD` (ACRT-variance `Psi'WPsi`), `TwoStageDiD` (Stage-2 `X_2'WX_2`, both the analytical and multiplier-bootstrap surfaces), `SpilloverDiD` (Wave D diff --git a/TODO.md b/TODO.md index 9be6b18c..e3f5e5de 100644 --- a/TODO.md +++ b/TODO.md @@ -36,7 +36,6 @@ The `Origin` column (Actionable tables) and the `PR` column (Deferred tables) bo | `EfficientDiD` survey-weighted Silverman bandwidth in conditional Omega* — `_silverman_bandwidth()` uses unweighted mean/std; survey-weighted statistics better reflect the population distribution (second-order refinement). | `efficient_did_covariates.py` | — | Quick | Low | | Survey sandwich SE is not exactly invariant to zero-weight (subpopulation / padded) rows: `_compute_stratified_psu_meat`'s finite-sample correction counts zero-weight units as PSUs, so padding shifts the SE ~2e-4 relative. Point estimate is exactly invariant. Fix: count only positive-weight PSUs in the correction (cross-cutting across all survey-enabled estimators). | `survey.py` (`_compute_stratified_psu_meat`) | PR-B | Mid | Low | | `ImputationDiD` LOO conservative-variance refinement (BJS 2024 Supp. Appendix A.9) — a finite-sample improvement to the auxiliary-model residuals reducing overfit of `tau_tilde_g` to `epsilon`. Asymptotic Theorem-3 variance is implemented and matches R `didimputation` (which also omits LOO by default). | `imputation.py` | imputation-validation | Mid | Low | -| Multi-absorb weighted demeaning needs iterative alternating projections for `N > 1` absorbed FE with survey weights; unweighted multi-absorb also uses single-pass (exact only for balanced panels). | `estimators.py` | #218 | Heavy | Medium | | `TwoWayFixedEffects(vcov_type in {hc2, hc2_bm})` with replicate-weight designs raises `NotImplementedError` (`twfe.py:~233`). The replicate path re-demeans per replicate, which doesn't compose with the full-dummy HC2/HC2-BM build — a correct impl needs per-replicate full-dummy refit. Workaround: `hc1` for replicate-weight CR1. | `twfe.py::fit` | follow-up | Heavy | Low | | TWFE's HC2/HC2-BM inline full-dummy build (`twfe.py:280-315`) duplicates the dummy-construction logic in `DifferenceInDifferences(fixed_effects=...)` (`estimators.py:478-486`). Extract a shared helper, or delegate TWFE's HC2/HC2-BM path to DiD's `fixed_effects=` branch (with TWFE-specific cluster-default threading), to reduce drift risk on FE naming / survey behavior / result-surface conventions. Substantive refactor — touches both estimators. | `twfe.py::fit`, `estimators.py::DifferenceInDifferences.fit` | follow-up | Heavy | Low | | Decide whether to formally deprecate `CallawaySantAnna.cluster=X` in favor of `survey_design=SurveyDesign(psu=X)` (the bare-cluster path already synthesizes a minimal SurveyDesign). Two equivalent paths = redundant surface. Mirrors the question for ImputationDiD / EfficientDiD / TwoStageDiD. | `staggered.py` | follow-up | Mid | Low | @@ -223,7 +222,7 @@ and `TwoWayFixedEffects` accept `vcov_type ∈ {classical, hc1, hc2, hc2_bm, con (the validated set in `linalg.py::_VALID_VCOV_TYPES`); cluster-robust variance comes from `cluster=` alongside the heteroscedasticity kind (`hc1+cluster` ⇒ CR1 Liang-Zeger; `hc2_bm+cluster` ⇒ CR2 Bell-McCaffrey, including the weighted WLS-CR2 port; the N>1 -absorbed-FE + weights composition remains gated by the open multi-absorb row in Actionable); +absorbed-FE + weights composition is supported via iterative alternating-projection demeaning, #586); wild cluster bootstrap is the separate `inference="wild_bootstrap"` path. Threading `vcov_type` through the 8 standalone estimators is **complete** (Phase 1b); four (`CallawaySantAnna`, `TripleDifference`, `ImputationDiD`, `EfficientDiD`) are permanently diff --git a/diff_diff/estimators.py b/diff_diff/estimators.py index c6ae3b49..b48ddf64 100644 --- a/diff_diff/estimators.py +++ b/diff_diff/estimators.py @@ -28,7 +28,7 @@ from diff_diff.results import DiDResults, MultiPeriodDiDResults, PeriodEffect from diff_diff.utils import ( WildBootstrapResults, - demean_by_group, + demean_by_groups, fe_dummy_names, safe_inference, validate_binary, @@ -414,17 +414,9 @@ def fit( absorbed_vars = [] n_absorbed_effects = 0 - # Reject multi-absorb with survey weights (single-pass demeaning is - # not the correct weighted FWL projection for N > 1 dimensions). Only - # fires when absorb is still set — i.e., the auto-route above didn't - # consume it. - if absorb and len(absorb) > 1 and survey_weights is not None: - raise ValueError( - f"Multiple absorbed fixed effects (absorb={absorb}) with survey " - "weights is not supported. Single-pass sequential demeaning is not " - "the correct weighted FWL projection for multiple absorbed dimensions. " - "Use absorb with a single variable, or use fixed_effects= instead." - ) + # Weighted multiple absorbed FE is supported: the absorb path below uses + # iterative alternating projections (demean_by_groups), the exact weighted + # FWL projection for N > 1 dimensions on both balanced and unbalanced panels. # Validate vcov_type="conley" wire-up. DiD.fit() accepts `unit` # as a fit-time arg (NOT on __init__) because cluster/unit @@ -462,16 +454,18 @@ def fit( float ) * working_data[time].values.astype(float) vars_to_demean = [outcome, treatment, time, "_treat_time"] + (covariates or []) - for ab_var in absorb: - working_data, n_fe = demean_by_group( - working_data, - vars_to_demean, - ab_var, - inplace=True, - weights=survey_weights, - ) - n_absorbed_effects += n_fe - absorbed_vars.append(ab_var) + # Method of alternating projections: for N > 1 absorbed dimensions a + # single sequential sweep is only exact on balanced (orthogonal-FE) + # panels; demean_by_groups iterates to the exact (W)LS-FWL residual. + working_data, n_fe = demean_by_groups( + working_data, + vars_to_demean, + list(absorb), + inplace=True, + weights=survey_weights, + ) + n_absorbed_effects += n_fe + absorbed_vars = list(absorb) # Extract variables (may be demeaned if absorb was used) y = working_data[outcome].values.astype(float) @@ -644,8 +638,7 @@ def _refit_did_absorb(w_r): float ) vars_dm = [outcome, treatment, time, "_treat_time"] + (covariates or []) - for ab_var in _absorb_list: - wd, _ = demean_by_group(wd, vars_dm, ab_var, inplace=True, weights=w_nz) + wd, _ = demean_by_groups(wd, vars_dm, _absorb_list, inplace=True, weights=w_nz) y_r = wd[outcome].values.astype(float) d_r = wd[treatment].values.astype(float) t_r = wd[time].values.astype(float) @@ -1572,17 +1565,9 @@ def fit( # type: ignore[override] absorb = None n_absorbed_effects = 0 - # Reject multi-absorb with survey weights (single-pass demeaning is - # not the correct weighted FWL projection for N > 1 dimensions). - # Only fires when absorb is still set — i.e., the auto-route above - # didn't consume it. - if absorb and len(absorb) > 1 and survey_weights is not None: - raise ValueError( - f"Multiple absorbed fixed effects (absorb={absorb}) with survey " - "weights is not supported. Single-pass sequential demeaning is not " - "the correct weighted FWL projection for multiple absorbed dimensions. " - "Use absorb with a single variable, or use fixed_effects= instead." - ) + # Weighted multiple absorbed FE is supported: the absorb path below uses + # iterative alternating projections (demean_by_groups), the exact weighted + # FWL projection for N > 1 dimensions on both balanced and unbalanced panels. # MultiPeriodDiD is intrinsically a multi-period panel estimator; # Phase 2 panel block-decomposed Conley (matches R conleyreg) needs @@ -1622,15 +1607,16 @@ def fit( # type: ignore[override] + [f"_did_interact_{p}" for p in non_ref_periods] + (covariates or []) ) - for ab_var in absorb: - working_data, n_fe = demean_by_group( - working_data, - vars_to_demean, - ab_var, - inplace=True, - weights=survey_weights, - ) - n_absorbed_effects += n_fe + # Method of alternating projections (exact for unbalanced panels; a + # single sequential sweep is exact only on balanced orthogonal-FE panels). + working_data, n_fe = demean_by_groups( + working_data, + vars_to_demean, + list(absorb), + inplace=True, + weights=survey_weights, + ) + n_absorbed_effects += n_fe # Extract outcome and treatment (may be demeaned if absorb was used) y = working_data[outcome].values.astype(float) @@ -1854,8 +1840,7 @@ def _refit_mp_absorb(w_r): + [f"_did_interact_{p}" for p in non_ref_periods] + (covariates or []) ) - for ab_var_ in _absorb_list_mp: - wd, _ = demean_by_group(wd, vars_dm_, ab_var_, inplace=True, weights=w_nz) + wd, _ = demean_by_groups(wd, vars_dm_, _absorb_list_mp, inplace=True, weights=w_nz) y_r = wd[outcome].values.astype(float) d_r = wd["_did_treatment"].values.astype(float) X_r = np.column_stack([np.ones(len(y_r)), d_r]) diff --git a/diff_diff/utils.py b/diff_diff/utils.py index a7c84e75..0afddb71 100644 --- a/diff_diff/utils.py +++ b/diff_diff/utils.py @@ -2618,13 +2618,18 @@ def demean_by_group( groups = data[group_var].values w = np.asarray(weights, dtype=np.float64) # Cache weight sums per group (invariant across variables) - w_sum = pd.Series(w).groupby(groups).transform("sum") + w_sum = pd.Series(w).groupby(groups).transform("sum").values for var in variables: col_name = var if not suffix else f"{var}{suffix}" x = data[var].values.astype(np.float64) - wx = pd.Series(w * x).groupby(groups).transform("sum") - weighted_means = wx / w_sum - data[col_name] = x - weighted_means.values + wx = pd.Series(w * x).groupby(groups).transform("sum").values + # Guard zero-total-weight groups (survey subpopulation / zero-weight + # domain padding) so those rows pass through unchanged and stay inert + # in WLS. For positive-weight groups this is bit-identical to wx/w_sum. + weighted_means = np.divide( + wx, w_sum, out=np.zeros_like(wx, dtype=np.float64), where=w_sum > 0 + ) + data[col_name] = x - weighted_means else: # Cache the groupby object for efficiency grouper = data.groupby(group_var, sort=False) @@ -2636,6 +2641,161 @@ def demean_by_group( return data, n_effects +def demean_by_groups( + data: pd.DataFrame, + variables: List[str], + group_vars: List[str], + inplace: bool = False, + suffix: str = "", + weights: Optional[np.ndarray] = None, + max_iter: int = 100, + tol: float = 1e-10, +) -> Tuple[pd.DataFrame, int]: + """N-way within transformation via the method of alternating projections (MAP). + + Removes ``len(group_vars)`` absorbed fixed-effect dimensions by repeatedly + demeaning each variable by each group in ``group_vars`` order until the + iterate stops changing (``max|x - x_old| < tol``) or ``max_iter`` is reached. + This converges to the exact (W)LS Frisch-Waugh-Lovell residualization onto the + combined column space of all ``group_vars`` dummies, for both balanced and + unbalanced panels and for both unweighted and survey-weighted designs (it is + the algorithm used by R ``fixest`` / ``reghdfe`` / ``lfe``). + + Single-pass sequential demeaning (one sweep) is only the one-iteration + approximation of this projection; it is exact only when the FE subspaces are + orthogonal (balanced fully-crossed panels). For ``len(group_vars) == 1`` the + projection is exact in one pass, so this delegates to :func:`demean_by_group` + (byte-identical to the prior one-way behavior). + + Parameters + ---------- + data : pd.DataFrame + DataFrame containing the variables to demean. + variables : list of str + Column names to demean. + group_vars : list of str + Grouping (fixed-effect) columns to absorb. Must be non-empty. + inplace : bool, default False + If True, writes the demeaned values onto ``data`` (the caller must own it) + and returns it. If False, attaches them as a consolidated block via + ``pd.concat`` and returns a new frame (the input is not mutated). + suffix : str, default "" + Demeaned column naming. Empty overwrites the source columns; a non-empty + suffix writes to ``f"{var}{suffix}"`` columns (originals preserved). + weights : np.ndarray, optional + Observation weights. When provided, uses weighted group means + ``sum(w*x)/sum(w)`` per group and converges to the WLS-FWL residual. + max_iter : int, default 100 + Maximum number of alternating-projection iterations per variable. Emits a + single ``UserWarning`` per call listing any variable that fails to converge. + tol : float, default 1e-10 + Convergence tolerance on the max absolute change across the iterate. + + Returns + ------- + data : pd.DataFrame + DataFrame with demeaned variables. + n_effects : int + Number of absorbed fixed effects, ``sum_d (nunique_d - 1)`` over + ``group_vars`` (the standard DOF-accounting convention). + + Examples + -------- + >>> df, n_fe = demean_by_groups(df, ['y', 'x'], ['unit', 'period']) + """ + if not group_vars: + raise ValueError("demean_by_groups requires at least one grouping variable.") + + # One dimension: the within projection is exact in a single pass. Delegate so + # the one-way callers stay byte-identical to demean_by_group. + if len(group_vars) == 1: + return demean_by_group( + data, + variables, + group_vars[0], + inplace=inplace, + suffix=suffix, + weights=weights, + ) + + # N >= 2: method of alternating projections. Per-variable independent + # convergence (outer loop = variable, inner = iterations) — this mirrors + # within_transform's weighted loop exactly so that the two-way weighted case + # (group_vars == [unit, time]) is bit-identical. + n_effects = sum(int(data[g].nunique()) - 1 for g in group_vars) + target_cols = [var if not suffix else f"{var}{suffix}" for var in variables] + group_arrays = [data[g].values for g in group_vars] + demeaned_values: List[np.ndarray] = [] + non_converged_vars: List[str] = [] + + if weights is not None: + w = np.asarray(weights, dtype=np.float64) + # Cache per-group weight sums once (invariant across variables/iterations). + w_sums = [pd.Series(w).groupby(g).transform("sum").values for g in group_arrays] + + def _weighted_group_demean(x, groups, w, w_sum): + wx_sum = pd.Series(w * x).groupby(groups).transform("sum").values + # Guard zero-total-weight groups (survey subpopulation / zero-weight + # domain padding): leave such rows unchanged (mean 0) so they remain + # inert in the downstream WLS instead of poisoning the design with + # NaN/Inf. For positive-weight groups this is bit-identical to wx/w_sum. + means = np.divide( + wx_sum, w_sum, out=np.zeros_like(wx_sum, dtype=np.float64), where=w_sum > 0 + ) + return x - means + + for var in variables: + x = data[var].values.astype(np.float64) + converged = False + for _iter in range(max_iter): + x_old = x.copy() + for groups, w_sum in zip(group_arrays, w_sums): + x = _weighted_group_demean(x, groups, w, w_sum) + if np.max(np.abs(x - x_old)) < tol: + converged = True + break + if not converged: + non_converged_vars.append(var) + demeaned_values.append(x) + else: + for var in variables: + x = data[var].values.astype(np.float64) + converged = False + for _iter in range(max_iter): + x_old = x.copy() + for groups in group_arrays: + x = x - pd.Series(x).groupby(groups).transform("mean").values + if np.max(np.abs(x - x_old)) < tol: + converged = True + break + if not converged: + non_converged_vars.append(var) + demeaned_values.append(x) + + if non_converged_vars: + warn_if_not_converged( + False, + f"demean_by_groups alternating projection (variables: {non_converged_vars})", + max_iter, + tol, + ) + + if inplace: + for col, vals in zip(target_cols, demeaned_values): + data[col] = vals + return data, n_effects + + # Non-inplace: attach the demeaned columns as a single consolidated block via + # pd.concat (no defensive copy — the demean above is read-only). Mirrors the + # within_transform attach contract: a colliding target name (suffix="" or a + # re-demean) is dropped first so concat replaces rather than duplicates. + new_block = pd.DataFrame(dict(zip(target_cols, demeaned_values)), index=data.index) + collisions = [c for c in target_cols if c in data.columns] + if collisions: + data = data.drop(columns=collisions) + return pd.concat([data, new_block], axis=1), n_effects + + def within_transform( data: pd.DataFrame, variables: List[str], @@ -2650,8 +2810,14 @@ def within_transform( """ Apply two-way within transformation to remove unit and time fixed effects. - Computes: y_it - y_i. - y_.t + y_.. for each variable. - When weights are provided, uses weighted group means at each step. + Computed via the method of alternating projections (a thin two-way wrapper + over :func:`demean_by_groups` with ``group_vars=[unit, time]``): each variable + is demeaned by unit, then by time, iterated until convergence. This is the + exact (weighted) Frisch-Waugh-Lovell residualization for both balanced and + unbalanced panels. The closed-form additive expression ``y_it - y_i. - y_.t + + y_..`` is the balanced fully-crossed special case (it equals the converged + iterate only when the unit and time FE subspaces are orthogonal). When weights + are provided, weighted group means are used at each step. This is the standard fixed effects transformation for panel data that removes both unit-specific and time-specific effects. @@ -2681,13 +2847,13 @@ def within_transform( weights : np.ndarray, optional Observation weights for weighted group means. max_iter : int, default 100 - Maximum number of alternating-projection iterations. Used only when - ``weights`` is not ``None``; the unweighted path is a single pass and - ignores this argument. Emits a ``UserWarning`` per call when any - variable fails to converge within this budget. + Maximum number of alternating-projection iterations. Applies to BOTH the + weighted and unweighted paths (both now iterate via the method of + alternating projections, exact for unbalanced panels). Emits a + ``UserWarning`` per call when any variable fails to converge within this + budget. Balanced panels converge in ~2 iterations. tol : float, default 1e-8 Convergence tolerance on the max absolute change across the iterate. - Used only when ``weights`` is not ``None``. Returns ------- @@ -2706,81 +2872,21 @@ def within_transform( >>> df = within_transform(df, ['y', 'x'], 'unit_id', 'year') >>> # df now has 'y_demeaned' and 'x_demeaned' columns """ - # Column naming (``suffix``) is independent of how the demeaned columns are - # attached (``inplace``): an empty suffix targets the source column (overwrite), - # a non-empty suffix a new ``f"{var}{suffix}"`` column. The demean below only - # reads ``data``, so no defensive copy is taken up front. - target_cols = [var if not suffix else f"{var}{suffix}" for var in variables] - demeaned_values: List[np.ndarray] = [] - - if weights is not None: - # Weighted within-transformation via iterative alternating projections - w = np.asarray(weights, dtype=np.float64) - unit_groups = data[unit].values - time_groups = data[time].values - - # Cache weight sums per group (invariant across variables) - unit_w_sum = pd.Series(w).groupby(unit_groups).transform("sum").values - time_w_sum = pd.Series(w).groupby(time_groups).transform("sum").values - - def _weighted_group_demean(x, groups, w, w_sum): - wx_sum = pd.Series(w * x).groupby(groups).transform("sum").values - return x - wx_sum / w_sum - - non_converged_vars: List[str] = [] - for var in variables: - x = data[var].values.astype(np.float64) - converged = False - for _iter in range(max_iter): - x_old = x.copy() - x = _weighted_group_demean(x, unit_groups, w, unit_w_sum) - x = _weighted_group_demean(x, time_groups, w, time_w_sum) - if np.max(np.abs(x - x_old)) < tol: - converged = True - break - if not converged: - non_converged_vars.append(var) - demeaned_values.append(x) - if non_converged_vars: - warn_if_not_converged( - False, - f"within_transform weighted demean (variables: {non_converged_vars})", - max_iter, - tol, - ) - else: - # Cache groupby objects for efficiency - unit_grouper = data.groupby(unit, sort=False) - time_grouper = data.groupby(time, sort=False) - - for var in variables: - unit_means = unit_grouper[var].transform("mean") - time_means = time_grouper[var].transform("mean") - grand_mean = data[var].mean() - demeaned_values.append((data[var] - unit_means - time_means + grand_mean).values) - - if inplace: - # Write onto the passed frame (the caller must own it); an existing - # same-named column is overwritten. Used for the in-place overwrite - # callers (small column sets); large suffixed sets take the concat path - # below to avoid per-column block fragmentation. - for col, vals in zip(target_cols, demeaned_values): - data[col] = vals - return data - - # Default (non-inplace): attach the demeaned columns as a single consolidated - # block via ``pd.concat``. No defensive copy of ``data`` is taken — the demean - # above is read-only and concat does not mutate its inputs, so the caller's - # frame is preserved (and shared rather than copied under copy-on-write). This - # both drops the redundant copy the old code took before the concat and avoids - # the ``DataFrame is highly fragmented`` warning of N per-column inserts. - new_block = pd.DataFrame(dict(zip(target_cols, demeaned_values)), index=data.index) - # Honor the overwrite contract: if a target name already exists (``suffix=""``, - # or re-demeaning a frame that already carries the suffix), drop it first so the - # concat replaces it instead of producing a duplicate label. ``drop`` returns a - # new frame (the input is not mutated). The common case — fresh suffixed targets - # — has no collision and skips straight to the concat. - collisions = [c for c in target_cols if c in data.columns] - if collisions: - data = data.drop(columns=collisions) - return pd.concat([data, new_block], axis=1) + # Two-way (unit + time) within transformation is the N-way method of + # alternating projections specialized to two FE dimensions. Delegate to the + # shared engine so there is one MAP implementation. The weighted path is + # bit-identical to the previous inline loop (same per-variable convergence, + # unit-then-time order, and forwarded tol=1e-8); the unweighted path now also + # iterates (MAP) instead of the closed-form additive demean, which was exact + # only for balanced panels. ``tol`` is forwarded explicitly (within_transform's + # default is 1e-8, not demean_by_groups' 1e-10) to preserve weighted goldens. + return demean_by_groups( + data, + variables, + [unit, time], + inplace=inplace, + suffix=suffix, + weights=weights, + max_iter=max_iter, + tol=tol, + )[0] diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 97245e5a..64b1b01b 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -347,6 +347,13 @@ This matches the behavior of R's `fixest::feols()` with absorbed FE. - **Note (scale invariance):** shared `diff_diff/linalg.py` behavior — rank detection re-checks on column-equilibrated columns and the solve equilibrates/unscales, so detection and fit are invariant to per-column scaling. For a well-scaled collinear design the dropped column is unchanged; a scale-induced under-count adopts the scale-corrected equilibrated selection (which may differ from the raw choice but retains an identified subset). See the CallawaySantAnna rank-deficiency Note. - **Note (covariate-name collision guard):** a covariate named `const`, `ATT`, `_treatment_post`, or a unit/time fixed-effect dummy name — or a duplicate covariate name — raises `ValueError` on both variance paths (would otherwise silently overwrite a structural coefficient on the full-dummy HC2/HC2-BM path). See the DifferenceInDifferences "covariate-name collision guard" Note. - Unbalanced panels handled via proper demeaning + - **Note (iterative demeaning):** the two-way within transformation + (`diff_diff.utils.within_transform`) uses the method of alternating projections + (iteratively demean by unit, then time, until convergence) for BOTH the weighted + and unweighted paths. This is exact on unbalanced panels. The unweighted path + previously used the closed-form additive demean `y - ȳ_i - ȳ_t + ȳ`, which is + exact only for balanced fully-crossed panels; on unbalanced panels it was a + biased approximation. Balanced-panel results are unchanged to machine precision. - Multi-period `time` parameter: only binary (0/1) post indicator is recommended; multi-period values produce `treated × period_number` rather than `treated × post_indicator`. A `UserWarning` is emitted when `time` has >2 unique values, advising users to create a binary post column. @@ -4260,23 +4267,30 @@ unequal selection probabilities). ### Absorbed Fixed Effects with Survey Weights -- **Note:** When `absorb` is used with a single variable in DiD/MultiPeriodDiD, - all regressors (treatment, time, interactions, covariates) are within-transformed - alongside the outcome per the FWL theorem. Regressors collinear with - the absorbed FE (e.g., treatment after absorbing unit FE) are dropped - via rank-deficiency handling. Multiple absorbed variables with survey weights - are rejected (single-pass sequential demeaning is not the correct weighted - FWL projection for N > 1 dimensions; iterative alternating projections are - needed but not yet implemented). -- **Note:** The shared weighted within-transformation path - (`diff_diff.utils.within_transform`, hit whenever `weights is not None`) emits - a `UserWarning` per call when any transformed variable exits the - alternating-projection loop without reaching `tol` within `max_iter`. - Defaults: `max_iter=100`, `tol=1e-8`. This signal applies uniformly across - TwoWayFixedEffects, SunAbraham, BaconDecomposition, and WooldridgeDiD whenever - they route through this helper (survey-weighted or otherwise). Silent return - of the current iterate was classified as a silent failure under the Phase 2 - audit and replaced with this explicit signal. +- **Note:** When `absorb` is used in DiD/MultiPeriodDiD, all regressors + (treatment, time, interactions, covariates) are within-transformed alongside + the outcome per the FWL theorem. Regressors collinear with the absorbed FE + (e.g., treatment after absorbing unit FE) are dropped via rank-deficiency + handling. Multiple absorbed variables (weighted or unweighted) are + within-transformed via the **method of alternating projections** + (`diff_diff.utils.demean_by_groups`): each variable is demeaned by each FE + dimension in turn until the iterate converges. This is the exact (weighted) + FWL residualization onto the combined column space of all absorbed dummies for + both balanced and unbalanced panels, matching R `fixest` / `reghdfe` / `lfe`. + A single sequential sweep (the prior behavior) is only the one-iteration + approximation and is exact only when the FE subspaces are orthogonal (balanced + fully-crossed panels); on unbalanced panels it was wrong. A single absorbed + variable converges in one pass (delegates to `demean_by_group`). +- **Note:** The shared within-transformation path (`diff_diff.utils.demean_by_groups`, + also reached via the two-way `within_transform`) emits a `UserWarning` per call + when any transformed variable exits the alternating-projection loop without + reaching `tol` within `max_iter`. This now covers the **unweighted** path as well + (previously the unweighted two-way transform used a closed-form additive demean + and could not warn). Defaults: `max_iter=100`; `tol=1e-8` via `within_transform` + (TwoWayFixedEffects, SunAbraham, BaconDecomposition, WooldridgeDiD) and `tol=1e-10` + for the DiD/MultiPeriodDiD `absorb=` path. Balanced panels converge in ~2 + iterations. Silent return of the current iterate was classified as a silent + failure under the Phase 2 audit and replaced with this explicit signal. ### Survey Degrees of Freedom diff --git a/tests/test_methodology_did.py b/tests/test_methodology_did.py index 7d91a6a0..21ae6098 100644 --- a/tests/test_methodology_did.py +++ b/tests/test_methodology_did.py @@ -21,8 +21,8 @@ import pandas as pd import pytest -from diff_diff import DifferenceInDifferences - +from diff_diff import DifferenceInDifferences, MultiPeriodDiD +from diff_diff.survey import SurveyDesign # ============================================================================= # Test Fixtures and Helpers @@ -1549,3 +1549,197 @@ def test_residuals_and_fitted_values(self): assert np.allclose(reconstructed, original), \ "Residuals + fitted should equal original outcome" + + +# ============================================================================= +# Multi-absorb (N>1 FE) iterative alternating-projection demeaning +# ============================================================================= + + +def _unbalanced_2fe_did_panel(seed=0): + """DiD panel with two cross-classified categorical FE (a, b), unbalanced so + the FE subspaces are non-orthogonal — single-pass demeaning is then wrong and + only iterative MAP matches the full-dummy fit. Cells have 0-3 obs each.""" + rng = np.random.default_rng(seed) + rows = [] + for a in range(6): + for b in range(5): + for _ in range(int(rng.integers(0, 4))): + rows.append((a, b)) + df = pd.DataFrame(rows, columns=["a", "b"]) + n = len(df) + df["treated"] = (df["a"] >= 3).astype(int) + df["post"] = rng.integers(0, 2, size=n) + df["outcome"] = ( + 2.0 * (df["treated"] * df["post"]) + + 0.4 * df["a"] + - 0.3 * df["b"] + + 1.1 * df["post"] + + rng.normal(scale=0.5, size=n) + ) + df["w"] = rng.uniform(0.5, 2.0, size=n) + return df + + +class TestMultiAbsorbIterativeDemean: + """N>1 absorbed FE must use iterative alternating projections (exact for + unbalanced panels), matching the full-dummy ``fixed_effects=`` fit. A single + sequential sweep (the old behavior) is off by ~1e-2 on these panels.""" + + @pytest.mark.parametrize("weighted", [False, True]) + def test_did_multi_absorb_matches_fixed_effects(self, weighted): + df = _unbalanced_2fe_did_panel(seed=3) + kw = {} + if weighted: + kw["survey_design"] = SurveyDesign(weights="w") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = DifferenceInDifferences().fit( + df, outcome="outcome", treatment="treated", time="post", + absorb=["a", "b"], **kw, + ) + res_fe = DifferenceInDifferences().fit( + df, outcome="outcome", treatment="treated", time="post", + fixed_effects=["a", "b"], **kw, + ) + # Iterative MAP matches the full-dummy ATT to solver precision; the old + # single-pass sweep would differ by ~1e-2 (well outside this tolerance). + assert np.isfinite(res_abs.att) + assert abs(res_abs.att - res_fe.att) < 1e-8 + + def test_did_collinear_regressors_dropped_to_nan(self): + """Regressors fully collinear with the absorbed FE (the treatment-group + indicator, constant within 'a') must report NaN coefficients (dropped), + not spurious near-zero values from a residual that survived demeaning.""" + df = _unbalanced_2fe_did_panel(seed=4) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res = DifferenceInDifferences().fit( + df, outcome="outcome", treatment="treated", time="post", + absorb=["a", "b"], + ) + # 'treated' is constant within 'a' (treated = a>=3) -> absorbed -> NaN coef. + assert "treated" in res.coefficients + assert np.isnan(res.coefficients["treated"]) + # The interaction (the ATT) survives and is finite. + assert np.isfinite(res.att) + + @pytest.mark.parametrize("weighted", [False, True]) + def test_mpd_multi_absorb_matches_fixed_effects(self, weighted): + """MultiPeriodDiD with two non-structural cross-classified FE (a, b). + (absorb=['unit','period'] is intentionally NOT used: period is structural + in the MPD design, so absorbing it is a different specification.)""" + rng = np.random.default_rng(5) + rows = [] + for a in range(6): + for b in range(5): + for _ in range(int(rng.integers(0, 4))): + rows.append((a, b)) + df = pd.DataFrame(rows, columns=["a", "b"]) + n = len(df) + df["treated"] = ((df["a"] >= 3) & (rng.integers(0, 2, size=n) == 1)).astype(int) + df["time"] = rng.integers(0, 3, size=n) + df["outcome"] = ( + 1.5 * df["treated"] + 0.4 * df["a"] - 0.3 * df["b"] + + 0.5 * df["time"] + rng.normal(scale=0.5, size=n) + ) + df["w"] = rng.uniform(0.5, 2.0, size=n) + kw = {} + if weighted: + kw["survey_design"] = SurveyDesign(weights="w") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = MultiPeriodDiD().fit( + df, outcome="outcome", treatment="treated", time="time", + post_periods=[2], absorb=["a", "b"], **kw, + ) + res_fe = MultiPeriodDiD().fit( + df, outcome="outcome", treatment="treated", time="time", + post_periods=[2], fixed_effects=["a", "b"], **kw, + ) + a_eff = {p: pe.effect for p, pe in res_abs.period_effects.items()} + f_eff = {p: pe.effect for p, pe in res_fe.period_effects.items()} + compared = [ + (a_eff[p], f_eff[p]) + for p in a_eff + if p in f_eff and np.isfinite(a_eff[p]) and np.isfinite(f_eff[p]) + ] + assert compared, "no finite period effects to compare" + for ae, fe in compared: + assert abs(ae - fe) < 1e-8 + + def test_did_multi_absorb_zero_total_weight_group_is_inert(self): + """Survey subpopulation / zero-weight domain padding: an absorbed category + that is entirely zero-weight must not break the weighted MAP (no NaN/Inf + from 0/0 division). Those rows stay inert in WLS, so the ATT equals the + full-dummy fit on the positive-weight sample.""" + rng = np.random.default_rng(7) + rows = [] + for a in range(5): + for b in range(4): + for _ in range(2): + rows.append((a, b)) + df = pd.DataFrame(rows, columns=["a", "b"]) + n = len(df) + df["treated"] = (df["a"] >= 3).astype(int) + df["post"] = rng.integers(0, 2, size=n) + df["outcome"] = ( + 2.0 * df["treated"] * df["post"] + 0.3 * df["a"] - 0.2 * df["b"] + + rng.normal(size=n) + ) + df["w"] = rng.uniform(0.5, 2.0, size=n) + df.loc[df["a"] == 0, "w"] = 0.0 # entire absorbed category a==0 is zero-weight + sd = SurveyDesign(weights="w") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = DifferenceInDifferences().fit( + df, outcome="outcome", treatment="treated", time="post", + absorb=["a", "b"], survey_design=sd, + ) + res_fe = DifferenceInDifferences().fit( + df, outcome="outcome", treatment="treated", time="post", + fixed_effects=["a", "b"], survey_design=sd, + ) + assert np.isfinite(res_abs.att) # no NaN/Inf from the zero-weight group + assert abs(res_abs.att - res_fe.att) < 1e-8 + + def test_mpd_multi_absorb_zero_total_weight_group_is_inert(self): + """MultiPeriodDiD analogue of the zero-total-weight inertness guard.""" + rng = np.random.default_rng(8) + rows = [] + for a in range(5): + for b in range(4): + for _ in range(2): + rows.append((a, b)) + df = pd.DataFrame(rows, columns=["a", "b"]) + n = len(df) + df["treated"] = ((df["a"] >= 3) & (rng.integers(0, 2, size=n) == 1)).astype(int) + df["time"] = rng.integers(0, 3, size=n) + df["outcome"] = ( + 1.5 * df["treated"] + 0.3 * df["a"] - 0.2 * df["b"] + + 0.5 * df["time"] + rng.normal(size=n) + ) + df["w"] = rng.uniform(0.5, 2.0, size=n) + df.loc[df["a"] == 0, "w"] = 0.0 + sd = SurveyDesign(weights="w") + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = MultiPeriodDiD().fit( + df, outcome="outcome", treatment="treated", time="time", + post_periods=[2], absorb=["a", "b"], survey_design=sd, + ) + res_fe = MultiPeriodDiD().fit( + df, outcome="outcome", treatment="treated", time="time", + post_periods=[2], fixed_effects=["a", "b"], survey_design=sd, + ) + a_eff = {p: pe.effect for p, pe in res_abs.period_effects.items()} + f_eff = {p: pe.effect for p, pe in res_fe.period_effects.items()} + compared = [ + (a_eff[p], f_eff[p]) + for p in a_eff + if p in f_eff and np.isfinite(f_eff[p]) + ] + assert compared, "no period effects to compare" + for ae, fe in compared: + assert np.isfinite(ae) # no NaN/Inf leaked from the zero-weight group + assert abs(ae - fe) < 1e-8 diff --git a/tests/test_survey.py b/tests/test_survey.py index b9a8b536..02b844d6 100644 --- a/tests/test_survey.py +++ b/tests/test_survey.py @@ -2644,8 +2644,16 @@ def test_repeated_fit_fresh_psu(self): # Original survey_design must be immutable assert lr.survey_design.psu is None - def test_multi_absorb_survey_rejected_did(self): - """DiD with multi-absorb + survey weights raises ValueError.""" + def test_multi_absorb_survey_now_supported_did(self): + """DiD with multi-absorb + survey weights is now supported (iterative MAP). + + Previously rejected. The absorb path uses the method of alternating + projections, the exact weighted FWL projection for N>1 dimensions, so the + ATT matches the equivalent ``fixed_effects=[a, b]`` full-dummy fit. (This + panel is balanced in a×b + uniform weights, so it verifies the lifted guard + and weighted equivalence; the unbalanced strong discriminator vs single-pass + lives in test_methodology_did.py::TestMultiAbsorbIterativeDemean.) + """ np.random.seed(42) n = 40 df = pd.DataFrame( @@ -2659,9 +2667,9 @@ def test_multi_absorb_survey_rejected_did(self): } ) sd = SurveyDesign(weights="w", weight_type="pweight") - did = DifferenceInDifferences() - with pytest.raises(ValueError, match="Multiple absorbed fixed effects"): - did.fit( + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = DifferenceInDifferences().fit( df, outcome="outcome", treatment="treated", @@ -2669,9 +2677,19 @@ def test_multi_absorb_survey_rejected_did(self): absorb=["a", "b"], survey_design=sd, ) + res_fe = DifferenceInDifferences().fit( + df, + outcome="outcome", + treatment="treated", + time="post", + fixed_effects=["a", "b"], + survey_design=sd, + ) + assert np.isfinite(res_abs.att) + assert abs(res_abs.att - res_fe.att) < 1e-8 - def test_multi_absorb_survey_rejected_multiperiod(self): - """MultiPeriodDiD with multi-absorb + survey weights raises ValueError.""" + def test_multi_absorb_survey_now_supported_multiperiod(self): + """MultiPeriodDiD with multi-absorb + survey weights is now supported.""" np.random.seed(42) n = 60 df = pd.DataFrame( @@ -2685,9 +2703,9 @@ def test_multi_absorb_survey_rejected_multiperiod(self): } ) sd = SurveyDesign(weights="w", weight_type="pweight") - mpd = MultiPeriodDiD() - with pytest.raises(ValueError, match="Multiple absorbed fixed effects"): - mpd.fit( + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + res_abs = MultiPeriodDiD().fit( df, outcome="outcome", treatment="treated", @@ -2696,6 +2714,25 @@ def test_multi_absorb_survey_rejected_multiperiod(self): absorb=["a", "b"], survey_design=sd, ) + res_fe = MultiPeriodDiD().fit( + df, + outcome="outcome", + treatment="treated", + time="time", + post_periods=[2], + fixed_effects=["a", "b"], + survey_design=sd, + ) + a_eff = {p: pe.effect for p, pe in res_abs.period_effects.items()} + f_eff = {p: pe.effect for p, pe in res_fe.period_effects.items()} + compared = [ + (a_eff[p], f_eff[p]) + for p in a_eff + if p in f_eff and np.isfinite(a_eff[p]) and np.isfinite(f_eff[p]) + ] + assert compared, "no finite period effects to compare" + for ae, fe in compared: + assert abs(ae - fe) < 1e-8 def test_single_absorb_survey_allowed(self): """Single-absorb with survey weights should still work (regression guard).""" diff --git a/tests/test_utils.py b/tests/test_utils.py index 97f36d32..3edfa69d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -26,12 +26,15 @@ compute_robust_se, compute_sdid_estimator, compute_time_weights, + demean_by_group, + demean_by_groups, equivalence_test_trends, fe_dummy_names, safe_inference, validate_binary, validate_covariate_names, validate_design_term_names, + within_transform, ) # ============================================================================= @@ -1386,3 +1389,210 @@ def test_matches_get_dummies(self, col): assert fe_dummy_names(col, "fe") == list( pd.get_dummies(col, prefix="fe", drop_first=True).columns ) + + +# ============================================================================= +# demean_by_groups — N-way method of alternating projections (MAP) +# ============================================================================= + + +def _unbalanced_2way_panel(seed=0, drop=0.30): + """Unbalanced (non-orthogonal) 2-way panel: some unit-period cells dropped.""" + rng = np.random.default_rng(seed) + rows = [ + (u, t) + for u in range(8) + for t in range(6) + if rng.random() >= drop + ] + df = pd.DataFrame(rows, columns=["unit", "period"]) + n = len(df) + df["x1"] = rng.normal(size=n) + df["x2"] = rng.normal(size=n) + df["y"] = 2.0 * df["x1"] - 1.5 * df["x2"] + rng.normal(size=n) + df["w"] = rng.uniform(0.5, 2.0, size=n) + return df + + +def _full_dummy_slopes(df, group_cols, xcols, weights=None): + """Ground truth: (W)OLS of y on [1, xcols, dummies(group_cols)]; return x slopes.""" + cols = [np.ones(len(df))] + [df[c].values.astype(float) for c in xcols] + for g in group_cols: + d = pd.get_dummies(df[g], prefix=g, drop_first=True).values.astype(float) + cols.extend(d[:, j] for j in range(d.shape[1])) + X = np.column_stack(cols) + y = df["y"].values.astype(float) + if weights is None: + beta, *_ = np.linalg.lstsq(X, y, rcond=None) + else: + sw = np.sqrt(np.asarray(weights, dtype=float)) + beta, *_ = np.linalg.lstsq(X * sw[:, None], y * sw, rcond=None) + return beta[1 : 1 + len(xcols)] + + +def _fwl_slopes(demeaned, xcols, weights=None): + """OLS of demeaned y on demeaned xcols (FWL residualization).""" + X = np.column_stack([demeaned[c] for c in xcols]) + y = demeaned["y"] + if weights is None: + beta, *_ = np.linalg.lstsq(X, y, rcond=None) + else: + sw = np.sqrt(np.asarray(weights, dtype=float)) + beta, *_ = np.linalg.lstsq(X * sw[:, None], y * sw, rcond=None) + return beta + + +def _frozen_old_within_transform_weighted( + df, variables, unit, time, weights, max_iter=100, tol=1e-8 +): + """Byte-for-byte copy of the PRE-REFACTOR within_transform weighted loop. + + Used as the byte-identity guard: demean_by_groups([unit, time], weighted) and + the refactored within_transform must reproduce this exactly. + """ + w = np.asarray(weights, dtype=np.float64) + unit_groups = df[unit].values + time_groups = df[time].values + unit_w_sum = pd.Series(w).groupby(unit_groups).transform("sum").values + time_w_sum = pd.Series(w).groupby(time_groups).transform("sum").values + + def _wgd(x, groups, w, w_sum): + wx_sum = pd.Series(w * x).groupby(groups).transform("sum").values + return x - wx_sum / w_sum + + out = {} + for var in variables: + x = df[var].values.astype(np.float64) + for _ in range(max_iter): + x_old = x.copy() + x = _wgd(x, unit_groups, w, unit_w_sum) + x = _wgd(x, time_groups, w, time_w_sum) + if np.max(np.abs(x - x_old)) < tol: + break + out[var] = x + return out + + +class TestDemeanByGroups: + @pytest.mark.parametrize("weighted", [False, True]) + def test_len1_byte_identical_to_demean_by_group(self, weighted): + """One grouping var must delegate to demean_by_group (byte-identical).""" + df = _unbalanced_2way_panel(seed=1) + w = df["w"].values if weighted else None + out_groups, n_g = demean_by_groups( + df, ["y", "x1"], ["unit"], suffix="_dm", weights=w + ) + out_single, n_s = demean_by_group( + df, ["y", "x1"], "unit", suffix="_dm", weights=w + ) + assert n_g == n_s + np.testing.assert_array_equal( + out_groups["y_dm"].values, out_single["y_dm"].values + ) + np.testing.assert_array_equal( + out_groups["x1_dm"].values, out_single["x1_dm"].values + ) + + def test_n_effects_is_sum_nunique_minus_one(self): + df = _unbalanced_2way_panel(seed=2) + _, n_eff = demean_by_groups(df, ["y"], ["unit", "period"], suffix="_dm") + expected = (df["unit"].nunique() - 1) + (df["period"].nunique() - 1) + assert n_eff == expected + + @pytest.mark.parametrize("weighted", [False, True]) + def test_unbalanced_2way_matches_full_dummy_ols(self, weighted): + """The core correctness claim: MAP residualization == full-dummy (W)OLS.""" + df = _unbalanced_2way_panel(seed=3) + w = df["w"].values if weighted else None + out, _ = demean_by_groups( + df, ["y", "x1", "x2"], ["unit", "period"], suffix="_dm", weights=w + ) + demeaned = {c: out[f"{c}_dm"].values for c in ("y", "x1", "x2")} + map_slopes = _fwl_slopes(demeaned, ["x1", "x2"], weights=w) + gt = _full_dummy_slopes(df, ["unit", "period"], ["x1", "x2"], weights=w) + np.testing.assert_allclose(map_slopes, gt, atol=1e-9, rtol=0) + + def test_n3_absorb_matches_full_dummy_ols(self): + """Generalizes to 3 absorbed dimensions.""" + rng = np.random.default_rng(4) + rows = [ + (u, t, (u + t) % 4) + for u in range(10) + for t in range(6) + if rng.random() >= 0.4 + ] + df = pd.DataFrame(rows, columns=["unit", "period", "firm"]) + n = len(df) + df["x1"] = rng.normal(size=n) + df["y"] = 2.0 * df["x1"] + rng.normal(size=n) + out, _ = demean_by_groups( + df, ["y", "x1"], ["unit", "period", "firm"], suffix="_dm" + ) + demeaned = {c: out[f"{c}_dm"].values for c in ("y", "x1")} + map_slope = _fwl_slopes(demeaned, ["x1"])[0] + gt = _full_dummy_slopes(df, ["unit", "period", "firm"], ["x1"])[0] + np.testing.assert_allclose(map_slope, gt, atol=1e-9, rtol=0) + + @pytest.mark.parametrize("weighted", [False, True]) + def test_result_orthogonal_to_fe_spans(self, weighted): + """Demeaned variables must have ~0 (weighted) group means in every FE dim.""" + df = _unbalanced_2way_panel(seed=5) + w = df["w"].values if weighted else None + out, _ = demean_by_groups( + df, ["y"], ["unit", "period"], suffix="_dm", weights=w, tol=1e-12 + ) + ydm = out["y_dm"].values + for g in ("unit", "period"): + if weighted: + num = pd.Series(w * ydm).groupby(df[g].values).transform("sum").values + den = pd.Series(w).groupby(df[g].values).transform("sum").values + means = num / den + else: + means = pd.Series(ydm).groupby(df[g].values).transform("mean").values + assert np.max(np.abs(means)) < 1e-9 + + def test_weighted_byte_identity_vs_frozen_within_transform(self): + """demean_by_groups([unit, time], weighted) reproduces the old loop exactly.""" + df = _unbalanced_2way_panel(seed=6) + w = df["w"].values + frozen = _frozen_old_within_transform_weighted( + df, ["y", "x1", "x2"], "unit", "period", w + ) + out, _ = demean_by_groups( + df, ["y", "x1", "x2"], ["unit", "period"], suffix="_dm", weights=w, tol=1e-8 + ) + for var in ("y", "x1", "x2"): + np.testing.assert_array_equal(out[f"{var}_dm"].values, frozen[var]) + + def test_within_transform_weighted_unchanged(self): + """The refactored within_transform weighted path is byte-identical to before.""" + df = _unbalanced_2way_panel(seed=7) + w = df["w"].values + frozen = _frozen_old_within_transform_weighted( + df, ["y", "x1"], "unit", "period", w + ) + out = within_transform(df, ["y", "x1"], "unit", "period", weights=w) + np.testing.assert_array_equal(out["y_demeaned"].values, frozen["y"]) + np.testing.assert_array_equal(out["x1_demeaned"].values, frozen["x1"]) + + def test_within_transform_unweighted_now_matches_full_dummy(self): + """Unweighted within_transform now uses MAP -> exact on unbalanced panels.""" + df = _unbalanced_2way_panel(seed=8) + out = within_transform(df, ["y", "x1", "x2"], "unit", "period") + demeaned = {c: out[f"{c}_demeaned"].values for c in ("y", "x1", "x2")} + map_slopes = _fwl_slopes(demeaned, ["x1", "x2"]) + gt = _full_dummy_slopes(df, ["unit", "period"], ["x1", "x2"]) + np.testing.assert_allclose(map_slopes, gt, atol=1e-9, rtol=0) + + def test_nonconvergence_emits_warning(self): + """A starved iteration budget on an unbalanced panel warns (not silent).""" + df = _unbalanced_2way_panel(seed=9) + with pytest.warns(UserWarning, match="did not converge"): + demean_by_groups( + df, ["y"], ["unit", "period"], suffix="_dm", max_iter=1, tol=1e-15 + ) + + def test_empty_group_vars_raises(self): + df = _unbalanced_2way_panel(seed=10) + with pytest.raises(ValueError, match="at least one grouping variable"): + demean_by_groups(df, ["y"], []) diff --git a/tests/test_within_transform.py b/tests/test_within_transform.py index 9cc2949b..2b65d3c1 100644 --- a/tests/test_within_transform.py +++ b/tests/test_within_transform.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd +import pytest from diff_diff.utils import within_transform @@ -45,8 +46,14 @@ def test_inplace_false_leaves_input_untouched(self): pd.testing.assert_frame_equal(df, orig) # default inplace=False: input not mutated assert out is not df assert "v0_demeaned" in out.columns and "v0" in out.columns - np.testing.assert_array_equal(out["v0_demeaned"].values, _ref_demean(df, "v0")) - np.testing.assert_array_equal(out["v1_demeaned"].values, _ref_demean(df, "v1")) + # The unweighted path now uses iterative MAP (exact for unbalanced panels); + # on this balanced panel it equals the closed-form additive demean to ~1 ULP. + np.testing.assert_allclose( + out["v0_demeaned"].values, _ref_demean(df, "v0"), rtol=1e-12, atol=1e-12 + ) + np.testing.assert_allclose( + out["v1_demeaned"].values, _ref_demean(df, "v1"), rtol=1e-12, atol=1e-12 + ) def test_inplace_true_suffix_mutates_same_object_keeps_originals(self): df = _panel() @@ -54,14 +61,16 @@ def test_inplace_true_suffix_mutates_same_object_keeps_originals(self): out = within_transform(df, ["v0", "v1"], "unit", "time", inplace=True) assert out is df # same object, mutated in place (no copy) np.testing.assert_array_equal(df["v0"].values, v0_orig) # original preserved - np.testing.assert_array_equal(df["v0_demeaned"].values, _ref_demean(df, "v0")) + np.testing.assert_allclose( + df["v0_demeaned"].values, _ref_demean(df, "v0"), rtol=1e-12, atol=1e-12 + ) def test_inplace_true_empty_suffix_overwrites_source(self): df = _panel() ref = _ref_demean(df, "v0") within_transform(df, ["v0"], "unit", "time", inplace=True, suffix="") assert "v0_demeaned" not in df.columns - np.testing.assert_array_equal(df["v0"].values, ref) + np.testing.assert_allclose(df["v0"].values, ref, rtol=1e-12, atol=1e-12) def test_redemean_existing_suffix_overwrites_no_duplicate(self): # The TWFE replicate scenario: a frame that already carries the suffix is @@ -84,7 +93,7 @@ def test_non_inplace_empty_suffix_overwrites_no_duplicate(self): out = within_transform(df, ["v0"], "unit", "time", suffix="") assert list(out.columns).count("v0") == 1 assert out["v0"].values.ndim == 1 - np.testing.assert_array_equal(out["v0"].values, ref) + np.testing.assert_allclose(out["v0"].values, ref, rtol=1e-12, atol=1e-12) np.testing.assert_array_equal(df["v0"].values, _panel()["v0"].values) # input intact def test_non_inplace_redemean_existing_suffix_no_duplicate(self): @@ -120,3 +129,34 @@ def test_many_columns_no_fragmentation_warning(self): out = within_transform(df, cols, "unit", "time") # default inplace=False -> concat assert all(f"{c}_demeaned" in out.columns for c in cols) assert out is not df + + +def _unbalanced_panel(seed=0, drop=0.3): + """Unbalanced two-way panel (some unit-time cells dropped -> non-orthogonal).""" + rng = np.random.default_rng(seed) + rows = [(u, t) for u in range(8) for t in range(6) if rng.random() >= drop] + df = pd.DataFrame(rows, columns=["unit", "time"]) + df["v0"] = rng.standard_normal(len(df)) + return df + + +class TestWithinTransformConvergence: + def test_unweighted_unbalanced_matches_full_dummy(self): + # The unweighted path now iterates (MAP), so it is exact on unbalanced + # panels. Anchor: FWL slope of v0 on a constant after demeaning must equal + # the unit+time full-dummy residual mean (both ~0 here); more strongly, the + # demeaned column is orthogonal to both FE spans. + df = _unbalanced_panel(seed=2) + out = within_transform(df, ["v0"], "unit", "time") + v = out["v0_demeaned"].values + for g in ("unit", "time"): + means = pd.Series(v).groupby(df[g].values).transform("mean").values + assert np.max(np.abs(means)) < 1e-8 + + def test_unweighted_nonconvergence_warns(self): + # A starved iteration budget on an unbalanced panel must emit the shared + # "did not converge" UserWarning (the unweighted path could not warn before + # the MAP refactor; silent return of the iterate is a silent failure). + df = _unbalanced_panel(seed=3) + with pytest.warns(UserWarning, match="did not converge"): + within_transform(df, ["v0"], "unit", "time", max_iter=1, tol=1e-15)