Data Science & Analytics Python Interview Guide: Coding Challenges Solved
数据科学与分析Python面试指南:编程挑战详解
Senior Data Engineer at Meta, Python expert with 10+ years
摘要 Summary
Expert solutions to Python coding challenges in data science and analytics interviews, with detailed explanations and best practices.
数据科学与分析面试中Python编程挑战的专家解答,包含详细解释和最佳实践。
DS/DA求职Python⾯试准备指南 ⽬录
Python基础与数据结构 Question : Google $ - $ Large Scale Data Processing Optimization Background: You're interviewing for a Data Scientist position at Google. The team works on optimizing YouTube's recommendation system that processes billions of user interactions daily. Question: "We have a Python function that processes user viewing history to extract features for our recommendation model. The current implementation is too slow for our scale. Here's the simplified version:
def extract_user_features(user_history):
features = {}
for video_id, timestamp, duration in user_history:
if video_id not in features:
features[video_id] = {'count': 0, 'total_time': 0, 'avg_time': 0}
features[video_id]['count'] += 1
features[video_id]['total_time'] += duration
features[video_id]['avg_time'] = features[video_id]['total_time'] /
features[video_id]['count']
return features
This function needs to process M+ records per hour. Optimize this code for
performance and memory efficiency. Also, explain how you would handle the case
where the data doesn't fit in memory."
思路分析:
这道题⽬完美体现了Google的核⼼挑战 - ⼤规模数据处理。作为YouTube这样的平台,每天
需要处理数⼗亿条⽤⼾交互数据,对性能和内存效率的要求极⾼。
问题理解与澄清: ⾸先需要明确⼏个关键点:数据的具体规模是多少?是否需要实时处理?
内存限制是什么?是否可以使⽤分布式处理?数据是否已经排序?
性能瓶颈分析: 当前代码的主要问题在于:1) 每次都重新计算平均值,造成重复计算;2) 字
典查找和更新操作频繁;3) 内存使⽤不够⾼效;4) 没有利⽤向量化操作。
优化⽅案设计:
第⼀层优化 - 算法层⾯改进:
from collections import defaultdict
import numpy as np
def extract_user_features_optimized_v1(user_history):
# 使用defaultdict避免重复的键检查
features = defaultdict(lambda: {'count': 0, 'total_time': 0})
for video_id, timestamp, duration in user_history:
features[video_id]['count'] += 1
features[video_id]['total_time'] += duration
# 批量计算平均值,避免重复计算
for video_id in features:
features[video_id]['avg_time'] = features[video_id]['total_time'] /
features[video_id]['count']
return dict(features)
第⼆层优化 - 使⽤NumPy向量化操作:
import pandas as pd
import numpy as np
def extract_user_features_optimized_v2(user_history):
# 转换为DataFrame进行向量化操作
df = pd.DataFrame(user_history, columns=['video_id', 'timestamp',
'duration'])
# 使用groupby进行高效聚合
features = df.groupby('video_id')['duration'].agg(['count', 'sum',
'mean']).to_dict('index')
# 重命名列以匹配原始格式
result = {}
for video_id, stats in features.items():
result[video_id] = {
'count': stats['count'],
'total_time': stats['sum'],
'avg_time': stats['mean']
}
return result
第三层优化 - 内存效率和⼤数据处理:
def extract_user_features_streaming(user_history_iterator,
chunk_size=1000000):
"""
流式处理大规模数据,适用于无法全部加载到内存的情况
"""
features = defaultdict(lambda: {'count': 0, 'total_time': 0})
chunk = []
for record in user_history_iterator:
chunk.append(record)
if len(chunk) >= chunk_size:
# 处理当前chunk
df_chunk = pd.DataFrame(chunk, columns=['video_id', 'timestamp',
'duration'])
chunk_features = df_chunk.groupby('video_id')
['duration'].agg(['count', 'sum'])
# 合并到总体特征中
for video_id, stats in chunk_features.iterrows():
features[video_id]['count'] += stats['count']
features[video_id]['total_time'] += stats['sum']
chunk = [] # 清空chunk释放内存
# 处理最后一个不完整的chunk
if chunk:
df_chunk = pd.DataFrame(chunk, columns=['video_id', 'timestamp',
'duration'])
chunk_features = df_chunk.groupby('video_id')['duration'].agg(['count',
'sum'])
for video_id, stats in chunk_features.iterrows():
features[video_id]['count'] += stats['count']
features[video_id]['total_time'] += stats['sum']
# 最终计算平均值
for video_id in features:
features[video_id]['avg_time'] = features[video_id]['total_time'] /
features[video_id]['count']
return dict(features)
分布式处理⽅案:
对于Google这样的规模,单机处理肯定是不够的。我会建议使⽤Apache Beam或者
Dataflow来进⾏分布式处理:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def create_feature_extraction_pipeline():
def extract_features(element):
video_id, duration_list = element
count = len(duration_list)
total_time = sum(duration_list)
avg_time = total_time / count if count > 0 else 0
return {
'video_id': video_id,
'count': count,
'total_time': total_time,
'avg_time': avg_time
}
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as pipeline:
features = (
pipeline
| 'Read Data' >> beam.io.ReadFromBigQuery(query="SELECT video_id,
duration FROM user_history")
| 'Group by Video' >> beam.Map(lambda x: (x['video_id'],
x['duration']))
| 'Group' >> beam.GroupByKey()
| 'Extract Features' >> beam.Map(extract_features)
| 'Write Results' >>
beam.io.WriteToBigQuery(table='features_table')
)
内存优化策略:
. 数据类型优化: 使⽤更⼩的数据类型,⽐如int⽽不是int
. 内存映射: 对于超⼤⽂件,使⽤memory mapping
. ⽣成器模式: 避免⼀次性加载所有数据
. 垃圾回收: 及时释放不需要的对象
性能测试和监控:
import time
import psutil
import gc
def benchmark_function(func, data, iterations=3):
"""
性能测试函数
"""
times = []
memory_usage = []
for i in range(iterations):
gc.collect() # 清理垃圾回收
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
result = func(data)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
times.append(end_time - start_time)
memory_usage.append(end_memory - start_memory)
return {
'avg_time': np.mean(times),
'avg_memory': np.mean(memory_usage),
'result_size': len(result)
}
⾯试回答要点:
. 展现系统思维: 不仅仅是代码优化,还要考虑整体架构
. 量化分析: 提供具体的性能提升数据
. 可扩展性: 考虑未来数据增⻓的情况
. 监控和维护: 提到如何监控系统性能
. 权衡取舍: 讨论不同⽅案的优缺点
这道题⽬展现了Google对于⼤规模数据处理能⼒的重视,以及对⼯程师系统设计思维的考
察。Question : Meta $ - $ Social Network Graph Data Structure Background: You're interviewing for a Data Scientist role at Meta. The team is working on analyzing friendship networks to improve friend recommendation algorithms. Question: "We need to analyze the friendship graph of Facebook users to identify potential friend recommendations. Given a user ID, we want to find mutual friends and calculate a friendship score based on common connections. Design a Python data structure and algorithm to efficiently handle this for a graph with billion users and billion friendship connections. Here's what we need:
How would you implement this efficiently?" 思路分析: 这道题⽬体现了Meta作为社交⽹络公司的核⼼业务挑战。需要处理超⼤规模的图数据结 构,同时保证查询效率和实时性。这不仅是⼀个算法问题,更是⼀个系统设计问题。 问题理解与澄清: ⾸先需要明确:图是有向还是⽆向的?是否需要考虑不同类型的关系?实 时更新的频率如何?查询的QPS⼤概是多少?是否可以接受⼀定的延迟? 数据结构设计: 考虑到Meta的规模,我们需要设计⼀个分布式的图存储系统。单机内存⽆法容纳30亿⽤⼾ 的完整图结构。
from collections import defaultdict, deque
import heapq
import redis
import json
from typing import Set, List, Tuple, Dict
import numpy as np
class DistributedSocialGraph:
def __init__(self, redis_cluster_nodes, num_shards=1000):
"""
分布式社交图数据结构
使用Redis集群进行分片存储
"""
self.redis_cluster =
redis.RedisCluster(startup_nodes=redis_cluster_nodes)
self.num_shards = num_shards
def _get_shard_key(self, user_id: int) -> str:
"""根据用户ID计算分片键"""
shard_id = user_id % self.num_shards
return f"shard:{shard_id}:user:{user_id}"
def add_friendship(self, user1: int, user2: int):
"""添加友谊关系"""
# 双向添加关系
key1 = self._get_shard_key(user1)
key2 = self._get_shard_key(user2)
# 使用Redis的集合数据结构存储朋友列表
self.redis_cluster.sadd(f"{key1}:friends", user2)
self.redis_cluster.sadd(f"{key2}:friends", user1)
# 更新朋友数量统计
self.redis_cluster.incr(f"{key1}:friend_count")
self.redis_cluster.incr(f"{key2}:friend_count")
def get_friends(self, user_id: int) -> Set[int]:
"""获取用户的所有朋友"""
key = self._get_shard_key(user_id)
friends = self.redis_cluster.smembers(f"{key}:friends")
return {int(friend) for friend in friends}
def find_mutual_friends(self, user1: int, user2: int) -> Set[int]:
"""查找两个用户的共同朋友"""
key1 = self._get_shard_key(user1)
key2 = self._get_shard_key(user2)
# 使用Redis的集合交集操作
mutual_friends = self.redis_cluster.sinter(
f"{key1}:friends",
f"{key2}:friends"
)
return {int(friend) for friend in mutual_friends}
def calculate_friendship_score(self, user1: int, user2: int) -> float:
"""
计算友谊分数
基于共同朋友数量、朋友网络重叠度等因素
"""
# 如果已经是朋友,返回0
if self.are_friends(user1, user2):
return 0.0
mutual_friends = self.find_mutual_friends(user1, user2)
mutual_count = len(mutual_friends)
if mutual_count == 0:
return 0.0
# 获取两个用户的朋友总数
user1_friend_count = self.get_friend_count(user1)
user2_friend_count = self.get_friend_count(user2)
# 计算Jaccard相似度
total_friends = user1_friend_count + user2_friend_count - mutual_count
jaccard_similarity = mutual_count / total_friends if total_friends > 0
else 0
# 考虑共同朋友的质量(他们的朋友数量)
mutual_friend_quality = 0
for friend_id in mutual_friends:
friend_count = self.get_friend_count(friend_id)
# 朋友数量适中的用户权重更高
quality_score = 1.0 / (1.0 + abs(friend_count - 150)) # 150是经验值
mutual_friend_quality += quality_score
# 综合分数计算
base_score = mutual_count * 0.4 # 共同朋友数量权重
similarity_score = jaccard_similarity * 0.3 # 相似度权重
quality_score = mutual_friend_quality * 0.3 # 质量权重
return base_score + similarity_score + quality_score
def recommend_friends(self, user_id: int, top_k: int = 10) ->
List[Tuple[int, float]]:
"""
为用户推荐朋友
使用二度连接算法
"""
user_friends = self.get_friends(user_id)
candidates = defaultdict(int)
# 遍历用户的所有朋友
for friend_id in user_friends:
friend_friends = self.get_friends(friend_id)
# 朋友的朋友作为候选
for candidate_id in friend_friends:
if candidate_id != user_id and candidate_id not in
user_friends:
candidates[candidate_id] += 1
# 计算每个候选者的友谊分数
scored_candidates = []
for candidate_id, mutual_count in candidates.items():
if mutual_count >= 2: # 至少有2个共同朋友
score = self.calculate_friendship_score(user_id, candidate_id)
scored_candidates.append((candidate_id, score))
# 返回分数最高的top_k个候选者
scored_candidates.sort(key=lambda x: x[1], reverse=True)
return scored_candidates[:top_k]
def are_friends(self, user1: int, user2: int) -> bool:
"""检查两个用户是否是朋友"""
key1 = self._get_shard_key(user1)
return self.redis_cluster.sismember(f"{key1}:friends", user2)
def get_friend_count(self, user_id: int) -> int:
"""获取用户的朋友数量"""
key = self._get_shard_key(user_id)
count = self.redis_cluster.get(f"{key}:friend_count")
return int(count) if count else 0
内存优化的本地缓存版本:
对于频繁访问的⽤⼾,我们可以使⽤本地缓存来提⾼性能:
from functools import lru_cache
import pickle
class OptimizedSocialGraph:
def __init__(self, cache_size=100000):
"""
优化版本的社交图,使用LRU缓存
"""
self.adjacency_list = defaultdict(set)
self.cache_size = cache_size
@lru_cache(maxsize=10000)
def get_friends_cached(self, user_id: int) -> frozenset:
"""缓存版本的获取朋友列表"""
return frozenset(self.adjacency_list[user_id])
def find_mutual_friends_optimized(self, user1: int, user2: int) ->
Set[int]:
"""优化版本的查找共同朋友"""
friends1 = self.get_friends_cached(user1)
friends2 = self.get_friends_cached(user2)
# 选择较小的集合进行遍历,提高效率
if len(friends1) < len(friends2):
return friends1 & friends2
else:
return friends2 & friends1
def batch_recommend_friends(self, user_ids: List[int]) -> Dict[int,
List[Tuple[int, float]]]:
"""
批量推荐朋友,提高吞吐量
"""
results = {}
# 预加载所有需要的朋友列表
all_friends = {}
for user_id in user_ids:
all_friends[user_id] = self.get_friends_cached(user_id)
for user_id in user_ids:
results[user_id] = self.recommend_friends(user_id)
return results
实时更新处理:
import asyncio
from kafka import KafkaConsumer, KafkaProducer
import json
class RealTimeFriendshipUpdater:
def __init__(self, graph: DistributedSocialGraph):
self.graph = graph
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
async def handle_friendship_update(self, user1: int, user2: int, action:
str):
"""处理实时的友谊关系更新"""
if action == "add":
self.graph.add_friendship(user1, user2)
# 异步更新推荐缓存
await self.update_recommendation_cache(user1, user2)
elif action == "remove":
self.graph.remove_friendship(user1, user2)
# 发送更新事件到下游系统
event = {
"user1": user1,
"user2": user2,
"action": action,
"timestamp": time.time()
}
self.producer.send("friendship_updates", event)
async def update_recommendation_cache(self, user1: int, user2: int):
"""更新推荐缓存"""
# 获取受影响的用户(朋友的朋友)
affected_users = set()
user1_friends = self.graph.get_friends(user1)
user2_friends = self.graph.get_friends(user2)
affected_users.update(user1_friends)
affected_users.update(user2_friends)
# 异步更新这些用户的推荐
tasks = []
for user_id in affected_users:
task =
asyncio.create_task(self.refresh_user_recommendations(user_id))
tasks.append(task)
await asyncio.gather(*tasks)
async def refresh_user_recommendations(self, user_id: int):
"""刷新单个用户的推荐"""
recommendations = self.graph.recommend_friends(user_id)
# 更新到缓存系统
cache_key = f"recommendations:{user_id}"
self.graph.redis_cluster.setex(
cache_key,
3600, # 1小时过期
json.dumps(recommendations)
)
性能优化策略:
. 分⽚策略: 根据⽤⼾ID进⾏⼀致性哈希分⽚
. 缓存层次: L本地缓存 + L Redis缓存 + L数据库
. 异步处理: 使⽤消息队列处理⾮实时更新
. 预计算: 对活跃⽤⼾预计算推荐结果
. 索引优化: 为常⽤查询建⽴专⻔的索引
⾯试回答要点:
. 规模意识: 强调对30亿⽤⼾规模的理解
. 系统设计: 不仅仅是算法,更要考虑分布式架构
. 性能权衡: 讨论不同⽅案的时间空间复杂度
. 实际应⽤: 结合Meta的实际业务场景
. 可扩展性: 考虑未来增⻓和新功能需求
这道题⽬展现了Meta对于⼤规模图算法和系统设计能⼒的重视,以及对社交⽹络核⼼业务
的深度理解。
数据处理与分析Question : Amazon $ - $ E-commerce Data Pipeline Optimization Background: You're interviewing for a Senior Data Scientist role at Amazon. The team manages the product recommendation engine that processes customer browsing and purchase data in real-time. Question: "We have a data pipeline that processes customer interaction data from our e-commerce platform. The pipeline needs to handle M+ events per hour during peak shopping periods like Black Friday. Here's our current data processing logic:
import pandas as pd
import numpy as np
def process_customer_events(events_df):
# Events format: user_id, product_id, event_type, timestamp, session_id
# Calculate session-based metrics
session_metrics = events_df.groupby(['user_id', 'session_id']).agg({
'product_id': 'nunique', # unique products viewed
'event_type': 'count', # total events
'timestamp': ['min', 'max'] # session duration
})
# Calculate user behavior patterns
user_patterns = events_df.groupby('user_id').agg({
'session_id': 'nunique', # number of sessions
'product_id': 'nunique', # unique products
'event_type': lambda x: (x == 'purchase').sum() # purchases
})
return session_metrics, user_patterns
This pipeline is becoming too slow and memory-intensive. Optimize it for real-time
processing and explain how you would handle data quality issues, missing values, and
implement incremental updates."
思路分析:
这道题⽬体现了Amazon作为电商巨头⾯临的核⼼挑战 - 实时处理海量⽤⼾⾏为数据。需要
考虑性能优化、内存管理、数据质量和增量更新等多个维度。
问题理解与澄清: ⾸先需要明确:数据的延迟要求是什么?是否需要exactly-once处理?数
据质量问题的具体表现?增量更新的频率?是否可以接受近似结果?
性能瓶颈分析: 当前代码的主要问题:1) 多次groupby操作效率低;2) 内存使⽤量⼤;3) 没
有利⽤向量化操作;4) 缺乏增量处理能⼒;5) 没有数据质量检查。
优化⽅案设计:
第⼀层优化 - 向量化和内存优化:
import pandas as pd
import numpy as np
from typing import Tuple, Dict
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timedelta
class OptimizedEventProcessor:
def __init__(self, chunk_size: int = 1000000):
self.chunk_size = chunk_size
self.session_cache = {}
self.user_cache = {}
def validate_and_clean_data(self, events_df: pd.DataFrame) -> pd.DataFrame:
"""
数据质量检查和清洗
"""
original_size = len(events_df)
# 1. 删除必要字段为空的记录
events_df = events_df.dropna(subset=['user_id', 'product_id',
'event_type', 'timestamp'])
# 2. 数据类型转换和验证
events_df['user_id'] = pd.to_numeric(events_df['user_id'],
errors='coerce')
events_df['product_id'] = pd.to_numeric(events_df['product_id'],
errors='coerce')
events_df['timestamp'] = pd.to_datetime(events_df['timestamp'],
errors='coerce')
# 3. 删除转换失败的记录
events_df = events_df.dropna()
# 4. 验证事件类型
valid_event_types = {'view', 'cart', 'purchase', 'click'}
events_df = events_df[events_df['event_type'].isin(valid_event_types)]
# 5. 时间范围验证(过滤异常时间戳)
now = datetime.now()
one_year_ago = now - timedelta(days=365)
events_df = events_df[
(events_df['timestamp'] >= one_year_ago) &
(events_df['timestamp'] <= now)
]
# 6. 生成session_id(如果缺失)
if 'session_id' not in events_df.columns or
events_df['session_id'].isna().any():
events_df = self._generate_session_ids(events_df)
print(f"Data cleaning: {original_size} -> {len(events_df)} records
({len(events_df)/original_size*100:.1f}% retained)")
return events_df
def _generate_session_ids(self, events_df: pd.DataFrame) -> pd.DataFrame:
"""
基于时间间隔生成session_id
"""
events_df = events_df.sort_values(['user_id', 'timestamp'])
# 计算时间差(30分钟无活动则认为是新session)
events_df['time_diff'] = events_df.groupby('user_id')
['timestamp'].diff()
events_df['new_session'] = (events_df['time_diff'] >
timedelta(minutes=30)) | events_df['time_diff'].isna()
# 生成session_id
events_df['session_id'] = events_df.groupby('user_id')
['new_session'].cumsum()
events_df['session_id'] = events_df['user_id'].astype(str) + '_' +
events_df['session_id'].astype(str)
# 清理临时列
events_df = events_df.drop(['time_diff', 'new_session'], axis=1)
return events_df
def process_events_optimized(self, events_df: pd.DataFrame) ->
Tuple[pd.DataFrame, pd.DataFrame]:
"""
优化版本的事件处理
"""
# 数据清洗
events_df = self.validate_and_clean_data(events_df)
# 使用更高效的数据类型
events_df['user_id'] = events_df['user_id'].astype('int32')
events_df['product_id'] = events_df['product_id'].astype('int32')
events_df['event_type'] = events_df['event_type'].astype('category')
# 预计算一些常用字段
events_df['is_purchase'] = (events_df['event_type'] ==
'purchase').astype('int8')
# 一次性计算所有session指标
session_agg = events_df.groupby(['user_id', 'session_id']).agg({
'product_id': 'nunique',
'event_type': 'count',
'timestamp': ['min', 'max'],
'is_purchase': 'sum'
}).reset_index()
# 扁平化列名
session_agg.columns = ['user_id', 'session_id', 'unique_products',
'total_events',
'session_start', 'session_end', 'purchases']
# 计算session时长(分钟)
session_agg['session_duration_minutes'] = (
session_agg['session_end'] - session_agg['session_start']
).dt.total_seconds() / 60
# 一次性计算所有用户指标
user_agg = events_df.groupby('user_id').agg({
'session_id': 'nunique',
'product_id': 'nunique',
'is_purchase': 'sum',
'event_type': 'count'
}).reset_index()
user_agg.columns = ['user_id', 'total_sessions', 'unique_products',
'total_purchases', 'total_events']
# 计算转化率
user_agg['conversion_rate'] = user_agg['total_purchases'] /
user_agg['total_events']
return session_agg, user_agg
def process_events_streaming(self, events_iterator) -> Dict:
"""
流式处理版本,适用于实时数据
"""
session_metrics = {}
user_metrics = {}
for chunk in events_iterator:
chunk_df = pd.DataFrame(chunk)
if chunk_df.empty:
continue
# 处理当前chunk
session_chunk, user_chunk = self.process_events_optimized(chunk_df)
# 增量更新session指标
for _, row in session_chunk.iterrows():
key = (row['user_id'], row['session_id'])
if key in session_metrics:
# 合并已存在的session数据
existing = session_metrics[key]
session_metrics[key] = {
'unique_products': max(existing['unique_products'],
row['unique_products']),
'total_events': existing['total_events'] +
row['total_events'],
'session_start': min(existing['session_start'],
row['session_start']),
'session_end': max(existing['session_end'],
row['session_end']),
'purchases': existing['purchases'] + row['purchases']
}
else:
session_metrics[key] = row.to_dict()
# 增量更新用户指标
for _, row in user_chunk.iterrows():
user_id = row['user_id']
if user_id in user_metrics:
existing = user_metrics[user_id]
user_metrics[user_id] = {
'total_sessions': max(existing['total_sessions'],
row['total_sessions']),
'unique_products': max(existing['unique_products'],
row['unique_products']),
'total_purchases': existing['total_purchases'] +
row['total_purchases'],
'total_events': existing['total_events'] +
row['total_events']
}
else:
user_metrics[user_id] = row.to_dict()
return {'session_metrics': session_metrics, 'user_metrics':
user_metrics}
实时处理架构设计:
import asyncio
from kafka import KafkaConsumer, KafkaProducer
import redis
import json
from concurrent.futures import ThreadPoolExecutor
import logging
class RealTimeEventProcessor:
def __init__(self, kafka_config: Dict, redis_config: Dict):
self.kafka_consumer = KafkaConsumer(
'customer_events',
bootstrap_servers=kafka_config['servers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest',
group_id='event_processor'
)
self.kafka_producer = KafkaProducer(
bootstrap_servers=kafka_config['servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.redis_client = redis.Redis(**redis_config)
self.processor = OptimizedEventProcessor()
self.executor = ThreadPoolExecutor(max_workers=4)
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def process_real_time_events(self):
"""
实时处理事件流
"""
batch = []
batch_size = 1000
last_process_time = datetime.now()
try:
for message in self.kafka_consumer:
event = message.value
batch.append(event)
# 批量处理或超时处理
current_time = datetime.now()
if (len(batch) >= batch_size or
(current_time - last_process_time).seconds >= 5):
# 异步处理批次
await self.process_batch(batch)
batch = []
last_process_time = current_time
except Exception as e:
self.logger.error(f"Error in real-time processing: {e}")
async def process_batch(self, events: List[Dict]):
"""
处理事件批次
"""
try:
# 转换为DataFrame
events_df = pd.DataFrame(events)
# 异步处理
loop = asyncio.get_event_loop()
session_metrics, user_metrics = await loop.run_in_executor(
self.executor,
self.processor.process_events_optimized,
events_df
)
# 更新Redis缓存
await self.update_metrics_cache(session_metrics, user_metrics)
# 发送处理结果到下游
await self.send_processed_metrics(session_metrics, user_metrics)
self.logger.info(f"Processed batch of {len(events)} events")
except Exception as e:
self.logger.error(f"Error processing batch: {e}")
async def update_metrics_cache(self, session_metrics: pd.DataFrame,
user_metrics: pd.DataFrame):
"""
更新Redis缓存
"""
pipe = self.redis_client.pipeline()
# 更新session指标
for _, row in session_metrics.iterrows():
key = f"session:{row['user_id']}:{row['session_id']}"
pipe.hset(key, mapping=row.to_dict())
pipe.expire(key, 86400) # 24小时过期
# 更新用户指标
for _, row in user_metrics.iterrows():
key = f"user:{row['user_id']}"
# 增量更新用户指标
existing = self.redis_client.hgetall(key)
if existing:
# 合并现有数据
for field in ['total_sessions', 'unique_products',
'total_purchases', 'total_events']:
if field in existing:
row[field] = max(int(existing[field]), row[field])
pipe.hset(key, mapping=row.to_dict())
pipe.expire(key, 604800) # 7天过期
await asyncio.get_event_loop().run_in_executor(None, pipe.execute)
async def send_processed_metrics(self, session_metrics: pd.DataFrame,
user_metrics: pd.DataFrame):
"""
发送处理结果到下游系统
"""
# 发送到推荐系统
for _, row in user_metrics.iterrows():
if row['conversion_rate'] > 0.1: # 高转化用户
recommendation_event = {
'user_id': row['user_id'],
'user_type': 'high_conversion',
'metrics': row.to_dict(),
'timestamp': datetime.now().isoformat()
}
self.kafka_producer.send('user_recommendations',
recommendation_event)
# 发送到分析系统
analytics_event = {
'session_count': len(session_metrics),
'user_count': len(user_metrics),
'avg_conversion_rate': user_metrics['conversion_rate'].mean(),
'timestamp': datetime.now().isoformat()
}
self.kafka_producer.send('analytics_metrics', analytics_event)
数据质量监控:
class DataQualityMonitor:
def __init__(self, redis_client):
self.redis_client = redis_client
self.quality_thresholds = {
'null_rate': 0.05, # 最大5%空值率
'duplicate_rate': 0.02, # 最大2%重复率
'outlier_rate': 0.01, # 最大1%异常值率
}
def check_data_quality(self, events_df: pd.DataFrame) -> Dict:
"""
检查数据质量
"""
total_records = len(events_df)
quality_report = {}
# 1. 空值检查
null_counts = events_df.isnull().sum()
quality_report['null_rates'] = (null_counts / total_records).to_dict()
# 2. 重复记录检查
duplicate_count = events_df.duplicated().sum()
quality_report['duplicate_rate'] = duplicate_count / total_records
# 3. 时间戳异常检查
now = datetime.now()
future_timestamps = (events_df['timestamp'] > now).sum()
old_timestamps = (events_df['timestamp'] < now -
timedelta(days=30)).sum()
quality_report['timestamp_anomalies'] = (future_timestamps +
old_timestamps) / total_records
# 4. 用户行为异常检查
user_event_counts = events_df['user_id'].value_counts()
outlier_users = (user_event_counts >
user_event_counts.quantile(0.99)).sum()
quality_report['user_behavior_outliers'] = outlier_users /
len(user_event_counts)
# 5. 整体质量评分
quality_score = self._calculate_quality_score(quality_report)
quality_report['overall_score'] = quality_score
# 6. 存储质量报告
self._store_quality_report(quality_report)
return quality_report
def _calculate_quality_score(self, quality_report: Dict) -> float:
"""
计算整体数据质量评分
"""
score = 100.0
# 扣分规则
for field, null_rate in quality_report['null_rates'].items():
if null_rate > self.quality_thresholds['null_rate']:
score -= (null_rate - self.quality_thresholds['null_rate']) *
100
if quality_report['duplicate_rate'] >
self.quality_thresholds['duplicate_rate']:
score -= (quality_report['duplicate_rate'] -
self.quality_thresholds['duplicate_rate']) * 200
if quality_report['timestamp_anomalies'] >
self.quality_thresholds['outlier_rate']:
score -= quality_report['timestamp_anomalies'] * 300
return max(0, score)
def _store_quality_report(self, quality_report: Dict):
"""
存储质量报告到Redis
"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
key = f"data_quality:{timestamp}"
self.redis_client.hset(key, mapping={
k: json.dumps(v) if isinstance(v, dict) else str(v)
for k, v in quality_report.items()
})
self.redis_client.expire(key, 604800) # 7天过期
⾯试回答要点:
. 性能优化: 展⽰对pandas优化、内存管理的深度理解
. 实时处理: 体现对流式处理架构的掌握
. 数据质量: 强调数据质量监控的重要性
. 可扩展性: 考虑Amazon规模下的系统设计
. 业务理解: 结合电商场景的实际需求
这道题⽬展现了Amazon对于⼤规模数据处理能⼒和系统⼯程思维的重视。Question : Netflix $ - $ Content Recommendation Data Analysis Background: You're interviewing for a Data Scientist position at Netflix. The team is responsible for analyzing viewing patterns to improve the recommendation algorithm. Question: "We need to analyze user viewing behavior to identify content preferences and improve our recommendation system. We have viewing data with the following structure: user_id, content_id, watch_time, total_duration, genre, release_year, device_type, timestamp. Your task is to: . Calculate user engagement metrics (completion rate, binge-watching patterns) . Identify content performance across different user segments . Detect seasonal viewing trends . Build a content similarity matrix based on user co-viewing patterns The dataset contains M+ viewing records. How would you approach this analysis efficiently in Python?" 思路分析: 这道题⽬体现了Netflix作为流媒体巨头的核⼼数据挑战。需要处理海量观看数据,提取⽤⼾ ⾏为洞察,并为推荐系统提供数据⽀持。这是⼀个典型的⼤规模数据分析和特征⼯程问题。 问题理解与澄清: ⾸先需要明确:数据的时间跨度是多久?是否需要实时分析?⽤⼾分群的 标准是什么?内容相似度的定义?分析结果的更新频率? 数据结构设计和预处理:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics.pairwise import cosine_similarity
from scipy.sparse import csr_matrix
import dask.dataframe as dd
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
class NetflixViewingAnalyzer:
def __init__(self, chunk_size: int = 5000000):
"""
Netflix观看数据分析器
"""
self.chunk_size = chunk_size
self.user_segments = {}
self.content_metadata = {}
def preprocess_viewing_data(self, viewing_df: pd.DataFrame) ->
pd.DataFrame:
"""
预处理观看数据
"""
print(f"Processing {len(viewing_df):,} viewing records...")
# 数据类型优化
viewing_df['user_id'] = viewing_df['user_id'].astype('int32')
viewing_df['content_id'] = viewing_df['content_id'].astype('int32')
viewing_df['watch_time'] = viewing_df['watch_time'].astype('float32')
viewing_df['total_duration'] =
viewing_df['total_duration'].astype('float32')
viewing_df['release_year'] = viewing_df['release_year'].astype('int16')
viewing_df['genre'] = viewing_df['genre'].astype('category')
viewing_df['device_type'] =
viewing_df['device_type'].astype('category')
viewing_df['timestamp'] = pd.to_datetime(viewing_df['timestamp'])
# 计算基础指标
viewing_df['completion_rate'] = np.clip(
viewing_df['watch_time'] / viewing_df['total_duration'], 0, 1
)
# 提取时间特征
viewing_df['hour'] = viewing_df['timestamp'].dt.hour
viewing_df['day_of_week'] = viewing_df['timestamp'].dt.dayofweek
viewing_df['month'] = viewing_df['timestamp'].dt.month
viewing_df['is_weekend'] = viewing_df['day_of_week'].isin([5, 6])
# 内容年龄分类
current_year = datetime.now().year
viewing_df['content_age'] = current_year - viewing_df['release_year']
viewing_df['content_age_category'] = pd.cut(
viewing_df['content_age'],
bins=[0, 1, 3, 5, 10, 100],
labels=['New', 'Recent', 'Moderate', 'Old', 'Classic']
)
# 过滤异常数据
viewing_df = viewing_df[
(viewing_df['watch_time'] > 0) &
(viewing_df['total_duration'] > 0) &
(viewing_df['completion_rate'] <= 1.0)
]
print(f"After preprocessing: {len(viewing_df):,} records")
return viewing_df
def calculate_user_engagement_metrics(self, viewing_df: pd.DataFrame) ->
pd.DataFrame:
"""
计算用户参与度指标
"""
print("Calculating user engagement metrics...")
# 基础用户指标
user_metrics = viewing_df.groupby('user_id').agg({
'content_id': 'nunique', # 观看的独特内容数
'watch_time': ['sum', 'mean'], # 总观看时长和平均观看时长
'completion_rate': ['mean', 'std'], # 平均完成率和标准差
'timestamp': ['count', 'min', 'max'], # 观看次数和时间范围
'genre': lambda x: x.mode().iloc[0] if not x.empty else 'Unknown',
# 最喜欢的类型
'device_type': lambda x: x.mode().iloc[0] if not x.empty else
'Unknown' # 主要设备
}).reset_index()
# 扁平化列名
user_metrics.columns = [
'user_id', 'unique_content', 'total_watch_time', 'avg_watch_time',
'avg_completion_rate', 'completion_rate_std', 'total_sessions',
'first_watch', 'last_watch', 'favorite_genre', 'primary_device'
]
# 计算用户活跃天数
user_metrics['active_days'] = (
user_metrics['last_watch'] - user_metrics['first_watch']
).dt.days + 1
# 计算平均每日观看时长
user_metrics['daily_watch_time'] = (
user_metrics['total_watch_time'] / user_metrics['active_days']
)
# 识别狂欢观看模式
binge_patterns = self._identify_binge_patterns(viewing_df)
user_metrics = user_metrics.merge(binge_patterns, on='user_id',
how='left')
# 用户分群
user_metrics = self._segment_users(user_metrics)
return user_metrics
def _identify_binge_patterns(self, viewing_df: pd.DataFrame) ->
pd.DataFrame:
"""
识别狂欢观看模式
"""
# 按用户和日期分组,计算每日观看时长
daily_viewing = viewing_df.groupby(['user_id',
viewing_df['timestamp'].dt.date]).agg({
'watch_time': 'sum',
'content_id': 'nunique'
}).reset_index()
daily_viewing.columns = ['user_id', 'date', 'daily_watch_time',
'daily_unique_content']
# 定义狂欢观看:单日观看超过4小时或观看超过3个不同内容
daily_viewing['is_binge_day'] = (
(daily_viewing['daily_watch_time'] > 240) | # 4小时 = 240分钟
(daily_viewing['daily_unique_content'] >= 3)
)
# 计算用户的狂欢观看指标
binge_metrics = daily_viewing.groupby('user_id').agg({
'is_binge_day': ['sum', 'mean'],
'daily_watch_time': 'max'
}).reset_index()
binge_metrics.columns = ['user_id', 'binge_days', 'binge_frequency',
'max_daily_watch_time']
return binge_metrics
def _segment_users(self, user_metrics: pd.DataFrame) -> pd.DataFrame:
"""
用户分群
"""
# 基于观看行为进行用户分群
conditions = [
(user_metrics['daily_watch_time'] >= 120) &
(user_metrics['binge_frequency'] >= 0.3),
(user_metrics['daily_watch_time'] >= 60) &
(user_metrics['avg_completion_rate'] >= 0.7),
(user_metrics['daily_watch_time'] >= 30) &
(user_metrics['total_sessions'] >= 50),
(user_metrics['daily_watch_time'] >= 15) &
(user_metrics['active_days'] >= 30),
]
choices = ['Heavy Binger', 'Quality Viewer', 'Regular User', 'Casual
Viewer']
user_metrics['user_segment'] = np.select(conditions, choices,
default='Light User')
return user_metrics
def analyze_content_performance(self, viewing_df: pd.DataFrame,
user_metrics: pd.DataFrame) -> pd.DataFrame:
"""
分析内容表现
"""
print("Analyzing content performance...")
# 合并用户分群信息
viewing_with_segments = viewing_df.merge(
user_metrics[['user_id', 'user_segment']],
on='user_id',
how='left'
)
# 内容基础指标
content_metrics = viewing_with_segments.groupby('content_id').agg({
'user_id': 'nunique', # 独特观看用户数
'watch_time': ['sum', 'mean'], # 总观看时长和平均观看时长
'completion_rate': ['mean', 'std', lambda x: (x >= 0.8).mean()], #
完成率指标
'timestamp': 'count', # 总观看次数
'genre': 'first', # 类型
'release_year': 'first', # 发布年份
'total_duration': 'first' # 总时长
}).reset_index()
# 扁平化列名
content_metrics.columns = [
'content_id', 'unique_viewers', 'total_watch_time',
'avg_watch_time',
'avg_completion_rate', 'completion_rate_std',
'high_completion_rate',
'total_views', 'genre', 'release_year', 'total_duration'
]
# 计算内容受欢迎程度指标
content_metrics['popularity_score'] = (
content_metrics['unique_viewers'] * 0.4 +
content_metrics['avg_completion_rate'] * 100 * 0.3 +
content_metrics['total_views'] * 0.3
)
# 按用户分群分析内容表现
segment_performance = viewing_with_segments.groupby(['content_id',
'user_segment']).agg({
'completion_rate': 'mean',
'user_id': 'nunique'
}).reset_index()
segment_pivot = segment_performance.pivot(
index='content_id',
columns='user_segment',
values='completion_rate'
).fillna(0)
# 合并分群表现数据
content_metrics = content_metrics.merge(
segment_pivot,
left_on='content_id',
right_index=True,
how='left'
)
return content_metrics
def detect_seasonal_trends(self, viewing_df: pd.DataFrame) -> Dict:
"""
检测季节性观看趋势
"""
print("Detecting seasonal viewing trends...")
# 按月份分析
monthly_trends = viewing_df.groupby(['month', 'genre']).agg({
'watch_time': 'sum',
'user_id': 'nunique',
'completion_rate': 'mean'
}).reset_index()
# 按小时分析
hourly_trends = viewing_df.groupby(['hour', 'device_type']).agg({
'watch_time': 'sum',
'user_id': 'nunique'
}).reset_index()
# 周末vs工作日分析
weekend_analysis = viewing_df.groupby(['is_weekend', 'genre']).agg({
'watch_time': 'mean',
'completion_rate': 'mean',
'user_id': 'nunique'
}).reset_index()
# 内容年龄趋势
content_age_trends = viewing_df.groupby(['content_age_category',
'month']).agg({
'watch_time': 'sum',
'completion_rate': 'mean'
}).reset_index()
return {
'monthly_trends': monthly_trends,
'hourly_trends': hourly_trends,
'weekend_analysis': weekend_analysis,
'content_age_trends': content_age_trends
}
def build_content_similarity_matrix(self, viewing_df: pd.DataFrame,
min_common_users: int = 50) -> np.ndarray:
"""
基于用户共同观看模式构建内容相似度矩阵
"""
print("Building content similarity matrix...")
# 过滤低质量观看记录(完成率低于20%)
quality_viewing = viewing_df[viewing_df['completion_rate'] >=
0.2].copy()
# 创建用户-内容矩阵
user_content_matrix = quality_viewing.pivot_table(
index='user_id',
columns='content_id',
values='completion_rate',
fill_value=0
)
print(f"User-content matrix shape: {user_content_matrix.shape}")
# 过滤观看用户数少的内容
content_user_counts = (user_content_matrix > 0).sum(axis=0)
popular_content = content_user_counts[content_user_counts >=
min_common_users].index
filtered_matrix = user_content_matrix[popular_content]
print(f"Filtered matrix shape: {filtered_matrix.shape}")
# 转换为稀疏矩阵以节省内存
sparse_matrix = csr_matrix(filtered_matrix.T) # 转置:内容x用户
# 计算余弦相似度
similarity_matrix = cosine_similarity(sparse_matrix)
# 创建内容ID到索引的映射
content_to_idx = {content_id: idx for idx, content_id in
enumerate(popular_content)}
idx_to_content = {idx: content_id for content_id, idx in
content_to_idx.items()}
return similarity_matrix, content_to_idx, idx_to_content
def get_similar_content(self, content_id: int, similarity_matrix:
np.ndarray,
content_to_idx: Dict, idx_to_content: Dict, top_k:
int = 10) -> List[Tuple[int, float]]:
"""
获取相似内容推荐
"""
if content_id not in content_to_idx:
return []
content_idx = content_to_idx[content_id]
similarities = similarity_matrix[content_idx]
# 获取最相似的内容(排除自身)
similar_indices = np.argsort(similarities)[::-1][1:top_k+1]
similar_content = [
(idx_to_content[idx], similarities[idx])
for idx in similar_indices
]
return similar_content
def generate_comprehensive_report(self, viewing_df: pd.DataFrame) -> Dict:
"""
生成综合分析报告
"""
print("Generating comprehensive analysis report...")
# 预处理数据
viewing_df = self.preprocess_viewing_data(viewing_df)
# 计算各项指标
user_metrics = self.calculate_user_engagement_metrics(viewing_df)
content_metrics = self.analyze_content_performance(viewing_df,
user_metrics)
seasonal_trends = self.detect_seasonal_trends(viewing_df)
similarity_matrix, content_to_idx, idx_to_content =
self.build_content_similarity_matrix(viewing_df)
# 生成关键洞察
insights = self._generate_key_insights(user_metrics, content_metrics,
seasonal_trends)
return {
'user_metrics': user_metrics,
'content_metrics': content_metrics,
'seasonal_trends': seasonal_trends,
'similarity_matrix': similarity_matrix,
'content_mappings': {'content_to_idx': content_to_idx,
'idx_to_content': idx_to_content},
'key_insights': insights
}
def _generate_key_insights(self, user_metrics: pd.DataFrame,
content_metrics: pd.DataFrame,
seasonal_trends: Dict) -> Dict:
"""
生成关键业务洞察
"""
insights = {}
# 用户洞察
insights['user_insights'] = {
'total_users': len(user_metrics),
'avg_daily_watch_time': user_metrics['daily_watch_time'].mean(),
'binge_user_percentage': (user_metrics['binge_frequency'] >
0.2).mean() * 100,
'top_user_segment': user_metrics['user_segment'].mode().iloc[0],
'avg_completion_rate': user_metrics['avg_completion_rate'].mean()
}
# 内容洞察
insights['content_insights'] = {
'total_content': len(content_metrics),
'avg_completion_rate':
content_metrics['avg_completion_rate'].mean(),
'most_popular_genre': content_metrics.groupby('genre')
['popularity_score'].mean().idxmax(),
'high_performing_content_count':
(content_metrics['avg_completion_rate'] > 0.8).sum()
}
# 季节性洞察
monthly_data = seasonal_trends['monthly_trends']
peak_month = monthly_data.groupby('month')['watch_time'].sum().idxmax()
insights['seasonal_insights'] = {
'peak_viewing_month': peak_month,
'weekend_vs_weekday_ratio':
seasonal_trends['weekend_analysis'].groupby('is_weekend')
['watch_time'].mean().iloc[1] /
seasonal_trends['weekend_analysis'].groupby('is_weekend')
['watch_time'].mean().iloc[0]
}
return insights
⼤规模数据处理优化:
import dask.dataframe as dd
from dask.distributed import Client
import pyarrow.parquet as pq
class ScalableNetflixAnalyzer:
def __init__(self, dask_client=None):
"""
可扩展的Netflix分析器,使用Dask处理大规模数据
"""
self.client = dask_client or Client('localhost:8786')
def process_large_dataset(self, data_path: str) -> Dict:
"""
处理大规模数据集(500M+记录)
"""
print("Loading large dataset with Dask...")
# 使用Dask读取分区数据
df = dd.read_parquet(data_path, engine='pyarrow')
print(f"Dataset shape: {df.shape[0].compute():,} rows")
# 并行计算用户指标
user_metrics = self._compute_user_metrics_parallel(df)
# 并行计算内容指标
content_metrics = self._compute_content_metrics_parallel(df)
# 计算相似度矩阵(采样方法)
similarity_data = self._compute_similarity_sampled(df)
return {
'user_metrics': user_metrics,
'content_metrics': content_metrics,
'similarity_data': similarity_data
}
def _compute_user_metrics_parallel(self, df: dd.DataFrame) -> pd.DataFrame:
"""
并行计算用户指标
"""
# 使用Dask的groupby操作
user_agg = df.groupby('user_id').agg({
'content_id': 'nunique',
'watch_time': ['sum', 'mean'],
'completion_rate': ['mean', 'std'],
'timestamp': 'count'
})
# 计算结果并转换为pandas
return user_agg.compute()
def _compute_content_metrics_parallel(self, df: dd.DataFrame) ->
pd.DataFrame:
"""
并行计算内容指标
"""
content_agg = df.groupby('content_id').agg({
'user_id': 'nunique',
'watch_time': ['sum', 'mean'],
'completion_rate': ['mean', 'std'],
'timestamp': 'count'
})
return content_agg.compute()
def _compute_similarity_sampled(self, df: dd.DataFrame, sample_rate: float
= 0.1) -> Dict:
"""
使用采样方法计算内容相似度
"""
# 采样数据以减少计算量
sampled_df = df.sample(frac=sample_rate).compute()
# 使用采样数据构建相似度矩阵
analyzer = NetflixViewingAnalyzer()
similarity_matrix, content_to_idx, idx_to_content =
analyzer.build_content_similarity_matrix(sampled_df)
return {
'similarity_matrix': similarity_matrix,
'content_to_idx': content_to_idx,
'idx_to_content': idx_to_content,
'sample_rate': sample_rate
}
⾯试回答要点:
. ⼤规模数据处理: 展⽰对Dask、分布式计算的理解
. 特征⼯程: 体现对⽤⼾⾏为分析的深度思考
. 推荐系统: 结合Netflix的核⼼业务场景
. 性能优化: 考虑内存效率和计算复杂度
. 业务洞察: 提供可操作的分析结果
这道题⽬展现了Netflix对于⼤规模⽤⼾⾏为分析和推荐系统数据处理能⼒的重视。
机器学习实现Question : Uber $ - $ Dynamic Pricing ML Model Background: You're interviewing for a Senior Data Scientist role at Uber. The team is working on improving the dynamic pricing algorithm that adjusts ride prices based on supply, demand, and various external factors. Question: "We need to build a machine learning model that predicts optimal pricing multipliers for different areas and times. The model should consider factors like: $ - $ Historical demand patterns $ - $ Driver availability $ - $ Weather conditions $ - $ Local events $ - $ Traffic conditions $ - $ Time of day/week Design and implement a Python solution that can: . Process real-time data streams . Train and update the model incrementally . Provide price predictions with confidence intervals . Handle concept drift and model degradation . Ensure fairness across different demographic areas How would you approach this end-to-end ML pipeline?" 思路分析: 这道题⽬体现了Uber作为共享出⾏平台⾯临的核⼼挑战 $ - $ 动态定价优化。这是⼀个典型的实 时机器学习问题,需要考虑多源数据融合、在线学习、公平性约束等复杂因素。 问题理解与澄清: ⾸先需要明确:价格调整的频率?模型更新的延迟要求?公平性的具体定 义?历史数据的可⽤性?外部数据源的可靠性? 系统架构设计:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error
import joblib
import redis
import json
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import logging
import warnings
warnings.filterwarnings('ignore')
class UberDynamicPricingML:
def __init__(self, redis_config: Dict, model_config: Dict):
"""
Uber动态定价机器学习系统
"""
self.redis_client = redis.Redis(**redis_config)
self.model_config = model_config
# 模型组件
self.primary_model = None
self.backup_model = None
self.scaler = StandardScaler()
self.label_encoders = {}
# 特征工程组件
self.feature_columns = []
self.categorical_columns = ['area_id', 'weather_condition',
'event_type']
self.numerical_columns = ['driver_count', 'demand_count',
'temperature', 'traffic_index']
# 模型性能监控
self.performance_history = []
self.drift_detector = ConceptDriftDetector()
# 公平性约束
self.fairness_constraints = FairnessConstraints()
# 日志设置
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def prepare_features(self, raw_data: pd.DataFrame) -> pd.DataFrame:
"""
特征工程和数据预处理
"""
df = raw_data.copy()
# 时间特征提取
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['month'] = df['timestamp'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_rush_hour'] = df['hour'].isin([7, 8, 9, 17, 18, 19]).astype(int)
# 需求供给比特征
df['demand_supply_ratio'] = df['demand_count'] / (df['driver_count'] +
1) # 避免除零
df['supply_shortage'] = np.maximum(0, df['demand_count'] -
df['driver_count'])
# 历史价格特征
df = self._add_historical_price_features(df)
# 天气影响特征
df = self._add_weather_features(df)
# 事件影响特征
df = self._add_event_features(df)
# 地理区域特征
df = self._add_area_features(df)
# 滞后特征(前一小时的数据)
df = self._add_lag_features(df)
return df
def _add_historical_price_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加历史价格特征
"""
# 从Redis获取历史价格数据
for _, row in df.iterrows():
area_id = row['area_id']
timestamp = row['timestamp']
# 获取过去24小时的价格数据
historical_prices = self._get_historical_prices(area_id, timestamp,
hours=24)
if historical_prices:
df.loc[df.index == row.name, 'avg_price_24h'] =
np.mean(historical_prices)
df.loc[df.index == row.name, 'max_price_24h'] =
np.max(historical_prices)
df.loc[df.index == row.name, 'price_volatility'] =
np.std(historical_prices)
else:
df.loc[df.index == row.name, 'avg_price_24h'] = 1.0 # 默认基础价
格
df.loc[df.index == row.name, 'max_price_24h'] = 1.0
df.loc[df.index == row.name, 'price_volatility'] = 0.0
return df
def _add_weather_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加天气影响特征
"""
# 天气条件编码
weather_impact = {
'clear': 1.0,
'cloudy': 1.1,
'rain': 1.3,
'heavy_rain': 1.5,
'snow': 1.4,
'storm': 1.6
}
df['weather_impact'] =
df['weather_condition'].map(weather_impact).fillna(1.0)
# 温度影响(极端温度增加需求)
df['temp_impact'] = 1.0
df.loc[df['temperature'] < 0, 'temp_impact'] = 1.2 # 严寒
df.loc[df['temperature'] > 35, 'temp_impact'] = 1.15 # 酷热
return df
def _add_event_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加事件影响特征
"""
event_impact = {
'none': 1.0,
'concert': 1.4,
'sports': 1.3,
'conference': 1.2,
'festival': 1.5,
'emergency': 2.0
}
df['event_impact'] = df['event_type'].map(event_impact).fillna(1.0)
# 事件距离影响(距离事件地点越近影响越大)
df['event_distance_impact'] = np.exp(-df['event_distance'] / 5.0) #
5km衰减
return df
def _add_area_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加地理区域特征
"""
# 从Redis获取区域特征
area_features = self._get_area_features()
df = df.merge(area_features, on='area_id', how='left')
# 填充缺失值
df['area_income_level'] = df['area_income_level'].fillna('medium')
df['area_density'] =
df['area_density'].fillna(df['area_density'].median())
return df
def _add_lag_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加滞后特征
"""
# 按区域排序
df = df.sort_values(['area_id', 'timestamp'])
# 添加滞后特征
lag_columns = ['demand_count', 'driver_count', 'price_multiplier']
for col in lag_columns:
if col in df.columns:
df[f'{col}_lag1h'] = df.groupby('area_id')[col].shift(1)
df[f'{col}_lag2h'] = df.groupby('area_id')[col].shift(2)
return df
def train_model(self, training_data: pd.DataFrame, target_column: str =
'price_multiplier'):
"""
训练动态定价模型
"""
self.logger.info("Starting model training...")
# 特征工程
features_df = self.prepare_features(training_data)
# 准备训练数据
X, y = self._prepare_training_data(features_df, target_column)
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=5)
# 训练主模型(Gradient Boosting)
self.primary_model = GradientBoostingRegressor(
n_estimators=200,
learning_rate=0.1,
max_depth=6,
random_state=42
)
# 训练备用模型(Random Forest)
self.backup_model = RandomForestRegressor(
n_estimators=100,
max_depth=8,
random_state=42
)
# 交叉验证评估
cv_scores = []
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
# 训练模型
self.primary_model.fit(X_train, y_train)
# 预测和评估
y_pred = self.primary_model.predict(X_val)
score = mean_absolute_error(y_val, y_pred)
cv_scores.append(score)
# 在全部数据上训练最终模型
self.primary_model.fit(X, y)
self.backup_model.fit(X, y)
# 保存特征列信息
self.feature_columns = X.columns.tolist()
# 评估模型性能
train_pred = self.primary_model.predict(X)
train_mae = mean_absolute_error(y, train_pred)
self.logger.info(f"Model training completed. CV MAE:
{np.mean(cv_scores):.4f}, Train MAE: {train_mae:.4f}")
# 保存模型
self._save_model()
return {
'cv_scores': cv_scores,
'train_mae': train_mae,
'feature_importance': dict(zip(self.feature_columns,
self.primary_model.feature_importances_))
}
def _prepare_training_data(self, features_df: pd.DataFrame, target_column:
str) -> Tuple[pd.DataFrame, pd.Series]:
"""
准备训练数据
"""
# 删除缺失值
features_df = features_df.dropna()
# 分离特征和目标变量
y = features_df[target_column]
# 选择特征列
feature_cols = [col for col in features_df.columns if col not in
[target_column, 'timestamp']]
X = features_df[feature_cols]
# 处理分类变量
for col in self.categorical_columns:
if col in X.columns:
if col not in self.label_encoders:
self.label_encoders[col] = LabelEncoder()
X[col] =
self.label_encoders[col].fit_transform(X[col].astype(str))
else:
X[col] =
self.label_encoders[col].transform(X[col].astype(str))
# 标准化数值特征
numerical_cols = [col for col in self.numerical_columns if col in
X.columns]
if numerical_cols:
X[numerical_cols] = self.scaler.fit_transform(X[numerical_cols])
return X, y
def predict_price_multiplier(self, input_data: Dict) -> Dict:
"""
预测价格倍数
"""
try:
# 转换输入数据为DataFrame
input_df = pd.DataFrame([input_data])
# 特征工程
features_df = self.prepare_features(input_df)
# 准备预测数据
X = self._prepare_prediction_data(features_df)
# 主模型预测
primary_pred = self.primary_model.predict(X)[0]
# 备用模型预测(用于置信区间)
backup_pred = self.backup_model.predict(X)[0]
# 计算预测置信区间
pred_mean = (primary_pred + backup_pred) / 2
pred_std = abs(primary_pred - backup_pred) / 2
confidence_interval = (pred_mean - 1.96 * pred_std, pred_mean +
1.96 * pred_std)
# 应用公平性约束
adjusted_pred = self.fairness_constraints.apply_constraints(
pred_mean, input_data['area_id']
)
# 应用业务约束(价格倍数范围)
final_pred = np.clip(adjusted_pred, 0.8, 3.0)
return {
'price_multiplier': final_pred,
'confidence_interval': confidence_interval,
'raw_prediction': pred_mean,
'fairness_adjusted': adjusted_pred != pred_mean,
'model_version': self._get_model_version()
}
except Exception as e:
self.logger.error(f"Prediction error: {e}")
return {'price_multiplier': 1.0, 'error': str(e)}
def _prepare_prediction_data(self, features_df: pd.DataFrame) ->
pd.DataFrame:
"""
准备预测数据
"""
# 选择特征列
X = features_df[self.feature_columns]
# 处理分类变量
for col in self.categorical_columns:
if col in X.columns and col in self.label_encoders:
X[col] = self.label_encoders[col].transform(X[col].astype(str))
# 标准化数值特征
numerical_cols = [col for col in self.numerical_columns if col in
X.columns]
if numerical_cols:
X[numerical_cols] = self.scaler.transform(X[numerical_cols])
return X
def update_model_online(self, new_data: pd.DataFrame, target_column: str =
'price_multiplier'):
"""
在线增量更新模型
"""
self.logger.info("Performing online model update...")
try:
# 特征工程
features_df = self.prepare_features(new_data)
X, y = self._prepare_training_data(features_df, target_column)
# 检测概念漂移
drift_detected = self.drift_detector.detect_drift(X, y)
if drift_detected:
self.logger.warning("Concept drift detected! Triggering model
retraining...")
# 重新训练模型
self.train_model(new_data, target_column)
else:
# 增量更新(使用SGD)
if hasattr(self.primary_model, 'partial_fit'):
self.primary_model.partial_fit(X, y)
else:
# 对于不支持增量学习的模型,使用warm start
self.primary_model.n_estimators += 10
self.primary_model.fit(X, y)
# 更新性能监控
self._update_performance_monitoring(X, y)
except Exception as e:
self.logger.error(f"Online update error: {e}")
def _update_performance_monitoring(self, X: pd.DataFrame, y: pd.Series):
"""
更新模型性能监控
"""
# 预测性能
y_pred = self.primary_model.predict(X)
mae = mean_absolute_error(y, y_pred)
mse = mean_squared_error(y, y_pred)
# 记录性能历史
performance_record = {
'timestamp': datetime.now().isoformat(),
'mae': mae,
'mse': mse,
'sample_size': len(y)
}
self.performance_history.append(performance_record)
# 保持最近100条记录
if len(self.performance_history) > 100:
self.performance_history = self.performance_history[-100:]
# 存储到Redis
self.redis_client.lpush(
'model_performance_history',
json.dumps(performance_record)
)
self.redis_client.ltrim('model_performance_history', 0, 99)
def _get_historical_prices(self, area_id: str, timestamp: datetime, hours:
int = 24) -> List[float]:
"""
获取历史价格数据
"""
end_time = timestamp
start_time = end_time - timedelta(hours=hours)
# 从Redis获取历史价格
price_key = f"prices:{area_id}"
price_data = self.redis_client.zrangebyscore(
price_key,
start_time.timestamp(),
end_time.timestamp(),
withscores=True
)
return [float(price) for price, _ in price_data]
def _get_area_features(self) -> pd.DataFrame:
"""
获取区域特征数据
"""
# 从Redis获取区域特征
area_data = self.redis_client.hgetall('area_features')
if not area_data:
# 返回默认特征
return pd.DataFrame({
'area_id': [],
'area_income_level': [],
'area_density': []
})
# 解析区域数据
areas = []
for area_id, features_json in area_data.items():
features = json.loads(features_json)
features['area_id'] = area_id.decode('utf-8')
areas.append(features)
return pd.DataFrame(areas)
def _save_model(self):
"""
保存模型到文件和Redis
"""
model_data = {
'primary_model': self.primary_model,
'backup_model': self.backup_model,
'scaler': self.scaler,
'label_encoders': self.label_encoders,
'feature_columns': self.feature_columns,
'version': datetime.now().isoformat()
}
# 保存到文件
joblib.dump(model_data, 'uber_pricing_model.pkl')
# 保存模型版本信息到Redis
self.redis_client.hset(
'model_metadata',
'version',
model_data['version']
)
def _get_model_version(self) -> str:
"""
获取当前模型版本
"""
version = self.redis_client.hget('model_metadata', 'version')
return version.decode('utf-8') if version else 'unknown'
class ConceptDriftDetector:
"""
概念漂移检测器
"""
def __init__(self, window_size: int = 1000, threshold: float = 0.1):
self.window_size = window_size
self.threshold = threshold
self.reference_data = None
def detect_drift(self, X: pd.DataFrame, y: pd.Series) -> bool:
"""
检测概念漂移
"""
if self.reference_data is None:
self.reference_data = {'X': X, 'y': y}
return False
# 使用Kolmogorov-Smirnov测试检测分布变化
from scipy.stats import ks_2samp
drift_detected = False
# 检测特征分布变化
for col in X.columns:
if X[col].dtype in ['int64', 'float64']:
statistic, p_value = ks_2samp(
self.reference_data['X'][col],
X[col]
)
if p_value < self.threshold:
drift_detected = True
break
# 检测目标变量分布变化
if not drift_detected:
statistic, p_value = ks_2samp(
self.reference_data['y'],
y
)
if p_value < self.threshold:
drift_detected = True
# 更新参考数据
if len(X) > self.window_size:
self.reference_data = {'X': X.tail(self.window_size), 'y':
y.tail(self.window_size)}
return drift_detected
class FairnessConstraints:
"""
公平性约束处理器
"""
def __init__(self):
self.area_income_mapping = {}
self.fairness_adjustments = {
'low_income': 0.95, # 低收入区域价格调整系数
'medium_income': 1.0, # 中等收入区域
'high_income': 1.05 # 高收入区域
}
def apply_constraints(self, predicted_price: float, area_id: str) -> float:
"""
应用公平性约束
"""
# 获取区域收入水平
income_level = self._get_area_income_level(area_id)
# 应用调整系数
adjustment_factor = self.fairness_adjustments.get(income_level, 1.0)
adjusted_price = predicted_price * adjustment_factor
return adjusted_price
def _get_area_income_level(self, area_id: str) -> str:
"""
获取区域收入水平
"""
# 这里应该从数据库或缓存中获取真实的区域收入数据
# 为了演示,使用简化的映射
if area_id not in self.area_income_mapping:
# 默认为中等收入
self.area_income_mapping[area_id] = 'medium_income'
return self.area_income_mapping[area_id]
实时数据处理管道:
import asyncio
from kafka import KafkaConsumer, KafkaProducer
import json
class RealTimePricingPipeline:
def __init__(self, ml_model: UberDynamicPricingML, kafka_config: Dict):
self.ml_model = ml_model
self.kafka_consumer = KafkaConsumer(
'ride_requests',
'driver_locations',
'external_data',
bootstrap_servers=kafka_config['servers'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.kafka_producer = KafkaProducer(
bootstrap_servers=kafka_config['servers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.data_buffer = {}
async def process_real_time_stream(self):
"""
处理实时数据流
"""
for message in self.kafka_consumer:
topic = message.topic
data = message.value
if topic == 'ride_requests':
await self._handle_ride_request(data)
elif topic == 'driver_locations':
await self._update_driver_availability(data)
elif topic == 'external_data':
await self._update_external_factors(data)
async def _handle_ride_request(self, request_data: Dict):
"""
处理乘车请求
"""
area_id = request_data['pickup_area_id']
# 聚合当前区域数据
current_data = self._aggregate_area_data(area_id)
# 预测价格倍数
prediction = self.ml_model.predict_price_multiplier(current_data)
# 发送定价决策
pricing_decision = {
'request_id': request_data['request_id'],
'area_id': area_id,
'price_multiplier': prediction['price_multiplier'],
'confidence': prediction.get('confidence_interval', [1.0, 1.0]),
'timestamp': datetime.now().isoformat()
}
self.kafka_producer.send('pricing_decisions', pricing_decision)
def _aggregate_area_data(self, area_id: str) -> Dict:
"""
聚合区域数据
"""
# 从缓存中获取最新的区域数据
area_data = self.ml_model.redis_client.hgetall(f'area_data:{area_id}')
if not area_data:
# 返回默认数据
return {
'area_id': area_id,
'demand_count': 10,
'driver_count': 8,
'weather_condition': 'clear',
'temperature': 20,
'traffic_index': 1.0,
'event_type': 'none',
'event_distance': 10.0,
'timestamp': datetime.now().isoformat()
}
# 解析缓存数据
parsed_data = {}
for key, value in area_data.items():
try:
parsed_data[key.decode('utf-8')] =
json.loads(value.decode('utf-8'))
except:
parsed_data[key.decode('utf-8')] = value.decode('utf-8')
return parsed_data
⾯试回答要点:
. 端到端ML系统: 展⽰对完整ML pipeline的理解
. 实时处理: 体现对流式数据处理的掌握
. 公平性考虑: 强调算法公平性的重要性
. 概念漂移: 展⽰对模型维护的深度思考
. 业务理解: 结合Uber的实际业务场景
这道题⽬展现了Uber对于实时机器学习系统和算法公平性的重视,以及对复杂业务场景的
深度理解。
数据可视化与统计分析Question : Airbnb $ - $ Host Performance Analytics Dashboard Background: You're interviewing for a Data Analyst role at Airbnb. The team needs to build an analytics dashboard that helps hosts understand their performance and optimize their listings. Question: "We want to create a comprehensive host performance analytics system that provides: . Revenue optimization insights . Booking pattern analysis . Competitive positioning . Seasonal trend identification . Guest satisfaction correlation analysis Design a Python solution that can: $ - $ Process booking and review data efficiently $ - $ Generate interactive visualizations $ - $ Perform statistical significance testing $ - $ Provide actionable recommendations $ - $ Handle data quality issues and outliers The system should be scalable and provide real-time insights for M+ listings globally." 思路分析: 这道题⽬体现了Airbnb作为共享住宿平台的核⼼数据挑战。需要构建⼀个全⾯的分析系统, 帮助房东优化经营策略。这涉及多维度数据分析、统计检验、可视化设计等多个⽅⾯。 问题理解与澄清: ⾸先需要明确:数据更新频率?可视化的交互性要求?统计显著性的置信 ⽔平?推荐算法的复杂度?⽤⼾界⾯的技术栈? 系统架构设计:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.offline as pyo
from scipy import stats
from scipy.stats import pearsonr, spearmanr, chi2_contingency
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.ensemble import IsolationForest
import warnings
warnings.filterwarnings('ignore')
class AirbnbHostAnalytics:
def __init__(self):
"""
Airbnb房东表现分析系统
"""
self.booking_data = None
self.review_data = None
self.listing_data = None
self.market_data = None
# 分析配置
self.confidence_level = 0.95
self.outlier_threshold = 0.05
# 可视化配置
self.color_palette = px.colors.qualitative.Set3
self.plot_template = 'plotly_white'
def load_and_preprocess_data(self, booking_df: pd.DataFrame, review_df:
pd.DataFrame,
listing_df: pd.DataFrame) -> Dict:
"""
加载和预处理数据
"""
print("Loading and preprocessing Airbnb data...")
# 数据类型优化
booking_df = self._optimize_booking_data(booking_df)
review_df = self._optimize_review_data(review_df)
listing_df = self._optimize_listing_data(listing_df)
# 数据质量检查
quality_report = self._check_data_quality(booking_df, review_df,
listing_df)
# 异常值检测和处理
booking_df = self._handle_outliers(booking_df)
# 特征工程
booking_df = self._engineer_booking_features(booking_df)
review_df = self._engineer_review_features(review_df)
# 数据整合
integrated_data = self._integrate_datasets(booking_df, review_df,
listing_df)
self.booking_data = booking_df
self.review_data = review_df
self.listing_data = listing_df
return {
'integrated_data': integrated_data,
'quality_report': quality_report,
'data_summary': self._generate_data_summary(integrated_data)
}
def _optimize_booking_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
优化预订数据类型
"""
df = df.copy()
# 数据类型转换
df['listing_id'] = df['listing_id'].astype('int32')
df['host_id'] = df['host_id'].astype('int32')
df['guest_id'] = df['guest_id'].astype('int32')
df['price'] = df['price'].astype('float32')
df['nights'] = df['nights'].astype('int16')
df['guests'] = df['guests'].astype('int8')
# 日期处理
df['check_in'] = pd.to_datetime(df['check_in'])
df['check_out'] = pd.to_datetime(df['check_out'])
df['booking_date'] = pd.to_datetime(df['booking_date'])
return df
def _optimize_review_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
优化评论数据类型
"""
df = df.copy()
df['listing_id'] = df['listing_id'].astype('int32')
df['reviewer_id'] = df['reviewer_id'].astype('int32')
df['rating'] = df['rating'].astype('float32')
df['review_date'] = pd.to_datetime(df['review_date'])
return df
def _optimize_listing_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
优化房源数据类型
"""
df = df.copy()
df['listing_id'] = df['listing_id'].astype('int32')
df['host_id'] = df['host_id'].astype('int32')
df['latitude'] = df['latitude'].astype('float32')
df['longitude'] = df['longitude'].astype('float32')
df['bedrooms'] = df['bedrooms'].astype('int8')
df['bathrooms'] = df['bathrooms'].astype('float32')
df['accommodates'] = df['accommodates'].astype('int8')
# 分类变量
categorical_cols = ['property_type', 'room_type', 'neighborhood',
'city']
for col in categorical_cols:
if col in df.columns:
df[col] = df[col].astype('category')
return df
def _check_data_quality(self, booking_df: pd.DataFrame, review_df:
pd.DataFrame,
listing_df: pd.DataFrame) -> Dict:
"""
数据质量检查
"""
quality_report = {}
# 检查缺失值
quality_report['missing_values'] = {
'booking': booking_df.isnull().sum().to_dict(),
'review': review_df.isnull().sum().to_dict(),
'listing': listing_df.isnull().sum().to_dict()
}
# 检查重复记录
quality_report['duplicates'] = {
'booking': booking_df.duplicated().sum(),
'review': review_df.duplicated().sum(),
'listing': listing_df.duplicated().sum()
}
# 检查数据一致性
quality_report['consistency'] =
self._check_data_consistency(booking_df, review_df, listing_df)
return quality_report
def _check_data_consistency(self, booking_df: pd.DataFrame, review_df:
pd.DataFrame,
listing_df: pd.DataFrame) -> Dict:
"""
检查数据一致性
"""
consistency_issues = {}
# 检查listing_id一致性
booking_listings = set(booking_df['listing_id'].unique())
review_listings = set(review_df['listing_id'].unique())
listing_listings = set(listing_df['listing_id'].unique())
consistency_issues['missing_listings_in_booking'] = len(review_listings
- booking_listings)
consistency_issues['missing_listings_in_review'] = len(booking_listings
- review_listings)
consistency_issues['orphaned_bookings'] = len(booking_listings -
listing_listings)
# 检查日期逻辑
invalid_dates = booking_df[booking_df['check_in'] >=
booking_df['check_out']]
consistency_issues['invalid_date_ranges'] = len(invalid_dates)
# 检查价格合理性
negative_prices = booking_df[booking_df['price'] <= 0]
consistency_issues['negative_prices'] = len(negative_prices)
return consistency_issues
def _handle_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""
处理异常值
"""
df = df.copy()
# 使用Isolation Forest检测异常值
numerical_cols = ['price', 'nights']
for col in numerical_cols:
if col in df.columns:
# 检测异常值
isolation_forest =
IsolationForest(contamination=self.outlier_threshold, random_state=42)
outliers =
isolation_forest.fit_predict(df[[col]].fillna(df[col].median()))
# 标记异常值
df[f'{col}_outlier'] = (outliers == -1)
# 处理异常值(使用分位数截断)
q1 = df[col].quantile(0.01)
q99 = df[col].quantile(0.99)
df[col] = np.clip(df[col], q1, q99)
return df
def _engineer_booking_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
预订数据特征工程
"""
df = df.copy()
# 时间特征
df['check_in_month'] = df['check_in'].dt.month
df['check_in_day_of_week'] = df['check_in'].dt.dayofweek
df['check_in_quarter'] = df['check_in'].dt.quarter
df['is_weekend'] = df['check_in_day_of_week'].isin([5, 6])
# 预订提前期
df['lead_time'] = (df['check_in'] - df['booking_date']).dt.days
# 收入计算
df['total_revenue'] = df['price'] * df['nights']
df['revenue_per_guest'] = df['total_revenue'] / df['guests']
# 季节性标记
df['season'] = df['check_in_month'].map({
12: 'Winter', 1: 'Winter', 2: 'Winter',
3: 'Spring', 4: 'Spring', 5: 'Spring',
6: 'Summer', 7: 'Summer', 8: 'Summer',
9: 'Fall', 10: 'Fall', 11: 'Fall'
})
return df
def _engineer_review_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
评论数据特征工程
"""
df = df.copy()
# 时间特征
df['review_month'] = df['review_date'].dt.month
df['review_year'] = df['review_date'].dt.year
# 评分分类
df['rating_category'] = pd.cut(
df['rating'],
bins=[0, 3, 4, 4.5, 5],
labels=['Poor', 'Fair', 'Good', 'Excellent']
)
return df
def _integrate_datasets(self, booking_df: pd.DataFrame, review_df:
pd.DataFrame,
listing_df: pd.DataFrame) -> pd.DataFrame:
"""
整合数据集
"""
# 聚合预订数据
booking_agg = booking_df.groupby('listing_id').agg({
'total_revenue': ['sum', 'mean'],
'nights': ['sum', 'mean'],
'guests': 'sum',
'price': 'mean',
'lead_time': 'mean',
'booking_date': ['count', 'min', 'max']
}).reset_index()
# 扁平化列名
booking_agg.columns = [
'listing_id', 'total_revenue_sum', 'total_revenue_mean',
'total_nights', 'avg_nights', 'total_guests', 'avg_price',
'avg_lead_time', 'total_bookings', 'first_booking', 'last_booking'
]
# 聚合评论数据
review_agg = review_df.groupby('listing_id').agg({
'rating': ['mean', 'std', 'count'],
'review_date': ['min', 'max']
}).reset_index()
# 扁平化列名
review_agg.columns = [
'listing_id', 'avg_rating', 'rating_std', 'total_reviews',
'first_review', 'last_review'
]
# 合并数据
integrated_data = listing_df.merge(booking_agg, on='listing_id',
how='left')
integrated_data = integrated_data.merge(review_agg, on='listing_id',
how='left')
# 填充缺失值
integrated_data['total_bookings'] =
integrated_data['total_bookings'].fillna(0)
integrated_data['total_reviews'] =
integrated_data['total_reviews'].fillna(0)
integrated_data['avg_rating'] = integrated_data['avg_rating'].fillna(0)
return integrated_data
def analyze_revenue_optimization(self, integrated_data: pd.DataFrame) ->
Dict:
"""
收入优化分析
"""
print("Performing revenue optimization analysis...")
# 价格弹性分析
price_elasticity = self._analyze_price_elasticity(integrated_data)
# 季节性收入分析
seasonal_revenue = self._analyze_seasonal_revenue()
# 竞争定位分析
competitive_analysis =
self._analyze_competitive_positioning(integrated_data)
# 收入驱动因素分析
revenue_drivers = self._analyze_revenue_drivers(integrated_data)
return {
'price_elasticity': price_elasticity,
'seasonal_revenue': seasonal_revenue,
'competitive_analysis': competitive_analysis,
'revenue_drivers': revenue_drivers
}
def _analyze_price_elasticity(self, df: pd.DataFrame) -> Dict:
"""
价格弹性分析
"""
# 按价格区间分析预订量
df['price_bin'] = pd.qcut(df['avg_price'], q=10, labels=False)
price_elasticity = df.groupby('price_bin').agg({
'avg_price': 'mean',
'total_bookings': 'mean',
'avg_rating': 'mean'
}).reset_index()
# 计算弹性系数
price_changes = price_elasticity['avg_price'].pct_change()
booking_changes = price_elasticity['total_bookings'].pct_change()
elasticity_coefficients = booking_changes / price_changes
elasticity_coefficients = elasticity_coefficients.replace([np.inf, -
np.inf], np.nan).dropna()
return {
'price_elasticity_data': price_elasticity,
'avg_elasticity': elasticity_coefficients.mean(),
'elasticity_by_segment': elasticity_coefficients.to_dict()
}
def _analyze_seasonal_revenue(self) -> Dict:
"""
季节性收入分析
"""
if self.booking_data is None:
return {}
# 按月份分析收入
monthly_revenue = self.booking_data.groupby('check_in_month').agg({
'total_revenue': ['sum', 'mean'],
'price': 'mean',
'nights': 'mean'
}).reset_index()
# 按季节分析
seasonal_revenue = self.booking_data.groupby('season').agg({
'total_revenue': ['sum', 'mean'],
'price': 'mean',
'nights': 'mean'
}).reset_index()
return {
'monthly_revenue': monthly_revenue,
'seasonal_revenue': seasonal_revenue
}
def _analyze_competitive_positioning(self, df: pd.DataFrame) -> Dict:
"""
竞争定位分析
"""
# 按邻域和房型分析
competitive_metrics = df.groupby(['neighborhood', 'room_type']).agg({
'avg_price': ['mean', 'median', 'std'],
'avg_rating': 'mean',
'total_bookings': 'mean'
}).reset_index()
# 计算市场份额
total_bookings_by_neighborhood = df.groupby('neighborhood')
['total_bookings'].sum()
df['market_share'] = df.apply(
lambda row: row['total_bookings'] /
total_bookings_by_neighborhood[row['neighborhood']] * 100,
axis=1
)
# 识别高表现房源
top_performers = df[
(df['avg_rating'] >= df['avg_rating'].quantile(0.8)) &
(df['total_bookings'] >= df['total_bookings'].quantile(0.8))
]
return {
'competitive_metrics': competitive_metrics,
'top_performers': top_performers,
'market_share_analysis': df.groupby('neighborhood')
['market_share'].describe()
}
def _analyze_revenue_drivers(self, df: pd.DataFrame) -> Dict:
"""
收入驱动因素分析
"""
# 相关性分析
numerical_cols = ['avg_price', 'total_bookings', 'avg_rating',
'bedrooms', 'bathrooms', 'accommodates']
correlation_matrix = df[numerical_cols].corr()
# 统计显著性检验
significance_tests = {}
for col in numerical_cols:
if col != 'total_revenue_sum':
correlation, p_value = pearsonr(df[col].fillna(0),
df['total_revenue_sum'].fillna(0))
significance_tests[col] = {
'correlation': correlation,
'p_value': p_value,
'significant': p_value < (1 - self.confidence_level)
}
return {
'correlation_matrix': correlation_matrix,
'significance_tests': significance_tests
}
def create_interactive_dashboard(self, integrated_data: pd.DataFrame,
analysis_results: Dict) -> str:
"""
创建交互式仪表板
"""
print("Creating interactive dashboard...")
# 创建子图
fig = make_subplots(
rows=3, cols=2,
subplot_titles=[
'Revenue by Season', 'Price vs Booking Correlation',
'Rating Distribution', 'Competitive Positioning',
'Revenue Drivers', 'Performance Trends'
],
specs=[
[{"type": "bar"}, {"type": "scatter"}],
[{"type": "histogram"}, {"type": "box"}],
[{"type": "heatmap"}, {"type": "scatter"}]
]
)
# 1. 季节性收入分析
if 'seasonal_revenue' in analysis_results:
seasonal_data = analysis_results['seasonal_revenue']
['seasonal_revenue']
fig.add_trace(
go.Bar(
x=seasonal_data['season'],
y=seasonal_data[('total_revenue', 'sum')],
name='Seasonal Revenue',
marker_color=self.color_palette[0]
),
row=1, col=1
)
# 2. 价格与预订相关性
fig.add_trace(
go.Scatter(
x=integrated_data['avg_price'],
y=integrated_data['total_bookings'],
mode='markers',
name='Price vs Bookings',
marker=dict(
color=integrated_data['avg_rating'],
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Rating")
)
),
row=1, col=2
)
# 3. 评分分布
fig.add_trace(
go.Histogram(
x=integrated_data['avg_rating'],
name='Rating Distribution',
marker_color=self.color_palette[2]
),
row=2, col=1
)
# 4. 竞争定位箱线图
fig.add_trace(
go.Box(
y=integrated_data['avg_price'],
x=integrated_data['room_type'],
name='Price by Room Type',
marker_color=self.color_palette[3]
),
row=2, col=2
)
# 5. 收入驱动因素热力图
if 'revenue_drivers' in analysis_results:
corr_matrix = analysis_results['revenue_drivers']
['correlation_matrix']
fig.add_trace(
go.Heatmap(
z=corr_matrix.values,
x=corr_matrix.columns,
y=corr_matrix.index,
colorscale='RdBu',
name='Correlation Matrix'
),
row=3, col=1
)
# 6. 表现趋势
fig.add_trace(
go.Scatter(
x=integrated_data['total_bookings'],
y=integrated_data['total_revenue_sum'],
mode='markers',
name='Performance Trend',
marker=dict(
size=integrated_data['avg_rating'] * 2,
color=self.color_palette[5]
)
),
row=3, col=2
)
# 更新布局
fig.update_layout(
height=1200,
title_text="Airbnb Host Performance Analytics Dashboard",
template=self.plot_template,
showlegend=False
)
# 保存为HTML文件
dashboard_html = "airbnb_host_dashboard.html"
pyo.plot(fig, filename=dashboard_html, auto_open=False)
return dashboard_html
def perform_statistical_analysis(self, integrated_data: pd.DataFrame) ->
Dict:
"""
执行统计分析
"""
print("Performing statistical analysis...")
statistical_results = {}
# 1. A/B测试模拟:不同价格策略的效果
ab_test_results = self._simulate_ab_test(integrated_data)
statistical_results['ab_test'] = ab_test_results
# 2. 方差分析:不同房型的收入差异
anova_results = self._perform_anova_analysis(integrated_data)
statistical_results['anova'] = anova_results
# 3. 卡方检验:评分与预订量的关系
chi_square_results = self._perform_chi_square_test(integrated_data)
statistical_results['chi_square'] = chi_square_results
# 4. 回归分析:收入预测模型
regression_results = self._perform_regression_analysis(integrated_data)
statistical_results['regression'] = regression_results
return statistical_results
def _simulate_ab_test(self, df: pd.DataFrame) -> Dict:
"""
模拟A/B测试:价格策略效果
"""
# 将房源随机分为两组
np.random.seed(42)
df['test_group'] = np.random.choice(['A', 'B'], size=len(df))
# 模拟不同价格策略
df_test = df.copy()
df_test.loc[df_test['test_group'] == 'B', 'avg_price'] *= 1.1 # B组价格
提高10%
# 计算两组的收入差异
group_a_revenue = df_test[df_test['test_group'] == 'A']
['total_revenue_sum'].mean()
group_b_revenue = df_test[df_test['test_group'] == 'B']
['total_revenue_sum'].mean()
# 执行t检验
group_a_data = df_test[df_test['test_group'] == 'A']
['total_revenue_sum'].dropna()
group_b_data = df_test[df_test['test_group'] == 'B']
['total_revenue_sum'].dropna()
t_stat, p_value = stats.ttest_ind(group_a_data, group_b_data)
return {
'group_a_mean': group_a_revenue,
'group_b_mean': group_b_revenue,
'difference': group_b_revenue - group_a_revenue,
'percentage_change': (group_b_revenue - group_a_revenue) /
group_a_revenue * 100,
't_statistic': t_stat,
'p_value': p_value,
'significant': p_value < (1 - self.confidence_level)
}
def _perform_anova_analysis(self, df: pd.DataFrame) -> Dict:
"""
方差分析:不同房型的收入差异
"""
# 按房型分组
room_type_groups = [
df[df['room_type'] == room_type]['total_revenue_sum'].dropna()
for room_type in df['room_type'].unique()
]
# 执行单因素方差分析
f_stat, p_value = stats.f_oneway(*room_type_groups)
# 计算各组统计量
group_stats = df.groupby('room_type')['total_revenue_sum'].agg(['mean',
'std', 'count'])
return {
'f_statistic': f_stat,
'p_value': p_value,
'significant': p_value < (1 - self.confidence_level),
'group_statistics': group_stats.to_dict()
}
def _perform_chi_square_test(self, df: pd.DataFrame) -> Dict:
"""
卡方检验:评分与预订量的关系
"""
# 创建评分和预订量的分类变量
df['rating_category'] = pd.cut(df['avg_rating'], bins=3, labels=['Low',
'Medium', 'High'])
df['booking_category'] = pd.cut(df['total_bookings'], bins=3, labels=
['Low', 'Medium', 'High'])
# 创建列联表
contingency_table = pd.crosstab(df['rating_category'],
df['booking_category'])
# 执行卡方检验
chi2_stat, p_value, dof, expected = chi2_contingency(contingency_table)
return {
'chi2_statistic': chi2_stat,
'p_value': p_value,
'degrees_of_freedom': dof,
'significant': p_value < (1 - self.confidence_level),
'contingency_table': contingency_table.to_dict(),
'expected_frequencies': expected.tolist()
}
def _perform_regression_analysis(self, df: pd.DataFrame) -> Dict:
"""
回归分析:收入预测模型
"""
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
# 准备特征变量
feature_cols = ['avg_price', 'bedrooms', 'bathrooms', 'accommodates',
'avg_rating']
X = df[feature_cols].fillna(df[feature_cols].median())
y = df['total_revenue_sum'].fillna(0)
# 训练线性回归模型
model = LinearRegression()
model.fit(X, y)
# 预测和评估
y_pred = model.predict(X)
r2 = r2_score(y, y_pred)
mse = mean_squared_error(y, y_pred)
# 特征重要性
feature_importance = dict(zip(feature_cols, model.coef_))
return {
'r2_score': r2,
'mse': mse,
'feature_importance': feature_importance,
'intercept': model.intercept_,
'model_equation': self._format_regression_equation(model,
feature_cols)
}
def _format_regression_equation(self, model, feature_cols: List[str]) ->
str:
"""
格式化回归方程
"""
equation = f"Revenue = {model.intercept_:.2f}"
for i, col in enumerate(feature_cols):
coef = model.coef_[i]
sign = "+" if coef >= 0 else ""
equation += f" {sign}{coef:.2f}*{col}"
return equation
def generate_actionable_recommendations(self, integrated_data:
pd.DataFrame,
analysis_results: Dict) ->
List[Dict]:
"""
生成可操作的建议
"""
recommendations = []
# 基于价格弹性的建议
if 'price_elasticity' in analysis_results:
elasticity = analysis_results['price_elasticity']['avg_elasticity']
if elasticity < -1: # 弹性需求
recommendations.append({
'category': 'Pricing Strategy',
'recommendation': 'Consider reducing prices to increase
booking volume',
'rationale': f'Price elasticity of {elasticity:.2f}
indicates elastic demand',
'priority': 'High'
})
elif elasticity > -0.5: # 非弹性需求
recommendations.append({
'category': 'Pricing Strategy',
'recommendation': 'Consider increasing prices to maximize
revenue',
'rationale': f'Price elasticity of {elasticity:.2f}
indicates inelastic demand',
'priority': 'High'
})
# 基于评分的建议
low_rating_threshold = integrated_data['avg_rating'].quantile(0.25)
low_rating_listings = integrated_data[integrated_data['avg_rating'] <
low_rating_threshold]
if len(low_rating_listings) > 0:
recommendations.append({
'category': 'Guest Satisfaction',
'recommendation': f'Focus on improving
{len(low_rating_listings)} listings with ratings below
{low_rating_threshold:.1f}',
'rationale': 'Low ratings significantly impact booking
conversion',
'priority': 'High'
})
# 基于季节性的建议
if 'seasonal_revenue' in analysis_results:
seasonal_data = analysis_results['seasonal_revenue']
['seasonal_revenue']
peak_season = seasonal_data.loc[seasonal_data[('total_revenue',
'sum')].idxmax(), 'season']
recommendations.append({
'category': 'Seasonal Strategy',
'recommendation': f'Optimize pricing and availability for
{peak_season} season',
'rationale': f'{peak_season} shows highest revenue potential',
'priority': 'Medium'
})
# 基于竞争分析的建议
if 'competitive_analysis' in analysis_results:
top_performers = analysis_results['competitive_analysis']
['top_performers']
avg_top_performer_price = top_performers['avg_price'].mean()
avg_market_price = integrated_data['avg_price'].mean()
if avg_top_performer_price > avg_market_price * 1.1:
recommendations.append({
'category': 'Competitive Positioning',
'recommendation': 'Consider premium positioning strategy',
'rationale': f'Top performers charge
{(avg_top_performer_price/avg_market_price-1)*100:.1f}% above market average',
'priority': 'Medium'
})
return recommendations
def _generate_data_summary(self, df: pd.DataFrame) -> Dict:
"""
生成数据摘要
"""
return {
'total_listings': len(df),
'total_hosts': df['host_id'].nunique(),
'avg_price': df['avg_price'].mean(),
'avg_rating': df['avg_rating'].mean(),
'total_revenue': df['total_revenue_sum'].sum(),
'avg_bookings_per_listing': df['total_bookings'].mean()
}
⾯试回答要点:
. 数据可视化: 展⽰对Plotly等现代可视化⼯具的掌握
. 统计分析: 体现对假设检验、⽅差分析等统计⽅法的理解
. 业务洞察: 结合Airbnb的实际业务场景提供可操作建议
. 数据质量: 强调数据质量检查和异常值处理的重要性
. 可扩展性: 考虑⼤规模数据处理的性能优化
这道题⽬展现了Airbnb对于数据分析师在业务洞察、统计分析和可视化能⼒⽅⾯的综合要
求。
⾯试准备指南
技术准备重点
. Python核⼼技能 - 数据结构和算法: 熟练掌握pandas、numpy的⾼级⽤法,理解时间复
杂度和空间复杂度 - ⾯向对象编程: 能够设计清晰的类结构,理解继承、封装、多态等概念 -
异步编程: 掌握asyncio、多线程、多进程在数据处理中的应⽤ - 内存管理: 了解Python内存
机制,能够优化⼤数据处理的内存使⽤
2. 数据处理技能 - ETL流程: 熟练设计和实现数据提取、转换、加载流程 - 数据质量: 掌握数
据清洗、异常值检测、缺失值处理的最佳实践 - ⼤数据处理: 了解Dask、Spark等分布式计
算框架 - 实时数据: 掌握流式数据处理和实时分析技术
3. 机器学习实现 - 模型选择: 理解不同算法的适⽤场景和优缺点 - 特征⼯程: 掌握特征选择、
特征构造、特征缩放等技术 - 模型评估: 熟练使⽤交叉验证、⽹格搜索等模型优化⽅法 - ⽣产
部署: 了解模型版本管理、A/B测试、监控等⽣产环境考虑
4. 统计分析能⼒ - 假设检验: 掌握t检验、卡⽅检验、⽅差分析等统计⽅法 - 相关性分析: 理解
⽪尔逊相关、斯⽪尔曼相关等不同相关性度量 - 回归分析: 熟练使⽤线性回归、逻辑回归等
回归⽅法 - 实验设计: 了解A/B测试设计和结果解释
公司特定准备策略
Google/Meta/Amazon等⼤⼚ - 重点准备⼤规模数据处理和系统设计 - 强调算法效率和代码
优化 - 准备机器学习系统的端到端设计 - 了解公司特定的技术栈和业务场景
⾦融公司 (JPMorgan, Goldman Sachs) - 重点准备⻛险分析和合规相关的数据处理 - 了解
⾦融时间序列分析 - 准备监管报告和⻛险度量相关问题 - 强调数据准确性和可审计性
科技独⻆兽 (Uber, Airbnb, Netflix) - 重点准备实时数据处理和推荐系统 - 了解⽤⼾⾏为分
析和个性化算法 - 准备A/B测试和因果推断相关问题 - 强调业务影响和产品思维
⾯试技巧建议
1. 代码实现技巧 - 结构清晰: 使⽤类和函数组织代码,避免⻓函数 - 注释完整: 为复杂逻辑添
加清晰的注释 - 错误处理: 考虑边界情况和异常处理 - 性能优化: 主动讨论时间复杂度和优化
⽅案
2. 问题解决⽅法 - 澄清需求: 主动询问数据规模、性能要求、业务约束 - 分步骤思考: 将复杂
问题分解为可管理的⼦问题 - 权衡取舍: 讨论不同⽅案的优缺点 - 业务理解: 将技术⽅案与业
务⽬标联系起来
3. 沟通表达要点 - 逻辑清晰: 按照问题理解→⽅案设计→代码实现→优化改进的顺序 - 技术
深度: 能够深⼊讨论算法原理和实现细节 - 业务敏感: 理解技术⽅案对业务的影响 - 学习能⼒:
展⽰持续学习和适应新技术的能⼒
常⻅⾯试主题
1. 数据处理类 - ⼤规模数据清洗和预处理 - 实时数据流处理 - 数据质量监控和异常检测 - 多
源数据整合和⼀致性处理
2. 分析建模类 - ⽤⼾⾏为分析和细分 - 推荐系统设计和优化 - 时间序列预测和异常检测 - A/B
测试设计和结果分析
3. 系统设计类 - 数据管道架构设计 - 机器学习系统设计 - 实时分析系统架构 - 数据仓库和数
据湖设计
4. 业务应⽤类 - 收⼊优化和定价策略 - ⻛险评估和欺诈检测 - 运营效率优化 - 客⼾⽣命周期
价值分析
技术栈准备清单
✅ ✅
必备技能 - Python (pandas, numpy, scikit-learn) - SQL (复杂查询、窗⼝函数、性能
✅ ✅
优化) - 统计学基础 (假设检验、回归分析) - 数据可视化 (matplotlib, seaborn, plotly)
✅ ✅ ✅
进阶技能 - ⼤数据处理 (Spark, Dask) - 云平台 (AWS, GCP, Azure) - 容器化
✅
(Docker, Kubernetes) - 版本控制 (Git, MLflow)
✅ ✅ ✅
加分技能 - 深度学习 (TensorFlow, PyTorch) - 实时处理 (Kafka, Redis) - 数据⼯程
✅
(Airflow, dbt) - 业务理解 (产品分析、商业智能)
最终建议
⾯试前准备 1. 项⽬经验整理: 准备2-3个深度项⽬,能够详细讲解技术细节和业务影响 2. 代
码练习: 在LeetCode、HackerRank等平台练习数据处理相关题⽬ 3. 模拟⾯试: 与同⾏进⾏
模拟⾯试,练习在压⼒下的表达能⼒ 4. 公司研究: 深⼊了解⽬标公司的业务模式和技术挑战
⾯试中表现 1. 保持冷静: 遇到不熟悉的问题时,诚实表达并展⽰学习能⼒ 2. 主动沟通: 及时
澄清问题需求,避免理解偏差 3. 展⽰思维: ⼤声思考,让⾯试官了解你的思考过程 4. 关注细
节: 注意代码的边界情况和性能考虑
⾯试后跟进 1. 总结反思: 记录⾯试中的问题和⾃⼰的表现 2. 技能补强: 针对⾯试中暴露的薄
弱环节进⾏针对性学习 3. 感谢信: 发送感谢邮件,重申对职位的兴趣 4. 持续学习: 保持对新
技术和⾏业趋势的关注
记住,Python⾯试不仅仅是技术能⼒的考察,更是综合能⼒的展⽰。通过充分的准备和练
习,相信你能够在⾯试中脱颖⽽出,获得⼼仪的DS/DA职位。
⾯试准备资源。相关文章 Related Articles
Deloitte UK Consultant Interview Questions: Complete Guide with Expert Answers
德勤英国咨询顾问面试题全解析:专家级回答指南
A comprehensive guide to Deloitte UK consultant interviews, featuring 30 authentic questions with detailed answer strategies from industry insiders.
JPMorgan Data Science Analyst Interview: Technical Questions & Solutions
摩根大通数据科学分析师面试:技术问题与解决方案
Master JPMorgan's data science analyst interview with expert insights on technical questions covering machine learning, statistics, and Python programming.