I. 引言:营销效果评估的挑战与机遇企业每年投入巨额资金进行营销活动,但如何准确评估这些投入的真实回报却是一个长期存在的挑战。

传统的营销评估方法往往存在严重缺陷——简单的"活动前后对比"忽略了自然增长趋势,"参与用户分析"则受到选择偏差的严重影响。

想象一下这样的场景:某电商平台开展了一次大规模会员促销活动,活动后销售额显著提升。表面看来活动非常成功,但这是否真的是活动带来的效果?可能同时存在季节性因素、竞争对手活动、宏观经济变化等混杂因素。如果没有科学的评估方法,企业可能会基于错误结论做出下一步决策,导致资源错配。

II. 案例背景与业务问题定义1. 业务场景介绍某大型电商平台"优购网"在2023年双十一期间开展了一次大规模会员专享促销活动。活动针对平台的金牌会员推出专项折扣、专属优惠券和优先配送服务。活动持续30天,覆盖平台约500万金牌会员,营销投入总计约2000万元。

活动结束后,初步数据显示:

活动期间金牌会员销售额同比增长45%会员活跃度提升30%新会员注册量增加25%然而,这些表面数字背后存在严重的选择偏差和混杂因素:

金牌会员本身就是高价值用户,自然购买倾向更高双十一期间整体电商流量增长,存在季节性效应竞争对手同时期也有促销活动2. 核心评估挑战挑战类型

具体表现

潜在影响

选择偏差

金牌会员自选择成为会员,本身消费能力更强

高估活动效果

季节性因素

双十一期间整体消费水平提升

混淆活动效果

外部竞争

竞争对手活动影响用户行为

难以隔离净效果

长期效应

活动可能影响用户长期价值

短期分析不完整

3. 评估目标与成功指标基于业务需求,我们定义清晰的评估目标:

主要目标:量化会员促销活动的净增量收益(扣除自然增长和选择偏差)

具体评估指标:

短期收益:活动期间增量销售额、增量利润中期影响:会员留存率变化、客户生命周期价值变化投资回报:ROI(Return on Investment)、增量ROI4. 因果识别策略为了应对评估挑战,我们采用多方法验证的策略:

实验设计:由于业务限制无法进行完全随机实验,但可以利用准实验设计

数据策略:整合多源数据,构建合适的对照组

分析方法:结合双重差分(DID)、倾向得分匹配(PSM)等方法

现在我们已经明确了业务问题和评估框架,接下来需要收集和准备数据来支持分析。

III. 数据收集与探索性分析1. 数据源整合与预处理为了全面评估营销活动效果,我们整合了多个数据源:

代码语言:python复制import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

import seaborn as sns

from datetime import datetime, timedelta

import warnings

warnings.filterwarnings('ignore')

# 设置随机种子保证可重现性

np.random.seed(42)

# 生成模拟数据集(模拟真实业务数据)

def generate_marketing_data(n_users=100000, start_date='2023-09-01', end_date='2023-12-31'):

"""生成模拟的营销活动数据"""

# 日期范围

date_range = pd.date_range(start=start_date, end=end_date)

pre_period = [d for d in date_range if d < pd.Timestamp('2023-11-01')]

campaign_period = [d for d in date_range if d >= pd.Timestamp('2023-11-01') and d <= pd.Timestamp('2023-11-30')]

post_period = [d for d in date_range if d > pd.Timestamp('2023-11-30')]

# 用户基础特征

users = []

for i in range(n_users):

user = {

'user_id': i,

'age': max(18, min(70, int(np.random.normal(35, 10)))),

'income': max(2000, np.random.lognormal(10.2, 0.3)),

'city_tier': np.random.choice([1, 2, 3], p=[0.2, 0.5, 0.3]),

'signup_date': pd.Timestamp('2023-01-01') + timedelta(days=np.random.randint(0, 300)),

'is_gold_member': np.random.choice([0, 1], p=[0.7, 0.3]) # 30%是金牌会员

}

# 会员等级与用户价值相关

if user['is_gold_member'] == 1:

user['income'] = user['income'] * 1.5 # 会员收入更高

user['age'] = max(25, user['age']) # 会员年龄偏大

users.append(user)

user_df = pd.DataFrame(users)

# 生成用户行为数据

behavior_data = []

for user in users:

user_id = user['user_id']

is_gold = user['is_gold_member']

# 基础购买概率(会员更高)

base_purchase_prob = 0.1 if is_gold == 0 else 0.2

for date in date_range:

# 季节性效应:双十一期间购买概率提升

seasonal_effect = 1.0

if date >= pd.Timestamp('2023-11-01') and date <= pd.Timestamp('2023-11-30'):

seasonal_effect = 2.0 # 双十一期间整体购买概率翻倍

# 活动效应:仅对会员在活动期间有效

campaign_effect = 1.0

if is_gold == 1 and date in campaign_period:

campaign_effect = 1.5 # 活动额外提升50%

# 购买概率

purchase_prob = base_purchase_prob * seasonal_effect * campaign_effect

purchase_prob = min(purchase_prob, 0.8) # 设置上限

# 生成购买行为

has_purchase = np.random.binomial(1, purchase_prob)

if has_purchase:

# 购买金额(会员客单价更高)

base_amount = np.random.lognormal(5.5, 0.8) if is_gold == 0 else np.random.lognormal(6.0, 0.7)

# 活动期间金额提升

if date in campaign_period:

base_amount = base_amount * 1.2

amount = max(10, base_amount) # 最小金额10元

behavior_data.append({

'user_id': user_id,

'date': date,

'is_gold_member': is_gold,

'purchase_amount': amount,

'has_purchase': 1,

'period': 'pre' if date in pre_period else

'campaign' if date in campaign_period else 'post'

})

else:

behavior_data.append({

'user_id': user_id,

'date': date,

'is_gold_member': is_gold,

'purchase_amount': 0,

'has_purchase': 0,

'period': 'pre' if date in pre_period else

'campaign' if date in campaign_period else 'post'

})

behavior_df = pd.DataFrame(behavior_data)

# 合并用户特征和行为数据

final_df = behavior_df.merge(user_df, on='user_id')

return final_df, user_df

# 生成数据

print("开始生成模拟数据...")

marketing_data, user_info = generate_marketing_data(n_users=50000)

print(f"数据生成完成,总记录数: {len(marketing_data)}")

print(f"用户数: {marketing_data['user_id'].nunique()}")

print(f"数据时间范围: {marketing_data['date'].min()} 到 {marketing_data['date'].max()}")2. 数据质量检查与清洗代码语言:python复制# 数据质量检查

def data_quality_check(df):

"""全面数据质量检查"""

quality_report = {}

# 基本统计

quality_report['total_records'] = len(df)

quality_report['unique_users'] = df['user_id'].nunique()

quality_report['date_range'] = f"{df['date'].min()} 到 {df['date'].max()}"

# 缺失值检查

missing_data = df.isnull().sum()

quality_report['missing_values'] = missing_data[missing_data > 0].to_dict()

# 异常值检测

purchase_amounts = df[df['purchase_amount'] > 0]['purchase_amount']

Q1 = purchase_amounts.quantile(0.25)

