Spark 如何写入HBase/Redis/MySQL/Kafka

这篇文章是给Spark初学者写的,老手就不要看了。文章谈及如何和HBase/Redis/MySQL/Kafka等进行交互的方法,主要是为了让大家明白其内部机制

一些概念

一个partition 对应一个task,一个task 必定存在于一个Executor,一个Executor 对应一个JVM.

  • Partition 是一个可迭代数据集合
  • Task 本质是作用于Partition的线程

问题

Task 里如何使用Kafka Producer 将数据发送到Kafaka呢。 其他譬如HBase/Redis/MySQL 也是如此。

解决方案

直观的解决方案自然是能够在Executor(JVM)里有个Prodcuer Pool(或者共享单个Producer实例),但是我们的代码都是 现在Driver端执行,然后将一些函数序列化到Executor端执行,这里就有序列化问题,正常如Pool,Connection都是无法序列化的。

一个简单的解决办法是定义个Object 类,

譬如

object SimpleHBaseClient {
  private val DEFAULT_ZOOKEEPER_QUORUM = "127.0.0.1:2181"

  private lazy val (table, conn) = createConnection

  def bulk(items:Iterator) = {
      items.foreach(conn.put(_))
      conn.flush....
  } 
 ......
}

然后保证这个类在map,foreachRDD等函数下使用,譬如:

dstream.foreachRDD{ rdd =>
    rdd.foreachPartition{iter=>
        SimpleHBaseClient.bulk(iter)  
    }
}

为什么要保证放到foreachRDD /map 等这些函数里呢? Spark的机制是先将用户的程序作为一个单机运行(运行者是Driver),Driver通过序列化机制,将对应算子规定的函数发送到Executor进行执行。这里,foreachRDD/map 等函数都是会发送到Executor执行的,Driver端并不会执行。里面引用的object 类 会作为一个stub 被序列化过去,object内部属性的的初始化其实是在Executor端完成的,所以可以避过序列化的问题。

Pool也是类似的做法。然而我们并不建议使用pool,因为Spark 本身已经是分布式的,举个例子可能有100个executor,如果每个executor再搞10个connection 的pool,则会有100*10 个链接,Kafka也受不了。一个Executor 维持一个connection就好。

关于Executor挂掉丢数据的问题,其实就看你什么时候flush,这是一个性能的权衡。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

hadoop系列之基础系列

一、Hadoop基础 1、分布式概念 通过爬虫-->爬到网页存储-->查找关键字 一台机器存储是有限的 Google采用多台机器,...

3817
来自专栏Hadoop实操

Sqoop抽数到Hive表异常分析(之二)

使用Sqoop抽取MySQL数据到Hive表时,抽取语句正常执行在数据Load到Hive表时报“Operation category READ is not s...

1473
来自专栏美团技术团队

【技术博客】Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、...

5306
来自专栏cloudskyme

hadoop使用(五)

第1章 引言 1.1 编写目的 对关于hadoop的文档及资料进行进一步的整理。 1.2 相关网站    毋庸置疑 http://hadoop.apache.o...

3495
来自专栏Hadoop实操

如何使用Oozie API接口向非Kerberos环境的CDH集群提交Shell工作流

前面Fayson介绍了《如何使用Oozie API接口向非Kerberos环境的CDH集群提交Spark作业》和《如何使用Oozie API接口向非Kerber...

3367
来自专栏Spark学习技巧

Spark调优系列之序列化方式调优

由于大多数的spark计算是基于内存的的天性,spark应用的瓶颈一般受制于集群的CPU,网络带宽,内存。大部分情况下,如果内存适合当前数据量的计算,那么瓶颈往...

3179
来自专栏美图数据技术团队

快速、安全、可靠!Yarn!| MTdata小讲堂

Yarn 的全称是 Yet Anther Resource Negotiator(另一种资源协商者)。它作为 Hadoop 的一个组件,官方对它的定义是一个工作...

1282
来自专栏LuckQI

Spark计算RDD介绍

1202
来自专栏赵俊的Java专栏

Python 版 WordCount

2153
来自专栏大数据-Hadoop、Spark

2018-08-08

1、spark程序停-启,实时数据量一下子太多,如何处理 2、spark程序数据丢失,如何处理?duration是多少?

852

扫码关注云+社区

领取腾讯云代金券