# Copyright 2024 The PyMC Labs Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pymc as pm
import pytensor.tensor as pt
from pymc.distributions.continuous import PositiveContinuous
from pymc.distributions.dist_math import check_parameters
from pytensor.tensor.random.op import RandomVariable
__all__ = [
"ContContract",
"ContNonContract",
"ParetoNBD",
]
[docs]
class ContNonContractRV(RandomVariable):
name = "continuous_non_contractual"
ndim_supp = 1
ndims_params = [0, 0, 0, 0]
dtype = "floatX"
_print_name = ("ContNonContract", "\\operatorname{ContNonContract}")
[docs]
def make_node(self, rng, size, dtype, lam, p, T):
T = pt.as_tensor_variable(T)
return super().make_node(rng, size, dtype, lam, p, T)
[docs]
@classmethod
def rng_fn(cls, rng, lam, p, T, size):
size = pm.distributions.shape_utils.to_tuple(size)
# TODO: broadcast sizes
lam = np.asarray(lam)
p = np.asarray(p)
T = np.asarray(T)
if size == ():
size = np.broadcast_shapes(lam.shape, p.shape, T.shape)
lam = np.broadcast_to(lam, size)
p = np.broadcast_to(p, size)
T = np.broadcast_to(T, size)
x_1 = rng.poisson(lam * T)
x_2 = rng.geometric(p)
x = np.minimum(x_1, x_2)
nzp = x == 0 # nzp = non-zero purchases
if x.shape == ():
if nzp:
return np.array([0, 0])
else:
return np.array([rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T, x])
x[nzp] = 1.0 # temporary to avoid errors in rng.beta below
t_x = rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T
x[nzp] = 0.0
t_x[nzp] = 0.0
return np.stack([t_x, x], axis=-1)
def _supp_shape_from_params(*args, **kwargs):
return (2,)
continuous_non_contractual = ContNonContractRV()
[docs]
class ContNonContract(PositiveContinuous):
r"""
Individual-level model for the customer lifetime value. See equation (3)
from Fader et al. (2005) [1]_.
.. math::
f(\lambda, p | x, t_1, \dots, t_x, T)
= f(\lambda, p | t_x, T) = (1 - p)^x \lambda^x \exp(-\lambda T)
+ \delta_{x > 0} p (1 - p)^{x-1} \lambda^x \exp(-\lambda t_x)
======== ===============================================
Support :math:`t_j > 0` for :math:`j = 1, \dots, x`
Mean :math:`\mathbb{E}[X(t) | \lambda, p] = \frac{1}{p} - \frac{1}{p}\exp\left(-\lambda p \min(t, T)\right)`
======== ===============================================
References
----------
.. [1] Fader, Peter S., Bruce GS Hardie, and Ka Lok Lee. "“Counting your customers”
the easy way: An alternative to the Pareto/NBD model." Marketing science
24.2 (2005): 275-284.
"""
rv_op = continuous_non_contractual
[docs]
@classmethod
def dist(cls, lam, p, T, **kwargs):
return super().dist([lam, p, T], **kwargs)
[docs]
def logp(value, lam, p, T):
t_x = value[..., 0]
x = value[..., 1]
zero_observations = pt.eq(x, 0)
A = x * pt.log(1 - p) + x * pt.log(lam) - lam * T
B = pt.log(p) + (x - 1) * pt.log(1 - p) + x * pt.log(lam) - lam * t_x
logp = pt.switch(
zero_observations,
A,
pt.logaddexp(A, B),
)
logp = pt.switch(
pt.any(
(
pt.and_(pt.ge(t_x, 0), zero_observations),
pt.lt(t_x, 0),
pt.lt(x, 0),
pt.gt(t_x, T),
),
),
-np.inf,
logp,
)
return check_parameters(
logp,
lam > 0,
0 <= p,
p <= 1,
msg="lam > 0, 0 <= p <= 1",
)
[docs]
class ContContractRV(RandomVariable):
name = "continuous_contractual"
ndim_supp = 1
ndims_params = [0, 0, 0, 0]
dtype = "floatX"
_print_name = ("ContinuousContractual", "\\operatorname{ContinuousContractual}")
[docs]
def make_node(self, rng, size, dtype, lam, p, T):
T = pt.as_tensor_variable(T)
return super().make_node(rng, size, dtype, lam, p, T)
def __call__(self, lam, p, T, size=None, **kwargs):
return super().__call__(lam, p, T, size=size, **kwargs)
[docs]
@classmethod
def rng_fn(cls, rng, lam, p, T, size):
size = pm.distributions.shape_utils.to_tuple(size)
# To do: broadcast sizes
lam = np.asarray(lam)
p = np.asarray(p)
T = np.asarray(T)
if size == ():
size = np.broadcast_shapes(lam.shape, p.shape, T.shape)
lam = np.broadcast_to(lam, size)
p = np.broadcast_to(p, size)
T = np.broadcast_to(T, size)
x_1 = rng.poisson(lam * T)
x_2 = rng.geometric(p)
x = np.minimum(x_1, x_2)
nzp = x == 0 # nzp = non-zero purchases
if x.shape == ():
if nzp:
return np.array([0, 0, float(x_1 > x_2)])
else:
return np.array(
[rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T, x, float(x_1 > x_2)]
)
x[nzp] = 1.0 # temporary to avoid errors in rng.beta below
t_x = rng.beta(x, np.maximum(x_1 + 1 - x_2, 1)) * T
x[nzp] = 0.0
t_x[nzp] = 0.0
return np.stack([t_x, x, (x_1 > x_2).astype(float)], axis=-1)
def _supp_shape_from_params(*args, **kwargs):
return (3,)
continuous_contractual = ContContractRV()
[docs]
class ContContract(PositiveContinuous):
r"""
Distribution class of a continuous contractual data-generating process,
that is where purchases can occur at any time point (continuous) and
churning/dropping out is explicit (contractual).
.. math::
f(\lambda, p | d, x, t_1, \dots, t_x, T)
= f(\lambda, p | t_x, T) = (1 - p)^{x-1} \lambda^x \exp(-\lambda t_x)
p^d \left\{(1-p)\exp(-\lambda*(T - t_x))\right\}^{1 - d}
======== ===============================================
Support :math:`t_j > 0` for :math:`j = 1, \dots, x`
Mean :math:`\mathbb{E}[X(t) | \lambda, p, d] = \frac{1}{p} - \frac{1}{p}\exp\left(-\lambda p \min(t, T)\right)`
======== ===============================================
"""
rv_op = continuous_contractual
[docs]
@classmethod
def dist(cls, lam, p, T, **kwargs):
return super().dist([lam, p, T], **kwargs)
[docs]
def logp(value, lam, p, T):
t_x = value[..., 0]
x = value[..., 1]
churn = value[..., 2]
zero_observations = pt.eq(x, 0)
logp = (x - 1) * pt.log(1 - p) + x * pt.log(lam) - lam * t_x
logp += churn * pt.log(p) + (1 - churn) * (pt.log(1 - p) - lam * (T - t_x))
logp = pt.switch(
zero_observations,
-lam * T,
logp,
)
logp = pt.switch(
pt.any(pt.or_(pt.lt(t_x, 0), zero_observations)),
-np.inf,
logp,
)
logp = pt.switch(
pt.all(
pt.or_(pt.eq(churn, 0), pt.eq(churn, 1)),
),
logp,
-np.inf,
)
logp = pt.switch(
pt.any(
(
pt.lt(t_x, 0),
pt.lt(x, 0),
pt.gt(t_x, T),
),
),
-np.inf,
logp,
)
return check_parameters(
logp,
lam > 0,
0 <= p,
p <= 1,
pt.all(0 < T),
msg="lam > 0, 0 <= p <= 1",
)
[docs]
class ParetoNBDRV(RandomVariable):
name = "pareto_nbd"
ndim_supp = 1
ndims_params = [0, 0, 0, 0, 0]
dtype = "floatX"
_print_name = ("ParetoNBD", "\\operatorname{ParetoNBD}")
[docs]
def make_node(self, rng, size, dtype, r, alpha, s, beta, T):
r = pt.as_tensor_variable(r)
alpha = pt.as_tensor_variable(alpha)
s = pt.as_tensor_variable(s)
beta = pt.as_tensor_variable(beta)
T = pt.as_tensor_variable(T)
return super().make_node(rng, size, dtype, r, alpha, s, beta, T)
def __call__(self, r, alpha, s, beta, T, size=None, **kwargs):
return super().__call__(r, alpha, s, beta, T, size=size, **kwargs)
[docs]
@classmethod
def rng_fn(cls, rng, r, alpha, s, beta, T, size):
size = pm.distributions.shape_utils.to_tuple(size)
r = np.asarray(r)
alpha = np.asarray(alpha)
s = np.asarray(s)
beta = np.asarray(beta)
T = np.asarray(T)
if size == ():
size = np.broadcast_shapes(
r.shape, alpha.shape, s.shape, beta.shape, T.shape
)
r = np.broadcast_to(r, size)
alpha = np.broadcast_to(alpha, size)
s = np.broadcast_to(s, size)
beta = np.broadcast_to(beta, size)
T = np.broadcast_to(T, size)
output = np.zeros(shape=size + (2,)) # noqa:RUF005
lam = rng.gamma(shape=r, scale=1 / alpha, size=size)
mu = rng.gamma(shape=s, scale=1 / beta, size=size)
def sim_data(lam, mu, T):
t = 0
n = 0
dropout_time = rng.exponential(scale=1 / mu)
wait = rng.exponential(scale=1 / lam)
final_t = min(dropout_time, T)
while (t + wait) < final_t:
t += wait
n += 1
wait = rng.exponential(scale=1 / lam)
return np.array(
[
t,
n,
],
)
for index in np.ndindex(*size):
output[index] = sim_data(lam[index], mu[index], T[index])
return output
def _supp_shape_from_params(*args, **kwargs):
return (2,)
pareto_nbd = ParetoNBDRV()
[docs]
class ParetoNBD(PositiveContinuous):
r"""
Population-level distribution class for a continuous, non-contractual, Pareto/NBD process,
based on Schmittlein, et al. in [2]_.
The likelihood function is derived from equations (22) and (23) of [3]_, with terms
rearranged for numerical stability.
The modified expression is provided below:
.. math::
\begin{align}
\text{if }\alpha > \beta: \\
\\
\mathbb{L}(r, \alpha, s, \beta | x, t_x, T) &=
\frac{\Gamma(r+x)\alpha^r\beta}{\Gamma(r)+(\alpha +t_x)^{r+s+x}}
[(\frac{s}{r+s+x})_2F_1(r+s+x,s+1;r+s+x+1;\frac{\alpha-\beta}{\alpha+t_x}) \\
&+ (\frac{r+x}{r+s+x})
\frac{_2F_1(r+s+x,s;r+s+x+1;\frac{\alpha-\beta}{\alpha+T})(\alpha +t_x)^{r+s+x}}
{(\alpha +T)^{r+s+x}}] \\
\\
\text{if }\beta >= \alpha: \\
\\
\mathbb{L}(r, \alpha, s, \beta | x, t_x, T) &=
\frac{\Gamma(r+x)\alpha^r\beta}{\Gamma(r)+(\beta +t_x)^{r+s+x}}
[(\frac{s}{r+s+x})_2F_1(r+s+x,r+x;r+s+x+1;\frac{\beta-\alpha}{\beta+t_x}) \\
&+ (\frac{r+x}{r+s+x})
\frac{_2F_1(r+s+x,r+x+1;r+s+x+1;\frac{\beta-\alpha}{\beta+T})(\beta +t_x)^{r+s+x}}
{(\beta +T)^{r+s+x}}]
\end{align}
======== ===============================================
Support :math:`t_j > 0` for :math:`j = 1, \dots, x`
Mean :math:`\mathbb{E}[X(t) | r, \alpha, s, \beta] = \frac{r\beta}{\alpha(s-1)}[1-(\frac{\beta}{\beta + t})^{s-1}]`
======== ===============================================
References
----------
.. [2] David C. Schmittlein, Donald G. Morrison and Richard Colombo.
"Counting Your Customers: Who Are They and What Will They Do Next."
Management Science,Vol. 33, No. 1 (Jan., 1987), pp. 1-24.
.. [3] Fader, Peter & G. S. Hardie, Bruce (2005).
"A Note on Deriving the Pareto/NBD Model and Related Expressions."
http://brucehardie.com/notes/009/pareto_nbd_derivations_2005-11-05.pdf
""" # noqa: E501
rv_op = pareto_nbd
[docs]
@classmethod
def dist(cls, r, alpha, s, beta, T, **kwargs):
return super().dist([r, alpha, s, beta, T], **kwargs)
[docs]
def logp(value, r, alpha, s, beta, T):
t_x = value[..., 0]
x = value[..., 1]
rsx = r + s + x
rx = r + x
cond = alpha >= beta
larger_param = pt.switch(cond, alpha, beta)
smaller_param = pt.switch(cond, beta, alpha)
param_diff = larger_param - smaller_param
hyp2f1_t1_2nd_param = pt.switch(cond, s + 1, rx)
hyp2f1_t2_2nd_param = pt.switch(cond, s, rx + 1)
# This term is factored out of the denominator of hyp2f_t1 for numerical stability
refactored = rsx * pt.log(larger_param + t_x)
hyp2f1_t1 = pt.log(
pt.hyp2f1(
rsx, hyp2f1_t1_2nd_param, rsx + 1, param_diff / (larger_param + t_x)
)
)
hyp2f1_t2 = (
pt.log(
pt.hyp2f1(
rsx, hyp2f1_t2_2nd_param, rsx + 1, param_diff / (larger_param + T)
)
)
- rsx * pt.log(larger_param + T)
+ refactored
)
A1 = (
pt.gammaln(rx)
- pt.gammaln(r)
+ r * pt.log(alpha)
+ s * pt.log(beta)
- refactored
)
A2 = pt.log(s) - pt.log(rsx) + hyp2f1_t1
A3 = pt.log(rx) - pt.log(rsx) + hyp2f1_t2
logp = A1 + pt.logaddexp(A2, A3)
logp = pt.switch(
pt.or_(
pt.or_(
pt.lt(t_x, 0),
pt.lt(x, 0),
),
pt.gt(t_x, T),
),
-np.inf,
logp,
)
return check_parameters(
logp,
r > 0,
alpha > 0,
s > 0,
beta > 0,
msg="r > 0, alpha > 0, s > 0, beta > 0",
)