首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CMCC实时充值监控平台项目

CMCC实时充值监控平台项目

作者头像
曼路
发布2019-05-26 10:01:51
4.7K0
发布2019-05-26 10:01:51
举报
文章被收录于专栏:浪淘沙浪淘沙

一、项目介绍

1.1 项目背景

CMCC旗下拥有很多的子机构,基本可以按照省份划分. 而各省份旗下的充值机构也非常的多.

目前要想获取整个平台的充值情况,需要先以省为单元,进行省份旗下的机构统计,然后由下往上一层一层的统计汇总,过程太过繁琐,且统计周期太长. 且充值过程中会涉及到中国移动信息系统内部各个子系统之间的接口调用, 接口故障监控也成为了重点监控的内容之一.为此建设一个能够实时监控全国的充值情况的平台, 掌控全网的实时充值, 各接口调用情况意义重大.

1.2 技术选型

难点分析

移动公司旗下子充值机构众多, 充值数据量大.

数据实时性要求高

可用技术选型

实时流式计算框架 Storm

实时流式计算框架 Spark Streaming

实时流式计算框架 Flink

对比分析

项目数据量

  1. 数据量每天大概 2000 到 3000 万笔的下单量, 每条数据大概在 0.5KB 左右,下单量数据大概在 15GB 左右.
  2. 最后充值成功的大概 500 到 1000 万,平时充值成功的大概五六百万笔.月初和月末量比较大

1.3 项目需求

1. 每天的业务概况

统计全网的充值订单量、充值金额、充值成功率、及充值平均时长

2. 每小时的业务办理趋势

主要统计全网的订单量数据和成功率.

3.业务质量

统计每个省充值业务的失败量

4. 统计实时充值业务办理信息

统计每分钟的充值金额和充值数量

1.4 日志数据