Q3 = purchase_amounts.quantile(0.75)

IQR = Q3 - Q1

outliers = purchase_amounts[(purchase_amounts < (Q1 - 1.5 * IQR)) |

(purchase_amounts > (Q3 + 1.5 * IQR))]

quality_report['outlier_count'] = len(outliers)

quality_report['outlier_ratio'] = len(outliers) / len(purchase_amounts)

# 数据一致性检查

gold_member_consistency = df.groupby('user_id')['is_gold_member'].nunique()

quality_report['inconsistent_gold_status'] = len(gold_member_consistency[gold_member_consistency > 1])

return quality_report

# 执行数据质量检查

quality_report = data_quality_check(marketing_data)

print("数据质量报告:")

for key, value in quality_report.items():

print(f"{key}: {value}")

# 数据清洗(处理异常值)

def clean_data(df, outlier_threshold=0.02):

"""数据清洗函数"""

# 复制数据避免修改原数据

cleaned_df = df.copy()

# 处理购买金额异常值(Winsorize方法)

purchase_amounts = cleaned_df[cleaned_df['purchase_amount'] > 0]['purchase_amount']

lower_bound = purchase_amounts.quantile(outlier_threshold)

upper_bound = purchase_amounts.quantile(1 - outlier_threshold)

cleaned_df['purchase_amount_clean'] = cleaned_df['purchase_amount'].clip(

lower=lower_bound, upper=upper_bound)

# 确保会员状态一致性

user_gold_status = cleaned_df.groupby('user_id')['is_gold_member'].first()

cleaned_df = cleaned_df.drop('is_gold_member', axis=1)

cleaned_df = cleaned_df.merge(user_gold_status.reset_index(), on='user_id')

print(f"异常值处理: 将{len(cleaned_df[cleaned_df['purchase_amount'] != cleaned_df['purchase_amount_clean']])}条记录的金额进行缩尾处理")

return cleaned_df

# 执行数据清洗

marketing_clean = clean_data(marketing_data)3. 探索性数据分析代码语言:python复制# 探索性数据分析

def exploratory_analysis(df, user_info):

"""全面的探索性分析"""

# 1. 用户特征分布

fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# 会员分布

member_dist = df.groupby('user_id')['is_gold_member'].first().value_counts()

axes[0, 0].pie(member_dist.values, labels=['普通会员', '金牌会员'], autopct='%1.1f%%')

axes[0, 0].set_title('会员类型分布')

# 年龄分布

user_ages = user_info.groupby('user_id')['age'].first()

axes[0, 1].hist(user_ages, bins=20, alpha=0.7, edgecolor='black')

axes[0, 1].set_xlabel('年龄')

axes[0, 1].set_ylabel('用户数')

axes[0, 1].set_title('用户年龄分布')

# 收入分布

user_income = user_info.groupby('user_id')['income'].first()

axes[0, 2].hist(user_income, bins=30, alpha=0.7, edgecolor='black')

axes[0, 2].set_xlabel('收入')

axes[0, 2].set_ylabel('用户数')

axes[0, 2].set_title('用户收入分布')

# 2. 购买行为时间趋势

daily_stats = df.groupby('date').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum',

'user_id': 'nunique'

}).reset_index()

daily_stats['avg_spend'] = daily_stats['purchase_amount_clean'] / daily_stats['has_purchase']

daily_stats['purchase_rate'] = daily_stats['has_purchase'] / daily_stats['user_id']

axes[1, 0].plot(daily_stats['date'], daily_stats['purchase_amount_clean'])

axes[1, 0].axvline(x=pd.Timestamp('2023-11-01'), color='red', linestyle='--', label='活动开始')

axes[1, 0].axvline(x=pd.Timestamp('2023-11-30'), color='red', linestyle='--', label='活动结束')

axes[1, 0].set_xlabel('日期')

axes[1, 0].set_ylabel('日销售额')

axes[1, 0].set_title('日销售额趋势')

axes[1, 0].legend()

axes[1, 0].tick_params(axis='x', rotation=45)

# 3. 会员vs非会员对比

member_daily = df.groupby(['date', 'is_gold_member']).agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

gold_daily = member_daily[member_daily['is_gold_member'] == 1]

regular_daily = member_daily[member_daily['is_gold_member'] == 0]

axes[1, 1].plot(gold_daily['date'], gold_daily['purchase_amount_clean'], label='金牌会员')

axes[1, 1].plot(regular_daily['date'], regular_daily['purchase_amount_clean'], label='普通会员')

axes[1, 1].axvline(x=pd.Timestamp('2023-11-01'), color='red', linestyle='--')

axes[1, 1].axvline(x=pd.Timestamp('2023-11-30'), color='red', linestyle='--')

axes[1, 1].set_xlabel('日期')

axes[1, 1].set_ylabel('日销售额')

axes[1, 1].set_title('会员类型销售额对比')

axes[1, 1].legend()

axes[1, 1].tick_params(axis='x', rotation=45)

# 4. 购买率对比

purchase_rates = df.groupby(['period', 'is_gold_member']).agg({

'has_purchase': 'mean',

'purchase_amount_clean': 'mean'

}).reset_index()

x = np.arange(3) # pre, campaign, post

width = 0.35

gold_rates = purchase_rates[purchase_rates['is_gold_member'] == 1]['has_purchase']

regular_rates = purchase_rates[purchase_rates['is_gold_member'] == 0]['has_purchase']

axes[1, 2].bar(x - width/2, gold_rates, width, label='金牌会员')

axes[1, 2].bar(x + width/2, regular_rates, width, label='普通会员')

axes[1, 2].set_xlabel('时期')

axes[1, 2].set_ylabel('购买率')

axes[1, 2].set_title('不同时期购买率对比')

axes[1, 2].set_xticks(x)

axes[1, 2].set_xticklabels(['活动前', '活动期', '活动后'])

axes[1, 2].legend()

plt.tight_layout()

plt.show()

return daily_stats, purchase_rates

# 执行探索性分析

daily_stats, purchase_rates = exploratory_analysis(marketing_clean, user_info)

# 输出关键统计量

print("\n关键统计量:")

print("=" * 50)

pre_campaign = marketing_clean[marketing_clean['period'] == 'pre']

campaign_period = marketing_clean[marketing_clean['period'] == 'campaign']

gold_pre = pre_campaign[pre_campaign['is_gold_member'] == 1]

gold_campaign = campaign_period[campaign_period['is_gold_member'] == 1]

print(f"活动前金牌会员购买率: {gold_pre['has_purchase'].mean():.3f}")

print(f"活动期金牌会员购买率: {gold_campaign['has_purchase'].mean():.3f}")

print(f"名义增长率: {(gold_campaign['has_purchase'].mean() / gold_pre['has_purchase'].mean() - 1) * 100:.1f}%")通过深入的数据探索,我们已经对数据特征和潜在模式有了清晰了解。接下来需要选择适当的因果推断方法来估计活动的真实效果。

IV. 因果推断方法选择与原理1. 因果图与识别假设在开始建模前,我们需要明确因果关系和识别假设。以下是本次营销活动的因果图:

基于因果图,我们提出以下识别假设:

假设

内容

验证方法

