前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据-Flink编程

大数据-Flink编程

作者头像
码客说
发布2022-10-04 21:35:27
1K0
发布2022-10-04 21:35:27
举报
文章被收录于专栏:码客码客

加载数据

代码中加载

代码语言:javascript
复制
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)

  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val env = ExecutionEnvironment.getExecutionEnvironment
    val stu: DataSet[(Int, String, Double)] = env.fromElements(
      (19, "Wilson", 178.8),
      (17, "Edith", 168.8),
      (18, "Joyce", 174.8),
      (18, "May", 195.8),
      (18, "Gloria", 182.7),
      (21, "Jessie", 184.8)
    )
    stu.print
  }
}

从文件中加载

代码语言:javascript
复制
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
object WordCount {
  case class Student(id: String, name: String, sex: String, age: Int, department: String)
  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val environment = ExecutionEnvironment.getExecutionEnvironment
    val stu_list: DataSet[Student] = environment.readCsvFile[Student](
      filePath = "file:///D:/bigdata_study/stu_list.txt",
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      quoteCharacter = null,
      ignoreFirstLine = false,
      ignoreComments = "#",
      lenient = false,
      includedFields = Array[Int](0, 1, 2, 3, 4),
      pojoFields = Array[String]("id", "name", "sex", "age", "department")
    )
    println("-------------原数据----------")
    stu_list.print
  }
}

Kafka中加载

代码语言:javascript
复制
import java.util
import java.util.Properties

import com.google.gson.Gson
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import org.elasticsearch.client.{Requests, RestClientBuilder}

/**
* 红尘丶世界
*
*/
object FLink_Kafka_ES {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 非常关键,一定要设置启动检查点!!
    env.enableCheckpointing(1000)

    //设置kafka topic
    val topic: String = "test"
    //配置kafka参数
    val props: Properties = new Properties
    props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")
    props.setProperty("group.id", "test01")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

    //导入隐式转换
    import org.apache.flink.streaming.connectors.kafka._
    import org.apache.flink.api.scala._
    import scala.collection.JavaConverters._

    val consumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), props)
    //设置最新的数据进行消费
    consumer.setStartFromLatest()
    //构建数据源
    val kafkaSource: DataStream[String] = env.addSource(consumer)
    //进行转换
    val mapDS: DataStream[Map[String, AnyRef]] = kafkaSource.map(x => {
      //创建Gson解析对象, 把json转化成map
      (new Gson).fromJson(x, classOf[util.Map[String, AnyRef]]).asScala.toMap
    })

    //配置节点信息
    val httpHosts: util.ArrayList[HttpHost] = new java.util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("192.168.100.111", 9200, "http"))
    //构建es sink
    val esSinkBuilder: ElasticsearchSink.Builder[Map[String, AnyRef]] = new ElasticsearchSink.Builder[Map[String, AnyRef]](
      httpHosts,
      new ElasticsearchSinkFunction[Map[String, AnyRef]] {
        override def process(t: Map[String, AnyRef], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val map: util.Map[String, AnyRef] = t.asJava
          val indexRequest: IndexRequest = Requests
          .indexRequest()
          .index("flink_kafka")
          //.`type`("kafka_data") //非必选项ES 7.x中不需要再设置文档
          //.create(false) //是否自动创建索引,不推荐使用,最好提前在es中进行Mapping映射,当然如果你的时间字段能够被ES自动识别可以让它自动创建
          //因为ES命名的问题,无法直接使用ES的命名
          //如需使用 x.x 命名格式, 可以考虑嵌套map或者json
          //如使用嵌套map需注意把所有的 map 都需要转化成 java.util.map 否则会爆类型异常
          .source(map)
          //发送请求,写入数据
          requestIndexer.add(indexRequest)
          //写入数据成功输出一下
          println("data saved successfully")
        }
      })
    //以下的一些配置可作为生产环境使用, es容错需配合flink 检查点使用
    //设置最大并行度
    //esSinkBuilder.setBulkFlushMaxActions(1)
    //设置es sink 的参数
    esSinkBuilder.setRestClientFactory(
      new RestClientFactory {
        override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
          restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback {
            override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
              val provider: BasicCredentialsProvider = new BasicCredentialsProvider()
              //设置用户名和密码
              val credentials: UsernamePasswordCredentials = new UsernamePasswordCredentials("elastic", "123456")
              provider.setCredentials(AuthScope.ANY, credentials)
              httpClientBuilder.setDefaultCredentialsProvider(provider)
            }
          })
        }
      })
    //进行重试的时间间隔。对于指数型则表示起始的基数
    esSink.setBulkFlushBackoffDelay(1)
    //失败重试的次数
    esSink.setBulkFlushBackoffRetries(3)
    //重试策略,又可以分为以下两种类型
    //a、指数型,表示多次重试之间的时间间隔按照指数方式进行增长。eg:2 -> 4 -> 8 ...
    //b、常数型,表示多次重试之间的时间间隔为固定常数。eg:2 -> 2 -> 2 ...
    esSink.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL)
    //设置批量提交时间间隔
    //esSink.setBulkFlushInterval(100)
    //该配置表示批量写入ES时的记录条数
    esSink.setBulkFlushMaxActions(1)
    //设置批量提交的最大字节 以MB为单位
    //esSink.setBulkFlushMaxSizeMb(16)
    //es 容错处理
    esSink.setFailureHandler(
      new ActionRequestFailureHandler {
        override def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = {
          if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent) {
            // full queue; re-add document for indexing
            requestIndexer.add(actionRequest)
          } else if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent) {
            // malformed document; simply drop request without failing sink
            println("WARN   数据格式出错了")
          } else {
            // for all other failures, fail the sink;
            // here the failure is simply rethrown, but users can also choose to throw custom exceptions
            println("ES 出问题了")
            throw throwable
          }
        }
      }
    )
    esSink
  }
  //设置最大并行度
  mapDS.setMaxParallelism(1)
  //把数据sink到es
  mapDS.addSink(esSinkBuilder.build())

  //生产数据命令如下
  // $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test
  //kafka中输入的测试数据 
  // {"id":1,"completed":false,"title":"delectus aut autem","userId":1}

  //查看索引
  //Get _cat/indices
  //查看索引中的内容
  //Get flink_kafka/_search
  //批量请求的配置;这将指示接收器在每个元素之后发出请求,否则将对它们进行缓冲。
  env.execute("Kafka_Flink")
}
}

