
在用户行为分析、服务器资源调度、产品运营监控等场景中,“一天内的最大在线人数” 和 “维持最大在线人数的最长持续时间” 是核心指标 —— 比如游戏运营需要知道峰值在线人数以扩容服务器,视频平台需要根据峰值时长优化带宽分配。但当日志量达到百万、甚至亿级时,“逐秒遍历统计” 的暴力方案会彻底失效,必须设计基于 “事件排序” 的高效算法。
本文以日志格式{userid, login_time, logout_time}(时间单位:秒,范围 0~86399,代表一天 24 小时)为例,完整讲解两大问题的解决思路与落地代码。
先明确一个关键观察:用户的在线状态变化只发生在 “登录时间” 和 “登出时间” 两个节点,中间时间段的在线状态是连续的。基于此,我们可以将每条日志转换成两个 “状态事件”,再通过排序和遍历事件计算在线人数,彻底规避 “逐秒统计” 的性能瓶颈。
对任意一条日志(user1, 100, 200)(user1 在第 100 秒登录,第 200 秒登出):
当多个事件的time相同时,需按 “登出事件优先” 排序,避免计算错误:
适用于日志量≤1000 万条的场景(Python 处理 1000 万事件排序约需 10~20 秒):
def calculate_max_online(logs): """ 计算一天内的最大在线人数 :param logs: 日志列表,每条日志格式为 (userid, login_time, logout_time) :return: max_online(最大在线人数) """ events = [] # 1. 日志预处理与事件转换 for userid, login, logout in logs: # 过滤异常日志 if login < 0 or logout < login or logout > 86399: continue # 登录事件(type=1),登出事件(type=-1) events.append((login, 1)) events.append((logout, -1)) # 2. 事件排序:time升序,time相同则登出事件(-1)在前 # 排序key:(time, type),因为-1 < 1,所以相同time时-1排在前面 events.sort(key=lambda x: (x[0], x[1])) # 3. 遍历计算在线人数 current_online = 0 max_online = 0 for time, delta in events: current_online += delta # 更新最大在线人数(注意:current_online可能为负,需取非负后比较) if current_online > max_online: max_online = current_online return max_online# 测试示例if __name__ == "__main__": # 测试日志:(userid, login_time, logout_time) test_logs = [ ("user1", 100, 200), # 100秒登录,200秒登出 ("user2", 150, 250), # 150秒登录,250秒登出 ("user3", 180, 300), # 180秒登录,300秒登出 ("user4", 200, 280), # 200秒登录,280秒登出(与user1登出时间相同) ] print("最大在线人数:", calculate_max_online(test_logs)) # 输出3(180~200秒时,user1、user2、user3同时在线)当日志量超 1 亿条时,单机内存无法承载,需用 Spark 进行分布式处理,核心逻辑与单机版一致,只是通过 RDD/DataFrame 实现并行计算:
import org.apache.spark.sql.SparkSessionobject MaxOnlineCalculator { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("MaxOnlineCalculator") .master("yarn") // 生产环境用yarn或k8s .getOrCreate() import spark.implicits._ // 1. 读取日志(假设日志存储在HDFS,格式为userid,login_time,logout_time) val logsDF = spark.read .option("header", "false") .csv("hdfs://path/to/logs.csv") .toDF("userid", "login_time", "logout_time") .select( $"userid", $"login_time".cast("long"), $"logout_time".cast("long") ) // 2. 预处理+事件转换(并行执行) val eventsDF = logsDF .filter("login_time >= 0 and logout_time >= login_time and logout_time <= 86399") .flatMap(row => { val login = row.getAs[Long]("login_time") val logout = row.getAs[Long]("logout_time") // 返回两个事件:(time, delta) Seq((login, 1L), (logout, -1L)) }) .toDF("time", "delta") // 3. 事件排序(分布式排序,按time和delta排序) val sortedEvents = eventsDF .orderBy($"time", $"delta") // delta=-1排在1前面,符合排序原则 // 4. 遍历计算最大在线人数(用reduceByKey累加,避免全局洗牌) // 这里用累加器实现全局计数(适用于需实时更新最大值的场景) val maxOnlineAcc = spark.sparkContext.longAccumulator("maxOnline") var currentOnline = 0L sortedEvents.foreach(row => { val time = row.getAs[Long]("time") val delta = row.getAs[Long]("delta") currentOnline += delta if (currentOnline > maxOnlineAcc.value) { maxOnlineAcc.add(currentOnline - maxOnlineAcc.value) } }) // 输出结果 println(s"一天内最大在线人数:${maxOnlineAcc.value}") spark.stop() }}最大在线人数可能在一天内出现多次(如早高峰 9 点和晚高峰 20 点都达到峰值),需找到 “每次峰值持续的时间段”,计算每个时间段的时长,最终取最大值。
def calculate_max_online_duration(logs): """ 计算一天内最大在线人数的最长持续时间 :param logs: 日志列表,每条日志格式为 (userid, login_time, logout_time) :return: (max_online, max_duration) (最大在线人数,最长持续时间,单位:秒) """ # 步骤1:生成排序后的事件(同问题1) events = [] for userid, login, logout in logs: if login < 0 or logout < login or logout > 86399: continue events.append((login, 1)) events.append((logout, -1)) # 排序:time升序,相同time时登出事件在前 events.sort(key=lambda x: (x[0], x[1])) # 步骤2:先计算max_online(同问题1) current_online = 0 max_online = 0 for time, delta in events: current_online += delta if current_online > max_online: max_online = current_online # 步骤3:计算最长峰值持续时间 current_online = 0 max_duration = 0 peak_start = None # 峰值开始时间 for i in range(len(events)): time, delta = events[i] prev_online = current_online # 事件发生前的在线人数 current_online += delta # 情况1:事件后进入峰值状态(prev_online < max_online,current_online == max_online) if prev_online < max_online and current_online == max_online: peak_start = time # 情况2:事件后退出峰值状态(prev_online == max_online,current_online < max_online) elif prev_online == max_online and current_online < max_online: if peak_start is not None: # 峰值结束时间为当前事件时间 duration = time - peak_start if duration > max_duration: max_duration = duration peak_start = None # 重置峰值开始时间 # 情况3:遍历到最后一个事件,且仍处于峰值状态 if i == len(events) - 1 and current_online == max_online and peak_start is not None: duration = 86399 - peak_start # 结束时间为一天的最后一秒 if duration > max_duration: max_duration = duration return max_online, max_duration# 测试示例if __name__ == "__main__": test_logs = [ ("user1", 100, 200), ("user2", 150, 250), ("user3", 180, 300), ("user4", 200, 280), ("user5", 220, 280), # 220~280秒时,user2、user3、user4、user5同时在线(峰值4) ] max_online, max_duration = calculate_max_online_duration(test_logs) print(f"最大在线人数:{max_online},最长持续时间:{max_duration}秒") # 输出:最大4,最长60秒(220~280秒)在 Spark 中计算最长持续时间,需避免 “全局遍历”(会导致数据集中到一个节点),可通过 “窗口函数” 标记峰值时间段,再计算每个时间段的时长:
处理海量日志的 “最大在线人数” 和 “最长持续时间”,核心是 “将连续状态转换为离散事件”:
这套方案不仅适用于用户在线统计,还可扩展到 “服务器连接数峰值”“接口调用峰值” 等类似场景 —— 只要是 “连续状态随离散事件变化” 的问题,都能复用 “事件排序 + 遍历计算” 的思路。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。