Fivetran Data Loaders#
Este cuaderno demuestra cómo utilizar los cargadores de datos Fivetran de PyMC-Marketing para preparar rápidamente los datos para el modelado de MMM.
Si está utilizando Fivetran para sincronizar datos de diversas fuentes (como Shopify, Google Ads, Facebook Ads, etc.) y ha implementado los paquetes dbt correspondientes, puede aprovechar nuestros cargadores de datos para optimizar su flujo de trabajo de MMM.
Pre-requisitos#
Paquetes de dbt de Fivetran: Asegúrese de tener los esquemas relevantes configurados:
dbt_shopify para datos de comercio electrónico
dbt_ad_reporting para datos publicitarios en múltiples plataformas
Consultas de base de datos: Crea consultas desde tu base de datos que sigan la estructura del esquema dbt de Fivetran. Los paquetes dbt estandarizan el formato de salida, facilitando el trabajo con datos de múltiples fuentes.
Flujo de trabajo#
Una vez que sus datos sigan el esquema dbt de Fivetran, puede:
Utilice nuestros cargadores de datos para procesar y formatear automáticamente su información.
Generar conjuntos de datos de X (canales de medios) y y (variable objetivo)
Entrene rápidamente modelos MMM con un mínimo de preprocesamiento de datos.
Veamos esto en acción:
import pandas as pd
from pymc_marketing.data.fivetran import (
process_fivetran_ad_reporting,
process_fivetran_shopify_unique_orders,
)
from pymc_marketing.mmm.builders.yaml import build_mmm_from_yaml
from pymc_marketing.paths import data_dir
Cargando datos del esquema dbt de Fivetran#
En un escenario del mundo real, normalmente consultarías tu base de datos directamente utilizando la estructura del esquema dbt de Fivetran. Por ejemplo:
import pandas as pd
from sqlalchemy import create_engine
Create your database connection
engine = create_engine('your_database_connection_string')
Query the standardized ad reporting table
query = "SELECT * FROM <schema>.ad_reporting__ad_report"
ad_data = pd.read_sql(query, engine)
Como se mencionó, la tabla ad_reporting__ad_report es creada por el dbt_ad_reporting.
Para esta demostración, estamos importando un archivo CSV que sigue la misma estructura de esquema exacta.
x_data = pd.read_csv(data_dir / "fivetran_examples/ad_report_schema.csv")
x_data.head()
x_data.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16 entries, 0 to 15
Data columns (total 16 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 source_relation 16 non-null object
1 date_day 16 non-null object
2 platform 16 non-null object
3 account_id 16 non-null object
4 account_name 16 non-null object
5 campaign_id 16 non-null object
6 campaign_name 16 non-null object
7 ad_group_id 16 non-null object
8 ad_group_name 16 non-null object
9 ad_id 16 non-null object
10 ad_name 16 non-null object
11 clicks 16 non-null int64
12 impressions 16 non-null int64
13 spend 16 non-null float64
14 conversions 16 non-null int64
15 conversions_value 16 non-null float64
dtypes: float64(2), int64(3), object(11)
memory usage: 2.1+ KB
Procesamiento de Datos#
A partir de estos datos en bruto, no necesitamos todas las columnas y detalles granulares. Actualmente, los datos están en formato largo (una fila por combinación de anuncio/campaña/día), pero para el modelado de MMM los necesitamos en formato ancho con métricas agregadas por plataforma y fecha.
Podemos transformar esto rápidamente utilizando la función process_fivetran_ad_reporting, que:
Agregue los datos por plataforma y fecha
Convertir de formato largo a formato ancho
Mantenga solo las columnas esenciales necesarias para el modelado MMM.
[!NOTE] Recuerde que las tablas como esta suelen contener información de todos los países y tipos de actividades mediáticas; por lo tanto, la idea debería ser filtrar por el tipo de actividad mediática (por ejemplo: concienciación, retargeting, etc.) y el país que desea analizar antes de aplicar las funciones auxiliares de datos, de lo contrario, agregará todos los tipos de medios y mercados.
x = process_fivetran_ad_reporting(
df=x_data,
)
x.head()
| date | facebook_ads_impressions | google_ads_impressions | tiktok_ads_impressions | |
|---|---|---|---|---|
| 0 | 2025-07-10 | 30000.0 | 8800.0 | 21000.0 |
| 1 | 2025-07-11 | 31000.0 | 9200.0 | 22000.0 |
| 2 | 2025-07-12 | 32000.0 | 9500.0 | 0.0 |
por defecto estamos utilizando impresiones, pero podríamos decidir modelar con gasto y es fácil cambiarlo utilizando un solo parámetro.
x_spend = process_fivetran_ad_reporting(df=x_data, value_columns="spend")
x_spend.head()
| date | facebook_ads_spend | google_ads_spend | tiktok_ads_spend | |
|---|---|---|---|---|
| 0 | 2025-07-10 | 381.5 | 185.75 | 229.5 |
| 1 | 2025-07-11 | 396.0 | 197.50 | 242.6 |
| 2 | 2025-07-12 | 410.5 | 203.80 | 0.0 |
Puede modificar algunas cosas, cambiando las columnas a pivotar, la operación a ejecutar en el momento de la agregación, entre otras. Puede consultar la función completa aquí:
process_fivetran_ad_reporting?
Signature:
process_fivetran_ad_reporting(
df: pandas.core.frame.DataFrame,
value_columns: str | collections.abc.Sequence[str] = 'impressions',
*,
date_col: str = 'date_day',
platform_col: str = 'platform',
agg: str = 'sum',
fill_value: float | None = 0.0,
include_missing_dates: bool = False,
freq: str = 'D',
rename_date_to: str | None = 'date',
) -> pandas.core.frame.DataFrame
Docstring:
Process Fivetran's Ad Reporting schema's.
Compatible with Fivetran's Ad Reporting schema:
- ad_reporting__account_report: Each record represents daily metrics by account
- ad_reporting__campaign_report: Each record represents daily metrics by campaign and account
- ad_reporting__ad_group_report: Each record represents daily metrics by ad group, campaign and account
- ad_reporting__ad_report: Each record represents daily metrics by ad, ad group, campaign and account
The input data is expected to contain at least the following columns: a date column
(default: ``date_day``), a platform column (default: ``platform``), and one or more
metric columns such as ``spend`` or ``impressions``.
Parameters
----------
df
Input DataFrame in long format.
value_columns
A single column name or a sequence of column names to aggregate and pivot. For
example: "spend" or ["spend", "impressions"].
date_col
Name of the date column. Defaults to "date_day".
platform_col
Name of the platform column. Defaults to "platform".
agg
Aggregation method to apply during groupby (e.g., "sum", "mean"). Defaults to "sum".
fill_value
Value to use to fill missing values in the wide output. If None, missing values
are left as NaN.
include_missing_dates
If True, the output will include a continuous date index from the min to the max
date found in the input, with missing dates filled (using ``fill_value``).
freq
Frequency used when ``include_missing_dates`` is True. Defaults to daily ("D").
rename_date_to
If provided, the date column in the result will be renamed to this value (e.g.,
"date"). If None, the original ``date_col`` name is kept.
Returns
-------
pd.DataFrame
A wide-format DataFrame with one row per date and columns for each
``{platform}_{metric}`` combination.
File: ~/Documents/GitHub/pymc-marketing/pymc_marketing/data/fivetran.py
Type: function
Todas las tablas ad_reporting pueden proporcionar información sobre sus controladores (canales de medios) que afectan su métrica objetivo. Pero, ¿cómo puede obtener información de su objetivo?
Currently, Fivetran allows you to get information from Shopify, using dbt_shopify. You can use the data loaders to access the shopify__orders schema and then transform the dataset to get the output ready for your media mix model.
y_data = pd.read_csv(data_dir / "fivetran_examples/shopify_orders_schema.csv")
y_data.head()
| order_id | user_id | total_discounts | total_discounts_set | total_line_items_price | total_line_items_price_set | total_price | total_price_set | total_tax_set | total_tax | ... | count_discount_codes_applied | order_total_shipping_tax | order_tags | order_url_tags | number_of_fulfillments | fulfillment_services | tracking_companies | tracking_numbers | customer_order_seq_number | new_vs_repeat | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 7013 | 102 | 19.6 | {"shop_money":{"amount":"19.60","currency_code... | 96.0 | {"shop_money":{"amount":"96.00","currency_code... | 97.03 | {"shop_money":{"amount":"97.03","currency_code... | {"shop_money":{"amount":"8.64","currency_code"... | 8.64 | ... | 0 | 1.0 | Welcome, Refund | utm_campaign:summer, utm_source:email | 2 | deliverr | USPS, UPS | TRK70131 | 1 | new |
| 1 | 7014 | 100 | 1.2 | {"shop_money":{"amount":"1.20","currency_code"... | 24.0 | {"shop_money":{"amount":"24.00","currency_code... | 28.13 | {"shop_money":{"amount":"28.13","currency_code... | {"shop_money":{"amount":"1.34","currency_code"... | 1.34 | ... | 2 | 0.2 | Welcome, Refund | utm_campaign:summer, utm_source:email | 2 | deliverr | USPS, UPS | TRK70141, TRK70142 | 2 | repeat |
| 2 | 7015 | 101 | 7.4 | {"shop_money":{"amount":"7.40","currency_code"... | 48.0 | {"shop_money":{"amount":"48.00","currency_code... | 42.63 | {"shop_money":{"amount":"42.63","currency_code... | {"shop_money":{"amount":"2.03","currency_code"... | 2.03 | ... | 0 | 0.0 | Promo, Welcome | utm_campaign:summer, utm_source:email | 2 | manual | UPS, USPS | TRK70151 | 3 | repeat |
| 3 | 7016 | 100 | 5.0 | {"shop_money":{"amount":"5.00","currency_code"... | 72.0 | {"shop_money":{"amount":"72.00","currency_code... | 74.50 | {"shop_money":{"amount":"74.50","currency_code... | {"shop_money":{"amount":"0.00","currency_code"... | 0.00 | ... | 0 | 0.0 | Promo, Refund | utm_campaign:summer, utm_source:email | 1 | shippo | USPS, UPS | TRK70161, TRK70162 | 4 | repeat |
| 4 | 7001 | 102 | 13.6 | {"shop_money":{"amount":"13.60","currency_code... | 72.0 | {"shop_money":{"amount":"72.00","currency_code... | 60.40 | {"shop_money":{"amount":"60.40","currency_code... | {"shop_money":{"amount":"0.00","currency_code"... | 0.00 | ... | 0 | 0.0 | VIP, Gift | utm_campaign:summer, utm_source:email | 1 | deliverr | USPS, FedEx | TRK70011, TRK70012 | 1 | new |
5 rows × 101 columns
y_data.info(verbose=True, show_counts=True)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24 entries, 0 to 23
Data columns (total 101 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 order_id 24 non-null int64
1 user_id 24 non-null int64
2 total_discounts 24 non-null float64
3 total_discounts_set 24 non-null object
4 total_line_items_price 24 non-null float64
5 total_line_items_price_set 24 non-null object
6 total_price 24 non-null float64
7 total_price_set 24 non-null object
8 total_tax_set 24 non-null object
9 total_tax 24 non-null float64
10 source_name 24 non-null object
11 subtotal_price 24 non-null float64
12 has_taxes_included 24 non-null bool
13 total_weight 24 non-null int64
14 total_tip_received 24 non-null float64
15 landing_site_base_url 24 non-null object
16 location_id 24 non-null int64
17 name 24 non-null object
18 note 11 non-null object
19 number 24 non-null int64
20 order_number 24 non-null int64
21 cancel_reason 12 non-null float64
22 cart_token 24 non-null object
23 checkout_token 24 non-null object
24 created_timestamp 24 non-null object
25 cancelled_timestamp 12 non-null object
26 closed_timestamp 24 non-null object
27 processed_timestamp 24 non-null object
28 updated_timestamp 24 non-null object
29 currency 24 non-null object
30 customer_id 24 non-null int64
31 email 24 non-null object
32 financial_status 24 non-null object
33 fulfillment_status 24 non-null object
34 referring_site 24 non-null object
35 billing_address_address_1 24 non-null object
36 billing_address_address_2 24 non-null object
37 billing_address_city 24 non-null object
38 billing_address_company 0 non-null float64
39 billing_address_country 24 non-null object
40 billing_address_country_code 24 non-null object
41 billing_address_first_name 24 non-null object
42 billing_address_last_name 24 non-null object
43 billing_address_latitude 24 non-null float64
44 billing_address_longitude 24 non-null float64
45 billing_address_name 24 non-null object
46 billing_address_phone 24 non-null object
47 billing_address_province 24 non-null object
48 billing_address_province_code 24 non-null int64
49 billing_address_zip 24 non-null int64
50 browser_ip 24 non-null object
51 total_shipping_price_set 24 non-null object
52 shipping_address_address_1 24 non-null object
53 shipping_address_address_2 0 non-null float64
54 shipping_address_city 24 non-null object
55 shipping_address_company 0 non-null float64
56 shipping_address_country 24 non-null object
57 shipping_address_country_code 24 non-null object
58 shipping_address_first_name 24 non-null object
59 shipping_address_last_name 24 non-null object
60 shipping_address_latitude 24 non-null float64
61 shipping_address_longitude 24 non-null float64
62 shipping_address_name 24 non-null object
63 shipping_address_phone 24 non-null object
64 shipping_address_province 24 non-null object
65 shipping_address_province_code 24 non-null int64
66 shipping_address_zip 24 non-null int64
67 token 24 non-null object
68 app_id 24 non-null int64
69 checkout_id 24 non-null int64
70 client_details_user_agent 24 non-null object
71 customer_locale 24 non-null object
72 order_status_url 24 non-null object
73 presentment_currency 24 non-null object
74 is_test_order 24 non-null bool
75 is_deleted 24 non-null bool
76 has_buyer_accepted_marketing 24 non-null bool
77 is_confirmed 24 non-null bool
78 _fivetran_synced 24 non-null object
79 source_relation 24 non-null object
80 orders_unique_key 24 non-null object
81 shipping_cost 24 non-null float64
82 order_adjustment_amount 24 non-null float64
83 order_adjustment_tax_amount 24 non-null float64
84 refund_subtotal 24 non-null float64
85 refund_total_tax 24 non-null float64
86 order_adjusted_total 24 non-null float64
87 line_item_count 24 non-null int64
88 shipping_discount_amount 24 non-null float64
89 percentage_calc_discount_amount 24 non-null float64
90 fixed_amount_discount_amount 24 non-null float64
91 count_discount_codes_applied 24 non-null int64
92 order_total_shipping_tax 24 non-null float64
93 order_tags 24 non-null object
94 order_url_tags 24 non-null object
95 number_of_fulfillments 24 non-null int64
96 fulfillment_services 24 non-null object
97 tracking_companies 24 non-null object
98 tracking_numbers 24 non-null object
99 customer_order_seq_number 24 non-null int64
100 new_vs_repeat 24 non-null object
dtypes: bool(5), float64(24), int64(17), object(55)
memory usage: 18.2+ KB
Al igual que en los ejemplos anteriores, estamos utilizando un archivo CSV con fines de demostración. Sin embargo, en un escenario del mundo real, usted consultaría el esquema y la tabla de datos adecuados directamente desde su base de datos conectada a Fivetran para obtener una salida como esta.
# Query the standardized ad reporting table
query = "SELECT * FROM <schema>.shopify__orders"
shopify_data = pd.read_sql(query, engine)
Una vez que obtenga los datos de él, puede aplicar los cargadores de datos para simplificar su pipeline del modelo de mezcla de medios.
[!NOTE] Sin embargo, recuerda que tablas como esta suelen contener información de todos los países o tipos de transacciones; por lo tanto, la idea debería ser filtrar por el tipo de transacciones y el país que deseas analizar antes de aplicar las funciones auxiliares de datos, de lo contrario, agregarás información de todos los tipos de transacciones y mercados.
y = process_fivetran_shopify_unique_orders(
df=y_data,
)
y.head()
| date | orders | |
|---|---|---|
| 0 | 2025-07-10 | 1 |
| 1 | 2025-07-11 | 2 |
| 2 | 2025-07-12 | 4 |
| 3 | 2025-07-13 | 5 |
| 4 | 2025-07-14 | 5 |
Puede consultar la función completa para validar todas las cosas que podría ajustar.
process_fivetran_shopify_unique_orders?
Signature:
process_fivetran_shopify_unique_orders(
df: pandas.core.frame.DataFrame,
*,
date_col: str = 'processed_timestamp',
order_key_col: str = 'orders_unique_key',
rename_date_to: str = 'date',
) -> pandas.core.frame.DataFrame
Docstring:
Compute daily unique order counts from a (pre-filtered) Shopify orders dataset.
This function is designed for data following the Fivetran Shopify orders schema
(e.g., ``shopify__orders``). It assumes the input ``df`` is already filtered to
the desired subset (e.g., non-canceled, US-delivery, new-only orders).
Parameters
----------
df
Input DataFrame following the Shopify orders schema.
date_col
Column to derive the daily bucket from. Defaults to "processed_timestamp".
order_key_col
Unique order identifier column. Defaults to "orders_unique_key".
rename_date_to
Name of the date column in the result. Defaults to "date".
Returns
-------
pd.DataFrame
A DataFrame with two columns: ``rename_date_to`` and ``orders``, where
``orders`` is the unique order count per day.
File: ~/Documents/GitHub/pymc-marketing/pymc_marketing/data/fivetran.py
Type: function
data = x.merge(y, on="date", how="left").fillna(0)
data.head()
| date | facebook_ads_impressions | google_ads_impressions | tiktok_ads_impressions | orders | |
|---|---|---|---|---|---|
| 0 | 2025-07-10 | 30000.0 | 8800.0 | 21000.0 | 1 |
| 1 | 2025-07-11 | 31000.0 | 9200.0 | 22000.0 | 2 |
| 2 | 2025-07-12 | 32000.0 | 9500.0 | 0.0 | 4 |
Forma alternativa de cargar datos utilizando las funciones de preprocesamiento de datos
# data = process_fivetran_ad_reporting(
# df=x_data,
# ).merge(
# process_fivetran_shopify_unique_orders(
# df=y_data,
# ),
# on="date",
# how="left",
# )
# data.head()
Una vez que todos sus datos estén preparados, puede entrenar su modelo 🙌🏻
mmm = build_mmm_from_yaml(
X=data[
[
"date",
"facebook_ads_impressions",
"google_ads_impressions",
"tiktok_ads_impressions",
]
],
y=data["orders"],
config_path=data_dir / "config_files" / "multi_dimensional_fivetran.yml",
)
/Users/carlostrujillo/Documents/GitHub/pymc-marketing/pymc_marketing/mmm/builders/yaml.py:104: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
X[date_column] = pd.to_datetime(X[date_column])
mmm.model.to_graphviz()
mmm.sample_prior_predictive(
X=data[
[
"date",
"facebook_ads_impressions",
"google_ads_impressions",
"tiktok_ads_impressions",
]
],
y=data["orders"],
)
Sampling: [adstock_alpha, intercept_contribution, saturation_alpha, saturation_lam, y, y_sigma]
<xarray.Dataset> Size: 24kB
Dimensions: (date: 3, sample: 500)
Coordinates:
* date (date) datetime64[ns] 24B 2025-07-10 2025-07-11 2025-07-12
* sample (sample) object 4kB MultiIndex
* chain (sample) int64 4kB 0 0 0 0 0 0 0 0 0 0 0 ... 0 0 0 0 0 0 0 0 0 0 0
* draw (sample) int64 4kB 0 1 2 3 4 5 6 7 ... 493 494 495 496 497 498 499
Data variables:
y (date, sample) float64 12kB 2.442 14.47 10.69 ... 3.808 3.132 5.092
Attributes:
created_at: 2025-08-14T19:24:17.732780+00:00
arviz_version: 0.22.0
inference_library: pymc
inference_library_version: 5.25.1
pymc_marketing_version: 0.15.1Así de simple, estás listo para ajustar tu modelo y desbloquear diferentes perspectivas de tus datos. Si deseas saber qué explorar a continuación, entonces echa un vistazo a los siguientes documentos:
Modelos de Mezcla de Marketing y Asignación de Presupuesto.
%load_ext watermark
%watermark -n -u -v -iv -w -p pytensor
Last updated: Thu Aug 14 2025
Python implementation: CPython
Python version : 3.12.11
IPython version : 9.4.0
pytensor: 2.31.7
pymc_marketing: 0.15.1
pandas : 2.3.1
Watermark: 2.5.0