条件独立性

给定观察到的用户特征,处理分配与潜在结果独立

平衡性检验

共同支持

处理组和对照组的倾向得分分布有重叠

倾向得分分布可视化

平行趋势

处理组和对照组在活动前有平行趋势

前段趋势检验

无溢出效应

活动效果不溢出到对照组

空间相关性检验

2. 方法选择比较针对本次评估,我们考虑以下因果推断方法:

代码语言:python复制# 方法选择评估框架

def method_selection_evaluation(df):

"""评估不同因果推断方法的适用性"""

evaluation_results = {}

# 1. 数据特征分析

n_treated = df[df['is_gold_member'] == 1]['user_id'].nunique()

n_control = df[df['is_gold_member'] == 0]['user_id'].nunique()

evaluation_results['sample_sizes'] = {'treated': n_treated, 'control': n_control}

# 2. 平行趋势检验

pre_period_data = df[df['period'] == 'pre']

trend_comparison = pre_period_data.groupby(['is_gold_member', 'date']).agg({

'purchase_amount_clean': 'mean',

'has_purchase': 'mean'

}).reset_index()

# 计算活动前趋势相关性

gold_trend = trend_comparison[trend_comparison['is_gold_member'] == 1].set_index('date')['purchase_amount_clean']

regular_trend = trend_comparison[trend_comparison['is_gold_member'] == 0].set_index('date')['purchase_amount_clean']

trend_correlation = gold_trend.corr(regular_trend)

evaluation_results['pre_trend_correlation'] = trend_correlation

# 3. 协变量平衡性检查

user_features = df.groupby('user_id').first()[['age', 'income', 'city_tier']]

user_features['is_gold_member'] = df.groupby('user_id')['is_gold_member'].first()

from scipy import stats

balance_results = {}

for feature in ['age', 'income', 'city_tier']:

treated_mean = user_features[user_features['is_gold_member'] == 1][feature].mean()

control_mean = user_features[user_features['is_gold_member'] == 0][feature].mean()

treated_std = user_features[user_features['is_gold_member'] == 1][feature].std()

control_std = user_features[user_features['is_gold_member'] == 0][feature].std()

std_diff = (treated_mean - control_mean) / np.sqrt((treated_std**2 + control_std**2) / 2)

t_stat, p_value = stats.ttest_ind(

user_features[user_features['is_gold_member'] == 1][feature],

user_features[user_features['is_gold_member'] == 0][feature]

)

balance_results[feature] = {

'std_diff': std_diff,

'p_value': p_value,

'balanced': abs(std_diff) < 0.1 and p_value > 0.05

}

evaluation_results['covariate_balance'] = balance_results

# 4. 方法推荐

method_recommendations = []

# 检查DID适用条件

did_appropriate = trend_correlation > 0.8

if did_appropriate:

method_recommendations.append({

'method': 'DID',

'confidence': '高',

'reason': '活动前趋势高度相关,满足平行趋势假设'

})

else:

method_recommendations.append({

'method': 'DID',

'confidence': '中',

'reason': '平行趋势假设需要进一步检验'

})

# 检查PSM适用条件

imbalance_count = sum(1 for feature in balance_results.values() if not feature['balanced'])

if imbalance_count > 0:

method_recommendations.append({

'method': 'PSM',

'confidence': '高',

'reason': f'有{imbalance_count}个协变量不平衡,需要匹配'

})

else:

method_recommendations.append({

'method': 'PSM',

'confidence': '中',

'reason': '协变量基本平衡,PSM仍可提高精度'

})

# 样本量支持多种方法

if n_treated > 1000 and n_control > 1000:

method_recommendations.append({

'method': '多种方法结合',

'confidence': '高',

'reason': '样本量充足,适合方法三角验证'

})

evaluation_results['method_recommendations'] = method_recommendations

return evaluation_results

# 执行方法选择评估

method_evaluation = method_selection_evaluation(marketing_clean)

print("方法选择评估结果:")

print("=" * 50)

print(f"处理组样本量: {method_evaluation['sample_sizes']['treated']}")

print(f"对照组样本量: {method_evaluation['sample_sizes']['control']}")

print(f"活动前趋势相关性: {method_evaluation['pre_trend_correlation']:.3f}")

print("\n协变量平衡性检查:")

for feature, stats in method_evaluation['covariate_balance'].items():

balanced_status = "平衡" if stats['balanced'] else "不平衡"

print(f"{feature}: 标准化差异={stats['std_diff']:.3f}, p值={stats['p_value']:.3f} ({balanced_status})")

print("\n方法推荐:")

for recommendation in method_evaluation['method_recommendations']:

print(f"{recommendation['method']}: 置信度{recommendation['confidence']} - {recommendation['reason']}")3. 确定最终分析方法基于评估结果,我们决定采用以下分析策略:

主要方法:双重差分法(DID)结合倾向得分匹配(PSM)

辅助方法:合成控制法(作为稳健性检验)

估计目标:平均处理效应(ATE)和异质性处理效应

现在我们已经确定了分析方法,接下来将构建具体的因果模型来估计营销活动的真实效果。

V. 模型构建与效果估计1. 数据准备与特征工程在构建因果模型前,我们需要准备适当的数据格式和特征:

代码语言:python复制# 数据准备与特征工程

def prepare_causal_analysis_data(df):

"""准备因果分析所需的数据格式"""

# 创建用户级汇总数据

user_level_data = df.groupby('user_id').agg({

'age': 'first',

'income': 'first',

'city_tier': 'first',

'is_gold_member': 'first',

'signup_date': 'first'

}).reset_index()

# 计算活动前行为特征(作为协变量)

pre_period_data = df[df['period'] == 'pre']

pre_behavior = pre_period_data.groupby('user_id').agg({

'purchase_amount_clean': ['sum', 'mean', 'count'],

'has_purchase': 'sum'

}).reset_index()

# 处理多级列名

pre_behavior.columns = ['user_id', 'pre_total_spend', 'pre_avg_spend', 'pre_purchase_count', 'pre_purchase_days']

pre_behavior['pre_purchase_rate'] = pre_behavior['pre_purchase_days'] / len(pre_period_data['date'].unique())

# 计算活动期间结果变量

campaign_data = df[df['period'] == 'campaign']

campaign_behavior = campaign_data.groupby('user_id').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

campaign_behavior.columns = ['user_id', 'campaign_total_spend', 'campaign_purchase_days']

# 合并所有特征

analysis_data = user_level_data.merge(pre_behavior, on='user_id', how='left')

analysis_data = analysis_data.merge(campaign_behavior, on='user_id', how='left')

# 处理缺失值(新用户可能在某个时期没有行为)

analysis_data = analysis_data.fillna(0)

# 创建时间相关特征

analysis_data['tenure_days'] = (pd.Timestamp('2023-11-01') - analysis_data['signup_date']).dt.days

analysis_data['tenure_group'] = pd.cut(analysis_data['tenure_days'],

bins=[0, 30, 90, 180, 365, float('inf')],

labels=['新用户', '1-3月', '3-6月', '6-12月', '老用户'])

return analysis_data

# 准备分析数据

analysis_df = prepare_causal_analysis_data(marketing_clean)