依赖

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.cloudfall</groupId>
  <artifactId>flink_elk</artifactId>
  <version>1.0-SNAPSHOT</version>

  <!-- 版本管理 -->
  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.12.15</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
    <flink.version>1.9.3</flink.version>
    <scala.binary.version>2.12.15</scala.binary.version>
  </properties>

  <dependencies>
    <!-- 导入scala的依赖 -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- 导入flink streaming 和 scala的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <exclusions>
        <exclusion>
          <artifactId>scala-library</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <!-- 导入flink和scala的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- 指定flink-client API的版本 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <exclusions>
        <exclusion>
          <artifactId>scala-parser-combinators_${scala.compat.version}</artifactId>
          <groupId>org.scala-lang.modules</groupId>
        </exclusion>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- 指定flink-connector-elasticsearch的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch7_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <exclusions>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- 指定flink-connector-kafka的依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_${scala.compat.version}</artifactId>
      <version>${flink.version}</version>
      <exclusions>
        <exclusion>
          <artifactId>slf4j-api</artifactId>
          <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
          <artifactId>snappy-java</artifactId>
          <groupId>org.xerial.snappy</groupId>
        </exclusion>
      </exclusions>
    </dependency>

    <!--       指定fast json的依赖 -->
    <!--        <dependency>-->
    <!--            <groupId>com.alibaba</groupId>-->
    <!--            <artifactId>fastjson</artifactId>-->
    <!--            <version>1.2.60</version>-->
    <!--        </dependency>-->
    
    <!-- 指定Google json 的依赖 -->
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.3.1</version>
    </dependency>

  </dependencies>
  <!-- 打包插件-->
  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.5.1</version>
        <configuration>
          <source>${maven.compiler.source}</source>
          <target>${maven.compiler.target}</target>
          <!--<encoding>${project.build.sourceEncoding}</encoding>-->
        </configuration>
      </plugin>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <!--<arg>-make:transitive</arg>-->
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>

            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer
                             implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>cn.cloudFall.FLinkKafka</mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

数据导出

导出到HDFS

代码语言:javascript
复制
//写入到HDFS
val output2 = "hdfs://bdedev/flink/Student002.csv"
ds2.writeAsCsv(output2, rowDelimiter = "\n", fieldDelimiter = "|||", WriteMode.OVERWRITE)
env.execute()

导出到文件

代码语言:javascript
复制
//写入到文件
val output2 = "file:///D:/bigdata_study/result001.txt"
ds3.writeAsCsv(output2, rowDelimiter = "\n", fieldDelimiter = ",", WriteMode.OVERWRITE)
env.execute()

值转换

Flink的Transformation转换主要包括四种:

  1. 单数据流基本转换
  2. 基于Key的分组转换
  3. 多数据流转换
  4. 数据重分布转换

单数据流基本转换

image-20220926175102708
image-20220926175102708

基于Key的分组转换

对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。

groupBy会将一个DataSet转化为一个GroupedDataSet,聚合操作会将GroupedDataSet转化为DataSet。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。

image-20220926180418786
image-20220926180418786

aggregation

常见的聚合操作有summaxmin等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟groupBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。

与批处理不同,这些聚合函数是对流数据进行数据,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。

代码语言:javascript
复制
val tupleStream = env.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
tupleStream.groupBy(0).sum(1).print()

第0个分组,第1个求和

结果

(1,3,8) (0,3,2)

reduce

代码语言:javascript
复制
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}

object WordCount {
  def main(args: Array[String]): Unit = {
    //设置用户名,避免权限错误
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    val env = ExecutionEnvironment.getExecutionEnvironment
    val ds1: DataSet[String] = env.fromElements(
      "good good study", "day day up"
    )
    val group_ds  = ds1.flatMap(line => line.split(" ")).map(word => (word, 1)).groupBy(0)
    val ds3 = group_ds.reduce((a, b) => (a._1, a._2 + b._2))
    ds3.sortPartition(0, Order.ASCENDING).print
  }
}

结果

(up,1) (day,2) (good,2) (study,1)

机器学习Alink

Spark对应的机器学习框架SparkML

Flink对应的机器学习框架FlinkML/Alink

FlinkML

https://github.com/apache/flink-ml

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.12</artifactId>
  <version>1.9.3</version>
</dependency>

Alink

Alink与SparkML算法相比,Alink算法更全面,性能更优异,场景更丰富(同时支持流批),本地化更出色(支持中文分词)是快速搭建在线机器学习系统的不二之选。

镜像仓库:Alink

教程:https://www.yuque.com/pinshu/alink_tutorial/book_java

img
img
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 加载数据
    • 代码中加载
      • 从文件中加载
        • Kafka中加载
        • 数据导出
        • 值转换
          • 单数据流基本转换
            • 基于Key的分组转换
              • aggregation
              • reduce
          • 机器学习Alink
          相关产品与服务
          Elasticsearch Service
          腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档