# Copyright 2024 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
from collections.abc import Sequence
from typing import Any, Literal, cast
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytensor.tensor as pt
import xarray
from numpy import exp, log
from pymc.util import RandomState
from pytensor.compile import Mode, get_default_mode
from pytensor.graph import Constant, node_rewriter
from pytensor.scalar import Grad2F1Loop
from pytensor.tensor.elemwise import Elemwise
from scipy.special import betaln, gammaln, hyp2f1
from xarray_einstats.stats import logsumexp as xr_logsumexp
from pymc_marketing.clv.distributions import ParetoNBD
from pymc_marketing.clv.models.basic import CLVModel
from pymc_marketing.clv.utils import to_xarray
@node_rewriter([Elemwise])
def local_reduce_max_num_iters_hyp2f1_grad(fgraph, node):
"""Rewrite that reduces the maximum number of iterations in the hyp2f1 grad scalar loop.
This is critical to get NUTS to converge in the beginning.
Otherwise, it can take a long time to get started.
"""
if not isinstance(node.op.scalar_op, Grad2F1Loop):
return
max_steps, *other_args = node.inputs
# max_steps = switch(skip_loop, 0, 1e6) by default
if max_steps.owner and max_steps.owner.op == pt.switch:
cond, zero, max_steps_const = max_steps.owner.inputs
if (isinstance(zero, Constant) and np.all(zero.data == 0)) and (
isinstance(max_steps_const, Constant)
and np.all(max_steps_const.data == 1e6)
):
new_max_steps = pt.switch(cond, zero, np.array(int(1e5), dtype="int32"))
return node.op.make_node(new_max_steps, *other_args).outputs
pytensor.compile.optdb["specialize"].register(
"local_reduce_max_num_iters_hyp2f1_grad",
local_reduce_max_num_iters_hyp2f1_grad,
use_db_name_as_tag=False, # Not included by default
)
[docs]
class ParetoNBDModel(CLVModel):
"""Pareto Negative Binomial Model (Pareto/NBD).
Model for continuous, non-contractual customers, first introduced by Schmittlein, et al. [1]_,
with additional derivations and predictive methods by Hardie & Fader [2]_ [3]_ [4]_.
The Pareto/NBD model assumes the time duration a customer is active follows a Gamma distribution,
and time between purchases is also Gamma-distributed while the customer is still active.
This model requires data to be summarized by recency, frequency, and T for each customer,
using `clv.rfm_summary()` or equivalent. Covariates impacting customer dropouts and transaction rates are optional.
Parameters
----------
data: pd.DataFrame
DataFrame containing the following columns:
* `frequency`: number of repeat purchases
* `recency`: time between the first and the last purchase
* `T`: time between the first purchase and the end of the observation period.
Model assumptions require T >= recency
* `customer_id`: unique customer identifier
Along with optional covariate columns.
model_config: dict, optional
Dictionary containing model parameters and covariate column names:
* `r_prior`: Shape parameter of time between purchases; defaults to `Weibull(alpha=2, beta=1)`
* `alpha_prior`: Scale parameter of time between purchases; defaults to `Weibull(alpha=2, beta=10)`
* `s_prior`: Shape parameter of time until dropout; defaults to `Weibull(alpha=2, beta=1)`
* `beta_prior`: Scale parameter of time until dropout; defaults to `Weibull(alpha=2, beta=10)`
* `purchase_covariates_prior`: Coefficients for purchase rate covariates; defaults to `Normal(0, 3)`
* `dropout_covariates_prior`: Coefficients for dropout covariates; defaults to `Normal.dist(0, 3)`
* `purchase_covariate_cols`: List containing column names of covariates for customer purchase rates.
* `dropout_covariate_cols:`: List containing column names of covariates for customer dropouts.
If not provided, the model will use default priors specified in the `default_model_config` class attribute.
sampler_config: dict, optional
Dictionary of sampler parameters. Defaults to None.
Examples
--------
.. code-block:: python
import pymc as pm
from pymc_marketing.clv import ParetoNBDModel, rfm_summary
rfm_df = rfm_summary(raw_data,'id_col_name','date_col_name')
# Initialize model with customer data; `model_config` parameter is optional
model = ParetoNBDModel(
data=rfm_df,
model_config={
"r_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 1}},
"alpha_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 10}},
"s_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 1}},
"beta_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 10}},
},
)
# Fit model quickly to large datasets via the default Maximum a Posteriori method
model.fit(fit_method='map')
print(model.fit_summary())
# Use 'mcmc' for more informative predictions and reliable performance on smaller datasets
model.fit(fit_method='mcmc')
print(model.fit_summary())
# Predict number of purchases for customers over the next 10 time periods
expected_purchases = model.expected_purchases(
data=rfm_df,
future_t=10,
)
# Predict probability of customer making 'n' purchases over 't' time periods
# Data parameter is omitted here because predictions are ran on original dataset
expected_num_purchases = model.expected_purchase_probability(
n=[0, 1, 2, 3],
future_t=[10,20,30,40],
)
new_data = pd.DataFrame(
data = {
"customer_id": [0, 1, 2, 3],
"frequency": [5, 2, 1, 8],
"recency": [7, 4, 2.5, 11],
"T": [10, 8, 10, 22]
}
)
# Predict probability customers will still be active in 'future_t' time periods
probability_alive = model.expected_probability_alive(
data=new_data,
future_t=[0, 3, 6, 9],
)
# Predict number of purchases for a new customer over 't' time periods.
expected_purchases_new_customer = model.expected_purchases_new_customer(
t=[2, 5, 7, 10],
)
References
----------
.. [1] David C. Schmittlein, Donald G. Morrison and Richard Colombo.
"Counting Your Customers: Who Are They and What Will They Do Next".
Management Science,Vol. 33, No. 1 (Jan., 1987), pp. 1-24.
.. [2] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions".
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
.. [3] Fader, Peter & G. S. Hardie, Bruce (2014).
"Additional Results for the Pareto/NBD Model".
https://www.brucehardie.com/notes/015/additional_pareto_nbd_results.pdf
.. [4] Fader, Peter & G. S. Hardie, Bruce (2014).
"Deriving the Conditional PMF of the Pareto/NBD Model".
https://www.brucehardie.com/notes/028/pareto_nbd_conditional_pmf.pdf
.. [5] Fader, Peter & G. S. Hardie, Bruce (2007).
"Incorporating Time-Invariant Covariates into the Pareto/NBD and BG/NBD Models".
https://www.brucehardie.com/notes/019/time_invariant_covariates.pdf
"""
_model_type = "Pareto/NBD" # Pareto Negative-Binomial Distribution
[docs]
def __init__(
self,
data: pd.DataFrame,
*,
model_config: dict | None = None,
sampler_config: dict | None = None,
):
super().__init__(
data=data,
model_config=model_config,
sampler_config=sampler_config,
)
self.purchase_covariate_cols = list(
self.model_config["purchase_covariate_cols"]
)
self.dropout_covariate_cols = list(self.model_config["dropout_covariate_cols"])
self.covariate_cols = self.purchase_covariate_cols + self.dropout_covariate_cols
self._validate_cols(
data,
required_cols=[
"customer_id",
"frequency",
"recency",
"T",
*self.covariate_cols,
],
must_be_unique=["customer_id"],
)
@property
def default_model_config(self) -> dict[str, Any]:
return {
"r_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 1}},
"alpha_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 10}},
"s_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 1}},
"beta_prior": {"dist": "Weibull", "kwargs": {"alpha": 2, "beta": 10}},
"purchase_coefficient_prior": {
"dist": "Normal",
"kwargs": {"mu": 0, "sigma": 1},
},
"dropout_coefficient_prior": {
"dist": "Normal",
"kwargs": {"mu": 0, "sigma": 1},
},
"purchase_covariate_cols": [],
"dropout_covariate_cols": [],
}
[docs]
def build_model(self) -> None: # type: ignore[override]
r_prior = self._create_distribution(self.model_config["r_prior"])
alpha_scale_prior = self._create_distribution(self.model_config["alpha_prior"])
s_prior = self._create_distribution(self.model_config["s_prior"])
beta_scale_prior = self._create_distribution(self.model_config["beta_prior"])
if self.purchase_covariate_cols:
purchase_coefficient_prior = self._create_distribution(
self.model_config["purchase_coefficient_prior"],
shape=len(self.purchase_covariate_cols),
)
if self.dropout_covariate_cols:
dropout_coefficient_prior = self._create_distribution(
self.model_config["dropout_coefficient_prior"],
shape=len(self.dropout_covariate_cols),
)
coords = {
"purchase_covariate": self.purchase_covariate_cols,
"dropout_covariate": self.dropout_covariate_cols,
"obs_var": ["recency", "frequency"],
}
mutable_coords = {"customer_id": self.data["customer_id"]}
with pm.Model(coords=coords, coords_mutable=mutable_coords) as self.model:
r = self.model.register_rv(r_prior, name="r")
if self.purchase_covariate_cols:
purchase_data = pm.MutableData(
"purchase_data",
self.data[self.purchase_covariate_cols],
dims=["customer_id", "purchase_covariate"],
)
purchase_coefficient = self.model.register_rv(
purchase_coefficient_prior,
name="purchase_coefficient",
dims=["purchase_covariate"],
)
alpha_scale = self.model.register_rv(
alpha_scale_prior, name="alpha_scale"
)
alpha = pm.Deterministic(
"alpha",
(
alpha_scale
* pm.math.exp(-pm.math.dot(purchase_data, purchase_coefficient))
),
dims="customer_id",
)
else:
alpha = self.model.register_rv(alpha_scale_prior, name="alpha")
# churn priors
s = self.model.register_rv(s_prior, name="s")
if self.dropout_covariate_cols:
dropout_data = pm.MutableData(
"dropout_data",
self.data[self.dropout_covariate_cols],
dims=["customer_id", "dropout_covariate"],
)
dropout_coefficient = self.model.register_rv(
dropout_coefficient_prior,
name="dropout_coefficient",
dims=["dropout_covariate"],
)
beta_scale = self.model.register_rv(beta_scale_prior, name="beta_scale")
beta = pm.Deterministic(
"beta",
(
beta_scale
* pm.math.exp(-pm.math.dot(dropout_data, dropout_coefficient))
),
dims="customer_id",
)
else:
beta = self.model.register_rv(beta_scale_prior, name="beta")
ParetoNBD(
name="recency_frequency",
r=r,
alpha=alpha,
s=s,
beta=beta,
T=self.data["T"],
observed=np.stack(
(self.data["recency"], self.data["frequency"]), axis=1
),
dims=["customer_id", "obs_var"],
)
[docs]
def fit(self, fit_method: str = "map", **kwargs): # type: ignore
"""Infer posteriors of model parameters to run predictions.
Parameters
----------
fit_method: str
Method used to fit the model. Options are:
* "map": Posterior point estimates via Maximum a Posteriori (default)
* "mcmc": Full posterior distributions via No U-Turn Sampler (NUTS)
kwargs:
Other keyword arguments passed to the underlying PyMC routines
"""
mode = get_default_mode()
if fit_method == "mcmc":
# Include rewrite in mode
opt_qry = mode.provided_optimizer.including(
"local_reduce_max_num_iters_hyp2f1_grad"
)
mode = Mode(linker=mode.linker, optimizer=opt_qry)
with pytensor.config.change_flags(mode=mode, on_opt_error="raise"):
# Suppress annoying warning
with warnings.catch_warnings():
warnings.filterwarnings(
message="""
Optimization Warning: The Op hyp2f1 does not provide a C implementation.
As well as being potentially slow, this also disables loop fusion.
""",
action="ignore",
category=UserWarning,
)
return super().fit(fit_method, **kwargs)
@staticmethod
def _logp(
r: xarray.DataArray,
alpha: xarray.DataArray,
s: xarray.DataArray,
beta: xarray.DataArray,
x: xarray.DataArray,
t_x: xarray.DataArray,
T: xarray.DataArray,
) -> xarray.DataArray:
"""
Utility function for using ParetoNBD log-likelihood in predictive methods.
"""
# Add one dummy dimension to the right of the scalar parameters, so they broadcast with the `T` vector
pareto_dist = ParetoNBD.dist(
alpha=alpha.values[..., None]
if "customer_id" not in alpha.dims
else alpha.values,
r=r.values[..., None],
beta=beta.values[..., None]
if "customer_id" not in beta.dims
else beta.values,
s=s.values[..., None],
T=T.values,
)
values = np.vstack((t_x.values, x.values)).T
# TODO: Instead of compiling this function everytime this method is called
# we could compile it once (with mutable inputs) and cache it for reuse with new inputs.
loglike = pm.logp(pareto_dist, values).eval()
return xarray.DataArray(data=loglike, dims=("chain", "draw", "customer_id"))
def _extract_predictive_variables(
self,
data: pd.DataFrame,
customer_varnames: Sequence[str] = (),
) -> xarray.Dataset:
"""Utility function assigning default customer arguments
for predictive methods and converting to xarrays.
"""
self._validate_cols(
data,
required_cols=[
"customer_id",
*customer_varnames,
*self.purchase_covariate_cols,
*self.dropout_covariate_cols,
],
must_be_unique=["customer_id"],
)
customer_id = data["customer_id"]
model_coords = self.model.coords # type: ignore
if self.purchase_covariate_cols:
purchase_xarray = xarray.DataArray(
data[self.purchase_covariate_cols],
dims=["customer_id", "purchase_covariate"],
coords=[customer_id, list(model_coords["purchase_covariate"])],
)
alpha_scale = self.fit_result["alpha_scale"]
purchase_coefficient = self.fit_result["purchase_coefficient"]
alpha = alpha_scale * np.exp(
-xarray.dot(
purchase_coefficient, purchase_xarray, dims="purchase_covariate"
)
)
alpha.name = "alpha"
else:
alpha = self.fit_result["alpha"]
if self.dropout_covariate_cols:
dropout_xarray = xarray.DataArray(
data[self.dropout_covariate_cols],
dims=["customer_id", "dropout_covariate"],
coords=[customer_id, list(model_coords["dropout_covariate"])],
)
beta_scale = self.fit_result["beta_scale"]
dropout_coefficient = self.fit_result["dropout_coefficient"]
beta = beta_scale * np.exp(
-xarray.dot(
dropout_coefficient, dropout_xarray, dims="dropout_covariate"
)
)
beta.name = "beta"
else:
beta = self.fit_result["beta"]
r = self.fit_result["r"]
s = self.fit_result["s"]
customer_vars = to_xarray(
data["customer_id"],
*[data[customer_varname] for customer_varname in customer_varnames],
)
if len(customer_varnames) == 1:
customer_vars = [customer_vars]
return xarray.combine_by_coords(
(
r,
alpha,
s,
beta,
*customer_vars,
)
)
[docs]
def expected_purchases(
self,
data: pd.DataFrame | None = None,
*,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Given *recency*, *frequency*, and *T* for an individual customer, this method predicts the
expected number of future purchases across *future_t* time periods.
Adapted from equation (41) In Bruce Hardie's notes [2]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L242
Parameters
----------
data: pd.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: unique customer identifier
* `frequency`: number of repeat purchases
* `recency`: time between the first and the last purchase
* `T`: time between the first purchase and the end of the observation period.
Model assumptions require T >= recency
* `future_t`: Number of time periods to predict expected purchases.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
future_t: array_like, optional
Number of time periods to predict expected purchases.
Not needed if `data` parameter is provided with a `future_t` column.
References
----------
.. [2] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions."
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
"""
if data is None:
data = self.data
if future_t is not None:
data = data.assign(future_t=future_t)
dataset = self._extract_predictive_variables(
data, customer_varnames=["frequency", "recency", "T", "future_t"]
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = dataset["future_t"]
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
first_term = (
gammaln(r + x)
- gammaln(r)
+ r * log(alpha)
+ s * log(beta)
- (r + x) * log(alpha + T)
- s * log(beta + T)
)
second_term = log(r + x) + log(beta + T) - log(alpha + T)
third_term = log(
(1 - ((beta + T) / (beta + T + future_t)) ** (s - 1)) / (s - 1)
)
exp_purchases = exp(first_term + second_term + third_term - loglike)
return exp_purchases.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_probability_alive(
self,
data: pd.DataFrame | None = None,
*,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Compute the probability that a customer with history *frequency*, *recency*, and *T*
is currently active. Can also estimate alive probability for *future_t* periods into the future.
Adapted from equation (18) in Bruce Hardie's notes [3]_.
Parameters
----------
data: pd.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: unique customer identifier
* `frequency`: number of repeat purchases
* `recency`: time between the first and the last purchase
* `T`: time between the first purchase and the end of the observation period.
Model assumptions require T >= recency
* `future_t`: Number of time periods in the future to estimate alive probability; defaults to 0.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
future_t: array_like, optional
Number of time periods in the future to estimate alive probability; defaults to 0.
Not needed if `data` parameter is provided with a `future_t` column.
References
----------
.. [3] Fader, Peter & G. S. Hardie, Bruce (2014).
"Additional Results for the Pareto/NBD Model."
https://www.brucehardie.com/notes/015/additional_pareto_nbd_results.pdf
"""
if data is None:
data = self.data
if future_t is not None:
data = data.assign(future_t=future_t)
if "future_t" not in data:
data = data.assign(future_t=0)
dataset = self._extract_predictive_variables(
data, customer_varnames=["frequency", "recency", "T", "future_t"]
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = dataset["future_t"]
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
term1 = gammaln(r + x) - gammaln(r)
term2 = r * log(alpha / (alpha + T))
term3 = -x * log(alpha + T)
term4 = s * log(beta / (beta + T + future_t))
prob_alive = exp(term1 + term2 + term3 + term4 - loglike)
return prob_alive.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_purchase_probability(
self,
data: pd.DataFrame | None = None,
*,
n_purchases: int | None = None,
future_t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Estimate probability of *n_purchases* over *future_t* time periods,
given an individual customer's current *frequency*, *recency*, and *T*.
Adapted from equation (16) in Bruce Hardie's notes [4]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L388
Parameters
----------
data: pd.DataFrame
Optional dataframe containing the following columns:
* `customer_id`: unique customer identifier
* `frequency`: number of repeat purchases
* `recency`: time between the first and the last purchase
* `T`: time between the first purchase and the end of the observation period.
Model assumptions require T >= recency
* `future_t`: Number of time periods to predict expected purchases.
* `n_purchases`: Number of purchases to predict probability for.
Currently restricted to the same number for all customers.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
n_purchases: int, optional
Number of purchases predicted.
Not needed if `data` parameter is provided with a `n_purchases` column.
future_t: array_like, optional
Time periods over which the probability should be estimated.
Not needed if `data` parameter is provided with a `future_t` column.
References
----------
.. [4] Fader, Peter & G. S. Hardie, Bruce (2014).
"Deriving the Conditional PMF of the Pareto/NBD Model."
https://www.brucehardie.com/notes/028/pareto_nbd_conditional_pmf.pdf
"""
if data is None:
data = self.data
if n_purchases is not None:
data = data.assign(n_purchases=n_purchases)
if future_t is not None:
data = data.assign(future_t=future_t)
dataset = self._extract_predictive_variables(
data,
customer_varnames=["frequency", "recency", "T", "future_t", "n_purchases"],
)
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
x = dataset["frequency"]
t_x = dataset["recency"]
T = dataset["T"]
future_t = cast(xarray.DataArray, dataset["future_t"])
n_purchases = cast(int, dataset["n_purchases"].values[0].item())
if not np.all(n_purchases == dataset["n_purchases"].values):
raise NotImplementedError(
"expected_purchase_probability with distinct numbers of `n_purchases` not implemented"
)
loglike = self._logp(r, alpha, s, beta, x, t_x, T)
_alpha_less_than_beta = alpha < beta
min_of_alpha_beta = xarray.where(_alpha_less_than_beta, alpha, beta)
max_of_alpha_beta = xarray.where(_alpha_less_than_beta, beta, alpha)
p = xarray.where(_alpha_less_than_beta, r + x + n_purchases, s + 1)
abs_alpha_beta = max_of_alpha_beta - min_of_alpha_beta
log_p_zero = (
gammaln(r + x)
+ r * log(alpha)
+ s * log(beta)
- (gammaln(r) + (r + x) * log(alpha + T) + s * log(beta + T) + loglike)
)
log_B_one = (
gammaln(r + x + n_purchases)
+ r * log(alpha)
+ s * log(beta)
- (
gammaln(r)
+ (r + x + n_purchases) * log(alpha + T + future_t)
+ s * log(beta + T + future_t)
)
)
log_B_two = (
r * log(alpha)
+ s * log(beta)
+ gammaln(r + s + x)
+ betaln(r + x + n_purchases, s + 1)
+ log(
hyp2f1(
r + s + x,
p,
r + s + x + n_purchases + 1,
abs_alpha_beta / (max_of_alpha_beta + T),
)
)
- (gammaln(r) + gammaln(s) + (r + s + x) * log(max_of_alpha_beta + T))
)
def _log_B_three(i):
return (
r * log(alpha)
+ s * log(beta)
+ gammaln(r + s + x + i)
+ betaln(r + x + n_purchases, s + 1)
+ log(
hyp2f1(
r + s + x + i,
p,
r + s + x + n_purchases + 1,
abs_alpha_beta / (max_of_alpha_beta + T + future_t),
)
)
- (
gammaln(r)
+ gammaln(s)
+ (r + s + x + i) * log(max_of_alpha_beta + T + future_t)
)
)
zeroth_term = (n_purchases == 0) * (1 - exp(log_p_zero))
# ignore numerical errors when future_t <= 0,
# this is an unusual edge case in practice, so refactoring is unwarranted
with np.errstate(divide="ignore", invalid="ignore"):
first_term = (
n_purchases * log(xarray.DataArray(future_t))
- gammaln(n_purchases + 1)
+ log_B_one
- loglike
)
second_term = log_B_two - loglike
third_term = xr_logsumexp(
xarray.concat(
[
i * log(cast(xarray.DataArray, future_t))
- gammaln(i + 1)
+ _log_B_three(i)
- loglike
for i in range(n_purchases + 1)
],
dim="concat_dim_",
),
dims="concat_dim_",
)
purchase_prob = zeroth_term + exp(
xr_logsumexp(
xarray.concat([first_term, second_term, third_term], dim="_concat_dim"),
b=xarray.DataArray(data=[1, 1, -1], dims="_concat_dim"),
dims="_concat_dim",
)
)
purchase_prob = xarray.where(
cast(xarray.DataArray, future_t) <= 0,
purchase_prob.fillna(0),
purchase_prob,
)
return purchase_prob.transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def expected_purchases_new_customer(
self,
data: pd.DataFrame | None = None,
*,
t: int | np.ndarray | pd.Series | None = None,
) -> xarray.DataArray:
"""
Expected number of purchases for a new customer across *t* time periods.
In a model with covariates, if `data` is not specified, the dataset used for fitting will be used.
A prediction will be computed for a new customer with each set of covariates.
This is not a conditional prediction on the observed customers!
Adapted from equation (27) in Bruce Hardie's notes [2]_, and `lifetimes` package:
https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/fitters/pareto_nbd_fitter.py#L359
Parameters
----------
data: pd.DataFrame, optional
Dataframe containing the following columns:
* `customer_id`: unique customer identifier
* `t`: Number of time periods to predict expected purchases.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
t: array_like, optional
Number of time periods over which to estimate purchases.
Not needed if `data` parameter is provided with a `t` column.
References
----------
.. [2] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions."
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
"""
if data is None:
data = self.data
if t is not None:
data = data.assign(t=t)
dataset = self._extract_predictive_variables(data, customer_varnames=["t"])
r = dataset["r"]
alpha = dataset["alpha"]
s = dataset["s"]
beta = dataset["beta"]
t = dataset["t"]
first_term = r * beta / alpha / (s - 1)
second_term = 1 - (beta / (beta + t)) ** (s - 1)
return (first_term * second_term).transpose(
"chain", "draw", "customer_id", missing_dims="ignore"
)
[docs]
def distribution_new_customer(
self,
data: pd.DataFrame | None = None,
*,
T: int | np.ndarray | pd.Series | None = None,
random_seed: RandomState | None = None,
var_names: Sequence[
Literal["dropout", "purchase_rate", "recency_frequency"]
] = (
"dropout",
"purchase_rate",
"recency_frequency",
),
) -> xarray.Dataset:
"""Utility function for posterior predictive sampling of dropout, purchase rate
and frequency/recency of new customers.
In a model with covariates, if `data` is not specified, the dataset used for fitting will be used.
A prediction will be computed for a new customer with each set of covariates.
This is not a conditional prediction on the observed customers!
Parameters
----------
data: pd.DataFrame, Optional
DataFrame containing the following columns:
* `customer_id`: unique customer identifier
* `T`: time between the first purchase and the end of the observation period.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
T: array_like, optional
time between the first purchase and the end of the observation period.
Not needed if `data` parameter is provided with a `T` column.
random_seed: RandomState, optional
Random state to use for sampling.
var_names: Sequence[str]
Names of the variables to sample from. Defaults to ["dropout", "purchase_rate", "recency_frequency"].
"""
if data is None:
data = self.data
if T is not None:
data = data.assign(T=T)
dataset = self._extract_predictive_variables(data, customer_varnames=["T"])
T = dataset["T"].values
# Delete "T" so we can pass dataset directly to `sample_posterior_predictive`
del dataset["T"]
if dataset.sizes["chain"] == 1 and dataset.sizes["draw"] == 1:
# For map fit add a dummy draw dimension
dataset = dataset.squeeze("draw").expand_dims(draw=range(1000))
coords = self.model.coords.copy() # type: ignore
coords["customer_id"] = data["customer_id"]
with pm.Model(coords=coords) as pred_model:
if self.purchase_covariate_cols:
alpha = pm.Flat("alpha", dims=["customer_id"])
else:
alpha = pm.Flat("alpha")
if self.dropout_covariate_cols:
beta = pm.Flat("beta", dims=["customer_id"])
else:
beta = pm.Flat("beta")
r = pm.Flat("r")
s = pm.Flat("s")
pm.Gamma(
"purchase_rate",
alpha=r,
beta=alpha,
dims=pred_model.named_vars_to_dims.get("alpha"),
)
pm.Gamma(
"dropout",
alpha=s,
beta=beta,
dims=pred_model.named_vars_to_dims.get("beta"),
)
ParetoNBD(
name="recency_frequency",
r=r,
alpha=alpha,
s=s,
beta=beta,
T=T,
dims=["customer_id", "obs_var"],
)
return pm.sample_posterior_predictive(
dataset,
var_names=var_names,
random_seed=random_seed,
predictions=True,
).predictions
[docs]
def distribution_new_customer_dropout(
self,
data: pd.DataFrame | None = None,
*,
random_seed: RandomState | None = None,
) -> xarray.Dataset:
"""Sample from the Gamma distribution representing dropout times for new customers.
This is the duration of time a new customer is active before churning, or dropping out.
Parameters
----------
data: pd.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: unique customer identifier
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
random_seed: RandomState, optional
Random state to use for sampling.
Returns
-------
xr.Dataset
Dataset containing the posterior samples for the population-level dropout rate.
"""
return self.distribution_new_customer(
data=data,
random_seed=random_seed,
var_names=["dropout"],
)["dropout"]
[docs]
def distribution_new_customer_purchase_rate(
self,
data: pd.DataFrame | None = None,
*,
random_seed: RandomState | None = None,
) -> xarray.Dataset:
"""Sample from the Gamma distribution representing purchase rates for new customers.
This is the purchase rate for a new customer and determines the time between
purchases for any new customer.
Parameters
----------
data: pd.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: unique customer identifier
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
random_seed : RandomState, optional
Random state to use for sampling.
Returns
-------
xr.Dataset
Dataset containing the posterior samples for the population-level purchase rate.
"""
return self.distribution_new_customer(
data=data,
random_seed=random_seed,
var_names=["purchase_rate"],
)["purchase_rate"]
[docs]
def distribution_new_customer_recency_frequency(
self,
data: pd.DataFrame | None = None,
*,
T: int | np.ndarray | pd.Series | None = None,
random_seed: RandomState | None = None,
) -> xarray.Dataset:
"""Pareto/NBD process representing purchases across the customer population.
This is the distribution of purchase frequencies given 'T' observation periods for each customer.
Parameters
----------
data: pd.DataFrame, optional
DataFrame containing the following columns:
* `customer_id`: unique customer identifier
* `T`: time between the first purchase and the end of the observation period.
* covariates: Purchase and dropout covariate columns if original model had any.
If not provided, the method will use the fit dataset.
T: array_like, optional
Number of observation periods for each customer. If not provided, T values from fit dataset will be used.
Not needed if `data` parameter is provided with a `T` column.
random_seed : RandomState, optional
Random state to use for sampling.
Returns
-------
xr.Dataset
Dataset containing the posterior samples for the customer population.
"""
return self.distribution_new_customer(
data=data,
T=T,
random_seed=random_seed,
var_names=["recency_frequency"],
)["recency_frequency"]