print(f"分析数据集形状: {analysis_df.shape}")

print(f"金牌会员比例: {analysis_df['is_gold_member'].mean():.3f}")

# 检查处理组和对照组的基线差异

print("\n处理组vs对照组基线特征比较:")

baseline_comparison = analysis_df.groupby('is_gold_member').agg({

'age': 'mean',

'income': 'mean',

'pre_total_spend': 'mean',

'pre_purchase_rate': 'mean',

'tenure_days': 'mean'

}).round(2)

print(baseline_comparison)2. 倾向得分匹配实现我们首先使用倾向得分匹配来创建可比较的处理组和对照组:

代码语言:python复制from sklearn.linear_model import LogisticRegression

from sklearn.ensemble import RandomForestClassifier

from sklearn.preprocessing import StandardScaler

import matplotlib.pyplot as plt

def propensity_score_matching(data, treatment_col='is_gold_member', outcome_col='campaign_total_spend'):

"""实现倾向得分匹配"""

# 准备特征矩阵

feature_cols = ['age', 'income', 'city_tier', 'pre_total_spend', 'pre_purchase_rate', 'tenure_days']

X = data[feature_cols]

T = data[treatment_col]

Y = data[outcome_col]

# 标准化特征

scaler = StandardScaler()

X_scaled = scaler.fit_transform(X)

# 估计倾向得分(使用随机森林)

ps_model = RandomForestClassifier(n_estimators=100, random_state=42)

ps_model.fit(X_scaled, T)

data['propensity_score'] = ps_model.predict_proba(X_scaled)[:, 1]

# 可视化倾向得分分布

plt.figure(figsize=(10, 6))

for treatment_status in [0, 1]:

subset = data[data[treatment_col] == treatment_status]

plt.hist(subset['propensity_score'], alpha=0.7,

label=f"{'处理组' if treatment_status == 1 else '对照组'}",

bins=30, density=True)

plt.xlabel('倾向得分')

plt.ylabel('密度')

plt.title('处理组和对照组的倾向得分分布')

plt.legend()

plt.show()

# 执行最近邻匹配

from sklearn.neighbors import NearestNeighbors

treated = data[data[treatment_col] == 1]

control = data[data[treatment_col] == 0]

# 为每个处理组个体找到最近邻

nbrs = NearestNeighbors(n_neighbors=1, metric='euclidean')

nbrs.fit(control[['propensity_score']])

distances, indices = nbrs.kneighbors(treated[['propensity_score']])

# 应用卡尺(最大距离0.1)

caliper = 0.1

matched_control_indices = []

for i, distance in enumerate(distances):

if distance[0] <= caliper:

matched_control_indices.append(indices[i][0])

matched_control = control.iloc[matched_control_indices]

matched_treated = treated.iloc[[i for i, d in enumerate(distances) if d[0] <= caliper]]

matched_data = pd.concat([matched_treated, matched_control])

print(f"匹配前: 处理组{len(treated)}人, 对照组{len(control)}人")

print(f"匹配后: 处理组{len(matched_treated)}人, 对照组{len(matched_control)}人")

print(f"匹配率: {len(matched_treated) / len(treated):.3f}")

# 检查匹配后平衡性

print("\n匹配后平衡性检查:")

for feature in feature_cols:

treated_mean = matched_treated[feature].mean()

control_mean = matched_control[feature].mean()

treated_std = matched_treated[feature].std()

control_std = matched_control[feature].std()

std_diff = (treated_mean - control_mean) / np.sqrt((treated_std**2 + control_std**2) / 2)

print(f"{feature}: 标准化差异 = {std_diff:.3f}")

return matched_data, matched_treated, matched_control

# 执行倾向得分匹配

matched_data, matched_treated, matched_control = propensity_score_matching(analysis_df)3. 双重差分模型实现现在实现双重差分模型来估计因果效应:

代码语言:python复制import statsmodels.api as sm

import statsmodels.formula.api as smf

def difference_in_differences_analysis(matched_data, pre_period_data, campaign_period_data):

"""实现双重差分分析"""

# 准备DID数据

did_data = []

# 处理组:活动前后数据

treated_users = matched_data[matched_data['is_gold_member'] == 1]['user_id']

# 活动前处理组

treated_pre = pre_period_data[pre_period_data['user_id'].isin(treated_users)]

treated_pre_agg = treated_pre.groupby('user_id').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

treated_pre_agg['period'] = 0 # 0表示活动前

treated_pre_agg['treated'] = 1 # 1表示处理组

# 活动期处理组

treated_campaign = campaign_period_data[campaign_period_data['user_id'].isin(treated_users)]

treated_campaign_agg = treated_campaign.groupby('user_id').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

treated_campaign_agg['period'] = 1 # 1表示活动期

treated_campaign_agg['treated'] = 1

# 对照组:活动前后数据

control_users = matched_data[matched_data['is_gold_member'] == 0]['user_id']

# 活动前对照组

control_pre = pre_period_data[pre_period_data['user_id'].isin(control_users)]

control_pre_agg = control_pre.groupby('user_id').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

control_pre_agg['period'] = 0

control_pre_agg['treated'] = 0 # 0表示对照组

# 活动期对照组

control_campaign = campaign_period_data[campaign_period_data['user_id'].isin(control_users)]

control_campaign_agg = control_campaign.groupby('user_id').agg({

'purchase_amount_clean': 'sum',

'has_purchase': 'sum'

}).reset_index()

control_campaign_agg['period'] = 1

control_campaign_agg['treated'] = 0

# 合并所有数据

did_data = pd.concat([treated_pre_agg, treated_campaign_agg, control_pre_agg, control_campaign_agg])

# 创建交互项(DID核心变量)

did_data['post_treatment'] = did_data['period'] * did_data['treated']

# 双重差分回归

model = smf.ols('purchase_amount_clean ~ period + treated + post_treatment', data=did_data).fit()

# 提取DID估计量

did_estimate = model.params['post_treatment']

did_std_err = model.bse['post_treatment']

did_p_value = model.pvalues['post_treatment']

print("双重差分模型结果:")

print(model.summary())

# 可视化DID结果

summary_stats = did_data.groupby(['treated', 'period']).agg({

'purchase_amount_clean': 'mean'

}).reset_index()

plt.figure(figsize=(10, 6))

for treated_status in [0, 1]:

subset = summary_stats[summary_stats['treated'] == treated_status]

label = '处理组' if treated_status == 1 else '对照组'

plt.plot(subset['period'], subset['purchase_amount_clean'], 'o-', label=label, linewidth=2)

plt.axvline(x=0.5, color='red', linestyle='--', alpha=0.7, label='活动开始')

plt.xlabel('时期 (0=活动前, 1=活动期)')

plt.ylabel('平均消费金额')

plt.title('双重差分分析结果')

plt.legend()

plt.grid(True, alpha=0.3)

plt.show()

return model, did_estimate, did_std_err, did_p_value

# 准备DID所需数据

pre_period_data = marketing_clean[marketing_clean['period'] == 'pre']

campaign_period_data = marketing_clean[marketing_clean['period'] == 'campaign']

# 执行DID分析