{"bussinessRst":"0000","channelCode":"6900","chargefee":"1000","clientIp":"117.136.79.101","gateway_id":"WXPAY","interFacRst":"0000","logOutTime":"20170412030030067","orderId":"384663607178845909","payPhoneNo":"15015541313","phoneno":"15015541313","provinceCode":"200","rateoperateid":"1513","receiveNotifyTime":"20170412030030017","requestId":"20170412030007090581518228485394","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"1000","srcChannel":"11","sysId":"01"}
{"bussinessRst":"0000","channelCode":"6900","chargefee":"1000","clientIp":"117.136.79.101","endReqTime":"20170412030030230","idType":"01","interFacRst":"0000","logOutTime":"20170412030030230","orderId":"384663607178845909","prodCnt":"1","provinceCode":"200","requestId":"20170412030007090581518228485394","retMsg":"成功","serverIp":"172.16.59.241","serverPort":"8088","serviceName":"sendRechargeReq","shouldfee":"1000","startReqTime":"20170412030030080","sysId":"01"}
{"bussinessRst":"0000","channelCode":"0702","chargefee":"1000","clientIp":"101.204.129.105","gateway_id":"CMPAY","interFacRst":"0000","logOutTime":"20170412030031580","orderId":"384663613178752811","payPhoneNo":"","phoneno":"18200222444","provinceCode":"280","rateoperateid":"1514","receiveNotifyTime":"20170412030031554","requestId":"20170412030013393282687799171031","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"995","srcChannel":"00","sysId":"01"}
{"bussinessRst":"0000","channelCode":"0702","chargefee":"1000","clientIp":"101.204.129.105","endReqTime":"20170412030031698","idType":"01","interFacRst":"0000","logOutTime":"20170412030031698","orderId":"384663613178752811","prodCnt":"1","provinceCode":"280","requestId":"20170412030013393282687799171031","retMsg":"成功","serverIp":"172.16.59.241","serverPort":"8088","serviceName":"sendRechargeReq","shouldfee":"995","startReqTime":"20170412030031592","sysId":"01"}
{"bussinessRst":"0000","channelCode":"6900","chargefee":"20000","clientIp":"112.17.244.230","gateway_id":"WXPAY","interFacRst":"0000","logOutTime":"20170412030032234","orderId":"384663617163048689","payPhoneNo":"15857207376","phoneno":"15857207376","provinceCode":"571","rateoperateid":"","receiveNotifyTime":"20170412030032194","requestId":"20170412030017876364973282669502","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"20000","srcChannel":"11","sysId":"01"}

需要的文件日志已上传 https://download.csdn.net/download/qq_32539825/10739951

二、项目流程

2.1 Flume监控文件夹收集数据传给kafka

实时监控文件夹,有新的文件产生的时候,就会传给kafka。这里kafka的Topic,会自动创建。

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/cmccdata

# 描述和配置sink组件:k1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = myTopic
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy


# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
 # 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2 消息订阅系统Kafka

Flume生产的消息保存在Kafka中,每条消息按顺序都有一个Offset。

*如何保证kafka的数据的不丢失?

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

0:不等待broker返回确认消息 1:等待topic中某个partition leader保存成功的状态反馈 -1:等待topic中某个partition 所有副本都保存成功的状态反馈

*SparkStreaming如何保证消费kafka数据的数据量?

在创建sparkStreaming时,会设置一个变量, spark.streaming.kafka.maxRatePerPartition。一次拉去的数据=该数*分区数*拉取数据的时间间隔。

2.3 SparkStreaming进行数据处理

根据需求处理实时流数据。

2.4 Mysql 存储Offset

防止消息重复消费

2.5 Redis存储业务数据

保存实时流的业务数据。分布式数据库,基于内存,速度快。

问题:如何保持offset和业务数据存储的同步

需要将offset和业务数据保存在相同的数据库,通过事务来保持同步。

因为业务的处理是在Executor处理的,而Offset的存储是在Driver端存储的。所以,处理完业务之后,首先将业务数据保存在一个临时的数据库中。在Driver端保存offset的时候,从临时数据库中读取业务处理数据,和保存offset做一个事务。同时保存到数据库。

三、代码实现

3.1 收集,分析,处理,保存数据。

flume+kafka+sparkStreaming+Mysql+Redis

代码结构:

cmcc_MonitorV2.scala(主类)
package cn.pig.app

import cn.pig.utils._
import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object cmcc_MonitorV2 {

  def main(args: Array[String]): Unit = {
    //取消日志显示
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //SparkCOnf属性配置
    val conf = new SparkConf().setAppName("中国移动实时监控平台_V2").setMaster("local[*]")
    //RDD序列化 节约内存
    conf.set("spark.serialize","org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.kafka.maxRatePerPartition","10000")   //拉取数据
    conf.set("spark.streaming.kafka.stopGracefullyOnShutdown","true")  //优雅的停止

    //创建SparkStreaming
    val ssc = new StreamingContext(conf,Seconds(2))
    /**
      * 提取数据库中的存储的偏移量
      */
    val currOffser: Map[TopicPartition, Long] = OffsetManager.getMyCurrentOffset


    //使用广播的方式匹配省份
    val provinceName: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(AppParameters.provinces)

    //创建直接从kafka中读取数据的对象
    val stream = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String ,String ](AppParameters.topic,AppParameters.kafkaParams,currOffser))

    /**
      * 开始计算
      */
     stream.foreachRDD(baseRdd =>{

       val offsetRanges: Array[OffsetRange] = baseRdd.asInstanceOf[HasOffsetRanges].offsetRanges

       val baseData = ApiUtils.Api_BaseDate(baseRdd)




       /**
         * 计算每日的业务概况
         */
        ApiUtils.Api_general_total(baseData)

       /**
         * 计算实时充值办理业务趋势
         */
       ApiUtils.api_general_hour(baseData)

       /**
         * 计算全国各省充值业务失败量分布
         */
       ApiUtils.api_general_province(baseData,provinceName)
       /**
         * 实时统计每分钟的充值金额和订单量
         */
       ApiUtils.api_realtime_minute(baseData)

       /**
         * 存储偏移量
         */
       OffsetManager.saveCurrentOffset(offsetRanges)



     })

    ssc.start()

    ssc.awaitTermination()

  }



}
ApiUtils.scala(4个业务处理的方法主要写在这,主类调用它)
package cn.pig.utils

import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

object ApiUtils {


  /**
    * 解析文件 获取基础数据 并保存到内存中
    * @param baseRdd
    * @return
    */
  def Api_BaseDate(baseRdd: RDD[ConsumerRecord[String, String]]): RDD[(String, String, List[Double], String, String)] = {
    val baseData: RDD[(String, String, List[Double], String, String)] =
      baseRdd.map(rdd => JSON.parseObject(rdd.value()))
        .filter(x => x.getString("serviceName").equals("reChargeNotifyReq"))
        .map(rdd => {
          //事物结果
          val result = rdd.getString("bussinessRst")
          //获得充值金额
          val fee = rdd.getString("chargefee").toDouble
          //获取省份
          val provinceCode = rdd.getString("provinceCode")
          println(provinceCode)
          //获取充值得发起时间和结束时间
          val requestId = rdd.getString("requestId")
          //获取日期
          val data = requestId.substring(0, 8)
          //小时
          val hour = requestId.substring(8, 10)
          //分钟
          val minute = requestId.substring(10, 12)
          //充值结束的时间
          val receiveTime = rdd.getString("receiveNotifyTime")

          val time = CalculateTools.getDate(requestId, receiveTime)
          val SuccedResult: (Int, Double, Long) = if (result.equals("0000")) (1, fee, time) else (0, 0, 0)


          (data, hour,List[Double](1, SuccedResult._1, SuccedResult._2, SuccedResult._3), provinceCode,minute)

        }).cache()
    baseData
  }


  /**
    * 业务概况
    * 充值订单量、充值金额、充值时长
    * @param baseData
    */
  def Api_general_total(baseData: RDD[(String, String, List[Double], String,String)]) = {

    baseData.map(tp => (tp._1, tp._3)).reduceByKey((list1, list2) => {
      list1.zip(list2).map(tp => tp._1 + tp._2)
    }).foreachPartition(partition => {

      /**
        * 保存到redis中
        */
      val redis = Jpools.getJedis
      partition.foreach(tp => {

        println(tp._1)
        redis.hincrBy("A-" + tp._1, "total", tp._2(0).toLong)
        redis.hincrBy("A-" + tp._1, "success", tp._2(1).toLong)
        redis.hincrBy("A-" + tp._1, "money", tp._2(2).toLong)
        redis.hincrBy("A-" + tp._1, "time", tp._2(3).toLong)
        //redis.expire("A-" + tp._1, 60 * 60 * 48)
      })
      redis.close()
    })
  }

  /**
    * 实时充值办理业务趋势
    * 每小时的订单量和成功的订单量
    * @param baseData
    */
   def api_general_hour(baseData: RDD[(String, String, List[Double], String,String)]) = {
    baseData.map(tp => ((tp._1, tp._2), (tp._3))).reduceByKey((list1, list2) => {
      list1.zip(list2).map(tp => tp._1 + tp._2)
    }).foreachPartition(tp => {
      val redis = Jpools.getJedis
      tp.foreach(data => {
        redis.hincrBy("B-" + data._1._1, "total-" + data._1._2, data._2(0).toLong)
        redis.hincrBy("B-" + data._1._1, "success-" + data._1._2, data._2(1).toLong)
        //redis.expire("B-" + data._1._1, 60 * 60 * 48)
      })
      redis.close()
    })
  }

  /**
    * 全国各省充值业务失败量分布
    * 使用广播的方式获取省份名称
    * 日期 时间hour list(成功标记位,金额,
    */
  def api_general_province(baseData: RDD[(String, String, List[Double], String,String)],provinceName:Broadcast[Map[String, AnyRef]]) = {
    baseData.map(tp => ((tp._1,tp._4),tp._3)).reduceByKey((list1,list2)=>{
      list1.zip(list2).map(tp =>tp._1+tp._2)
    })
      .foreachPartition(tp => {
        val redis = Jpools.getJedis
        tp.foreach(data =>{
          //redis.hincrBy("C-"+data._1._1,"total-"+provinceName.value.getOrElse(data._1._2,data._1._2),data._2(0).toLong)
          redis.hincrBy("C-"+data._1._1,provinceName.value.getOrElse(data._1._2,data._1._2)+"",data._2(1).toLong)
         // redis.expire("C-"+data._1._1,60*60*48)
        })
        redis.close()
      })
  }

  /**
  * 实时统计每分钟的充值金额和订单量
    */
  def api_realtime_minute(baseData: RDD[(String, String, List[Double], String,String)]) = {
    baseData.map(tp => ((tp._1,tp._2,tp._5),List(tp._3(1),tp._3(2)))).reduceByKey((list1,list2)=>{
      list1.zip(list2).map(tp =>tp._1+tp._2)
    })
      .foreachPartition(tp => {
        val redis = Jpools.getJedis
        tp.foreach(data =>{
          redis.hincrBy("D-"+data._1._1,"Num-"+data._1._2+data._1._3,data._2(0).toLong)
          redis.hincrBy("D-"+data._1._1,"Money-"+data._1._2+data._1._3,data._2(1).toLong)
          //redis.expire("D-"+data._1._1,60*60*48)

        })
        redis.close()
      })

  }





}

application.conf(配置文件,在resources目录下)

//配置kafka参数
kafka.topic = "myTopic"
kafka.groupId = "lkp"
kafka.broker = "hadoop01:9092,hadoop02:9092,hadoop03:9092"

//redis配置参数
redis.host = "hadoop02"  //主机名
redis.index = "2"         //保存到redis中那个库

//配置mysql数据库信息(scalikejdbc)
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://hadoop01:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="root"


//省份对应
province = {
  100 = "北京"
  200 = "广东"
  210 = "上海"
  220 = "天津"
  230 = "重庆"
  240 = "辽宁"
  250 = "江苏"
  270 = "湖北"
  280 = "四川"
  290 = "陕西"
  311 = "河北"
  351 = "山西"
  371 = "河南"
  431 = "吉林"
  451 = "黑龙江"
  471 = "内蒙古"
  531 = "山东"
  551 = "安徽"
  571 = "浙江"
  591 = "福建"
  731 = "湖南"
  771 = "广西"
  791 = "江西"
  851 = "贵州"
  871 = "云南"
  891 = "西藏"
  898 = "海南"
  931 = "甘肃"
  951 = "宁夏"
  971 = "青海"
  991 = "新疆"
}

AppParametes.scala(主要读取配置文件,让其他类调用)

package cn.pig.utils

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer

object AppParameters {

  /**
    * kafka配置设置
    */
//读取配置文件  读取顺序:application.conf-->application.json-->application.properties

  val config = ConfigFactory.load()

  val topic: Array[String] = config.getString("kafka.topic").split(",")
  val groupId = config.getString("kafka.groupId")
  val brokers = config.getString("kafka.broker")

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  /**
    * Redis设置
    * 主机 和 第几个数据库数据库
    */
  val redis_host = config.getString("redis.host")

  val redis_index = config.getString("redis.index").toInt

  /**
    * 省份设置
    */
    import scala.collection.JavaConversions._
  val provinces = config.getObject("province").unwrapped().toMap
}

CalculateTools.scala(计算日期的类)

package cn.pig.utils

import java.text.SimpleDateFormat

import org.apache.commons.lang.time.FastDateFormat

object CalculateTools {

  def getDate(requestId:String,endTime:String)={
    val startTime = requestId.substring(0,17)
    //val format = FastDateFormat.getInstance("yyyyMMddHHmmssSSS")

    val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")
    format.parse(endTime).getTime - format.parse(startTime).getTime

  }

}

Jpools.scala(Redis数据库连接池)

package cn.pig.utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool


/**
  * Redis数据库池
  */
object Jpools {
  private val poolConfig = new GenericObjectPoolConfig()
  poolConfig.setMaxIdle(5)    //最大的空闲连接,默认为6
  poolConfig.setMaxTotal(2000)  //支持最大的连接数 默认为8

  private lazy val jedisPool = new JedisPool(poolConfig,AppParameters.redis_host,6379)

  def getJedis = {
    val jedis = jedisPool.getResource()
    jedis.select(AppParameters.redis_index)
    jedis
  }

}

OffsetManager.scala(使用scalikejdbc连接mysql保存Offset )

package cn.pig.utils

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scalikejdbc._
import scalikejdbc.config._

object OffsetManager {
  //加载配置文件  application.conf
  DBs.setup()

  /**
    * 获取自己存储的偏移量信息
    * @return
    */
  def getMyCurrentOffset :Map[TopicPartition,Long] = {
    DB.readOnly(implicit session =>
      SQL("select * from streaming_offset where groupId  = ?").bind(AppParameters.groupId)
        .map(rs =>
          (
            new TopicPartition(rs.string("topicName"),rs.int("partitionId")),
            rs.long("offset")
          )
        ).list().apply().toMap
    )


  }

  /**
    * 持久化存储当前的偏移量
    */
  def saveCurrentOffset(offsetRanges: Array[OffsetRange]): Unit ={

    DB.localTx(implicit session =>{
      offsetRanges.foreach(or =>{
        SQL("replace into streaming_offset values(?,?,?,?)")
          .bind(or.topic,or.partition,or.untilOffset,AppParameters.groupId)
          .update()
          .apply()
      })
    })


  }


}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.com.pig</groupId>
    <artifactId>cmcc_moniter</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.2.1</spark.version>
        <mysql.version>5.1.38</mysql.version>
        <redis.version>2.9.0</redis.version>
        <config.version>1.3.3</config.version>
        <fastjson.version>1.2.47</fastjson.version>
        <scalikejdbc.version>2.5.0</scalikejdbc.version>
    </properties>

    <dependencies>

        <!-- spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>${redis.version}</version>
        </dependency>
        <!-- streaming-kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 指定kafka-client API的版本-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>${config.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>


        <!-- scalikejdbc-config -->
        <dependency>
            <groupId>org.scalikejdbc</groupId>
            <artifactId>scalikejdbc-config_2.11</artifactId>
            <version>${scalikejdbc.version}</version>
        </dependency>





    </dependencies>
</project>

3.2 动态页面实现

Idea+Servlet+echart

MapVo.class(地图的Bean)

package cn.pig.cmcc.beans;

/**
 * 充值量业务实体类
 */
public class MapVo {
    /**
     * 省份和数量
     */
    private String name;
    private int value;

    public MapVo(String name, int value) {
        this.name = name;
        this.value = value;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getValue() {
        return value;
    }

    public void setValue(int value) {
        this.value = value;
    }

    public MapVo() {
    }
}

MinutesKpiVo.class

package cn.pig.cmcc.beans;

public class MinutesKpiVo {
    private String money;
    private String counts;

    public MinutesKpiVo(String money, String counts) {
        this.money = money;
        this.counts = counts;
    }

    public MinutesKpiVo() {
    }

    public String getMoney() {
        return money;
    }

    public void setMoney(String money) {
        this.money = money;
    }

    public String getCounts() {
        return counts;
    }

    public void setCounts(String counts) {
        this.counts = counts;
    }
}

Servlet.java(处理地图请求的Servlet)

package cn.pig.cmcc.controller;

import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.services.IMapIndexService;
import cn.pig.cmcc.services.MapIndexService;
import com.alibaba.fastjson.JSONObject;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;

/**
 * 充值成功业务分布Servlet访问接口
 */
@WebServlet(name = "Servlet",urlPatterns = "/mapIndex.cmcc")
public class Servlet extends HttpServlet {
    //实例化service对象
    IMapIndexService service = new MapIndexService();
    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        resp.setCharacterEncoding("utf-8");
        resp.setContentType("application/json");

        //接收前端传递的参数
        String day = req.getParameter("day");
        //调用service
        List<MapVo> voList = service.findAllBy(day);

        //将数据返回给前端
        String jsonStr = JSONObject.toJSONString(voList);
        //将jsonzfc写到前端
        resp.getWriter().write(jsonStr);
    }
}

Servlet_Minute.class(实时显示充值金额和充值数量的Servlet)

package cn.pig.cmcc.controller;

import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.beans.MinutesKpiVo;
import cn.pig.cmcc.services.IMapIndexService;
import cn.pig.cmcc.services.IMinuteService;
import cn.pig.cmcc.services.MapIndexService;
import cn.pig.cmcc.services.MinuteKpiService;
import com.alibaba.fastjson.JSONObject;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * 充值成功业务分布Servlet访问接口
 */
@WebServlet(name = "Servlet_Minute",urlPatterns = "/minutesKpi.cmcc")
public class Servlet_Minute extends HttpServlet {
    //实例化service对象
    IMinuteService service = new MinuteKpiService();
    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

        resp.setContentType("application/json;charset=utf-8");
        //接收参数
        String day = req.getParameter("day");
        //获取时间
        Date date = new Date();
        SimpleDateFormat format = new SimpleDateFormat("HHmm");
        String time =  format.format(date);

        MinutesKpiVo vo = service.findBy(day,time);
        //将对象转换成json 并输出
        resp.getWriter().write(JSONObject.toJSONString(vo));

    }
}
IMapIndexService.java(地图业务接口)
package cn.pig.cmcc.services;

import cn.pig.cmcc.beans.MapVo;

import java.util.List;

public interface IMapIndexService {
    /**
     * 通过日期
     * @param day
     * @return
     */
    List<MapVo> findAllBy(String day);
}
IMinuteService.class(实时显示业务接口)
package cn.pig.cmcc.services;

import cn.pig.cmcc.beans.MinutesKpiVo;

public interface IMinuteService {
    /**
     * 根据日期和时间获取数据
     * @param date
     * @param hourMinutes
     * @return
     */
    MinutesKpiVo findBy(String date,String hourMinutes);
}
MapIndexService.class(处理地图业务)
package cn.pig.cmcc.services;

import cn.pig.cmcc.beans.MapVo;
import cn.pig.cmcc.utils.Constants;
import cn.pig.cmcc.utils.Jpools;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MapIndexService implements  IMapIndexService{
    @Override
    public List<MapVo> findAllBy(String day) {
        List<MapVo> list = new ArrayList<>();

        //从redis中读取数据
        Jedis jedis = Jpools.getJedis();
        Map<String,String> all = jedis.hgetAll(Constants.MAP_PREFIX+day);
        for(Map.Entry<String,String> entry:all.entrySet()){
            MapVo map = new MapVo();
            map.setName(entry.getKey());
            map.setValue(Integer.parseInt(entry.getValue()));
            list.add(map);
        }

        return list;

    }
}
MinuteKpiService.class(实时业务处理)
package cn.pig.cmcc.services;

import cn.pig.cmcc.beans.MinutesKpiVo;
import cn.pig.cmcc.utils.Constants;
import cn.pig.cmcc.utils.Jpools;
import redis.clients.jedis.Jedis;

public class MinuteKpiService implements IMinuteService {

    /**
     * 根据日期和时间获取数据
     * @param day
     * @param hourMinutes
     * @return
     */
    @Override
    public MinutesKpiVo findBy(String day, String hourMinutes) {
        MinutesKpiVo vo = new MinutesKpiVo();
        //获取数据
        Jedis jedis = Jpools.getJedis();
        //获取最近一分钟的充值金额,获取最近一分钟的充值笔数
        String money = jedis.hget(Constants.MUNITE_PREFIX+day,Constants.MINUTES_FIELD_M_PREFIX+hourMinutes);
        //获取最近一分钟的充值笔数
        String num = jedis.hget(Constants.MUNITE_PREFIX+day,Constants.MINUTES_FIELD_NUM_PREFIX+hourMinutes);

        jedis.close();
        vo.setCounts(num);
        vo.setMoney(money);
        return vo;
    }
}

Constants.class(redis数据库的数据前缀)

package cn.pig.cmcc.utils;

public class Constants {
    /**
     * 地图Key的前缀
     *
     */
    public static final String MAP_PREFIX="C-";

    /**
     * 每分钟充值金额和笔数的前缀
     */
    public static final String MUNITE_PREFIX="D-";

    /**
     * 每分钟充值笔数
     */
    public static final String MINUTES_FIELD_NUM_PREFIX="Num-";
    /**
     * 每分钟充值金额
     */
    public static final String MINUTES_FIELD_M_PREFIX="Money-";
}

Jpools.class(Redis数据库连接池)

package cn.pig.cmcc.utils;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * redis数据库连接池访问类
 */
public class Jpools {
    private static Config load = ConfigFactory.load();
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    private static JedisPool jedisPool = null;

    static{
        poolConfig.setMaxIdle(load.getInt("redis.pool.maxIdle"));
        poolConfig.setMaxTotal(load.getInt("redis.pool.maxIdle"));

        jedisPool = new JedisPool(poolConfig,load.getString("redis.ip"),load.getInt("redis.port"));
    }

    /**
     * 提供redis访问连接
     * @return
     */
    public static Jedis getJedis(){
        Jedis jedis = jedisPool.getResource();
        jedis.select(load.getInt("redis.db.index"));
        return jedis;
    }
}

application.conf

# 可用连接实例的最大数目,默认值为8;
# 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
redis.pool.maxActive=1024
# 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
redis.pool.maxIdle=200
# 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
redis.pool.testOnBorrow=true
# 当调用return Object方法时,是否进行有效性检查
redis.pool.testOnReturn=true
# redis 服务器地址
redis.ip="hadoop02"
# redis 端口
redis.port=6379
# redis数据库
redis.db.index = 5

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.pig</groupId>
  <artifactId>cmcc_visual_2</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>cmcc_visual_2 Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>

    <dependency>
      <groupId>javax.servlet.jsp</groupId>
      <artifactId>jsp-api</artifactId>
      <version>2.1</version>
    </dependency>


    <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>


    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.4.2</version>
    </dependency>


    <dependency>
      <groupId>com.typesafe</groupId>
      <artifactId>config</artifactId>
      <version>1.3.1</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.36</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>cmcc_visual_2</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

index.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
    <meta charset="utf-8">
    <title>CMS Report</title>
</head>

<body>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->

<%--<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>--%>
<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>



<script src="js/jquery-3.1.1.min.js"></script>
<script src="js/echarts2.x/echarts-all.js"></script>
<%--<script src="js/echarts/echarts.min.js"></script>--%>
<script src="js/echarts2.x/theme/dark.js"></script>
<script type="text/javascript">

    // 基于准备好的dom,初始化echarts图表
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setTheme(dark);

    var option = {
        title : {
            text: '全国充值业务成功量分布',
            x:'center'
        },
        tooltip : {
            trigger: 'item'
        },
        legend: {
            orient: 'vertical',
            x:'left',
            data:['成功量']
        },
        dataRange: {
//            min: 0,
//            max: 2500,
//            x: 'left',
//            y: 'bottom',
//            text:['高','低'],           // 文本,默认为数值文本
//            calculable : true

            x: 'left',
            y: 'bottom',
            splitList: [
                {start: 10000, color: '#E1022A'}, // label:自定义label   color:自定义颜色
                {start: 5000, end: 9999, color: '#E19106'},
                {start: 1000, end: 4999, color:'#6CAF00'},
                {start: 1, end: 999, color:'#A3E10A'},
                {start: 0, end: 0, color: '#C5CDDB'}
            ]// ,
            // color: ['#FF7E50', '#E09107', '#A3E00B']
        },
        toolbox: {
            show: true,
            orient : 'vertical',
            x: 'right',
            y: 'center',
            feature : {
                mark : {show: true},
                dataView : {show: true, readOnly: false},
                restore : {show: true},
                saveAsImage : {show: true}
            }
        },
        roamController: {
            show: true,
            x: 'right',
            mapTypeControl: {
                'china': true
            }
        },
        series : [
            {
                name: '成功量',
                type: 'map',
                mapType: 'china',
                roam: false,
                itemStyle:{
                    normal:{label:{show:true}},
                    emphasis:{label:{show:true}}
                },
                data: []
            }
        ]
    };


    // ajax getting data...............
    $.get("mapIndex.cmcc", { day: "20170412"},function(data){
        option.series[0].data = data;
        // 为echarts对象加载数据
        myChart.setOption(option);
    });


</script>
</body>
</html>

realtime.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
    <meta charset="utf-8">
    <title>demo</title>
    <!-- 引入 ECharts 文件 -->

</head>
<body>
<h1>每分钟实时充值统计</h1>

<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<div id="main" style="height:500px;border:1px solid #ccc;padding:10px;"></div>
<script src="js/jquery-3.1.1.min.js"></script>
<script src="js/echarts2.x/echarts-all.js"></script>
<script type="text/javascript">var timeTicket;var timeOutTicket</script>
<script type="text/javascript">

    var now = new Date();

    function pad2(n) { return n < 10 ? '0' + n : n }
    function generateTimeRequestNumber(date) {
        return date.getFullYear().toString() + pad2(date.getMonth() + 1) + pad2(date.getDate()) + pad2(date.getHours()) + pad2(date.getMinutes());//+ pad2(date.getSeconds());
    }
	// 初始化图标对象
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setTheme("macarons");

    var option = {
        title : {
            text: '实时充值'
        },
        tooltip : {
            trigger: 'axis'
        },
        legend: {
            data:['充值笔数', '充值金额']
        },
        toolbox: {
            show : true,
            feature : {
                mark : {show: true},
                dataView : {show: true, readOnly: false},
                magicType : {show: true, type: ['line', 'bar']},
                restore : {show: true},
                saveAsImage : {show: true}
            }
        },
        dataZoom : {
            show : false,
            start : 0,
            end : 100
        },
        xAxis : [
            {
                type : 'category',
                boundaryGap : true,
                data : (function (){
                    var res = [];
                    var len = 10;
                    while (len--) {
                        res.unshift(generateTimeRequestNumber(now));
                        now = new Date(now - 60000);
                    }
                    return res;
                })()
            },
            {
                type : 'category',
                boundaryGap : true,
                data : (function (){
                    var res = [];
                    var len = 10;
                    while (len--) {
                        res.push(len + 1);
                    }
                    return res;
                })()
            }
        ],
        yAxis : [
            {
                type : 'value',
                scale: true,
                name : '笔数',
                boundaryGap: [0, 0]
            },
            {
                type : 'value',
                scale: true,
                name : '金额',
                boundaryGap: [0, 0]
            }
        ],
        series : [
            {
                name:'充值金额',
                type:'bar',
                xAxisIndex: 1,
                yAxisIndex: 1,
                data:(function (){
                    var res = [];
                    var len = 10;
                    while (len--) {
                        res.push(0);
                    }
                    return res;
                })()
            },
            {
                name:'充值笔数',
                type:'line',
                data:(function (){
                    var res = [];
                    var len = 10;
                    while (len--) {
                        res.push(0);
                    }
                    return res;
                })()
            }
        ]
    };

    clearInterval(timeTicket);

    timeTicket = setInterval(function () {
        var lastData = 0;
        var d = 0;

        var axisData = generateTimeRequestNumber(new Date());

        // Ajax 发送到后台,从数据库中获取数据
        $.get("minutesKpi.cmcc", { day: "20170412"},function(data){
            d = parseInt(data.money); // 充值金额
            lastData = data.counts; // 充值笔数

            // 动态数据接口 addData
            myChart.addData([
                [
                    0,        // 系列索引
                    d,        // 新增数据, [柱状态图数据]
                    false,     // 新增数据是否从队列头部插入
                    false     // 是否增加队列长度,false则自定删除原有数据,队头插入删队尾,队尾插入删队头
                ],
                [
                    1,        // 系列索引
                    lastData, // 新增数据 [ 折线的数量]
                    false,    // 新增数据是否从队列头部插入
                    false,    // 是否增加队列长度,false则自定删除原有数据,队头插入删队尾,队尾插入删队头
                    axisData  // 坐标轴标签, x轴的时间
                ]
            ]);
        });
    }, 60000); // setInterval 每隔60秒执行一次realTimeData()

    myChart.setOption(option);
</script>
</body>
</html>

四、结果显示

1 mysql中offset的存储

2 Redis中业务数据的存储

3 页面地图的显示

4 实时充值显示(按分钟显示)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年10月23日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、项目介绍
    • 1.1 项目背景
      • 1.2 技术选型
        • 1.3 项目需求
        • 二、项目流程
          • 2.1 Flume监控文件夹收集数据传给kafka
            • 2.2 消息订阅系统Kafka
            • 2.3 SparkStreaming进行数据处理
              • 2.4 Mysql 存储Offset
                • 2.5 Redis存储业务数据
                • 三、代码实现
                  • 3.1 收集,分析,处理,保存数据。
                    • 3.2 动态页面实现
                    • 四、结果显示
                    相关产品与服务
                    云数据库 Redis
                    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档