Source code for causalpy.pymc_experiments

#   Copyright 2024 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Quasi-Experiment classes for Bayesian causal inference"""

import warnings  # noqa: I001
from typing import Union

import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import xarray as xr
from patsy import build_design_matrices, dmatrices
from sklearn.linear_model import LinearRegression as sk_lin_reg
from matplotlib.lines import Line2D


from causalpy.data_validation import (
    PrePostFitDataValidator,
    DiDDataValidator,
    RDDataValidator,
    RegressionKinkDataValidator,
    PrePostNEGDDataValidator,
    IVDataValidator,
    PropensityDataValidator,
)
from causalpy.plot_utils import plot_xY
from causalpy.utils import round_num

LEGEND_FONT_SIZE = 12
az.style.use("arviz-darkgrid")


[docs] class ExperimentalDesign: """ Base class for other experiment types See subclasses for examples of most methods """ model = None expt_type = None
[docs] def __init__(self, model=None, **kwargs): if model is not None: self.model = model if self.model is None: raise ValueError("fitting_model not set or passed.")
@property def idata(self): """ Access to the models InferenceData object """ return self.model.idata
[docs] def print_coefficients(self, round_to=None) -> None: """ Prints the model coefficients :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. Example -------- >>> import causalpy as cp >>> df = cp.load_data("did") >>> seed = 42 >>> result = cp.pymc_experiments.DifferenceInDifferences( ... df, ... formula="y ~ 1 + group*post_treatment", ... time_variable_name="t", ... group_variable_name="group", ... model=cp.pymc_models.LinearRegression( ... sample_kwargs={ ... "draws": 2000, ... "random_seed": seed, ... "progressbar": False ... }), ... ) >>> result.print_coefficients(round_to=1) Model coefficients: Intercept 1, 94% HDI [1, 1] post_treatment[T.True] 1, 94% HDI [0.9, 1] group 0.2, 94% HDI [0.09, 0.2] group:post_treatment[T.True] 0.5, 94% HDI [0.4, 0.6] sigma 0.08, 94% HDI [0.07, 0.1] """ def print_row( max_label_length: int, name: str, coeff_samples: xr.DataArray, round_to: int ) -> None: """Print one row of the coefficient table""" formatted_name = f" {name: <{max_label_length}}" formatted_val = f"{round_num(coeff_samples.mean().data, round_to)}, 94% HDI [{round_num(coeff_samples.quantile(0.03).data, round_to)}, {round_num(coeff_samples.quantile(1-0.03).data, round_to)}]" # noqa: E501 print(f" {formatted_name} {formatted_val}") print("Model coefficients:") coeffs = az.extract(self.idata.posterior, var_names="beta") # Determine the width of the longest label max_label_length = max(len(name) for name in self.labels + ["sigma"]) for name in self.labels: coeff_samples = coeffs.sel(coeffs=name) print_row(max_label_length, name, coeff_samples, round_to) # Add coefficient for measurement std coeff_samples = az.extract(self.model.idata.posterior, var_names="sigma") name = "sigma" print_row(max_label_length, name, coeff_samples, round_to)
[docs] class PrePostFit(ExperimentalDesign, PrePostFitDataValidator): """ A class to analyse quasi-experiments where parameter estimation is based on just the pre-intervention data. :param data: A pandas dataframe :param treatment_time: The time when treatment occured, should be in reference to the data index :param validation_time: Optional time to split the data into training and validation data sets :param formula: A statistical model formula :param model: A PyMC model Example -------- >>> import causalpy as cp >>> sc = cp.load_data("sc") >>> treatment_time = 70 >>> seed = 42 >>> result = cp.pymc_experiments.PrePostFit( ... sc, ... treatment_time, ... formula="actual ~ 0 + a + g", ... model=cp.pymc_models.WeightedSumFitter( ... sample_kwargs={ ... "draws": 400, ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False ... } ... ), ... ) >>> result.summary(round_to=1) ==================================Pre-Post Fit================================== Formula: actual ~ 0 + a + g Pre-intervention Bayesian $R^2$: 0.9 (std = 0.01) Model coefficients: a 0.6, 94% HDI [0.6, 0.6] g 0.4, 94% HDI [0.4, 0.4] sigma 0.8, 94% HDI [0.6, 0.9] """
[docs] def __init__( self, data: pd.DataFrame, treatment_time: Union[int, float, pd.Timestamp], formula: str, validation_time=None, model=None, **kwargs, ) -> None: super().__init__(model=model, **kwargs) self._input_validation(data, treatment_time) self.treatment_time = treatment_time self.validation_time = validation_time # validate arguments if self.validation_time is not None: # check that validation time is less than treatment time if self.validation_time >= self.treatment_time: raise ValueError( "Validation time must be less than the treatment time." ) # set experiment type - usually done in subclasses self.expt_type = "Pre-Post Fit" # split data in to pre and post intervention if self.validation_time is None: self.datapre = data[data.index < self.treatment_time] self.datapost = data[data.index >= self.treatment_time] else: self.datapre = data[data.index < self.validation_time] self.datapost = data[data.index >= self.validation_time] self.formula = formula # set things up with pre-intervention data y, X = dmatrices(formula, self.datapre) self.outcome_variable_name = y.design_info.column_names[0] self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.pre_y, self.pre_X = np.asarray(y), np.asarray(X) # process post-intervention data (new_y, new_x) = build_design_matrices( [self._y_design_info, self._x_design_info], self.datapost ) self.post_X = np.asarray(new_x) self.post_y = np.asarray(new_y) # fit the model to the observed (pre-intervention) data COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.pre_X.shape[0])} self.model.fit(X=self.pre_X, y=self.pre_y, coords=COORDS) if self.validation_time is None: # We just have pre and post data, no validation data. So we can score the pre intervention data self.score = self.model.score(X=self.pre_X, y=self.pre_y) else: # Score on the training data - before the validation time self.datatrain = data[data.index < self.validation_time] y, X = dmatrices(formula, self.datatrain) self.score = self.model.score(X=X, y=y) # Score on the validation data - after the validation time but # before the treatment time self.datavalidate = data[ (data.index >= self.validation_time) & (data.index < self.treatment_time) ] y, X = dmatrices(formula, self.datavalidate) self.score_validation = self.model.score(X=X, y=y) # get the model predictions of the observed (pre-intervention) data self.pre_pred = self.model.predict(X=self.pre_X) # calculate the counterfactual self.post_pred = self.model.predict(X=self.post_X) # causal impact pre (ie the residuals of the model fit to observed) pre_data = xr.DataArray(self.pre_y[:, 0], dims=["obs_ind"]) self.pre_impact = ( pre_data - self.pre_pred["posterior_predictive"]["y_hat"] ).transpose(..., "obs_ind") # causal impact post (ie the residuals of the model fit to observed) post_data = xr.DataArray(self.post_y[:, 0], dims=["obs_ind"]) self.post_impact = ( post_data - self.post_pred["posterior_predictive"]["y_hat"] ).transpose(..., "obs_ind") # cumulative impact post self.post_impact_cumulative = self.post_impact.cumsum(dim="obs_ind")
[docs] def plot(self, counterfactual_label="Counterfactual", round_to=None, **kwargs): """ Plot the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = plt.subplots(3, 1, sharex=True, figsize=(7, 8)) # TOP PLOT -------------------------------------------------- # pre-intervention period h_line, h_patch = plot_xY( self.datapre.index, self.pre_pred["posterior_predictive"].mu, ax=ax[0], plot_hdi_kwargs={"color": "C0"}, ) handles = [(h_line, h_patch)] labels = ["Pre-intervention period"] (h,) = ax[0].plot(self.datapre.index, self.pre_y, "k.", label="Observations") handles.append(h) labels.append("Observations") # post intervention period h_line, h_patch = plot_xY( self.datapost.index, self.post_pred["posterior_predictive"].mu, ax=ax[0], plot_hdi_kwargs={"color": "C1"}, ) handles.append((h_line, h_patch)) labels.append(counterfactual_label) ax[0].plot(self.datapost.index, self.post_y, "k.") # Shaded causal effect h = ax[0].fill_between( self.datapost.index, y1=az.extract( self.post_pred, group="posterior_predictive", var_names="mu" ).mean("sample"), y2=np.squeeze(self.post_y), color="C0", alpha=0.25, ) handles.append(h) labels.append("Causal impact") # MIDDLE PLOT ----------------------------------------------- plot_xY( self.datapre.index, self.pre_impact, ax=ax[1], plot_hdi_kwargs={"color": "C0"}, ) plot_xY( self.datapost.index, self.post_impact, ax=ax[1], plot_hdi_kwargs={"color": "C1"}, ) ax[1].axhline(y=0, c="k") ax[1].fill_between( self.datapost.index, y1=self.post_impact.mean(["chain", "draw"]), color="C0", alpha=0.25, label="Causal impact", ) ax[1].set(ylabel="Causal Impact") # BOTTOM PLOT ----------------------------------------------- ax[2].set(ylabel="Cumulative Causal Impact") plot_xY( self.datapost.index, self.post_impact_cumulative, ax=ax[2], plot_hdi_kwargs={"color": "C1"}, ) ax[2].axhline(y=0, c="k") # Intervention line for i in [0, 1, 2]: ax[i].axvline( x=self.treatment_time, ls="--", # lw=3, color="k", ) if self.validation_time is not None: ax[i].axvline( x=self.validation_time, ls="--", # lw=3, color="k", ) ax[0].legend( handles=(h_tuple for h_tuple in handles), labels=labels, fontsize=LEGEND_FONT_SIZE, ) return fig, ax
[docs] def summary(self, round_to=None) -> None: """ Print text output summarising the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ print(f"{self.expt_type:=^80}") print(f"Formula: {self.formula}") # print goodness of fit scores if self.validation_time is None: print( f"Pre-intervention Bayesian $R^2$: {round_num(self.score.r2, round_to)} (std = {round_num(self.score.r2_std, round_to)})" ) else: print( f"Pre-intervention Bayesian $R^2$: {round_num(self.score.r2, round_to)} (std = {round_num(self.score.r2_std, round_to)})\n" f"Validation Bayesian $R^2$: {round_num(self.score_validation.r2, round_to)} (std = {round_num(self.score_validation.r2_std, round_to)})" ) # print coefficients self.print_coefficients(round_to)
[docs] class InterruptedTimeSeries(PrePostFit): """ A wrapper around PrePostFit class :param data: A pandas dataframe :param treatment_time: The time when treatment occured, should be in reference to the data index :param formula: A statistical model formula :param validation_time: Optional time to split the data into training and validation data sets :param model: A PyMC model Example -------- >>> import causalpy as cp >>> df = ( ... cp.load_data("its") ... .assign(date=lambda x: pd.to_datetime(x["date"])) ... .set_index("date") ... ) >>> treatment_time = pd.to_datetime("2017-01-01") >>> seed = 42 >>> result = cp.pymc_experiments.InterruptedTimeSeries( ... df, ... treatment_time, ... formula="y ~ 1 + t + C(month)", ... model=cp.pymc_models.LinearRegression( ... sample_kwargs={ ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... } ... ) ... ) """ expt_type = "Interrupted Time Series"
[docs] class SyntheticControl(PrePostFit): """A wrapper around the PrePostFit class :param data: A pandas dataframe :param treatment_time: The time when treatment occured, should be in reference to the data index :param formula: A statistical model formula :param validation_time: Optional time to split the data into training and validation data sets :param model: A PyMC model Example -------- >>> import causalpy as cp >>> df = cp.load_data("sc") >>> treatment_time = 70 >>> seed = 42 >>> result = cp.pymc_experiments.SyntheticControl( ... df, ... treatment_time, ... formula="actual ~ 0 + a + b + c + d + e + f + g", ... model=cp.pymc_models.WeightedSumFitter( ... sample_kwargs={ ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... } ... ), ... ) """ expt_type = "Synthetic Control"
[docs] def plot(self, plot_predictors=False, **kwargs): """Plot the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = super().plot(counterfactual_label="Synthetic control", **kwargs) if plot_predictors: # plot control units as well ax[0].plot(self.datapre.index, self.pre_X, "-", c=[0.8, 0.8, 0.8], zorder=1) ax[0].plot( self.datapost.index, self.post_X, "-", c=[0.8, 0.8, 0.8], zorder=1 ) return fig, ax
[docs] class DifferenceInDifferences(ExperimentalDesign, DiDDataValidator): """A class to analyse data from Difference in Difference settings. .. note:: There is no pre/post intervention data distinction for DiD, we fit all the data available. :param data: A pandas dataframe :param formula: A statistical model formula :param time_variable_name: Name of the data column for the time variable :param group_variable_name: Name of the data column for the group variable :param model: A PyMC model for difference in differences Example -------- >>> import causalpy as cp >>> df = cp.load_data("did") >>> seed = 42 >>> result = cp.pymc_experiments.DifferenceInDifferences( ... df, ... formula="y ~ 1 + group*post_treatment", ... time_variable_name="t", ... group_variable_name="group", ... model=cp.pymc_models.LinearRegression( ... sample_kwargs={ ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... } ... ) ... ) """
[docs] def __init__( self, data: pd.DataFrame, formula: str, time_variable_name: str, group_variable_name: str, model=None, **kwargs, ): super().__init__(model=model, **kwargs) self.data = data self.expt_type = "Difference in Differences" self.formula = formula self.time_variable_name = time_variable_name self.group_variable_name = group_variable_name self._input_validation() y, X = dmatrices(formula, self.data) self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.y, self.X = np.asarray(y), np.asarray(X) self.outcome_variable_name = y.design_info.column_names[0] COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.X.shape[0])} self.model.fit(X=self.X, y=self.y, coords=COORDS) # predicted outcome for control group self.x_pred_control = ( self.data # just the untreated group .query(f"{self.group_variable_name} == 0") # drop the outcome variable .drop(self.outcome_variable_name, axis=1) # We may have multiple units per time point, we only want one time point .groupby(self.time_variable_name) .first() .reset_index() ) assert not self.x_pred_control.empty (new_x,) = build_design_matrices([self._x_design_info], self.x_pred_control) self.y_pred_control = self.model.predict(np.asarray(new_x)) # predicted outcome for treatment group self.x_pred_treatment = ( self.data # just the treated group .query(f"{self.group_variable_name} == 1") # drop the outcome variable .drop(self.outcome_variable_name, axis=1) # We may have multiple units per time point, we only want one time point .groupby(self.time_variable_name) .first() .reset_index() ) assert not self.x_pred_treatment.empty (new_x,) = build_design_matrices([self._x_design_info], self.x_pred_treatment) self.y_pred_treatment = self.model.predict(np.asarray(new_x)) # predicted outcome for counterfactual. This is given by removing the influence # of the interaction term between the group and the post_treatment variable self.x_pred_counterfactual = ( self.data # just the treated group .query(f"{self.group_variable_name} == 1") # just the treatment period(s) .query("post_treatment == True") # drop the outcome variable .drop(self.outcome_variable_name, axis=1) # We may have multiple units per time point, we only want one time point .groupby(self.time_variable_name) .first() .reset_index() ) assert not self.x_pred_counterfactual.empty (new_x,) = build_design_matrices( [self._x_design_info], self.x_pred_counterfactual, return_type="dataframe" ) # INTERVENTION: set the interaction term between the group and the # post_treatment variable to zero. This is the counterfactual. for i, label in enumerate(self.labels): if "post_treatment" in label and self.group_variable_name in label: new_x.iloc[:, i] = 0 self.y_pred_counterfactual = self.model.predict(np.asarray(new_x)) # calculate causal impact. # This is the coefficient on the interaction term coeff_names = self.idata.posterior.coords["coeffs"].data for i, label in enumerate(coeff_names): if "post_treatment" in label and self.group_variable_name in label: self.causal_impact = self.idata.posterior["beta"].isel({"coeffs": i})
[docs] def plot(self, round_to=None): """Plot the results. :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = plt.subplots() # Plot raw data sns.scatterplot( self.data, x=self.time_variable_name, y=self.outcome_variable_name, hue=self.group_variable_name, alpha=1, legend=False, markers=True, ax=ax, ) # Plot model fit to control group time_points = self.x_pred_control[self.time_variable_name].values h_line, h_patch = plot_xY( time_points, self.y_pred_control.posterior_predictive.mu, ax=ax, plot_hdi_kwargs={"color": "C0"}, label="Control group", ) handles = [(h_line, h_patch)] labels = ["Control group"] # Plot model fit to treatment group time_points = self.x_pred_control[self.time_variable_name].values h_line, h_patch = plot_xY( time_points, self.y_pred_treatment.posterior_predictive.mu, ax=ax, plot_hdi_kwargs={"color": "C1"}, label="Treatment group", ) handles.append((h_line, h_patch)) labels.append("Treatment group") # Plot counterfactual - post-test for treatment group IF no treatment # had occurred. time_points = self.x_pred_counterfactual[self.time_variable_name].values if len(time_points) == 1: parts = ax.violinplot( az.extract( self.y_pred_counterfactual, group="posterior_predictive", var_names="mu", ).values.T, positions=self.x_pred_counterfactual[self.time_variable_name].values, showmeans=False, showmedians=False, widths=0.2, ) for pc in parts["bodies"]: pc.set_facecolor("C0") pc.set_edgecolor("None") pc.set_alpha(0.5) else: h_line, h_patch = plot_xY( time_points, self.y_pred_counterfactual.posterior_predictive.mu, ax=ax, plot_hdi_kwargs={"color": "C2"}, label="Counterfactual", ) handles.append((h_line, h_patch)) labels.append("Counterfactual") # arrow to label the causal impact self._plot_causal_impact_arrow(ax) # formatting ax.set( xticks=self.x_pred_treatment[self.time_variable_name].values, title=self._causal_impact_summary_stat(round_to), ) ax.legend( handles=(h_tuple for h_tuple in handles), labels=labels, fontsize=LEGEND_FONT_SIZE, ) return fig, ax
def _plot_causal_impact_arrow(self, ax): """ draw a vertical arrow between `y_pred_counterfactual` and `y_pred_counterfactual` """ # Calculate y values to plot the arrow between y_pred_treatment = ( self.y_pred_treatment["posterior_predictive"] .mu.isel({"obs_ind": 1}) .mean() .data ) y_pred_counterfactual = ( self.y_pred_counterfactual["posterior_predictive"].mu.mean().data ) # Calculate the x position to plot at # Note that we force to be float to avoid a type error using np.ptp with boolean # values diff = np.ptp( np.array(self.x_pred_treatment[self.time_variable_name].values).astype( float ) ) x = np.max(self.x_pred_treatment[self.time_variable_name].values) + 0.1 * diff # Plot the arrow ax.annotate( "", xy=(x, y_pred_counterfactual), xycoords="data", xytext=(x, y_pred_treatment), textcoords="data", arrowprops={"arrowstyle": "<-", "color": "green", "lw": 3}, ) # Plot text annotation next to arrow ax.annotate( "causal\nimpact", xy=(x, np.mean([y_pred_counterfactual, y_pred_treatment])), xycoords="data", xytext=(5, 0), textcoords="offset points", color="green", va="center", ) def _causal_impact_summary_stat(self, round_to=None) -> str: """Computes the mean and 94% credible interval bounds for the causal impact.""" percentiles = self.causal_impact.quantile([0.03, 1 - 0.03]).values ci = ( "$CI_{94\\%}$" + f"[{round_num(percentiles[0], round_to)}, {round_num(percentiles[1], round_to)}]" ) causal_impact = f"{round_num(self.causal_impact.mean(), round_to)}, " return f"Causal impact = {causal_impact + ci}"
[docs] def summary(self, round_to=None) -> None: """ Print text output summarising the results. :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ print(f"{self.expt_type:=^80}") print(f"Formula: {self.formula}") print("\nResults:") print(self._causal_impact_summary_stat(round_to)) self.print_coefficients(round_to)
[docs] class RegressionDiscontinuity(ExperimentalDesign, RDDataValidator): """ A class to analyse sharp regression discontinuity experiments. :param data: A pandas dataframe :param formula: A statistical model formula :param treatment_threshold: A scalar threshold value at which the treatment is applied :param model: A PyMC model :param running_variable_name: The name of the predictor variable that the treatment threshold is based upon :param epsilon: A small scalar value which determines how far above and below the treatment threshold to evaluate the causal impact. :param bandwidth: Data outside of the bandwidth (relative to the discontinuity) is not used to fit the model. Example -------- >>> import causalpy as cp >>> df = cp.load_data("rd") >>> seed = 42 >>> result = cp.pymc_experiments.RegressionDiscontinuity( ... df, ... formula="y ~ 1 + x + treated + x:treated", ... model=cp.pymc_models.LinearRegression( ... sample_kwargs={ ... "draws": 100, ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... }, ... ), ... treatment_threshold=0.5, ... ) """
[docs] def __init__( self, data: pd.DataFrame, formula: str, treatment_threshold: float, model=None, running_variable_name: str = "x", epsilon: float = 0.001, bandwidth: float = np.inf, **kwargs, ): super().__init__(model=model, **kwargs) self.expt_type = "Regression Discontinuity" self.data = data self.formula = formula self.running_variable_name = running_variable_name self.treatment_threshold = treatment_threshold self.epsilon = epsilon self.bandwidth = bandwidth self._input_validation() if self.bandwidth is not np.inf: fmin = self.treatment_threshold - self.bandwidth fmax = self.treatment_threshold + self.bandwidth filtered_data = self.data.query(f"{fmin} <= x <= {fmax}") if len(filtered_data) <= 10: warnings.warn( f"Choice of bandwidth parameter has lead to only {len(filtered_data)} remaining datapoints. Consider increasing the bandwidth parameter.", # noqa: E501 UserWarning, ) y, X = dmatrices(formula, filtered_data) else: y, X = dmatrices(formula, self.data) self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.y, self.X = np.asarray(y), np.asarray(X) self.outcome_variable_name = y.design_info.column_names[0] # DEVIATION FROM SKL EXPERIMENT CODE ============================= # fit the model to the observed (pre-intervention) data COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.X.shape[0])} self.model.fit(X=self.X, y=self.y, coords=COORDS) # ================================================================ # score the goodness of fit to all data self.score = self.model.score(X=self.X, y=self.y) # get the model predictions of the observed data if self.bandwidth is not np.inf: xi = np.linspace(fmin, fmax, 200) else: xi = np.linspace( np.min(self.data[self.running_variable_name]), np.max(self.data[self.running_variable_name]), 200, ) self.x_pred = pd.DataFrame( {self.running_variable_name: xi, "treated": self._is_treated(xi)} ) (new_x,) = build_design_matrices([self._x_design_info], self.x_pred) self.pred = self.model.predict(X=np.asarray(new_x)) # calculate discontinuity by evaluating the difference in model expectation on # either side of the discontinuity # NOTE: `"treated": np.array([0, 1])`` assumes treatment is applied above # (not below) the threshold self.x_discon = pd.DataFrame( { self.running_variable_name: np.array( [ self.treatment_threshold - self.epsilon, self.treatment_threshold + self.epsilon, ] ), "treated": np.array([0, 1]), } ) (new_x,) = build_design_matrices([self._x_design_info], self.x_discon) self.pred_discon = self.model.predict(X=np.asarray(new_x)) self.discontinuity_at_threshold = ( self.pred_discon["posterior_predictive"].sel(obs_ind=1)["mu"] - self.pred_discon["posterior_predictive"].sel(obs_ind=0)["mu"] )
def _is_treated(self, x): """Returns ``True`` if `x` is greater than or equal to the treatment threshold. .. warning:: Assumes treatment is given to those ABOVE the treatment threshold. """ return np.greater_equal(x, self.treatment_threshold)
[docs] def plot(self, round_to=None): """ Plot the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = plt.subplots() # Plot raw data sns.scatterplot( self.data, x=self.running_variable_name, y=self.outcome_variable_name, c="k", ax=ax, ) # Plot model fit to data h_line, h_patch = plot_xY( self.x_pred[self.running_variable_name], self.pred["posterior_predictive"].mu, ax=ax, plot_hdi_kwargs={"color": "C1"}, ) handles = [(h_line, h_patch)] labels = ["Posterior mean"] # create strings to compose title title_info = f"{round_num(self.score.r2, round_to)} (std = {round_num(self.score.r2_std, round_to)})" r2 = f"Bayesian $R^2$ on all data = {title_info}" percentiles = self.discontinuity_at_threshold.quantile([0.03, 1 - 0.03]).values ci = ( r"$CI_{94\%}$" + f"[{round_num(percentiles[0], round_to)}, {round_num(percentiles[1], round_to)}]" ) discon = f""" Discontinuity at threshold = {round_num(self.discontinuity_at_threshold.mean(), round_to)}, """ ax.set(title=r2 + "\n" + discon + ci) # Intervention line ax.axvline( x=self.treatment_threshold, ls="-", lw=3, color="r", label="treatment threshold", ) ax.legend( handles=(h_tuple for h_tuple in handles), labels=labels, fontsize=LEGEND_FONT_SIZE, ) return fig, ax
[docs] def summary(self, round_to=None) -> None: """ Print text output summarising the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ print(f"{self.expt_type:=^80}") print(f"Formula: {self.formula}") print(f"Running variable: {self.running_variable_name}") print(f"Threshold on running variable: {self.treatment_threshold}") print("\nResults:") print( f"Discontinuity at threshold = {round_num(self.discontinuity_at_threshold.mean(), round_to)}" ) self.print_coefficients(round_to)
[docs] class RegressionKink(ExperimentalDesign, RegressionKinkDataValidator): """ A class to analyse sharp regression kink experiments. :param data: A pandas dataframe :param formula: A statistical model formula :param kink_point: A scalar threshold value at which there is a change in the first derivative of the assignment function :param model: A PyMC model :param running_variable_name: The name of the predictor variable that the kink_point is based upon :param epsilon: A small scalar value which determines how far above and below the kink point to evaluate the causal impact. :param bandwidth: Data outside of the bandwidth (relative to the discontinuity) is not used to fit the model. """
[docs] def __init__( self, data: pd.DataFrame, formula: str, kink_point: float, model=None, running_variable_name: str = "x", epsilon: float = 0.001, bandwidth: float = np.inf, **kwargs, ): super().__init__(model=model, **kwargs) self.expt_type = "Regression Kink" self.data = data self.formula = formula self.running_variable_name = running_variable_name self.kink_point = kink_point self.epsilon = epsilon self.bandwidth = bandwidth self._input_validation() if self.bandwidth is not np.inf: fmin = self.kink_point - self.bandwidth fmax = self.kink_point + self.bandwidth filtered_data = self.data.query(f"{fmin} <= x <= {fmax}") if len(filtered_data) <= 10: warnings.warn( f"Choice of bandwidth parameter has lead to only {len(filtered_data)} remaining datapoints. Consider increasing the bandwidth parameter.", # noqa: E501 UserWarning, ) y, X = dmatrices(formula, filtered_data) else: y, X = dmatrices(formula, self.data) self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.y, self.X = np.asarray(y), np.asarray(X) self.outcome_variable_name = y.design_info.column_names[0] COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.X.shape[0])} self.model.fit(X=self.X, y=self.y, coords=COORDS) # score the goodness of fit to all data self.score = self.model.score(X=self.X, y=self.y) # get the model predictions of the observed data if self.bandwidth is not np.inf: xi = np.linspace(fmin, fmax, 200) else: xi = np.linspace( np.min(self.data[self.running_variable_name]), np.max(self.data[self.running_variable_name]), 200, ) self.x_pred = pd.DataFrame( {self.running_variable_name: xi, "treated": self._is_treated(xi)} ) (new_x,) = build_design_matrices([self._x_design_info], self.x_pred) self.pred = self.model.predict(X=np.asarray(new_x)) # evaluate gradient change around kink point mu_kink_left, mu_kink, mu_kink_right = self._probe_kink_point() self.gradient_change = self._eval_gradient_change( mu_kink_left, mu_kink, mu_kink_right, epsilon )
@staticmethod def _eval_gradient_change(mu_kink_left, mu_kink, mu_kink_right, epsilon): """Evaluate the gradient change at the kink point. It works by evaluating the model below the kink point, at the kink point, and above the kink point. This is a static method for ease of testing. """ gradient_left = (mu_kink - mu_kink_left) / epsilon gradient_right = (mu_kink_right - mu_kink) / epsilon gradient_change = gradient_right - gradient_left return gradient_change def _probe_kink_point(self): # Create a dataframe to evaluate predicted outcome at the kink point and either # side x_predict = pd.DataFrame( { self.running_variable_name: np.array( [ self.kink_point - self.epsilon, self.kink_point, self.kink_point + self.epsilon, ] ), "treated": np.array([0, 1, 1]), } ) (new_x,) = build_design_matrices([self._x_design_info], x_predict) predicted = self.model.predict(X=np.asarray(new_x)) # extract predicted mu values mu_kink_left = predicted["posterior_predictive"].sel(obs_ind=0)["mu"] mu_kink = predicted["posterior_predictive"].sel(obs_ind=1)["mu"] mu_kink_right = predicted["posterior_predictive"].sel(obs_ind=2)["mu"] return mu_kink_left, mu_kink, mu_kink_right def _is_treated(self, x): """Returns ``True`` if `x` is greater than or equal to the treatment threshold.""" # noqa: E501 return np.greater_equal(x, self.kink_point)
[docs] def plot(self, round_to=None): """ Plot the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = plt.subplots() # Plot raw data sns.scatterplot( self.data, x=self.running_variable_name, y=self.outcome_variable_name, c="k", # hue="treated", ax=ax, ) # Plot model fit to data h_line, h_patch = plot_xY( self.x_pred[self.running_variable_name], self.pred["posterior_predictive"].mu, ax=ax, plot_hdi_kwargs={"color": "C1"}, ) handles = [(h_line, h_patch)] labels = ["Posterior mean"] # create strings to compose title title_info = f"{round_num(self.score.r2, round_to)} (std = {round_num(self.score.r2_std, round_to)})" r2 = f"Bayesian $R^2$ on all data = {title_info}" percentiles = self.gradient_change.quantile([0.03, 1 - 0.03]).values ci = ( r"$CI_{94\%}$" + f"[{round_num(percentiles[0], round_to)}, {round_num(percentiles[1], round_to)}]" ) grad_change = f""" Change in gradient = {round_num(self.gradient_change.mean(), round_to)}, """ ax.set(title=r2 + "\n" + grad_change + ci) # Intervention line ax.axvline( x=self.kink_point, ls="-", lw=3, color="r", label="treatment threshold", ) ax.legend( handles=(h_tuple for h_tuple in handles), labels=labels, fontsize=LEGEND_FONT_SIZE, ) return fig, ax
[docs] def summary(self, round_to=None) -> None: """ Print text output summarising the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ print( f""" {self.expt_type:=^80} Formula: {self.formula} Running variable: {self.running_variable_name} Kink point on running variable: {self.kink_point} Results: Change in slope at kink point = {round_num(self.gradient_change.mean(), round_to)} """ ) self.print_coefficients(round_to)
[docs] class PrePostNEGD(ExperimentalDesign, PrePostNEGDDataValidator): """ A class to analyse data from pretest/posttest designs :param data: A pandas dataframe :param formula: A statistical model formula :param group_variable_name: Name of the column in data for the group variable, should be either binary or boolean :param pretreatment_variable_name: Name of the column in data for the pretreatment variable :param model: A PyMC model Example -------- >>> import causalpy as cp >>> df = cp.load_data("anova1") >>> seed = 42 >>> result = cp.pymc_experiments.PrePostNEGD( ... df, ... formula="post ~ 1 + C(group) + pre", ... group_variable_name="group", ... pretreatment_variable_name="pre", ... model=cp.pymc_models.LinearRegression( ... sample_kwargs={ ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... } ... ) ... ) >>> result.summary(round_to=1) # doctest: +NUMBER ==================Pretest/posttest Nonequivalent Group Design=================== Formula: post ~ 1 + C(group) + pre <BLANKLINE> Results: Causal impact = 2, $CI_{94%}$[2, 2] Model coefficients: Intercept -0.5, 94% HDI [-1, 0.2] C(group)[T.1] 2, 94% HDI [2, 2] pre 1, 94% HDI [1, 1] sigma 0.5, 94% HDI [0.5, 0.6] """
[docs] def __init__( self, data: pd.DataFrame, formula: str, group_variable_name: str, pretreatment_variable_name: str, model=None, **kwargs, ): super().__init__(model=model, **kwargs) self.data = data self.expt_type = "Pretest/posttest Nonequivalent Group Design" self.formula = formula self.group_variable_name = group_variable_name self.pretreatment_variable_name = pretreatment_variable_name self._input_validation() y, X = dmatrices(formula, self.data) self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.y, self.X = np.asarray(y), np.asarray(X) self.outcome_variable_name = y.design_info.column_names[0] # fit the model to the observed (pre-intervention) data COORDS = {"coeffs": self.labels, "obs_indx": np.arange(self.X.shape[0])} self.model.fit(X=self.X, y=self.y, coords=COORDS) # Calculate the posterior predictive for the treatment and control for an # interpolated set of pretest values # get the model predictions of the observed data self.pred_xi = np.linspace( np.min(self.data[self.pretreatment_variable_name]), np.max(self.data[self.pretreatment_variable_name]), 200, ) # untreated x_pred_untreated = pd.DataFrame( { self.pretreatment_variable_name: self.pred_xi, self.group_variable_name: np.zeros(self.pred_xi.shape), } ) (new_x_untreated,) = build_design_matrices( [self._x_design_info], x_pred_untreated ) self.pred_untreated = self.model.predict(X=np.asarray(new_x_untreated)) # treated x_pred_treated = pd.DataFrame( { self.pretreatment_variable_name: self.pred_xi, self.group_variable_name: np.ones(self.pred_xi.shape), } ) (new_x_treated,) = build_design_matrices([self._x_design_info], x_pred_treated) self.pred_treated = self.model.predict(X=np.asarray(new_x_treated)) # Evaluate causal impact as equal to the trestment effect self.causal_impact = self.idata.posterior["beta"].sel( {"coeffs": self._get_treatment_effect_coeff()} )
[docs] def plot(self, round_to=None): """Plot the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ fig, ax = plt.subplots( 2, 1, figsize=(7, 9), gridspec_kw={"height_ratios": [3, 1]} ) # Plot raw data sns.scatterplot( x="pre", y="post", hue="group", alpha=0.5, data=self.data, legend=True, ax=ax[0], ) ax[0].set(xlabel="Pretest", ylabel="Posttest") # plot posterior predictive of untreated h_line, h_patch = plot_xY( self.pred_xi, self.pred_untreated["posterior_predictive"].mu, ax=ax[0], plot_hdi_kwargs={"color": "C0"}, label="Control group", ) handles = [(h_line, h_patch)] labels = ["Control group"] # plot posterior predictive of treated h_line, h_patch = plot_xY( self.pred_xi, self.pred_treated["posterior_predictive"].mu, ax=ax[0], plot_hdi_kwargs={"color": "C1"}, label="Treatment group", ) handles.append((h_line, h_patch)) labels.append("Treatment group") ax[0].legend( handles=(h_tuple for h_tuple in handles), labels=labels, fontsize=LEGEND_FONT_SIZE, ) # Plot estimated caual impact / treatment effect az.plot_posterior(self.causal_impact, ref_val=0, ax=ax[1], round_to=round_to) ax[1].set(title="Estimated treatment effect") return fig, ax
def _causal_impact_summary_stat(self, round_to) -> str: """Computes the mean and 94% credible interval bounds for the causal impact.""" percentiles = self.causal_impact.quantile([0.03, 1 - 0.03]).values ci = ( r"$CI_{94%}$" + f"[{round_num(percentiles[0], round_to)}, {round_num(percentiles[1], round_to)}]" ) causal_impact = f"{round_num(self.causal_impact.mean(), round_to)}, " return f"Causal impact = {causal_impact + ci}"
[docs] def summary(self, round_to=None) -> None: """ Print text output summarising the results :param round_to: Number of decimals used to round results. Defaults to 2. Use "None" to return raw numbers. """ print(f"{self.expt_type:=^80}") print(f"Formula: {self.formula}") print("\nResults:") # TODO: extra experiment specific outputs here print(self._causal_impact_summary_stat(round_to)) self.print_coefficients(round_to)
def _get_treatment_effect_coeff(self) -> str: """Find the beta regression coefficient corresponding to the group (i.e. treatment) effect. For example if self.group_variable_name is 'group' and the labels are `['Intercept', 'C(group)[T.1]', 'pre']` then we want `C(group)[T.1]`. """ for label in self.labels: if (self.group_variable_name in label) & (":" not in label): return label raise NameError("Unable to find coefficient name for the treatment effect")
[docs] class InstrumentalVariable(ExperimentalDesign, IVDataValidator): """ A class to analyse instrumental variable style experiments. :param instruments_data: A pandas dataframe of instruments for our treatment variable. Should contain instruments Z, and treatment t :param data: A pandas dataframe of covariates for fitting the focal regression of interest. Should contain covariates X including treatment t and outcome y :param instruments_formula: A statistical model formula for the instrumental stage regression e.g. t ~ 1 + z1 + z2 + z3 :param formula: A statistical model formula for the \n focal regression e.g. y ~ 1 + t + x1 + x2 + x3 :param model: A PyMC model :param priors: An optional dictionary of priors for the mus and sigmas of both regressions. If priors are not specified we will substitue MLE estimates for the beta coefficients. Greater control can be achieved by specifying the priors directly e.g. priors = { "mus": [0, 0], "sigmas": [1, 1], "eta": 2, "lkj_sd": 2, } Example -------- >>> import pandas as pd >>> import causalpy as cp >>> from causalpy.pymc_experiments import InstrumentalVariable >>> from causalpy.pymc_models import InstrumentalVariableRegression >>> import numpy as np >>> N = 100 >>> e1 = np.random.normal(0, 3, N) >>> e2 = np.random.normal(0, 1, N) >>> Z = np.random.uniform(0, 1, N) >>> ## Ensure the endogeneity of the the treatment variable >>> X = -1 + 4 * Z + e2 + 2 * e1 >>> y = 2 + 3 * X + 3 * e1 >>> test_data = pd.DataFrame({"y": y, "X": X, "Z": Z}) >>> sample_kwargs = { ... "tune": 1, ... "draws": 5, ... "chains": 1, ... "cores": 4, ... "target_accept": 0.95, ... "progressbar": False, ... } >>> instruments_formula = "X ~ 1 + Z" >>> formula = "y ~ 1 + X" >>> instruments_data = test_data[["X", "Z"]] >>> data = test_data[["y", "X"]] >>> iv = InstrumentalVariable( ... instruments_data=instruments_data, ... data=data, ... instruments_formula=instruments_formula, ... formula=formula, ... model=InstrumentalVariableRegression(sample_kwargs=sample_kwargs), ... ) """
[docs] def __init__( self, instruments_data: pd.DataFrame, data: pd.DataFrame, instruments_formula: str, formula: str, model=None, priors=None, **kwargs, ): super().__init__(model=model, **kwargs) self.expt_type = "Instrumental Variable Regression" self.data = data self.instruments_data = instruments_data self.formula = formula self.instruments_formula = instruments_formula self.model = model self._input_validation() y, X = dmatrices(formula, self.data) self._y_design_info = y.design_info self._x_design_info = X.design_info self.labels = X.design_info.column_names self.y, self.X = np.asarray(y), np.asarray(X) self.outcome_variable_name = y.design_info.column_names[0] t, Z = dmatrices(instruments_formula, self.instruments_data) self._t_design_info = t.design_info self._z_design_info = Z.design_info self.labels_instruments = Z.design_info.column_names self.t, self.Z = np.asarray(t), np.asarray(Z) self.instrument_variable_name = t.design_info.column_names[0] self.get_naive_OLS_fit() self.get_2SLS_fit() # fit the model to the data COORDS = {"instruments": self.labels_instruments, "covariates": self.labels} self.coords = COORDS if priors is None: priors = { "mus": [self.ols_beta_first_params, self.ols_beta_second_params], "sigmas": [1, 1], "eta": 2, "lkj_sd": 1, } self.priors = priors self.model.fit( X=self.X, Z=self.Z, y=self.y, t=self.t, coords=COORDS, priors=self.priors )
[docs] def get_2SLS_fit(self): """ Two Stage Least Squares Fit This function is called by the experiment, results are used for priors if none are provided. """ first_stage_reg = sk_lin_reg().fit(self.Z, self.t) fitted_Z_values = first_stage_reg.predict(self.Z) X2 = self.data.copy(deep=True) X2[self.instrument_variable_name] = fitted_Z_values _, X2 = dmatrices(self.formula, X2) second_stage_reg = sk_lin_reg().fit(X=X2, y=self.y) betas_first = list(first_stage_reg.coef_[0][1:]) betas_first.insert(0, first_stage_reg.intercept_[0]) betas_second = list(second_stage_reg.coef_[0][1:]) betas_second.insert(0, second_stage_reg.intercept_[0]) self.ols_beta_first_params = betas_first self.ols_beta_second_params = betas_second self.first_stage_reg = first_stage_reg self.second_stage_reg = second_stage_reg
[docs] def get_naive_OLS_fit(self): """ Naive Ordinary Least Squares This function is called by the experiment. """ ols_reg = sk_lin_reg().fit(self.X, self.y) beta_params = list(ols_reg.coef_[0][1:]) beta_params.insert(0, ols_reg.intercept_[0]) self.ols_beta_params = dict(zip(self._x_design_info.column_names, beta_params)) self.ols_reg = ols_reg
[docs] class InversePropensityWeighting(ExperimentalDesign, PropensityDataValidator): """ A class to analyse inverse propensity weighting experiments. :param data: A pandas dataframe :param formula: A statistical model formula for the propensity model :param outcome_variable A string denoting the outcome variable in datq to be reweighted :param weighting_scheme: A string denoting which weighting scheme to use among: 'raw', 'robust', 'doubly robust' or 'overlap'. See Aronow and Miller "Foundations of Agnostic Statistics" for discussion and computation of these weighting schemes. :param model: A PyMC model Example -------- >>> import causalpy as cp >>> df = cp.load_data("nhefs") >>> seed = 42 >>> result = cp.pymc_experiments.InversePropensityWeighting( ... df, ... formula="trt ~ 1 + age + race", ... outcome_variable ="outcome", ... weighting_scheme="robust", ... model=cp.pymc_models.PropensityScore( ... sample_kwargs={ ... "draws": 100, ... "target_accept": 0.95, ... "random_seed": seed, ... "progressbar": False, ... }, ... ), ... ) """
[docs] def __init__( self, data: pd.DataFrame, formula: str, outcome_variable: str, weighting_scheme: str, model=None, **kwargs, ): super().__init__(model=model, **kwargs) self.expt_type = "Inverse Propensity Score Weighting" self.data = data self.formula = formula self.outcome_variable = outcome_variable self.weighting_scheme = weighting_scheme self._input_validation() t, X = dmatrices(formula, self.data) self._t_design_info = t.design_info self._t_design_info = X.design_info self.labels = X.design_info.column_names self.t, self.X = np.asarray(t), np.asarray(X) self.y = self.data[self.outcome_variable] COORDS = {"obs_ind": list(range(self.X.shape[0])), "coeffs": self.labels} self.coords = COORDS self.model.fit(X=self.X, t=self.t, coords=COORDS)
[docs] def make_robust_adjustments(self, ps): """This estimator is discussed in Aronow and Miller's book as being related to the Horvitz Thompson method""" X = pd.DataFrame(self.X, columns=self.labels) X["ps"] = ps X[self.outcome_variable] = self.y t = self.t.flatten() p_of_t = np.mean(t) X["i_ps"] = np.where(t == 1, (p_of_t / X["ps"]), (1 - p_of_t) / (1 - X["ps"])) n_ntrt = X[t == 0].shape[0] n_trt = X[t == 1].shape[0] outcome_trt = X[t == 1][self.outcome_variable] outcome_ntrt = X[t == 0][self.outcome_variable] i_propensity0 = X[t == 0]["i_ps"] i_propensity1 = X[t == 1]["i_ps"] weighted_outcome1 = outcome_trt * i_propensity1 weighted_outcome0 = outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt
[docs] def make_raw_adjustments(self, ps): """This estimator is discussed in Aronow and Miller as the simplest of base form of inverse propensity weighting schemes""" X = pd.DataFrame(self.X, columns=self.labels) X["ps"] = ps X[self.outcome_variable] = self.y t = self.t.flatten() X["ps"] = np.where(t, X["ps"], 1 - X["ps"]) X["i_ps"] = 1 / X["ps"] n_ntrt = n_trt = len(X) outcome_trt = X[t == 1][self.outcome_variable] outcome_ntrt = X[t == 0][self.outcome_variable] i_propensity0 = X[t == 0]["i_ps"] i_propensity1 = X[t == 1]["i_ps"] weighted_outcome1 = outcome_trt * i_propensity1 weighted_outcome0 = outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt
[docs] def make_overlap_adjustments(self, ps): """This weighting scheme was adapted from Lucy D’Agostino McGowan's blog on Propensity Score Weights referenced in the primary CausalPy explainer notebook""" X = pd.DataFrame(self.X, columns=self.labels) X["ps"] = ps X[self.outcome_variable] = self.y t = self.t.flatten() X["i_ps"] = np.where(t, (1 - X["ps"]) * t, X["ps"] * (1 - t)) n_ntrt = (1 - t[t == 0]) * X[t == 0]["i_ps"] n_trt = t[t == 1] * X[t == 1]["i_ps"] outcome_trt = X[t == 1][self.outcome_variable] outcome_ntrt = X[t == 0][self.outcome_variable] i_propensity0 = X[t == 0]["i_ps"] i_propensity1 = X[t == 1]["i_ps"] weighted_outcome1 = t[t == 1] * outcome_trt * i_propensity1 weighted_outcome0 = (1 - t[t == 0]) * outcome_ntrt * i_propensity0 return weighted_outcome0, weighted_outcome1, n_ntrt, n_trt
[docs] def make_doubly_robust_adjustment(self, ps): """The doubly robust weighting scheme is also discussed in Aronow and Miller, but a bit more generally than our implementation here. Here we have specified the outcome model to be a simple OLS model. In this way the compromise between the outcome model and the propensity model is always done with OLS.""" X = pd.DataFrame(self.X, columns=self.labels) X["ps"] = ps t = self.t.flatten() m0 = sk_lin_reg().fit(X[t == 0].astype(float), self.y[t == 0]) m1 = sk_lin_reg().fit(X[t == 1].astype(float), self.y[t == 1]) m0_pred = m0.predict(X) m1_pred = m1.predict(X) ## Compromise between outcome and treatement assignment model weighted_outcome0 = (1 - t) * (self.y - m0_pred) / (1 - X["ps"]) + m0_pred weighted_outcome1 = t * (self.y - m1_pred) / X["ps"] + m1_pred return weighted_outcome0, weighted_outcome1, None, None
[docs] def get_ate(self, i, idata, method="doubly_robust"): ### Post processing the sample posterior distribution for propensity scores ### One sample at a time. ps = idata["posterior"]["p"].stack(z=("chain", "draw"))[:, i].values if method == "robust": ( weighted_outcome_ntrt, weighted_outcome_trt, n_ntrt, n_trt, ) = self.make_robust_adjustments(ps) ntrt = weighted_outcome_ntrt.sum() / n_ntrt trt = weighted_outcome_trt.sum() / n_trt elif method == "raw": ( weighted_outcome_ntrt, weighted_outcome_trt, n_ntrt, n_trt, ) = self.make_raw_adjustments(ps) ntrt = weighted_outcome_ntrt.sum() / n_ntrt trt = weighted_outcome_trt.sum() / n_trt elif method == "overlap": ( weighted_outcome_ntrt, weighted_outcome_trt, n_ntrt, n_trt, ) = self.make_overlap_adjustments(ps) ntrt = np.sum(weighted_outcome_ntrt) / np.sum(n_ntrt) trt = np.sum(weighted_outcome_trt) / np.sum(n_trt) else: ( weighted_outcome_ntrt, weighted_outcome_trt, n_ntrt, n_trt, ) = self.make_doubly_robust_adjustment(ps) trt = np.mean(weighted_outcome_trt) ntrt = np.mean(weighted_outcome_ntrt) ate = trt - ntrt return [ate, trt, ntrt]
[docs] def plot_ATE(self, idata=None, method=None, prop_draws=100, ate_draws=300): if idata is None: idata = self.idata if method is None: method = self.weighting_scheme def plot_weights(bins, top0, top1, ax, color="population"): colors_dict = { "population": ["orange", "skyblue", 0.6], "pseudo_population": ["grey", "grey", 0.1], } ax.axhline(0, c="gray", linewidth=1) bars0 = ax.bar( bins[:-1] + 0.025, top0, width=0.04, facecolor=colors_dict[color][0], alpha=colors_dict[color][2], ) bars1 = ax.bar( bins[:-1] + 0.025, -top1, width=0.04, facecolor=colors_dict[color][1], alpha=colors_dict[color][2], ) for bars in (bars0, bars1): for bar in bars: bar.set_edgecolor("black") def make_hists(idata, i, axs, method=method): p_i = az.extract(idata)["p"][:, i].values if method == "raw": weight0 = 1 / (1 - p_i[self.t.flatten() == 0]) weight1 = 1 / (p_i[self.t.flatten() == 1]) elif method == "overlap": t = self.t.flatten() weight1 = (1 - p_i[t == 1]) * t[t == 1] weight0 = p_i[t == 0] * (1 - t[t == 0]) else: t = self.t.flatten() p_of_t = np.mean(t) weight1 = p_of_t / p_i[t == 1] weight0 = (1 - p_of_t) / (1 - p_i[t == 0]) bins = np.arange(0.025, 0.99, 0.005) top0, _ = np.histogram(p_i[self.t.flatten() == 0], bins=bins) top1, _ = np.histogram(p_i[self.t.flatten() == 1], bins=bins) plot_weights(bins, top0, top1, axs[0]) top0, _ = np.histogram( p_i[self.t.flatten() == 0], bins=bins, weights=weight0 ) top1, _ = np.histogram( p_i[self.t.flatten() == 1], bins=bins, weights=weight1 ) plot_weights(bins, top0, top1, axs[0], color="pseudo_population") mosaic = """AAAAAA BBBBCC""" fig, axs = plt.subplot_mosaic(mosaic, figsize=(20, 13)) axs = [axs[k] for k in axs.keys()] axs[0].axvline( 0.1, linestyle="--", label="Low Extreme Propensity Scores", color="black" ) axs[0].axvline( 0.9, linestyle="--", label="Hi Extreme Propensity Scores", color="black" ) axs[0].set_title( "Weighted and Unweighted Draws from the Posterior \n Propensity Scores Distribution", fontsize=20, ) axs[0].set_ylabel("Counts of Observations") axs[0].set_xlabel("Propensity Scores") custom_lines = [ Line2D([0], [0], color="skyblue", lw=2), Line2D([0], [0], color="orange", lw=2), Line2D([0], [0], color="grey", lw=2), Line2D([0], [0], color="black", lw=2, linestyle="--"), ] axs[0].legend( custom_lines, ["Treatment PS", "Control PS", "Weighted Pseudo Population", "Extreme PS"], ) [make_hists(idata, i, axs) for i in range(prop_draws)] ate_df = pd.DataFrame( [self.get_ate(i, idata, method=method) for i in range(ate_draws)], columns=["ATE", "Y(1)", "Y(0)"], ) axs[1].hist( ate_df["Y(1)"], label="E(Y(1))", ec="black", bins=10, alpha=0.6, color="skyblue", ) axs[1].hist( ate_df["Y(0)"], label="E(Y(0))", ec="black", bins=10, alpha=0.6, color="orange", ) axs[1].legend() axs[1].set_xlabel(self.outcome_variable) axs[1].set_title( f"The Outcomes \n Under the {method} re-weighting scheme", fontsize=20 ) axs[2].hist( ate_df["ATE"], label="ATE", ec="black", bins=10, color="grey", alpha=0.6, ) axs[2].set_xlabel("Difference") axs[2].axvline(ate_df["ATE"].mean(), label="E(ATE)") axs[2].legend() axs[2].set_title("Average Treatment Effect", fontsize=20) return fig
[docs] def weighted_percentile(self, data, weights, perc): """ perc : percentile in [0-1]! """ assert 0 <= perc <= 1 ix = np.argsort(data) data = data[ix] # sort data weights = weights[ix] # sort weights cdf = (np.cumsum(weights) - 0.5 * weights) / np.sum( weights ) # 'like' a CDF function return np.interp(perc, cdf, data)
[docs] def plot_balance_ecdf(self, covariate, idata=None, weighting_scheme=None): """ Plotting function takes a single covariate and shows the differences in the ECDF between the treatment and control groups before and after weighting. It provides a visual check on the balance achieved by using the different weighting schemes """ if idata is None: idata = self.idata if weighting_scheme is None: weighting_scheme = self.weighting_scheme ps = az.extract(idata)["p"].mean(dim="sample").values X = pd.DataFrame(self.X, columns=self.labels) X["ps"] = ps t = self.t.flatten() if weighting_scheme == "raw": w1 = 1 / ps[t == 1] w0 = 1 / (1 - ps[t == 0]) elif weighting_scheme == "robust": p_of_t = np.mean(t) w1 = p_of_t / (ps[t == 1]) w0 = (1 - p_of_t) / (1 - ps[t == 0]) else: w1 = (1 - ps[t == 1]) * t[t == 1] w0 = ps[t == 0] * (1 - t[t == 0]) fig, axs = plt.subplots(1, 2, figsize=(20, 6)) raw_trt = [ self.weighted_percentile( X[t == 1][covariate].values, np.ones(len(X[t == 1])), p ) for p in np.linspace(0, 1, 1000) ] raw_ntrt = [ self.weighted_percentile( X[t == 0][covariate].values, np.ones(len(X[t == 0])), p ) for p in np.linspace(0, 1, 1000) ] w_trt = [ self.weighted_percentile(X[t == 1][covariate].values, w1, p) for p in np.linspace(0, 1, 1000) ] w_ntrt = [ self.weighted_percentile(X[t == 0][covariate].values, w0, p) for p in np.linspace(0, 1, 1000) ] axs[0].plot( np.linspace(0, 1, 1000), raw_trt, color="skyblue", label="Raw Treated" ) axs[0].plot( np.linspace(0, 1, 1000), raw_ntrt, color="orange", label="Raw Control" ) axs[0].set_title(f"ECDF \n Raw: {covariate}") axs[1].set_title( f"ECDF \n Weighted {weighting_scheme} adjustment for {covariate}" ) axs[1].plot( np.linspace(0, 1, 1000), w_trt, color="skyblue", label="Reweighted Treated" ) axs[1].plot( np.linspace(0, 1, 1000), w_ntrt, color="orange", label="Reweighted Control", ) axs[1].set_xlabel("Quantiles") axs[0].set_xlabel("Quantiles") axs[1].legend() axs[0].legend() return fig