did_model, did_estimate, did_std_err, did_p_value = difference_in_differences_analysis(

matched_data, pre_period_data, campaign_period_data

)

print(f"\nDID估计结果:")

print(f"平均处理效应 (ATE): {did_estimate:.2f}")

print(f"标准误: {did_std_err:.2f}")

print(f"p值: {did_p_value:.4f}")

print(f"95%置信区间: [{did_estimate - 1.96 * did_std_err:.2f}, {did_estimate + 1.96 * did_std_err:.2f}]")4. 异质性效应分析除了平均效应,我们还关心活动对不同用户群体的异质性效果:

代码语言:python复制def heterogeneous_effects_analysis(matched_data, marketing_clean):

"""分析处理效应的异质性"""

# 准备用户特征和结果数据

user_features = matched_data[['user_id', 'age', 'income', 'city_tier',

'pre_total_spend', 'pre_purchase_rate', 'tenure_days']]

campaign_results = marketing_clean[marketing_clean['period'] == 'campaign']

campaign_results = campaign_results.groupby('user_id').agg({

'purchase_amount_clean': 'sum'

}).reset_index()

analysis_data = user_features.merge(campaign_results, on='user_id')

analysis_data = analysis_data.merge(matched_data[['user_id', 'is_gold_member']], on='user_id')

# 定义用户细分维度

analysis_data['age_group'] = pd.cut(analysis_data['age'],

bins=[0, 25, 35, 45, 55, 100],

labels=['25以下', '25-35', '35-45', '45-55', '55以上'])

analysis_data['income_group'] = pd.cut(analysis_data['income'],

bins=[0, 5000, 10000, 20000, 50000, float('inf')],

labels=['5k以下', '5k-10k', '10k-20k', '20k-50k', '50k以上'])

analysis_data['value_group'] = pd.qcut(analysis_data['pre_total_spend'], 4,

labels=['低价值', '中低价值', '中高价值', '高价值'])

# 计算各细分的处理效应

hetero_results = []

for dimension in ['age_group', 'income_group', 'value_group']:

for group in analysis_data[dimension].unique():

group_data = analysis_data[analysis_data[dimension] == group]

treated_avg = group_data[group_data['is_gold_member'] == 1]['purchase_amount_clean'].mean()

control_avg = group_data[group_data['is_gold_member'] == 0]['purchase_amount_clean'].mean()

# 简单的差值作为处理效应估计

treatment_effect = treated_avg - control_avg

hetero_results.append({

'dimension': dimension,

'group': group,

'treated_avg': treated_avg,

'control_avg': control_avg,

'treatment_effect': treatment_effect,

'sample_size': len(group_data)

})

hetero_df = pd.DataFrame(hetero_results)

# 可视化异质性效应

fig, axes = plt.subplots(1, 3, figsize=(18, 6))

for i, dimension in enumerate(['age_group', 'income_group', 'value_group']):

dim_data = hetero_df[hetero_df['dimension'] == dimension]

axes[i].bar(range(len(dim_data)), dim_data['treatment_effect'])

axes[i].set_xlabel(dimension)

axes[i].set_ylabel('处理效应')

axes[i].set_title(f'{dimension}的处理效应异质性')

axes[i].set_xticks(range(len(dim_data)))

axes[i].set_xticklabels(dim_data['group'], rotation=45)

# 添加数值标签

for j, effect in enumerate(dim_data['treatment_effect']):

axes[i].text(j, effect + (0.1 if effect >= 0 else -0.5), f'{effect:.1f}',

ha='center', va='bottom' if effect >= 0 else 'top')

plt.tight_layout()

plt.show()

return hetero_df

# 执行异质性分析

hetero_effects = heterogeneous_effects_analysis(matched_data, marketing_clean)

print("异质性效应分析结果:")

print(hetero_effects[['dimension', 'group', 'treatment_effect', 'sample_size']].round(2))通过以上模型构建和分析,我们已经得到了营销活动效果的初步估计。接下来需要进行严格的敏感性分析来验证这些结果的可靠性。

VI. 敏感性分析与稳健性检验1. 倾向得分匹配的敏感性分析代码语言:python复制def sensitivity_analysis_psm(matched_data, analysis_df, outcome_col='campaign_total_spend'):

"""对倾向得分匹配结果进行敏感性分析"""

sensitivity_results = {}

# 1. 不同匹配方法的比较

from sklearn.neighbors import NearestNeighbors

def different_matching_methods(data, method='nearest', k=1, caliper=None):

"""实现不同的匹配方法"""

treated = data[data['is_gold_member'] == 1]

control = data[data['is_gold_member'] == 0]

if method == 'nearest':

nbrs = NearestNeighbors(n_neighbors=k, metric='euclidean')

nbrs.fit(control[['propensity_score']])

distances, indices = nbrs.kneighbors(treated[['propensity_score']])

elif method == 'radius':

nbrs = NearestNeighbors(radius=caliper, metric='euclidean')

nbrs.fit(control[['propensity_score']])

distances, indices = nbrs.radius_neighbors(treated[['propensity_score']])

# 应用匹配

matched_control_indices = []

for i, match_indices in enumerate(indices):

if len(match_indices) > 0:

if method == 'nearest':

matched_control_indices.extend(match_indices[:k])

else: # radius matching

matched_control_indices.extend(match_indices)

matched_control = control.iloc[matched_control_indices]

matched_treated = treated.iloc[[i for i in range(len(treated)) if len(indices[i]) > 0]]

return matched_treated, matched_control

# 比较不同匹配方法

methods = [

('1:1最近邻', 'nearest', 1, 0.1),

('1:3最近邻', 'nearest', 3, 0.1),

('半径匹配', 'radius', None, 0.05)

]

method_comparison = []

for method_name, method_type, k, caliper in methods:

matched_treated, matched_control = different_matching_methods(

analysis_df, method_type, k, caliper)

if len(matched_treated) > 0 and len(matched_control) > 0:

treated_avg = matched_treated[outcome_col].mean()

control_avg = matched_control[outcome_col].mean()

ate = treated_avg - control_avg

method_comparison.append({

'method': method_name,

'treated_sample': len(matched_treated),

'control_sample': len(matched_control),

'ate': ate

})

sensitivity_results['matching_methods'] = pd.DataFrame(method_comparison)

# 2. 不同倾向得分模型的比较

def different_ps_models(data, model_type='logistic'):

"""使用不同的模型估计倾向得分"""

feature_cols = ['age', 'income', 'city_tier', 'pre_total_spend', 'pre_purchase_rate', 'tenure_days']

X = data[feature_cols]

T = data['is_gold_member']

scaler = StandardScaler()

X_scaled = scaler.fit_transform(X)

if model_type == 'logistic':

model = LogisticRegression(random_state=42)

elif model_type == 'random_forest':

model = RandomForestClassifier(n_estimators=100, random_state=42)

elif model_type == 'gradient_boosting':

from sklearn.ensemble import GradientBoostingClassifier

model = GradientBoostingClassifier(n_estimators=100, random_state=42)

model.fit(X_scaled, T)

propensity_scores = model.predict_proba(X_scaled)[:, 1]

return propensity_scores

ps_model_comparison = []

