分销系统是现代电商和社交电商平台的核心组件之一,它通过多级分佣机制激励用户推广产品,形成裂变式增长。一个高效、公平、稳定的分销系统离不开精心设计的算法支持。本文将深入探讨分销系统中常用的核心算法及其实现原理。
邻接表模型(最常用):
CREATE TABLE distribution_relations (
id INT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
parent_id INT NULL,
level INT NOT NULL DEFAULT 0,
path VARCHAR(1000) NOT NULL DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_parent_id (parent_id),
INDEX idx_path (path(255))
);路径枚举算法:
class DistributionTree:
def __init__(self):
self.relations = {}
def add_user(self, user_id, parent_id=None):
"""添加用户到分销树"""
if parent_id is None:
# 根节点
self.relations[user_id] = {
'path': str(user_id),
'level': 0
}
else:
parent = self.relations.get(parent_id)
if parent:
path = f"{parent['path']}.{user_id}"
level = parent['level'] + 1
self.relations[user_id] = {
'path': path,
'level': level,
'parent_id': parent_id
}
def get_ancestors(self, user_id, max_level=None):
"""获取指定用户的所有上级"""
user = self.relations.get(user_id)
if not user or 'path' not in user:
return []
ancestor_ids = user['path'].split('.')[:-1] # 排除自己
if max_level:
ancestor_ids = ancestor_ids[-max_level:] # 只取最近N级
return [int(uid) for uid in ancestor_ids]对于需要频繁查询多层关系的系统,闭包表是更优选择:
CREATE TABLE relation_closure (
ancestor INT NOT NULL,
descendant INT NOT NULL,
depth INT NOT NULL,
PRIMARY KEY (ancestor, descendant),
INDEX idx_descendant (descendant)
);
-- 查找用户的所有上级(包括间接上级)
SELECT ancestor, depth
FROM relation_closure
WHERE descendant = ?
ORDER BY depth DESC;class CommissionCalculator:
def __init__(self, commission_rules):
"""
commission_rules示例:
[
{'level': 1, 'rate': 0.10}, # 一级分佣10%
{'level': 2, 'rate': 0.05}, # 二级分佣5%
{'level': 3, 'rate': 0.02}, # 三级分佣2%
]
"""
self.rules = sorted(commission_rules, key=lambda x: x['level'])
def calculate_commission(self, order_amount, ancestors):
"""计算多级佣金分配"""
commissions = {}
for rule in self.rules:
level = rule['level']
rate = rule['rate']
# 检查是否有该级别的上级
if level - 1 < len(ancestors):
ancestor_id = ancestors[level - 1]
commission = order_amount * rate
# 累积佣金(同一用户可能在多级中出现)
if ancestor_id in commissions:
commissions[ancestor_id] += commission
else:
commissions[ancestor_id] = commission
return commissionsclass WeightedCommissionCalculator:
def calculate_weighted_commission(self, order_amount, team_performance):
"""
基于团队业绩的加权分佣
team_performance: {user_id: performance_score}
"""
total_performance = sum(team_performance.values())
if total_performance == 0:
return {}
commissions = {}
total_commission_pool = order_amount * 0.15 # 假设总佣金池为订单金额的15%
for user_id, performance in team_performance.items():
weight = performance / total_performance
commissions[user_id] = total_commission_pool * weight
return commissionsclass TeamStatistics:
def __init__(self, tree_storage):
self.tree = tree_storage
def count_team_members(self, leader_id, max_depth=None):
"""统计团队总人数(递归算法)"""
count = 0
stack = [(leader_id, 0)]
while stack:
current_id, current_depth = stack.pop()
# 获取直接下级
children = self.tree.get_children(current_id)
for child in children:
if max_depth is None or current_depth + 1 <= max_depth:
count += 1
stack.append((child['user_id'], current_depth + 1))
return count
def get_team_performance(self, leader_id, start_date, end_date):
"""统计团队总业绩(使用闭包表优化)"""
# 获取所有团队成员
team_members = self.tree.get_all_descendants(leader_id)
member_ids = [member['user_id'] for member in team_members]
# 批量查询业绩
query = """
SELECT user_id, SUM(order_amount) as total_sales
FROM orders
WHERE user_id IN %s
AND order_status = 'completed'
AND created_at BETWEEN %s AND %s
GROUP BY user_id
"""
# 执行查询并汇总
total_performance = sum(result['total_sales'] for result in query_results)
return total_performance# 使用Redis Sorted Set实现实时团队排名
# 添加或更新团队业绩
ZADD team_performance {current_timestamp} {leader_id}:{performance}
# 获取TOP 10团队
ZREVRANGEBYSCORE team_performance +inf -inf LIMIT 0 10
# 获取用户排名
ZREVRANK team_performance {leader_id}class CycleDetection:
def has_cycle(self, user_id, parent_id, existing_relations):
"""检测添加关系是否会形成环形结构"""
if user_id == parent_id:
return True # 不能设置自己为上级
# 广度优先搜索检测环路
visited = set([user_id])
queue = [parent_id]
while queue:
current = queue.pop(0)
if current == user_id:
return True # 发现环路
if current in visited:
continue
visited.add(current)
# 获取当前节点的上级
current_parent = existing_relations.get(current)
if current_parent and current_parent != 0:
queue.append(current_parent)
return Falseclass WithdrawRiskControl:
def __init__(self):
self.rules = [
self._check_min_withdraw_amount,
self._check_daily_limit,
self._check_abnormal_pattern,
self._check_blacklist
]
def evaluate_withdraw_request(self, user_id, amount, history):
"""评估提现请求风险"""
risk_score = 0
violations = []
for rule in self.rules:
result = rule(user_id, amount, history)
if not result['passed']:
risk_score += result['risk_score']
violations.append(result['reason'])
return {
'risk_score': risk_score,
'violations': violations,
'approved': risk_score < 50 # 阈值
}
def _check_abnormal_pattern(self, user_id, amount, history):
"""检测异常提现模式"""
# 检查提现频率
recent_withdrawals = history.get_recent_withdrawals(user_id, days=7)
if len(recent_withdrawals) > 5:
return {
'passed': False,
'risk_score': 30,
'reason': '提现频率过高'
}
return {'passed': True}class BatchCommissionProcessor:
def process_batch_orders(self, orders):
"""批量处理订单佣金计算"""
# 按用户分组订单
user_orders = {}
for order in orders:
user_id = order['user_id']
if user_id not in user_orders:
user_orders[user_id] = []
user_orders[user_id].append(order)
# 批量查询用户关系
user_ids = list(user_orders.keys())
user_ancestors = self.batch_get_ancestors(user_ids)
# 并行计算佣金
commission_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for user_id, orders in user_orders.items():
future = executor.submit(
self._calculate_user_commissions,
user_id, orders, user_ancestors[user_id]
)
futures.append(future)
for future in as_completed(futures):
commission_results.extend(future.result())
return commission_resultsclass DistributionCache:
def __init__(self, redis_client):
self.redis = redis_client
def get_user_ancestors(self, user_id, ttl=3600):
"""获取用户上级(带缓存)"""
cache_key = f"user_ancestors:{user_id}"
# 尝试从缓存获取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,从数据库查询
ancestors = self.db.get_user_ancestors(user_id)
# 写入缓存
self.redis.setex(cache_key, ttl, json.dumps(ancestors))
return ancestors
def invalidate_user_cache(self, user_id):
"""用户关系变更时清理缓存"""
patterns = [
f"user_ancestors:{user_id}",
f"user_descendants:{user_id}",
f"team_stats:{user_id}:*"
]
for pattern in patterns:
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)分销系统算法的设计需要在性能、准确性和可维护性之间找到平衡。随着业务规模的增长,算法也需要不断优化和迭代。建议在实际开发中:
正确的算法选择能够支撑分销系统处理数百万甚至数千万用户的关系网络,同时保证佣金计算的准确性和实时性,为业务发展提供坚实的技术基础。