前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink维表关联系列之自定义异步查询

flink维表关联系列之自定义异步查询

作者头像
Flink实战剖析
发布2022-04-18 11:57:56
4940
发布2022-04-18 11:57:56
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

在异步IO查询外部存储时,对于提供异步查询的客户端来说可以直接使用,但是对于没有提供异步查询的客户端应该怎么做呢?我们可以将查询请求丢到一个线程池中,将这个线程池看做是一个异步的客户端来帮助我们完成查询请求。

通过线程池方式来帮助我们完成异步请求关键在于线程池的core大小如何设置,如果设置过大,会到导致创建很多个线程,势必会造成CPU的压力比较大,由于大多数情况下集群是没有做CPU隔离策略的,就会影响到其他任务;如果设置过小,在处理的速度上根不上就会导致任务阻塞。可以做一个粗略的估算:假如任务中单个Task需要做维表关联查询的数据每秒会产生1000条,也就是1000的TPS,我们希望能够在1s以内处理完这1000条数据,如果外部单次查询耗时是10ms, 那我们就需要10个并发同时执行,也就是我们需要的coreSize 是10。 以查询mysql为例:

代码语言:javascript
复制
class ExecSideFunction extends RichAsyncFunction[String, String] {



  var executors: Executor = _

  var sqlTemplate: String = _



  override def open(parameters: Configuration): Unit = {

    executors = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue[Runnable](1000))

    sqlTemplate = "select value from tbl1 where id=?"

  }



  override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {



    executors.execute(new Runnable {

      override def run(): Unit = {

        val con = ConnectionFactory.getConnection("sourceId").asInstanceOf[Connection]

        val sql = sqlTemplate.replace("?", parseKey(input))

        MysqlUtil.executeSelect(con, sql, rs => {

          val res = new util.ArrayList[String]()

          while (rs.next()) {

            val v = rs.getString("value")

            res.add(fillData(input, v))

          }

          resultFuture.complete(res)

        })

        con.close()

      }

    })

  }



  private def parseKey(input: String): String = {

    ""

    }



  private def fillData(input: String, v: String): String = {

    ""

  }

}

end

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档