for model_name in ['logistic', 'random_forest', 'gradient_boosting']:

analysis_df_copy = analysis_df.copy()

analysis_df_copy['propensity_score'] = different_ps_models(analysis_df, model_name)

# 使用相同的匹配方法

matched_treated, matched_control = different_matching_methods(

analysis_df_copy, 'nearest', 1, 0.1)

if len(matched_treated) > 0 and len(matched_control) > 0:

treated_avg = matched_treated[outcome_col].mean()

control_avg = matched_control[outcome_col].mean()

ate = treated_avg - control_avg

ps_model_comparison.append({

'ps_model': model_name,

'ate': ate,

'treated_sample': len(matched_treated)

})

sensitivity_results['ps_models'] = pd.DataFrame(ps_model_comparison)

# 3. 不同卡尺值的敏感性

caliper_sensitivity = []

for caliper in [0.01, 0.05, 0.1, 0.2]:

matched_treated, matched_control = different_matching_methods(

analysis_df, 'nearest', 1, caliper)

if len(matched_treated) > 0 and len(matched_control) > 0:

treated_avg = matched_treated[outcome_col].mean()

control_avg = matched_control[outcome_col].mean()

ate = treated_avg - control_avg

caliper_sensitivity.append({

'caliper': caliper,

'ate': ate,

'matched_pairs': len(matched_treated),

'match_rate': len(matched_treated) / len(analysis_df[analysis_df['is_gold_member'] == 1])

})

sensitivity_results['caliper_sensitivity'] = pd.DataFrame(caliper_sensitivity)

return sensitivity_results

# 执行敏感性分析

sensitivity_results = sensitivity_analysis_psm(matched_data, analysis_df)

print("倾向得分匹配敏感性分析结果:")

print("\n1. 不同匹配方法比较:")

print(sensitivity_results['matching_methods'])

print("\n2. 不同倾向得分模型比较:")

print(sensitivity_results['ps_models'])

print("\n3. 不同卡尺值敏感性:")

print(sensitivity_results['caliper_sensitivity'])2. Placebo测试与证伪检验代码语言:python复制def placebo_tests(marketing_clean, analysis_df):

"""进行Placebo测试验证方法有效性"""

placebo_results = {}

# 1. 前段Placebo测试:使用活动前数据模拟活动

pre_data = marketing_clean[marketing_clean['period'] == 'pre']

# 将前段分为"伪活动前"和"伪活动期"

pre_dates = pre_data['date'].unique()

