Source code for pymc_marketing.mmm.budget_optimizer

#   Copyright 2024 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Budget optimization module."""

import numpy as np
from pandas import DataFrame
from scipy.optimize import minimize

from pymc_marketing.mmm.transformers import michaelis_menten
from pymc_marketing.mmm.utils import sigmoid_saturation


[docs] def calculate_expected_contribution( method: str, parameters: dict[str, tuple[float, float]], budget: dict[str, float], ) -> dict[str, float]: """ Calculate expected contributions using the specified model. This function calculates the expected contributions for each channel based on the chosen model. The selected model can be either the Michaelis-Menten model or the sigmoid model, each described by specific parameters. As the allocated budget varies, the expected contribution is computed according to the chosen model. Parameters ---------- method : str The model to use for contribution estimation. Choose from 'michaelis-menten' or 'sigmoid'. parameters : Dict Model-specific parameters for each channel. For 'michaelis-menten', each entry is a tuple (L, k) where: - L is the maximum potential contribution. - k is the budget at which the contribution is half of its maximum. For 'sigmoid', each entry is a tuple (alpha, lam) where: - alpha controls the slope of the curve. - lam is the budget at which the curve transitions. budget : Dict The total budget. Returns ------- Dict A dictionary with channels as keys and their respective contributions as values. The key 'total' contains the total expected contribution across all channels. Raises ------ ValueError If the specified `method` is not recognized. """ total_expected_contribution = 0.0 contributions = {} for channel, channe_budget in budget.items(): if method == "michaelis-menten": L, k = parameters[channel] contributions[channel] = michaelis_menten(channe_budget, L, k) elif method == "sigmoid": alpha, lam = parameters[channel] contributions[channel] = sigmoid_saturation(channe_budget, alpha, lam) else: raise ValueError("`method` must be either 'michaelis-menten' or 'sigmoid'.") total_expected_contribution += contributions[channel] contributions["total"] = total_expected_contribution return contributions
[docs] def objective_distribution( x: list[float], method: str, channels: list[str], parameters: dict[str, tuple[float, float]], ) -> float: """ Compute the total contribution for a given budget distribution. This function calculates the negative sum of contributions for a proposed budget distribution using the Michaelis-Menten model. This value will be minimized in the optimization process to maximize the total expected contribution. Parameters ---------- x : List of float The proposed budget distribution across channels. channels : List of str The List of channels for which the budget is being optimized. parameters : Dict Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`. Returns ------- float Negative of the total expected contribution for the given budget distribution. """ sum_contributions = 0.0 for channel, budget in zip(channels, x, strict=False): if method == "michaelis-menten": L, k = parameters[channel] sum_contributions += michaelis_menten(budget, L, k) elif method == "sigmoid": alpha, lam = parameters[channel] sum_contributions += sigmoid_saturation(budget, alpha, lam) else: raise ValueError("`method` must be either 'michaelis-menten' or 'sigmoid'.") return -1 * sum_contributions
[docs] def optimize_budget_distribution( method: str, total_budget: int, budget_ranges: dict[str, tuple[float, float]] | None, parameters: dict[str, tuple[float, float]], channels: list[str], ) -> dict[str, float]: """ Optimize the budget allocation across channels to maximize total contribution. Using the Michaelis-Menten or Sigmoid function, this function seeks the best budget distribution across channels that maximizes the total expected contribution. This function leverages the Sequential Least Squares Quadratic Programming (SLSQP) optimization algorithm to find the best budget distribution across channels that maximizes the total expected contribution based on the Michaelis-Menten or Sigmoid functions. The optimization is constrained such that: 1. The sum of budgets across all channels equals the total available budget. 2. The budget allocated to each individual channel lies within its specified range. The SLSQP method is particularly suited for this kind of problem as it can handle both equality and inequality constraints. Parameters ---------- total_budget : int The total budget to be distributed across channels. budget_ranges : Dict or None An optional dictionary defining the minimum and maximum budget for each channel. If not provided, the budget for each channel is constrained between 0 and its L value. parameters : Dict Michaelis-Menten parameters for each channel as described in `calculate_expected_contribution`. channels : list of str The list of channels for which the budget is being optimized. Returns ------- Dict A dictionary with channels as keys and the optimal budget for each channel as values. """ # Check if budget_ranges is the correct type if not isinstance(budget_ranges, dict | type(None)): raise TypeError("`budget_ranges` should be a dictionary or None.") if budget_ranges is None: budget_ranges = { channel: (0, min(total_budget, parameters[channel][0])) for channel in channels } initial_guess = [total_budget // len(channels)] * len(channels) bounds = [budget_ranges[channel] for channel in channels] constraints = {"type": "eq", "fun": lambda x: np.sum(x) - total_budget} result = minimize( lambda x: objective_distribution(x, method, channels, parameters), initial_guess, method="SLSQP", bounds=bounds, constraints=constraints, ) return { channel: budget for channel, budget in zip(channels, result.x, strict=False) }
[docs] def budget_allocator( method: str, total_budget: int, channels: list[str], parameters: dict[str, tuple[float, float]], budget_ranges: dict[str, tuple[float, float]] | None, ) -> DataFrame: optimal_budget = optimize_budget_distribution( method=method, total_budget=total_budget, budget_ranges=budget_ranges, parameters=parameters, channels=channels, ) expected_contribution = calculate_expected_contribution( method=method, parameters=parameters, budget=optimal_budget ) optimal_budget.update({"total": sum(optimal_budget.values())}) return DataFrame( { "estimated_contribution": expected_contribution, "optimal_budget": optimal_budget, } )