Source code for pymc_marketing.clv.utils

#   Copyright 2024 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import warnings
from datetime import date, datetime

import numpy as np
import pandas as pd
import xarray
from numpy import datetime64

__all__ = [
    "to_xarray",
    "customer_lifetime_value",
    "rfm_summary",
    "rfm_train_test_split",
]


[docs] def to_xarray(customer_id, *arrays, dim: str = "customer_id"): """Convert vector arrays to xarray with a common dim (default "customer_id").""" dims = (dim,) coords = {dim: np.asarray(customer_id)} res = tuple( xarray.DataArray(data=array, coords=coords, dims=dims) for array in arrays ) return res[0] if len(arrays) == 1 else res
[docs] def customer_lifetime_value( transaction_model, customer_id: pd.Series | np.ndarray, frequency: pd.Series | np.ndarray, recency: pd.Series | np.ndarray, T: pd.Series | np.ndarray, monetary_value: pd.Series | np.ndarray | xarray.DataArray, time: int = 12, discount_rate: float = 0.01, freq: str = "D", ) -> xarray.DataArray: """ Compute the average lifetime value for a group of one or more customers. This method computes the average lifetime value for a group of one or more customers. It also applies Discounted Cash Flow. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L449 Parameters ---------- transaction_model: CLVModel The model to predict future transactions customer_id: array_like Customer unique identifiers. Must not repeat. frequency: array_like The frequency vector of customers' purchases (denoted x in literature). recency: array_like The recency vector of customers' purchases (denoted t_x in literature). T: array_like The vector of customers' age (time since first purchase) monetary_value: array_like The monetary value vector of customer's purchases (denoted m in literature). time: int, optional The lifetime expected for the user in months. Default: 12 discount_rate: float, optional The monthly adjusted discount rate. Default: 0.01 freq: string, optional Unit of time of the purchase history. Defaults to "D" for daily. Other options are "W" (weekly), "M" (monthly), and "H" (hourly). Example: If your dataset contains information about weekly purchases, you should use "W". Returns ------- xarray DataArray with the estimated customer lifetime values """ def _squeeze_dims(x: xarray.DataArray): dims_to_squeeze: tuple[str, ...] = () if "chain" in x.dims and len(x.chain) == 1: dims_to_squeeze += ("chain",) if "draw" in x.dims and len(x.draw) == 1: dims_to_squeeze += ("draw",) if dims_to_squeeze: x = x.squeeze(dims_to_squeeze) return x if discount_rate == 0.0: # no discount rate: just compute a single time step from 0 to `time` steps = np.arange(time, time + 1) else: steps = np.arange(1, time + 1) factor = {"W": 4.345, "M": 1.0, "D": 30, "H": 30 * 24}[freq] # Monetary value can be passed as a DataArray, with entries per chain and draw or as a simple vector if not isinstance(monetary_value, xarray.DataArray): monetary_value = to_xarray(customer_id, monetary_value) monetary_value = _squeeze_dims(monetary_value) frequency, recency, T = to_xarray(customer_id, frequency, recency, T) clv = xarray.DataArray(0.0) # FIXME: This is a hotfix for ParetoNBDModel, as it has a different API from BetaGeoModel # We should harmonize them! from pymc_marketing.clv.models import ParetoNBDModel if isinstance(transaction_model, ParetoNBDModel): transaction_data = pd.DataFrame( { "customer_id": customer_id, "frequency": frequency, "recency": recency, "T": T, } ) def expected_purchases(*, t, **kwargs): return transaction_model.expected_purchases( future_t=t, data=transaction_data, ) else: expected_purchases = transaction_model.expected_num_purchases # TODO: Vectorize computation so that we perform a single call to expected_num_purchases prev_expected_num_purchases = _squeeze_dims( expected_purchases( customer_id=customer_id, frequency=frequency, recency=recency, T=T, t=0, ) ) for i in steps * factor: # since the prediction of number of transactions is cumulative, we have to subtract off the previous periods new_expected_num_purchases = _squeeze_dims( expected_purchases( customer_id=customer_id, frequency=frequency, recency=recency, T=T, t=i, ) ) expected_transactions = new_expected_num_purchases - prev_expected_num_purchases prev_expected_num_purchases = new_expected_num_purchases # sum up the CLV estimates of all the periods and apply discounted cash flow clv = clv + (monetary_value * expected_transactions) / (1 + discount_rate) ** ( i / factor ) # Add squeezed chain/draw dims if "draw" not in clv.dims: clv = clv.expand_dims({"draw": 1}) if "chain" not in clv.dims: clv = clv.expand_dims({"chain": 1}) return clv.transpose("chain", "draw", "customer_id")
def _find_first_transactions( transactions: pd.DataFrame, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, observation_period_end: str | pd.Period | datetime | None = None, time_unit: str = "D", sort_transactions: bool | None = True, ) -> pd.DataFrame: """ Return dataframe with first transactions. This takes a DataFrame of transaction data of the form: customer_id, datetime [, monetary_value] and appends a column named 'repeated' to the transaction log which indicates which rows are repeated transactions for that customer_id. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L148 Parameters ---------- transactions: :obj: DataFrame A Pandas DataFrame that contains the customer_id col and the datetime col. customer_id_col: string Column in the transactions DataFrame that denotes the customer_id. datetime_col: string Column in the transactions DataFrame that denotes the datetime the purchase was made. monetary_value_col: string, optional Column in the transactions DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. datetime_format: string, optional A string that represents the timestamp format. Useful if Pandas can't understand the provided format. observation_period_end: Union[str, pd.Period, datetime], optional A string or datetime to denote the final date of the study. Events after this date are truncated. If not given, defaults to the max 'datetime_col'. time_unit: string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units sort_transactions: bool, optional Default: True If raw data is already sorted in chronological order, set to `False` to improve computational efficiency. """ select_columns = [customer_id_col, datetime_col] if observation_period_end is None: observation_period_end = transactions[datetime_col].max() if isinstance(observation_period_end, pd.Period): observation_period_end = observation_period_end.to_timestamp() if isinstance(observation_period_end, str): observation_period_end = pd.to_datetime(observation_period_end) if monetary_value_col: select_columns.append(monetary_value_col) if sort_transactions: transactions = transactions[select_columns].sort_values(select_columns).copy() # convert date column into a DateTimeIndex for time-wise grouping and truncating transactions[datetime_col] = pd.to_datetime( transactions[datetime_col], format=datetime_format ) transactions = ( transactions.set_index(datetime_col).to_period(time_unit).to_timestamp() ) mask = pd.to_datetime(transactions.index) <= pd.to_datetime(observation_period_end) transactions = transactions.loc[mask].reset_index() period_groupby = transactions.groupby( [datetime_col, customer_id_col], sort=False, as_index=False ) if monetary_value_col: # when processing a monetary column, make sure to sum together transactions made in the same period period_transactions = period_groupby.sum() else: # by calling head() on the groupby object, the datetime and customer_id columns # will be reduced to the first transaction of that time period period_transactions = period_groupby.head(1) # create a new column for flagging first transactions period_transactions = period_transactions.copy() period_transactions.loc[:, "first"] = False # find all first transactions and store as an index first_transactions = ( period_transactions.groupby(customer_id_col, sort=True, as_index=False) .head(1) .index ) # flag first transactions as True period_transactions.loc[first_transactions, "first"] = True select_columns.append("first") # reset datetime_col to period period_transactions[datetime_col] = period_transactions[datetime_col].dt.to_period( time_unit ) return period_transactions[select_columns]
[docs] def clv_summary(*args, **kwargs): warnings.warn("clv_summary was renamed to rfm_summary", UserWarning, stacklevel=1) return rfm_summary(*args, **kwargs)
[docs] def rfm_summary( transactions: pd.DataFrame, customer_id_col: str, datetime_col: str, monetary_value_col: str | None = None, datetime_format: str | None = None, observation_period_end: str | pd.Period | datetime | None = None, time_unit: str = "D", time_scaler: float | None = 1, include_first_transaction: bool | None = False, sort_transactions: bool | None = True, ) -> pd.DataFrame: """ Summarize transaction data for use in CLV modeling and/or RFM segmentation. This transforms a DataFrame of transaction data of the form: customer_id, datetime [, monetary_value] to a DataFrame of the form: customer_id, frequency, recency, T [, monetary_value] Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L230 Parameters ---------- transactions: :obj: DataFrame A Pandas DataFrame that contains the customer_id col and the datetime col. customer_id_col: string Column in the transactions DataFrame that denotes the customer_id. datetime_col: string Column in the transactions DataFrame that denotes the datetime the purchase was made. monetary_value_col: string, optional Column in the transactions DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. observation_period_end: Union[str, pd.Period, datetime], optional A string or datetime to denote the final date of the study. Events after this date are truncated. If not given, defaults to the max 'datetime_col'. datetime_format: string, optional A string that represents the timestamp format. Useful if Pandas can't understand the provided format. time_unit: string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler: int, optional Default: 1. Useful for scaling recency & T to a different time granularity. Example: With freq='D' and freq_multiplier=1, we get recency=591 and T=632 With freq='h' and freq_multiplier=24, we get recency=590.125 and T=631.375 This is useful if predictions in a different time granularity are desired, and can also help with model convergence for study periods of many years. include_first_transaction: bool, optional Default: False For predictive CLV modeling, this should be False. Set to True if performing RFM segmentation. sort_transactions: bool, optional Default: True If raw data is already sorted in chronological order, set to `False` to improve computational efficiency. Returns ------- :obj: DataFrame: customer_id, frequency, recency, T [, monetary_value] """ if observation_period_end is None: observation_period_end_ts = ( pd.to_datetime(transactions[datetime_col].max(), format=datetime_format) .to_period(time_unit) .to_timestamp() ) elif isinstance(observation_period_end, pd.Period): observation_period_end_ts = observation_period_end.to_timestamp() else: observation_period_end_ts = ( pd.to_datetime(observation_period_end, format=datetime_format) .to_period(time_unit) .to_timestamp() ) # label repeated transactions repeated_transactions = _find_first_transactions( # type: ignore transactions, customer_id_col, datetime_col, monetary_value_col, datetime_format, observation_period_end_ts, time_unit, sort_transactions, ) # reset datetime_col to timestamp repeated_transactions[datetime_col] = repeated_transactions[ datetime_col ].dt.to_timestamp() # count all orders by customer customers = repeated_transactions.groupby(customer_id_col, sort=False)[ datetime_col ].agg(["min", "max", "count"]) if not include_first_transaction: # subtract 1 from count, as we ignore their first order. customers["frequency"] = customers["count"] - 1 else: customers["frequency"] = customers["count"] customers["T"] = ( (observation_period_end_ts - customers["min"]) / np.timedelta64(1, time_unit) / time_scaler ) customers["recency"] = ( (pd.to_datetime(customers["max"]) - pd.to_datetime(customers["min"])) # type: ignore / np.timedelta64(1, time_unit) / time_scaler ) summary_columns = ["frequency", "recency", "T"] if monetary_value_col: if not include_first_transaction: # create an index of all the first purchases first_purchases = repeated_transactions[ repeated_transactions["first"] ].index # by setting the monetary_value cells of all the first purchases to NaN, # those values will be excluded from the mean value calculation repeated_transactions.loc[first_purchases, monetary_value_col] = np.nan customers["monetary_value"] = ( repeated_transactions.groupby(customer_id_col)[monetary_value_col] .mean() .fillna(0) ) summary_columns.append("monetary_value") summary_df = customers[summary_columns].astype(float) summary_df = summary_df.reset_index().rename( columns={customer_id_col: "customer_id"} ) return summary_df
[docs] def rfm_train_test_split( transactions: pd.DataFrame, customer_id_col: str, datetime_col: str, train_period_end: float | str | datetime | datetime64 | date, test_period_end: float | str | datetime | datetime64 | date | None = None, time_unit: str = "D", time_scaler: float | None = 1, datetime_format: str | None = None, monetary_value_col: str | None = None, include_first_transaction: bool | None = False, sort_transactions: bool | None = True, ) -> pd.DataFrame: """ Summarize transaction data and split into training and tests datasets for CLV modeling. This can also be used to evaluate the impact of a time-based intervention like a marketing campaign. This transforms a DataFrame of transaction data of the form: customer_id, datetime [, monetary_value] to a DataFrame of the form: customer_id, frequency, recency, T [, monetary_value], test_frequency [, test_monetary_value], test_T Note this function will exclude new customers whose first transactions occurred during the test period. Adapted from lifetimes package https://github.com/CamDavidsonPilon/lifetimes/blob/41e394923ad72b17b5da93e88cfabab43f51abe2/lifetimes/utils.py#L27 Parameters ---------- transactions: :obj: DataFrame A Pandas DataFrame that contains the customer_id col and the datetime col. customer_id_col: string Column in the transactions DataFrame that denotes the customer_id. datetime_col: string Column in the transactions DataFrame that denotes the datetime the purchase was made. train_period_end: Union[str, pd.Period, datetime], optional A string or datetime to denote the final time period for the training data. Events after this time period are used for the test data. test_period_end: Union[str, pd.Period, datetime], optional A string or datetime to denote the final time period of the study. Events after this date are truncated. If not given, defaults to the max of 'datetime_col'. time_unit: string, optional Time granularity for study. Default: 'D' for days. Possible values listed here: https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units time_scaler: int, optional Default: 1. Useful for scaling recency & T to a different time granularity. Example: With freq='D' and freq_multiplier=1, we get recency=591 and T=632 With freq='h' and freq_multiplier=24, we get recency=590.125 and T=631.375 This is useful if predictions in months or years are desired, and can also help with model convergence for study periods of many years. datetime_format: string, optional A string that represents the timestamp format. Useful if Pandas can't understand the provided format. monetary_value_col: string, optional Column in the transactions DataFrame that denotes the monetary value of the transaction. Optional; only needed for spend estimation models like the Gamma-Gamma model. include_first_transaction: bool, optional Default: False For predictive CLV modeling, this should be False. Set to True if performing RFM segmentation. sort_transactions: bool, optional Default: True If raw data is already sorted in chronological order, set to `False` to improve computational efficiency. Returns ------- :obj: DataFrame: customer_id, frequency, recency, T, test_frequency, test_T [, monetary_value, test_monetary_value] """ if test_period_end is None: test_period_end = transactions[datetime_col].max() transaction_cols = [customer_id_col, datetime_col] if monetary_value_col: transaction_cols.append(monetary_value_col) transactions = transactions[transaction_cols].copy() transactions[datetime_col] = pd.to_datetime( transactions[datetime_col], format=datetime_format ) test_period_end = pd.to_datetime(test_period_end, format=datetime_format) train_period_end = pd.to_datetime(train_period_end, format=datetime_format) # create training dataset training_transactions = transactions.loc[ transactions[datetime_col] <= train_period_end ] if training_transactions.empty: error_msg = """No data available. Check `test_transactions` and `train_period_end` and confirm values in `transactions` occur prior to those time periods.""" raise ValueError(error_msg) training_rfm_data = rfm_summary( training_transactions, customer_id_col, datetime_col, monetary_value_col=monetary_value_col, datetime_format=datetime_format, observation_period_end=train_period_end, time_unit=time_unit, time_scaler=time_scaler, include_first_transaction=include_first_transaction, sort_transactions=sort_transactions, ) # create test dataset test_transactions = transactions.loc[ (test_period_end >= transactions[datetime_col]) & (transactions[datetime_col] > train_period_end) ].copy() if test_transactions.empty: error_msg = """No data available. Check `test_transactions` and `train_period_end` and confirm values in `transactions` occur prior to those time periods.""" raise ValueError(error_msg) test_transactions[datetime_col] = test_transactions[datetime_col].dt.to_period( time_unit ) # create dataframe with customer_id and test_frequency columns test_rfm_data = ( test_transactions.groupby([customer_id_col, datetime_col], sort=False)[ datetime_col ] .agg(lambda r: 1) .groupby(level=customer_id_col) .count() ).reset_index() test_rfm_data = test_rfm_data.rename( columns={"id": "customer_id", "date": "test_frequency"} ) if monetary_value_col: test_monetary_value = ( test_transactions.groupby([customer_id_col, datetime_col])[ monetary_value_col ] .sum() .groupby(customer_id_col) .mean() ) test_rfm_data = test_rfm_data.merge( test_monetary_value, left_on="customer_id", right_on=customer_id_col, how="inner", ) test_rfm_data = test_rfm_data.rename( columns={monetary_value_col: "test_monetary_value"} ) train_test_rfm_data = training_rfm_data.merge( test_rfm_data, on="customer_id", how="left" ) train_test_rfm_data.fillna(0, inplace=True) time_delta = ( test_period_end.to_period(time_unit) - train_period_end.to_period(time_unit) ).n train_test_rfm_data["test_T"] = time_delta / time_scaler # type: ignore return train_test_rfm_data