split_date = pre_dates[len(pre_dates) // 2]

pseudo_pre = pre_data[pre_data['date'] < split_date]

pseudo_campaign = pre_data[pre_data['date'] >= split_date]

# 使用相同的DID方法分析伪活动

pseudo_did_data = []

for user_id in analysis_df['user_id']:

user_pre = pseudo_pre[pseudo_pre['user_id'] == user_id]['purchase_amount_clean'].sum()

user_campaign = pseudo_campaign[pseudo_campaign['user_id'] == user_id]['purchase_amount_clean'].sum()

is_treated = analysis_df[analysis_df['user_id'] == user_id]['is_gold_member'].iloc[0]

pseudo_did_data.extend([

{'user_id': user_id, 'outcome': user_pre, 'period': 0, 'treated': is_treated},

{'user_id': user_id, 'outcome': user_campaign, 'period': 1, 'treated': is_treated}

])

pseudo_did_df = pd.DataFrame(pseudo_did_data)

pseudo_did_df['post_treatment'] = pseudo_did_df['period'] * pseudo_did_df['treated']

# Placebo DID回归

placebo_model = smf.ols('outcome ~ period + treated + post_treatment', data=pseudo_did_df).fit()

placebo_effect = placebo_model.params['post_treatment']

placebo_p_value = placebo_model.pvalues['post_treatment']

placebo_results['pre_period_placebo'] = {

'effect': placebo_effect,

'p_value': placebo_p_value,

'significant': placebo_p_value < 0.05

}

# 2. 随机处理分配测试

n_randomizations = 100

random_effects = []

for i in range(n_randomizations):

# 随机分配处理状态

analysis_df_copy = analysis_df.copy()

analysis_df_copy['random_treated'] = np.random.choice([0, 1], size=len(analysis_df_copy),

p=[0.7, 0.3]) # 保持相同比例

# 计算随机处理效应

treated_avg = analysis_df_copy[analysis_df_copy['random_treated'] == 1]['campaign_total_spend'].mean()

control_avg = analysis_df_copy[analysis_df_copy['random_treated'] == 0]['campaign_total_spend'].mean()

random_effect = treated_avg - control_avg

random_effects.append(random_effect)

random_effects = np.array(random_effects)

actual_effect = analysis_df[analysis_df['is_gold_member'] == 1]['campaign_total_spend'].mean() - \

analysis_df[analysis_df['is_gold_member'] == 0]['campaign_total_spend'].mean()

# 计算p值(实际效应在随机分布中的位置)

randomization_p_value = np.mean(np.abs(random_effects) >= np.abs(actual_effect))

placebo_results['randomization_test'] = {

'actual_effect': actual_effect,

'randomization_p_value': randomization_p_value,

'significant': randomization_p_value < 0.05

}

# 可视化随机化测试结果

plt.figure(figsize=(10, 6))

plt.hist(random_effects, bins=30, alpha=0.7, edgecolor='black', density=True)

plt.axvline(x=actual_effect, color='red', linestyle='--', linewidth=2, label=f'实际效应: {actual_effect:.2f}')

plt.axvline(x=np.mean(random_effects), color='blue', linestyle='--', linewidth=2, label=f'随机效应均值: {np.mean(random_effects):.2f}')

plt.xlabel('处理效应')

plt.ylabel('密度')

plt.title('随机化Placebo测试')

plt.legend()

plt.show()

return placebo_results

# 执行Placebo测试

placebo_results = placebo_tests(marketing_clean, analysis_df)

print("Placebo测试结果:")

print("\n1. 前段Placebo测试:")

placebo_test = placebo_results['pre_period_placebo']

print(f"伪处理效应: {placebo_test['effect']:.2f}, p值: {placebo_test['p_value']:.4f}")

print(f"是否显著: {'是' if placebo_test['significant'] else '否'} (期望不显著)")

print("\n2. 随机化测试:")

random_test = placebo_results['randomization_test']

print(f"实际效应: {random_test['actual_effect']:.2f}")

print(f"随机化p值: {random_test['randomization_p_value']:.4f}")

print(f"是否显著: {'是' if random_test['significant'] else '否'} (期望显著)")3. Rosenbaum边界敏感性分析代码语言:python复制def rosenbaum_bound_sensitivity(matched_data, outcome_col='campaign_total_spend'):

"""Rosenbaum边界敏感性分析"""

# 准备匹配对数据

treated_users = matched_data[matched_data['is_gold_member'] == 1]['user_id']

control_users = matched_data[matched_data['is_gold_member'] == 0]['user_id']

# 简化:假设1:1匹配,计算匹配对内的差异

matched_pairs = []

for i, treated_id in enumerate(treated_users[:min(len(treated_users), len(control_users))]):

control_id = control_users.iloc[i]

treated_outcome = matched_data[matched_data['user_id'] == treated_id][outcome_col].values[0]

control_outcome = matched_data[matched_data['user_id'] == control_id][outcome_col].values[0]

matched_pairs.append({

'pair_id': i,

'treated_outcome': treated_outcome,

'control_outcome': control_outcome,

'difference': treated_outcome - control_outcome

})

pairs_df = pd.DataFrame(matched_pairs)

# 符号检验(最保守的检验)

positive_differences = len(pairs_df[pairs_df['difference'] > 0])

total_pairs = len(pairs_df)

sign_test_p = stats.binom_test(positive_differences, total_pairs, 0.5)

# Rosenbaum边界分析(简化版)

gamma_values = [1.0, 1.2, 1.5, 2.0, 2.5, 3.0]

sensitivity_results = []

for gamma in gamma_values:

# 计算在给定gamma下的p值边界

# 简化计算:使用大样本近似

mean_diff = pairs_df['difference'].mean()

std_diff = pairs_df['difference'].std()

# Rosenbaum边界公式(简化)

bound_statistic = mean_diff / (std_diff / np.sqrt(total_pairs))

bound_statistic_adjusted = bound_statistic / np.sqrt(gamma)

# 对应的p值

p_value_upper = 2 * (1 - stats.norm.cdf(np.abs(bound_statistic_adjusted)))

sensitivity_results.append({

'gamma': gamma,

'p_value_upper_bound': p_value_upper,

'still_significant': p_value_upper < 0.05

})

sensitivity_df = pd.DataFrame(sensitivity_results)

# 可视化敏感性分析

plt.figure(figsize=(10, 6))

plt.plot(sensitivity_df['gamma'], sensitivity_df['p_value_upper_bound'], 'o-', linewidth=2)

plt.axhline(y=0.05, color='red', linestyle='--', label='显著性阈值 (0.05)')

plt.xlabel('Gamma (隐藏偏误程度)')

plt.ylabel('p值上界')

plt.title('Rosenbaum边界敏感性分析')

plt.legend()

plt.grid(True, alpha=0.3)

plt.show()

# 找到临界gamma值

critical_gamma = sensitivity_df[sensitivity_df['p_value_upper_bound'] >= 0.05]['gamma'].min()

if pd.isna(critical_gamma):

critical_gamma = f"> {gamma_values[-1]}"

return {

'sign_test_p': sign_test_p,

'sensitivity_analysis': sensitivity_df,

'critical_gamma': critical_gamma

}

# 执行Rosenbaum边界分析

rosenbaum_results = rosenbaum_bound_sensitivity(matched_data)

print("Rosenbaum边界敏感性分析结果:")

print(f"符号检验p值: {rosenbaum_results['sign_test_p']:.4f}")

print(f"临界Gamma值: {rosenbaum_results['critical_gamma']}")

print("\n详细敏感性分析:")

print(rosenbaum_results['sensitivity_analysis'])通过严格的敏感性分析,我们对估计结果的稳健性有了更深入的理解。现在可以将这些统计结果转化为具体的业务洞察和投资回报计算。

VII. 业务洞察与ROI计算1. 效果估计汇总与业务解释代码语言:python复制def business_insights_summary(did_estimate, did_std_err, hetero_effects, matched_data, marketing_clean):

"""将统计结果转化为业务洞察"""

insights = {}

# 1. 总体效果估计

confidence_interval = [did_estimate - 1.96 * did_std_err, did_estimate + 1.96 * did_std_err]

insights['overall_effect'] = {

'point_estimate': did_estimate,

'confidence_interval': confidence_interval,

'interpretation': f"活动平均为每位金牌会员带来{did_estimate:.2f}元的增量消费"

}

# 2. 总增量收益计算

n_gold_members = marketing_clean['is_gold_member'].sum()

total_incremental_revenue = did_estimate * n_gold_members

insights['total_incremental'] = {

'n_gold_members': n_gold_members,

'incremental_per_member': did_estimate,

'total_incremental_revenue': total_incremental_revenue,

'interpretation': f"活动预计带来总收入增量{total_incremental_revenue:,.0f}元"

}

# 3. 异质性洞察

# 找出效果最好的细分群体

best_segment = hetero_effects.loc[hetero_effects['treatment_effect'].idxmax()]

worst_segment = hetero_effects.loc[hetero_effects['treatment_effect'].idxmin()]

insights['heterogeneity'] = {

'best_segment': {

'dimension': best_segment['dimension'],

'group': best_segment['group'],

'effect': best_segment['treatment_effect']

},

'worst_segment': {

'dimension': worst_segment['dimension'],

'group': worst_segment['group'],

'effect': worst_segment['treatment_effect']

}

}

# 4. 成本效益分析准备

campaign_costs = {

'discount_costs': 15000000, # 折扣成本1500万

'marketing_spend': 5000000, # 营销投入500万

'operational_costs': 2000000 # 运营成本200万

}

total_costs = sum(campaign_costs.values())

insights['cost_analysis'] = {

'total_costs': total_costs,

'cost_breakdown': campaign_costs,

'incremental_revenue': total_incremental_revenue

}

return insights

# 生成业务洞察

business_insights = business_insights_summary(did_estimate, did_std_err, hetero_effects, matched_data, marketing_clean)

print("业务洞察总结:")

print("=" * 50)

print(f"1. 总体效果: {business_insights['overall_effect']['interpretation']}")

print(f" 95%置信区间: [{business_insights['overall_effect']['confidence_interval'][0]:.2f}, {business_insights['overall_effect']['confidence_interval'][1]:.2f}]")

print(f"\n2. 总增量收益: {business_insights['total_incremental']['interpretation']}")

print(f" 影响会员数: {business_insights['total_incremental']['n_gold_members']:,}人")

print(f"\n3. 异质性分析:")

best_segment = business_insights['heterogeneity']['best_segment']

worst_segment = business_insights['heterogeneity']['worst_segment']

print(f" 最有效群体: {best_segment['dimension']}-{best_segment['group']}, 效应值: {best_segment['effect']:.2f}元")

print(f" 效果最差群体: {worst_segment['dimension']}-{worst_segment['group']}, 效应值: {worst_segment['effect']:.2f}元")

print(f"\n4. 成本分析:")

print(f" 总活动成本: {business_insights['cost_analysis']['total_costs']:,.0f}元")

print(f" 增量收入: {business_insights['cost_analysis']['incremental_revenue']:,.0f}元")2. ROI计算与投资决策支持代码语言:python复制def roi_calculation(business_insights, profit_margin=0.3):

"""计算投资回报率及相关指标"""

cost_data = business_insights['cost_analysis']

incremental_revenue = cost_data['incremental_revenue']

total_costs = cost_data['total_costs']

# 增量利润(考虑毛利率)

incremental_profit = incremental_revenue * profit_margin

# 投资回报率计算

roi = (incremental_profit - total_costs) / total_costs * 100 # 百分比

# 相关指标

romi = incremental_revenue / total_costs # 营销投资回报率

break_even_romi = 1 / profit_margin # 盈亏平衡点

# 增量成本收益率

incremental_cost_ratio = incremental_profit / total_costs

roi_results = {

'incremental_revenue': incremental_revenue,

'incremental_profit': incremental_profit,

'total_costs': total_costs,

'roi_percentage': roi,

'romi': romi,

'break_even_romi': break_even_romi,

'incremental_cost_ratio': incremental_cost_ratio,

'profit_margin': profit_margin

}

# 投资建议

if roi > 0:

investment_advice = "建议继续投资"

confidence_level = "高"

elif romi > break_even_romi:

investment_advice = "考虑优化后继续"

confidence_level = "中"

else:

investment_advice = "建议停止或大幅调整"

confidence_level = "高"

roi_results['investment_advice'] = investment_advice

roi_results['confidence_level'] = confidence_level

return roi_results

# 计算ROI

roi_results = roi_calculation(business_insights)

print("\n投资回报分析:")

print("=" * 50)

print(f"增量收入: {roi_results['incremental_revenue']:,.0f}元")

print(f"增量利润(毛利率{roi_results['profit_margin']:.0%}): {roi_results['incremental_profit']:,.0f}元")

print(f"总成本: {roi_results['total_costs']:,.0f}元")

print(f"ROI: {roi_results['roi_percentage']:.1f}%")

print(f"ROMI: {roi_results['romi']:.2f} (盈亏平衡: {roi_results['break_even_romi']:.2f})")

print(f"增量成本收益率: {roi_results['incremental_cost_ratio']:.2f}")

print(f"\n投资建议: {roi_results['investment_advice']} (置信度: {roi_results['confidence_level']})")3. 战略建议与优化方向代码语言:python复制def strategic_recommendations(business_insights, hetero_effects, roi_results):

"""基于分析结果提出战略建议"""

recommendations = []

# 1. 总体投资建议

if roi_results['roi_percentage'] > 20:

recommendations.append({

'category': '投资策略',

'recommendation': '扩大投资',

'details': f"活动ROI达{roi_results['roi_percentage']:.1f}%,高于公司要求的最低回报率",

'priority': '高'

})

elif roi_results['roi_percentage'] > 0:

recommendations.append({

'category': '投资策略',

'recommendation': '维持投资',

'details': '活动有正向回报,但可进一步优化提升效果',

'priority': '中'

})

else:

recommendations.append({

'category': '投资策略',

'recommendation': '重新设计',

'details': '活动投资回报为负,需要重新评估活动设计',

'priority': '高'

})

# 2. 目标客群优化

best_segment = business_insights['heterogeneity']['best_segment']

worst_segment = business_insights['heterogeneity']['worst_segment']

recommendations.append({

'category': '目标客群',

'recommendation': '聚焦高响应群体',

'details': f"重点针对{best_segment['dimension']}的{best_segment['group']}群体,该群体响应效果最佳",

'priority': '高'

})

recommendations.append({

'category': '目标客群',

'recommendation': '优化低响应群体策略',

'details': f"对{worst_segment['dimension']}的{worst_segment['group']}群体需要调整营销信息或优惠力度",

'priority': '中'

})

# 3. 活动设计优化

if hetero_effects['treatment_effect'].std() / hetero_effects['treatment_effect'].mean() > 0.5:

recommendations.append({

'category': '活动设计',

'recommendation': '个性化活动设计',

'details': '不同群体响应差异显著,建议采用个性化活动方案',

'priority': '中'

})

# 4. 测量体系优化

recommendations.append({

'category': '测量体系',

'recommendation': '建立常态化测量框架',

'details': '建议建立标准化的营销活动效果测量流程,提高未来评估效率',

'priority': '中'

})

return pd.DataFrame(recommendations)

# 生成战略建议

strategy_recommendations = strategic_recommendations(business_insights, hetero_effects, roi_results)

print("\n战略建议:")

print("=" * 50)

for idx, row in strategy_recommendations.iterrows():

print(f"{idx + 1}. [{row['category']}] {row['recommendation']} (优先级: {row['priority']})")

print(f" 详情: {row['details']}")

print()4. 可视化报告与执行摘要代码语言:python复制def create_executive_summary(business_insights, roi_results, strategy_recommendations):

"""生成执行摘要可视化报告"""

fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. 总体效果可视化

effect_data = [business_insights['overall_effect']['point_estimate']]

ci_lower, ci_upper = business_insights['overall_effect']['confidence_interval']

errors = [[effect_data[0] - ci_lower], [ci_upper - effect_data[0]]]

axes[0, 0].bar(['平均处理效应'], effect_data, yerr=errors, capsize=10, alpha=0.7)

axes[0, 0].set_ylabel('增量消费金额 (元)')

axes[0, 0].set_title('营销活动平均处理效应')

axes[0, 0].grid(True, alpha=0.3)

# 2. ROI分析

roi_metrics = ['ROI', 'ROMI']

roi_values = [roi_results['roi_percentage'], roi_results['romi']]

axes[0, 1].bar(roi_metrics, roi_values, alpha=0.7)

axes[0, 1].axhline(y=0, color='red', linestyle='--', label='盈亏平衡')

axes[0, 1].set_ylabel('百分比 / 比率')

axes[0, 1].set_title('投资回报指标')

axes[0, 1].legend()

axes[0, 1].grid(True, alpha=0.3)

# 3. 异质性效应

hetero_subset = hetero_effects.nlargest(5, 'treatment_effect')

axes[1, 0].barh([f"{row['group']}({row['dimension']})" for _, row in hetero_subset.iterrows()],

hetero_subset['treatment_effect'], alpha=0.7)

axes[1, 0].set_xlabel('处理效应 (元)')

axes[1, 0].set_title('效果最好的五个细分群体')

axes[1, 0].grid(True, alpha=0.3)

# 4. 成本收益分析

categories = ['增量收入', '增量利润', '总成本']

values = [business_insights['cost_analysis']['incremental_revenue'] / 10000,

roi_results['incremental_profit'] / 10000,

roi_results['total_costs'] / 10000]

axes[1, 1].bar(categories, values, alpha=0.7)

axes[1, 1].set_ylabel('金额 (万元)')

axes[1, 1].set_title('成本收益分析')

axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()

plt.show()

# 文本执行摘要

print("执行摘要")

print("=" * 60)

print(f"活动名称: 双十一金牌会员专享促销")

print(f"评估期间: 2023年11月1日-11月30日")

print(f"覆盖用户: {business_insights['total_incremental']['n_gold_members']:,}名金牌会员")

print()

print("核心发现:")

print(f"• 平均为每位会员带来{business_insights['overall_effect']['point_estimate']:.1f}元增量消费")

print(f"• 总增量收入{business_insights['total_incremental']['total_incremental_revenue']:,.0f}元")

print(f"• 活动ROI为{roi_results['roi_percentage']:.1f}%,投资回报{roi_results['romi']:.2f}")

print()

print("战略建议:")

for idx, row in strategy_recommendations.nlargest(3, 'priority').iterrows():

print(f"• {row['recommendation']}: {row['details']}")

# 生成执行摘要

create_executive_summary(business_insights, roi_results, strategy_recommendations)通过完整的业务分析和ROI计算,我们已经将统计结果转化为具体的商业洞察和行动建议。最后,让我们总结本次评估的方法论和实践经验。