# Copyright 2024 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Media Mix Model with delayed adstock and logistic saturation class."""
import json
from pathlib import Path
from typing import Any
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import pymc as pm
import seaborn as sns
from pytensor.tensor import TensorVariable
from xarray import DataArray, Dataset
from pymc_marketing.constants import DAYS_IN_YEAR
from pymc_marketing.mmm.base import MMM
from pymc_marketing.mmm.lift_test import (
add_logistic_empirical_lift_measurements_to_likelihood,
scale_lift_measurements,
)
from pymc_marketing.mmm.preprocessing import MaxAbsScaleChannels, MaxAbsScaleTarget
from pymc_marketing.mmm.transformers import geometric_adstock, logistic_saturation
from pymc_marketing.mmm.tvp import create_time_varying_intercept, infer_time_index
from pymc_marketing.mmm.utils import (
apply_sklearn_transformer_across_dim,
create_new_spend_data,
generate_fourier_modes,
)
from pymc_marketing.mmm.validating import ValidateControlColumns
__all__ = ["DelayedSaturatedMMM"]
[docs]
class BaseDelayedSaturatedMMM(MMM):
"""Base class for a media mix model with delayed adstock and logistic saturation class (see [1]_).
References
----------
.. [1] Jin, Yuxue, et al. “Bayesian methods for media mix modeling with carryover and shape effects.” (2017).
"""
_model_type = "DelayedSaturatedMMM"
version = "0.0.2"
[docs]
def __init__(
self,
date_column: str,
channel_columns: list[str],
adstock_max_lag: int,
time_varying_intercept: bool = False,
model_config: dict | None = None,
sampler_config: dict | None = None,
validate_data: bool = True,
control_columns: list[str] | None = None,
yearly_seasonality: int | None = None,
**kwargs,
) -> None:
"""Constructor method.
Parameters
----------
date_column : str
Column name of the date variable.
channel_columns : List[str]
Column names of the media channel variables.
adstock_max_lag : int
Number of lags to consider in the adstock transformation.
time_varying_intercept : bool, optional
Whether to consider time-varying intercept, by default False.
model_config : Dictionary, optional
dictionary of parameters that initialise model configuration.
Class-default defined by the user default_model_config method.
sampler_config : Dictionary, optional
dictionary of parameters that initialise sampler configuration.
Class-default defined by the user default_sampler_config method.
validate_data : bool, optional
Whether to validate the data before fitting to model, by default True.
control_columns : Optional[List[str]], optional
Column names of control variables to be added as additional regressors, by default None
adstock_max_lag : int, optional
Number of lags to consider in the adstock transformation, by default 4
yearly_seasonality : Optional[int], optional
Number of Fourier modes to model yearly seasonality, by default None.
"""
self.control_columns = control_columns
self.adstock_max_lag = adstock_max_lag
self.time_varying_intercept = time_varying_intercept
self.yearly_seasonality = yearly_seasonality
self.date_column = date_column
self.validate_data = validate_data
super().__init__(
date_column=date_column,
channel_columns=channel_columns,
model_config=model_config,
sampler_config=sampler_config,
adstock_max_lag=adstock_max_lag,
)
@property
def default_sampler_config(self) -> dict:
return {}
@property
def output_var(self):
"""Defines target variable for the model"""
return "y"
def _generate_and_preprocess_model_data( # type: ignore
self, X: pd.DataFrame | pd.Series, y: pd.Series | np.ndarray
) -> None:
"""Applies preprocessing to the data before fitting the model.
If validate is True, it will check if the data is valid for the model.
sets self.model_coords based on provided dataset
Parameters
----------
X : Union[pd.DataFrame, pd.Series], shape (n_obs, n_features)
y : Union[pd.Series, np.ndarray], shape (n_obs,)
Sets
----
preprocessed_data : Dict[str, Union[pd.DataFrame, pd.Series]]
Preprocessed data for the model.
X : pd.DataFrame
A filtered version of the input `X`, such that it is guaranteed that
it contains only the `date_column`, the columns that are specified
in the `channel_columns` and `control_columns`, and fourier features
if `yearly_seasonality=True`.
y : Union[pd.Series, np.ndarray]
The target variable for the model (as provided).
_time_index : np.ndarray
The index of the date column. Used by TVP
_time_index_mid : int
The middle index of the date index. Used by TVP.
_time_resolution: int
The time resolution of the date index. Used by TVP.
"""
date_data = X[self.date_column]
channel_data = X[self.channel_columns]
self.coords_mutable: dict[str, Any] = {
"date": date_data,
}
coords: dict[str, Any] = {
"channel": self.channel_columns,
}
new_X_dict = {
self.date_column: date_data,
}
X_data = pd.DataFrame.from_dict(new_X_dict)
X_data = pd.concat([X_data, channel_data], axis=1)
control_data: pd.DataFrame | pd.Series | None = None
if self.control_columns is not None:
control_data = X[self.control_columns]
coords["control"] = self.control_columns
X_data = pd.concat([X_data, control_data], axis=1)
fourier_features: pd.DataFrame | None = None
if self.yearly_seasonality is not None:
fourier_features = self._get_fourier_models_data(X=X)
self.fourier_columns = fourier_features.columns
coords["fourier_mode"] = fourier_features.columns.to_numpy()
X_data = pd.concat([X_data, fourier_features], axis=1)
self.model_coords = coords
if self.validate_data:
self.validate("X", X_data)
self.validate("y", y)
self.preprocessed_data: dict[str, pd.DataFrame | pd.Series] = {
"X": self.preprocess("X", X_data), # type: ignore
"y": self.preprocess("y", y), # type: ignore
}
self.X: pd.DataFrame = X_data
self.y: pd.Series | np.ndarray = y
if self.time_varying_intercept:
self._time_index = np.arange(0, X.shape[0])
self._time_index_mid = X.shape[0] // 2
self._time_resolution = (
self.X[self.date_column].iloc[1] - self.X[self.date_column].iloc[0]
).days
def _save_input_params(self, idata) -> None:
"""Saves input parameters to the attrs of idata."""
idata.attrs["date_column"] = json.dumps(self.date_column)
idata.attrs["control_columns"] = json.dumps(self.control_columns)
idata.attrs["channel_columns"] = json.dumps(self.channel_columns)
idata.attrs["adstock_max_lag"] = json.dumps(self.adstock_max_lag)
idata.attrs["validate_data"] = json.dumps(self.validate_data)
idata.attrs["yearly_seasonality"] = json.dumps(self.yearly_seasonality)
def _create_likelihood_distribution(
self,
dist: dict,
mu: TensorVariable,
observed: np.ndarray | pd.Series,
dims: str,
) -> TensorVariable:
"""
Create and return a likelihood distribution for the model.
This method prepares the distribution and its parameters as specified in the
configuration dictionary, validates them, and constructs the likelihood
distribution using PyMC.
Parameters
----------
dist : Dict
A configuration dictionary that must contain a 'dist' key with the name of
the distribution and a 'kwargs' key with parameters for the distribution.
observed : Union[np.ndarray, pd.Series]
The observed data to which the likelihood distribution will be fitted.
dims : str
The dimensions of the data.
Returns
-------
TensorVariable
The likelihood distribution constructed with PyMC.
Raises
------
ValueError
If 'kwargs' key is missing in `dist`, or the parameter configuration does
not contain 'dist' and 'kwargs' keys, or if 'mu' is present in the nested
'kwargs'
"""
allowed_distributions = [
"Normal",
"StudentT",
"Laplace",
"Logistic",
"LogNormal",
"Wald",
"TruncatedNormal",
"Gamma",
"AsymmetricLaplace",
"VonMises",
]
if dist["dist"] not in allowed_distributions:
raise ValueError(
f"""
The distribution used for the likelihood is not allowed.
Please, use one of the following distributions: {allowed_distributions}.
"""
)
# Validate that 'kwargs' is present and is a dictionary
if "kwargs" not in dist or not isinstance(dist["kwargs"], dict):
raise ValueError(
"The 'kwargs' key must be present in the 'dist' dictionary and be a dictionary itself."
)
if "mu" in dist["kwargs"]:
raise ValueError(
"The 'mu' key is not allowed directly within 'kwargs' of the main distribution as it is reserved."
)
parameter_distributions = {}
for param, param_config in dist["kwargs"].items():
# Check if param_config is a dictionary with a 'dist' key
if isinstance(param_config, dict) and "dist" in param_config:
# Prepare nested distribution
if "kwargs" not in param_config:
raise ValueError(
f"The parameter configuration for '{param}' must contain 'kwargs'."
)
parameter_distributions[param] = self._get_distribution(
dist=param_config
)(**param_config["kwargs"], name=f"likelihood_{param}")
elif isinstance(param_config, int | float):
# Use the value directly
parameter_distributions[param] = param_config
else:
raise ValueError(
f"""
Invalid parameter configuration for '{param}'.
It must be either a dictionary with a 'dist' key or a numeric value.
"""
)
# Extract the likelihood distribution name and instantiate it
likelihood_dist_name = dist["dist"]
likelihood_dist = self._get_distribution(dist={"dist": likelihood_dist_name})
return likelihood_dist(
name=self.output_var,
mu=mu,
observed=observed,
dims=dims,
**parameter_distributions,
)
[docs]
def build_model(
self,
X: pd.DataFrame,
y: pd.Series | np.ndarray,
**kwargs,
) -> None:
"""
Builds a probabilistic model using PyMC for marketing mix modeling.
The model incorporates channels, control variables, and Fourier components, applying
adstock and saturation transformations to the channel data. The final model is
constructed with multiple factors contributing to the response variable.
Parameters
----------
X : pd.DataFrame
The input data for the model, which should include columns for channels,
control variables (if applicable), and Fourier components (if applicable).
y : Union[pd.Series, np.ndarray]
The target/response variable for the modeling.
**kwargs : dict
Additional keyword arguments that might be required by underlying methods or utilities.
Attributes Set
---------------
model : pm.Model
The PyMC model object containing all the defined stochastic and deterministic variables.
Examples
--------
custom_config = {
'intercept': {'dist': 'Normal', 'kwargs': {'mu': 0, 'sigma': 2}},
'beta_channel': {'dist': 'LogNormal', 'kwargs': {'mu': 1, 'sigma': 3}},
'alpha': {'dist': 'Beta', 'kwargs': {'alpha': 1, 'beta': 3}},
'lam': {'dist': 'Gamma', 'kwargs': {'alpha': 3, 'beta': 1}},
'likelihood': {'dist': 'Normal',
'kwargs': {'sigma': {'dist': 'HalfNormal', 'kwargs': {'sigma': 2}}}
},
'gamma_control': {'dist': 'Normal', 'kwargs': {'mu': 0, 'sigma': 2}},
'gamma_fourier': {'dist': 'Laplace', 'kwargs': {'mu': 0, 'b': 1}}
}
model = DelayedSaturatedMMM(
date_column="date_week",
channel_columns=["x1", "x2"],
control_columns=[
"event_1",
"event_2",
"t",
],
adstock_max_lag=8,
yearly_seasonality=2,
model_config=custom_config,
)
"""
self.intercept_dist = self._get_distribution(
dist=self.model_config["intercept"]
)
self.beta_channel_dist = self._get_distribution(
dist=self.model_config["beta_channel"]
)
self.lam_dist = self._get_distribution(dist=self.model_config["lam"])
self.alpha_dist = self._get_distribution(dist=self.model_config["alpha"])
self.gamma_control_dist = self._get_distribution(
dist=self.model_config["gamma_control"]
)
self.gamma_fourier_dist = self._get_distribution(
dist=self.model_config["gamma_fourier"]
)
self._generate_and_preprocess_model_data(X, y)
with pm.Model(
coords=self.model_coords,
coords_mutable=self.coords_mutable,
) as self.model:
channel_data_ = pm.MutableData(
name="channel_data",
value=self.preprocessed_data["X"][self.channel_columns],
dims=("date", "channel"),
)
target_ = pm.MutableData(
name="target",
value=self.preprocessed_data["y"],
dims="date",
)
if self.time_varying_intercept:
time_index = pm.Data(
"time_index",
self._time_index,
dims="date",
)
intercept = create_time_varying_intercept(
time_index,
self._time_index_mid,
self._time_resolution,
self.intercept_dist,
self.model_config,
)
else:
intercept = self.intercept_dist(
name="intercept", **self.model_config["intercept"]["kwargs"]
)
beta_channel = self.beta_channel_dist(
name="beta_channel",
**self.model_config["beta_channel"]["kwargs"],
dims=("channel",),
)
alpha = self.alpha_dist(
name="alpha",
dims="channel",
**self.model_config["alpha"]["kwargs"],
)
lam = self.lam_dist(
name="lam",
dims="channel",
**self.model_config["lam"]["kwargs"],
)
channel_adstock = pm.Deterministic(
name="channel_adstock",
var=geometric_adstock(
x=channel_data_,
alpha=alpha,
l_max=self.adstock_max_lag,
normalize=True,
axis=0,
),
dims=("date", "channel"),
)
channel_adstock_saturated = pm.Deterministic(
name="channel_adstock_saturated",
var=logistic_saturation(x=channel_adstock, lam=lam),
dims=("date", "channel"),
)
channel_contributions_var = channel_adstock_saturated * beta_channel
channel_contributions = pm.Deterministic(
name="channel_contributions",
var=channel_contributions_var,
dims=("date", "channel"),
)
mu_var = intercept + channel_contributions.sum(axis=-1)
if (
self.control_columns is not None
and len(self.control_columns) > 0
and all(
column in self.preprocessed_data["X"].columns
for column in self.control_columns
)
):
gamma_control = self.gamma_control_dist(
name="gamma_control",
dims="control",
**self.model_config["gamma_control"]["kwargs"],
)
control_data_ = pm.MutableData(
name="control_data",
value=self.preprocessed_data["X"][self.control_columns],
dims=("date", "control"),
)
control_contributions = pm.Deterministic(
name="control_contributions",
var=control_data_ * gamma_control,
dims=("date", "control"),
)
mu_var += control_contributions.sum(axis=-1)
if (
hasattr(self, "fourier_columns")
and self.fourier_columns is not None
and len(self.fourier_columns) > 0
and all(
column in self.preprocessed_data["X"].columns
for column in self.fourier_columns
)
):
fourier_data_ = pm.MutableData(
name="fourier_data",
value=self.preprocessed_data["X"][self.fourier_columns],
dims=("date", "fourier_mode"),
)
gamma_fourier = self.gamma_fourier_dist(
name="gamma_fourier",
dims="fourier_mode",
**self.model_config["gamma_fourier"]["kwargs"],
)
fourier_contribution = pm.Deterministic(
name="fourier_contributions",
var=fourier_data_ * gamma_fourier,
dims=("date", "fourier_mode"),
)
mu_var += fourier_contribution.sum(axis=-1)
mu = pm.Deterministic(name="mu", var=mu_var, dims="date")
self._create_likelihood_distribution(
dist=self.model_config["likelihood"],
mu=mu,
observed=target_,
dims="date",
)
@property
def default_model_config(self) -> dict:
return {
"intercept": {
"dist": "Normal",
"kwargs": {"mu": 0, "sigma": 2},
},
"beta_channel": {"dist": "HalfNormal", "kwargs": {"sigma": 2}},
"alpha": {"dist": "Beta", "kwargs": {"alpha": 1, "beta": 3}},
"lam": {"dist": "Gamma", "kwargs": {"alpha": 3, "beta": 1}},
"likelihood": {
"dist": "Normal",
"kwargs": {
"sigma": {"dist": "HalfNormal", "kwargs": {"sigma": 2}},
},
},
"gamma_control": {"dist": "Normal", "kwargs": {"mu": 0, "sigma": 2}},
"gamma_fourier": {"dist": "Laplace", "kwargs": {"mu": 0, "b": 1}},
"intercept_tvp_kwargs": {
"m": 200,
"L": None,
"eta_lam": 1,
"ls_mu": None,
"ls_sigma": 10,
"cov_func": None,
},
}
def _get_fourier_models_data(self, X) -> pd.DataFrame:
"""Generates fourier modes to model seasonality.
References
----------
https://www.pymc.io/projects/examples/en/latest/time_series/Air_passengers-Prophet_with_Bayesian_workflow.html
"""
if self.yearly_seasonality is None:
raise ValueError("yearly_seasonality must be specified.")
date_data: pd.Series = pd.to_datetime(
arg=X[self.date_column], format="%Y-%m-%d"
)
periods: npt.NDArray[np.float_] = (
date_data.dt.dayofyear.to_numpy() / DAYS_IN_YEAR
)
return generate_fourier_modes(
periods=periods,
n_order=self.yearly_seasonality,
)
[docs]
def channel_contributions_forward_pass(
self, channel_data: npt.NDArray[np.float_]
) -> npt.NDArray[np.float_]:
"""Evaluate the channel contribution for a given channel data and a fitted model, ie. the forward pass.
Parameters
----------
channel_data : array-like
Input channel data. Result of all the preprocessing steps.
Returns
-------
array-like
Transformed channel data.
"""
alpha_posterior = self.fit_result["alpha"].to_numpy()
lam_posterior = self.fit_result["lam"].to_numpy()
lam_posterior_expanded = np.expand_dims(a=lam_posterior, axis=2)
beta_channel_posterior = self.fit_result["beta_channel"].to_numpy()
beta_channel_posterior_expanded = np.expand_dims(
a=beta_channel_posterior, axis=2
)
geometric_adstock_posterior = geometric_adstock(
x=channel_data,
alpha=alpha_posterior,
l_max=self.adstock_max_lag,
normalize=True,
axis=0,
)
logistic_saturation_posterior = logistic_saturation(
x=geometric_adstock_posterior,
lam=lam_posterior_expanded,
)
channel_contribution_forward_pass = (
beta_channel_posterior_expanded * logistic_saturation_posterior
)
return channel_contribution_forward_pass.eval()
@property
def _serializable_model_config(self) -> dict[str, Any]:
def ndarray_to_list(d: dict) -> dict:
new_d = d.copy() # Copy the dictionary to avoid mutating the original one
for key, value in new_d.items():
if isinstance(value, np.ndarray):
new_d[key] = value.tolist()
elif isinstance(value, dict):
new_d[key] = ndarray_to_list(value)
return new_d
serializable_config = self.model_config.copy()
return ndarray_to_list(serializable_config)
[docs]
@classmethod
def load(cls, fname: str):
"""
Creates a DelayedSaturatedMMM instance from a file,
instantiating the model with the saved original input parameters.
Loads inference data for the model.
Parameters
----------
fname : string
This denotes the name with path from where idata should be loaded from.
Returns
-------
Returns an instance of DelayedSaturatedMMM.
Raises
------
ValueError
If the inference data that is loaded doesn't match with the model.
"""
filepath = Path(str(fname))
idata = az.from_netcdf(filepath)
model_config = cls._model_config_formatting(
json.loads(idata.attrs["model_config"])
)
model = cls(
date_column=json.loads(idata.attrs["date_column"]),
control_columns=json.loads(idata.attrs["control_columns"]),
channel_columns=json.loads(idata.attrs["channel_columns"]),
adstock_max_lag=json.loads(idata.attrs["adstock_max_lag"]),
validate_data=json.loads(idata.attrs["validate_data"]),
yearly_seasonality=json.loads(idata.attrs["yearly_seasonality"]),
model_config=model_config,
sampler_config=json.loads(idata.attrs["sampler_config"]),
)
model.idata = idata
dataset = idata.fit_data.to_dataframe()
X = dataset.drop(columns=[model.output_var])
y = dataset[model.output_var].values
model.build_model(X, y)
# All previously used data is in idata.
if model.id != idata.attrs["id"]:
error_msg = f"""The file '{fname}' does not contain an inference data of the same model
or configuration as '{cls._model_type}'"""
raise ValueError(error_msg)
return model
def _data_setter(
self,
X: np.ndarray | pd.DataFrame,
y: np.ndarray | pd.Series | None = None,
) -> None:
"""
Sets new data in the model.
This function accepts data in various formats and sets them into the
model using the PyMC's `set_data` method. The data corresponds to the
channel data and the target.
Parameters
----------
X : Union[np.ndarray, pd.DataFrame]
Data for the channel. It can be a numpy array or pandas DataFrame.
If it's a DataFrame, the columns corresponding to self.channel_columns
are used. If it's an ndarray, it's used directly.
y : Union[np.ndarray, pd.Series], optional
Target data. It can be a numpy array or a pandas Series.
If it's a Series, its values are used. If it's an ndarray, it's used
directly. The default is None.
Raises
------
RuntimeError
If the data for the channel is not provided in `X`.
TypeError
If `X` is not a pandas DataFrame or a numpy array, or
if `y` is not a pandas Series or a numpy array and is not None.
Returns
-------
None
"""
if not isinstance(X, pd.DataFrame):
msg = "X must be a pandas DataFrame in order to access the columns"
raise TypeError(msg)
new_channel_data: np.ndarray | None = None
coords = {"date": X[self.date_column].to_numpy()}
try:
new_channel_data = X[self.channel_columns].to_numpy()
except KeyError as e:
raise RuntimeError("New data must contain channel_data!") from e
def identity(x):
return x
channel_transformation = (
identity
if not hasattr(self, "channel_transformer")
else self.channel_transformer.transform
)
data: dict[str, np.ndarray | Any] = {
"channel_data": channel_transformation(new_channel_data)
}
if self.control_columns is not None:
control_data = X[self.control_columns].to_numpy()
control_transformation = (
identity
if not hasattr(self, "control_transformer")
else self.control_transformer.transform
)
data["control_data"] = control_transformation(control_data)
if hasattr(self, "fourier_columns"):
data["fourier_data"] = self._get_fourier_models_data(X)
if self.time_varying_intercept:
data["time_index"] = infer_time_index(
X[self.date_column], self.X[self.date_column], self._time_resolution
)
if y is not None:
if isinstance(y, pd.Series):
data["target"] = (
y.to_numpy()
) # convert Series to numpy array explicitly
elif isinstance(y, np.ndarray):
data["target"] = y
else:
raise TypeError("y must be either a pandas Series or a numpy array")
else:
dtype = self.preprocessed_data["y"].dtype # type: ignore
data["target"] = np.zeros(X.shape[0], dtype=dtype) # type: ignore
with self.model:
pm.set_data(data, coords=coords)
@classmethod
def _model_config_formatting(cls, model_config: dict) -> dict:
"""
Because of json serialization, model_config values that were originally tuples
or numpy are being encoded as lists. This function converts them back to tuples
and numpy arrays to ensure correct id encoding.
"""
def format_nested_dict(d: dict) -> dict:
for key, value in d.items():
if isinstance(value, dict):
d[key] = format_nested_dict(value)
elif isinstance(value, list):
# Check if the key is "dims" to convert it to tuple
if key == "dims":
d[key] = tuple(value)
# Convert all other lists to numpy arrays
else:
d[key] = np.array(value)
return d
return format_nested_dict(model_config.copy())
[docs]
class DelayedSaturatedMMM(
MaxAbsScaleTarget,
MaxAbsScaleChannels,
ValidateControlColumns,
BaseDelayedSaturatedMMM,
):
"""Media Mix Model with delayed adstock and logistic saturation class (see [1]_).
Given a time series target variable :math:`y_{t}` (e.g. sales on conversions), media variables
:math:`x_{m, t}` (e.g. impressions, clicks or costs) and a set of control covariates :math:`z_{c, t}` (e.g. holidays, special events)
we consider a Bayesian linear model of the form:
.. math::
y_{t} = \\alpha + \\sum_{m=1}^{M}\\beta_{m}f(x_{m, t}) + \\sum_{c=1}^{C}\\gamma_{c}z_{c, t} + \\varepsilon_{t},
where :math:`\\alpha` is the intercept, :math:`f` is a media transformation function and :math:`\\varepsilon_{t}` is the error therm
which we assume is normally distributed. The function :math:`f` encodes the contribution of media on the target variable.
Typically we consider two types of transformation: adstock (carry-over) and saturation effects.
Notes
-----
Here are some important notes about the model:
1. Before fitting the model, we scale the target variable and the media channels using the maximum absolute value of each variable.
This enable us to have a more stable model and better convergence. If control variables are present, we do not scale them!
If needed please do it before passing the data to the model.
2. We allow to add yearly seasonality controls as Fourier modes.
You can use the `yearly_seasonality` parameter to specify the number of Fourier modes to include.
3. This class also allow us to calibrate the model using:
* Custom priors for the parameters via the `model_config` parameter. You can also set the likelihood distribution.
* Adding lift tests to the likelihood function via the :meth:`add_lift_test_measurements <pymc_marketing.mmm.delayed_saturated_mmm.DelayedSaturatedMMM.add_lift_test_measurements>` method.
For details on a vanilla implementation in PyMC, see [2]_.
Examples
--------
Here is an example of how to instantiate the model with the default configuration:
.. code-block:: python
import numpy as np
import pandas as pd
from pymc_marketing.mmm import DelayedSaturatedMMM
data_url = "https://raw.githubusercontent.com/pymc-labs/pymc-marketing/main/data/mmm_example.csv"
data = pd.read_csv(data_url, parse_dates=["date_week"])
mmm = DelayedSaturatedMMM(
date_column="date_week",
channel_columns=["x1", "x2"],
control_columns=[
"event_1",
"event_2",
"t",
],
adstock_max_lag=8,
yearly_seasonality=2,
)
Now we can fit the model with the data:
.. code-block:: python
# Set features and target
X = data.drop("y", axis=1)
y = data["y"]
# Fit the model
idata = mmm.fit(X, y)
We can also define custom priors for the model:
.. code-block:: python
my_model_config = {
"beta_channel": {
"dist": "LogNormal",
"kwargs": {"mu": np.array([2, 1]), "sigma": 1},
},
"likelihood": {
"dist": "Normal",
"kwargs": {"sigma": {"dist": "HalfNormal", "kwargs": {"sigma": 2}}},
},
}
mmm = DelayedSaturatedMMM(
model_config=my_model_config,
date_column="date_week",
channel_columns=["x1", "x2"],
control_columns=[
"event_1",
"event_2",
"t",
],
adstock_max_lag=8,
yearly_seasonality=2,
)
As you can see, we can configure all prior and likelihood distributions via the `model_config`.
The `fit` method accepts keyword arguments that are passed to the PyMC sampling method.
For example, to change the number of samples and chains, and using a JAX implementation of NUTS we can do:
.. code-block:: python
sampler_kwargs = {
"draws": 2_000,
"target_accept": 0.9,
"chains": 5,
"random_seed": 42,
}
idata = mmm.fit(X, y, nuts_sampler="numpyro", **sampler_kwargs)
References
----------
.. [1] Jin, Yuxue, et al. “Bayesian methods for media mix modeling with carryover and shape effects.” (2017).
.. [2] Orduz, J. `"Media Effect Estimation with PyMC: Adstock, Saturation & Diminishing Returns" <https://juanitorduz.github.io/pymc_mmm/>`_.
""" # noqa: E501
[docs]
def channel_contributions_forward_pass(
self, channel_data: npt.NDArray[np.float_]
) -> npt.NDArray[np.float_]:
"""Evaluate the channel contribution for a given channel data and a fitted model, ie. the forward pass.
We return the contribution in the original scale of the target variable.
Parameters
----------
channel_data : array-like
Input channel data. Result of all the preprocessing steps.
Returns
-------
array-like
Transformed channel data.
"""
channel_contribution_forward_pass = super().channel_contributions_forward_pass(
channel_data=channel_data
)
target_transformed_vectorized = np.vectorize(
self.target_transformer.inverse_transform,
excluded=[1, 2],
signature="(m, n) -> (m, n)",
)
return target_transformed_vectorized(channel_contribution_forward_pass)
[docs]
def get_channel_contributions_forward_pass_grid(
self, start: float, stop: float, num: int
) -> DataArray:
"""Generate a grid of scaled channel contributions for a given grid of shared values.
Parameters
----------
start : float
Start of the grid. It must be equal or greater than 0.
stop : float
End of the grid. It must be greater than start.
num : int
Number of points in the grid.
Returns
-------
DataArray
Grid of channel contributions.
"""
if start < 0:
raise ValueError("start must be greater than or equal to 0.")
share_grid = np.linspace(start=start, stop=stop, num=num)
channel_contributions = []
for delta in share_grid:
channel_data = (
delta * self.preprocessed_data["X"][self.channel_columns].to_numpy()
)
channel_contribution_forward_pass = self.channel_contributions_forward_pass(
channel_data=channel_data
)
channel_contributions.append(channel_contribution_forward_pass)
return DataArray(
data=np.array(channel_contributions),
dims=("delta", "chain", "draw", "date", "channel"),
coords={
"delta": share_grid,
"date": self.X[self.date_column],
"channel": self.channel_columns,
},
)
[docs]
def plot_channel_contributions_grid(
self,
start: float,
stop: float,
num: int,
absolute_xrange: bool = False,
**plt_kwargs: Any,
) -> plt.Figure:
"""Plots a grid of scaled channel contributions for a given grid of share values.
Parameters
----------
start : float
Start of the grid. It must be equal or greater than 0.
stop : float
End of the grid. It must be greater than start.
num : int
Number of points in the grid.
absolute_xrange : bool, optional
If True, the x-axis is in absolute values (input units), otherwise it is in
relative percentage values, by default False.
Returns
-------
plt.Figure
Plot of grid of channel contributions.
"""
share_grid = np.linspace(start=start, stop=stop, num=num)
contributions = self.get_channel_contributions_forward_pass_grid(
start=start, stop=stop, num=num
)
fig, ax = plt.subplots(**plt_kwargs)
for i, channel in enumerate(self.channel_columns):
channel_contribution_total = contributions.sel(channel=channel).sum(
dim="date"
)
hdi_contribution = az.hdi(ary=channel_contribution_total).x
total_channel_input = self.X[channel].sum()
x_range = (
total_channel_input * share_grid if absolute_xrange else share_grid
)
ax.fill_between(
x=x_range,
y1=hdi_contribution[:, 0],
y2=hdi_contribution[:, 1],
color=f"C{i}",
label=f"{channel} $94\%$ HDI contribution", # noqa: W605
alpha=0.4,
)
sns.lineplot(
x=x_range,
y=channel_contribution_total.mean(dim=("chain", "draw")),
color=f"C{i}",
marker="o",
label=f"{channel} contribution mean",
ax=ax,
)
if absolute_xrange:
ax.axvline(
x=total_channel_input,
color=f"C{i}",
linestyle="--",
label=f"{channel} current total input",
)
if not absolute_xrange:
ax.axvline(x=1, color="black", linestyle="--", label=r"$\delta = 1$")
ax.legend(loc="center left", bbox_to_anchor=(1, 0.5))
x_label = "input" if absolute_xrange else r"$\delta$"
ax.set(
title="Channel contribution as a function of cost share",
xlabel=x_label,
ylabel="contribution",
)
return fig
[docs]
def new_spend_contributions(
self,
spend: np.ndarray | None = None,
one_time: bool = True,
spend_leading_up: np.ndarray | None = None,
prior: bool = False,
original_scale: bool = True,
**sample_posterior_predictive_kwargs,
) -> DataArray:
"""Return the upcoming contributions for a given spend.
The spend can be one time or constant over the period. The spend leading up to the
period can also be specified in order account for the lagged effect of the spend.
Parameters
----------
spend : np.ndarray, optional
Array of spend for each channel. If None, the average spend for each channel is used, by default None.
one_time : bool, optional
Whether the spends for each channel are only at the start of the period.
If True, all spends after the initial spend are zero.
If False, all spends after the initial spend are the same as the initial spend.
By default True.
spend_leading_up : np.array, optional
Array of spend for each channel leading up to the spend, by default None or 0 for each channel.
Use this parameter to account for the lagged effect of the spend.
prior : bool, optional
Whether to use the prior or posterior, by default False (posterior)
**sample_posterior_predictive_kwargs
Additional keyword arguments passed to pm.sample_posterior_predictive
Returns
-------
DataArray
Upcoming contributions for each channel
Examples
--------
Channel contributions from 1 unit on each channel only once.
.. code-block:: python
n_channels = len(model.channel_columns)
spend = np.ones(n_channels)
new_spend_contributions = model.new_spend_contributions(spend=spend)
Channel contributions from continuously spending 1 unit on each channel.
.. code-block:: python
n_channels = len(model.channel_columns)
spend = np.ones(n_channels)
new_spend_contributions = model.new_spend_contributions(spend=spend, one_time=False)
Channel contributions from 1 unit on each channel only once but with 1 unit leading up to the spend.
.. code-block:: python
n_channels = len(model.channel_columns)
spend = np.ones(n_channels)
spend_leading_up = np.ones(n_channels)
new_spend_contributions = model.new_spend_contributions(spend=spend, spend_leading_up=spend_leading_up)
"""
if spend is None:
spend = self.X.loc[:, self.channel_columns].mean().to_numpy() # type: ignore
n_channels = len(self.channel_columns)
if len(spend) != n_channels:
raise ValueError("spend must be the same length as the number of channels")
new_data = create_new_spend_data(
spend=spend,
adstock_max_lag=self.adstock_max_lag,
one_time=one_time,
spend_leading_up=spend_leading_up,
)
new_data = (
self.channel_transformer.transform(new_data) if not prior else new_data
)
idata: Dataset = self.fit_result if not prior else self.prior
coords = {
"time_since_spend": np.arange(
-self.adstock_max_lag, self.adstock_max_lag + 1
),
"channel": self.channel_columns,
}
with pm.Model(coords=coords):
alpha = pm.Uniform("alpha", lower=0, upper=1, dims=("channel",))
lam = pm.HalfFlat("lam", dims=("channel",))
beta_channel = pm.HalfFlat("beta_channel", dims=("channel",))
# Same as the forward pass of the model
channel_adstock = geometric_adstock(
x=new_data,
alpha=alpha,
l_max=self.adstock_max_lag,
normalize=True,
axis=0,
)
channel_adstock_saturated = logistic_saturation(x=channel_adstock, lam=lam)
pm.Deterministic(
name="channel_contributions",
var=channel_adstock_saturated * beta_channel,
dims=("time_since_spend", "channel"),
)
samples = pm.sample_posterior_predictive(
idata,
var_names=["channel_contributions"],
**sample_posterior_predictive_kwargs,
)
channel_contributions = samples.posterior_predictive["channel_contributions"]
if original_scale:
channel_contributions = apply_sklearn_transformer_across_dim(
data=channel_contributions,
func=self.get_target_transformer().inverse_transform,
dim_name="time_since_spend",
combined=False,
)
return channel_contributions
[docs]
def plot_new_spend_contributions(
self,
spend_amount: float,
one_time: bool = True,
lower: float = 0.025,
upper: float = 0.975,
ylabel: str = "Sales",
idx: slice | None = None,
channels: list[str] | None = None,
prior: bool = False,
original_scale: bool = True,
ax: plt.Axes | None = None,
**sample_posterior_predictive_kwargs,
) -> plt.Axes:
"""Plot the upcoming sales for a given spend amount.
Calls the new_spend_contributions method and plots the results. For more
control over the plot, use new_spend_contributions directly.
Parameters
----------
spend_amount : float
The amount of spend for each channel
one_time : bool, optional
Whether the spend are one time (at start of period) or constant (over period), by default True (one time)
lower : float, optional
The lower quantile for the confidence interval, by default 0.025
upper : float, optional
The upper quantile for the confidence interval, by default 0.975
ylabel : str, optional
The label for the y-axis, by default "Sales"
idx : slice, optional
The index slice of days to plot, by default None or only the positive days.
More specifically, slice(0, None, None)
channels : List[str], optional
The channels to plot, by default None or all channels
prior : bool, optional
Whether to use the prior or posterior, by default False (posterior)
original_scale : bool, optional
Whether to plot in the original scale of the target variable, by default True
ax : plt.Axes, optional
The axes to plot on, by default None or current axes
**sample_posterior_predictive_kwargs
Additional keyword arguments passed to pm.sample_posterior_predictive
Returns
-------
plt.Axes
The plot of upcoming sales for the given spend amount
"""
for value in [lower, upper]:
if value < 0 or value > 1:
raise ValueError("lower and upper must be between 0 and 1")
if lower > upper:
raise ValueError("lower must be less than or equal to upper")
ax = ax or plt.gca()
total_channels = len(self.channel_columns)
contributions = self.new_spend_contributions(
np.ones(total_channels) * spend_amount,
one_time=one_time,
spend_leading_up=np.ones(total_channels) * spend_amount,
prior=prior,
original_scale=original_scale,
**sample_posterior_predictive_kwargs,
)
contributions_groupby = contributions.to_series().groupby(
level=["time_since_spend", "channel"]
)
idx = idx or pd.IndexSlice[0:]
conf = (
contributions_groupby.quantile([lower, upper])
.unstack("channel")
.unstack()
.loc[idx]
)
channels = channels or self.channel_columns # type: ignore
for channel in channels: # type: ignore
ax.fill_between(
conf.index,
conf[channel][lower],
conf[channel][upper],
label=f"{channel} {100 * (upper - lower):.0f}% CI",
alpha=0.5,
)
mean = contributions_groupby.mean().unstack("channel").loc[idx, channels]
color = [f"C{i}" for i in range(len(channels))] # type: ignore
mean.add_suffix(" mean").plot(ax=ax, color=color, alpha=0.75)
ax.legend().set_title("Channel")
ax.set(
xlabel="Time since spend",
ylabel=ylabel,
title=f"Upcoming sales for {spend_amount:.02f} spend",
)
return ax
def _validate_data(self, X, y=None):
return X
[docs]
def sample_posterior_predictive(
self,
X_pred,
extend_idata: bool = True,
combined: bool = True,
include_last_observations: bool = False,
original_scale: bool = True,
**sample_posterior_predictive_kwargs,
):
"""
Sample from the model's posterior predictive distribution.
Parameters
---------
X_pred : array, shape (n_pred, n_features)
The input data used for prediction.
extend_idata : Boolean determining whether the predictions should be added to inference data object.
Defaults to True.
combined: Combine chain and draw dims into sample. Won't work if a dim named sample already exists.
Defaults to True.
include_last_observations: Boolean determining whether to include the last observations of the training
data in order to carry over costs with the adstock transformation.
Assumes that X_pred are the next predictions following the training data.
Defaults to False.
original_scale: Boolean determining whether to return the predictions in the original scale
of the target variable. Defaults to True.
**sample_posterior_predictive_kwargs: Additional arguments to pass to pymc.sample_posterior_predictive
Returns
-------
posterior_predictive_samples : DataArray, shape (n_pred, samples)
Posterior predictive samples for each input X_pred
"""
if include_last_observations:
X_pred = pd.concat(
[self.X.iloc[-self.adstock_max_lag :, :], X_pred], axis=0
).sort_values(by=self.date_column)
self._data_setter(X_pred)
with self.model: # sample with new input data
post_pred = pm.sample_posterior_predictive(
self.idata, **sample_posterior_predictive_kwargs
)
if extend_idata:
self.idata.extend(post_pred, join="right") # type: ignore
posterior_predictive_samples = az.extract(
post_pred, "posterior_predictive", combined=combined
)
if include_last_observations:
posterior_predictive_samples = posterior_predictive_samples.isel(
date=slice(self.adstock_max_lag, None)
)
if original_scale:
posterior_predictive_samples = apply_sklearn_transformer_across_dim(
data=posterior_predictive_samples,
func=self.get_target_transformer().inverse_transform,
dim_name="date",
combined=combined,
)
return posterior_predictive_samples
[docs]
def add_lift_test_measurements(
self,
df_lift_test: pd.DataFrame,
dist: pm.Distribution = pm.Gamma,
name: str = "lift_measurements",
) -> None:
"""Add lift tests to the model.
The model difference of a channel's saturation curve is created
from `x` and `x + delta_x` for each channel. This random variable is
then conditioned using the empirical lift, `delta_y`, and `sigma` of the lift test
with the specified distribution `dist`.
The pseudo-code for the lift test is as follows:
.. code-block:: python
model_estimated_lift = (
saturation_curve(x + delta_x)
- saturation_curve(x)
)
empirical_lift = delta_y
dist(abs(model_estimated_lift), sigma=sigma, observed=abs(empirical_lift))
The model has to be built before adding the lift tests.
Parameters
----------
df_lift_test : pd.DataFrame
DataFrame with lift test results with at least the following columns:
* `channel`: channel name. Must be present in `channel_columns`.
* `x`: x axis value of the lift test.
* `delta_x`: change in x axis value of the lift test.
* `delta_y`: change in y axis value of the lift test.
* `sigma`: standard deviation of the lift test.
dist : pm.Distribution, optional
The distribution to use for the likelihood, by default pm.Gamma
name : str, optional
The name of the likelihood of the lift test contribution(s),
by default "lift_measurements". Name change required if calling
this method multiple times.
Raises
------
RuntimeError
If the model has not been built yet.
KeyError
If the 'channel' column is not present in df_lift_test.
Examples
--------
Build the model first then add lift test measurements.
.. code-block:: python
model = DelayedSaturatedMMM(
date_column="date_week",
channel_columns=["x1", "x2"],
control_columns=[
"event_1",
"event_2",
],
adstock_max_lag=8,
yearly_seasonality=2,
)
X: pd.DataFrame = ...
y: np.ndarray = ...
model.build_model(X, y)
df_lift_test = pd.DataFrame({
"channel": ["x1", "x1"],
"x": [1, 1],
"delta_x": [0.1, 0.2],
"delta_y": [0.1, 0.1],
"sigma": [0.1, 0.1],
})
model.add_lift_test_measurements(df_lift_test)
"""
if self.model is None:
raise RuntimeError(
"The model has not been built yet. Please, build the model first."
)
if "channel" not in df_lift_test.columns:
raise KeyError(
"The 'channel' column is required to map the lift measurements to the model."
)
df_lift_test_scaled = scale_lift_measurements(
df_lift_test=df_lift_test,
channel_col="channel",
channel_columns=self.channel_columns, # type: ignore
channel_transform=self.channel_transformer.transform,
target_transform=self.target_transformer.transform,
)
with self.model:
add_logistic_empirical_lift_measurements_to_likelihood(
df_lift_test=df_lift_test_scaled,
# Based on the model
lam_name="lam",
beta_name="beta_channel",
dist=dist,
name=name,
)