专栏首页工作笔记精华flink读取kafka数据并写入HDFS 转

flink读取kafka数据并写入HDFS 转

### 本地代码flink streaming读取远程环境的kafka的数据,写入远程环境的HDFS中;

public static void main(String[] args) throws Exception {
 
        // set up the streaming execution environment
        final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
   
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
//目标环境的IP地址和端口号
        properties.setProperty("bootstrap.servers", "192.168.0.1:9092");//kafka
//kafka版本0.8需要;
//        properties.setProperty("zookeeper.connect", "192.168.0.1:2181");//zookeepe
        properties.setProperty("group.id", "test-consumer-group"); //group.id
//第一种方式:
//这里很重要,填写hdfs-site.xml和core-site.xml的路径,可以把目标环境上的hadoop的这两个配置拉到本地来,这个是我放在了项目的resources目录下。
 //       properties.setProperty("fs.hdfs.hadoopconf", "E:\\Ali-Code\\cn-smart\\cn-components\\cn-flink\\src\\main\\resources");
//第二种方式: 
 properties.setProperty("fs.default-scheme","hdfs://ip:8020");
 
//根据不同的版本new不同的消费对象;
//        FlinkKafkaConsumer09<String> flinkKafkaConsumer09 = new FlinkKafkaConsumer09<String>("test0", new SimpleStringSchema(),properties);
        FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), properties);
//        flinkKafkaConsumer010.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
        DataStream<String> keyedStream = env.addSource(flinkKafkaConsumer010);
        keyedStream.print();
        // execute program
 
        System.out.println("*********** hdfs ***********************");
        BucketingSink<String> bucketingSink = new BucketingSink<>("/var"); //hdfs上的路径
        BucketingSink<String> bucketingSink1 = bucketingSink.setBucketer((Bucketer<String>) (clock, basePath, value) -> {
            return basePath;
        });
        bucketingSink.setWriter(new StringWriter<>())
                .setBatchSize(1024 * 1024 )  
                .setBatchRolloverInterval(2000); 
 
        keyedStream.addSink(bucketingSink);
    
        env.execute("test");
    }
在远程目标环境上hdfs的/var下面生成很多小目录,这些小目录是kafka中的数据;

问题:
1. 这种方式生成的hdfs文件不能够被spark sql去读取;

解决: 将数据写成parquet格式到hdfs上可解决这个问题;见另一篇博客

https://blog.csdn.net/u012798083/article/details/85852830

2. 如果出现大量inprocess的文件,怎么办?

解决: 将数据量加大一点;

3. 如何增加窗口处理?

解决:见另一篇博客:https://blog.csdn.net/u012798083/article/details/85852830

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • flink sql fromDataSet fromDataStream 使用row

    提示org.apache.flink.table.api.TableException: An input of GenericTypeInfo<Row> ca...

    stys35
  • Redis实现分布式锁3-使用LUA脚本实现分布式锁,解决原子性问题

    stys35
  • Storm(三)Java编写第一个本地模式demo

    本地模式 在本地模式下,Storm拓扑结构运行在本地计算机的单一JVM进程上。这个模式用于开发、测试以及调试,因为这是观察所有组件如何协同工作的最简单方法。在...

    stys35
  • [android] 天气app布局练习(四)

    陶士涵
  • web3.fromWei

    用户1408045
  • 使用Redis实现延时任务(一)

    最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。

    Throwable
  • bootstrap 轮播 幻灯片 常用样式

    <div id="myCarousel" class="carousel slide"> <ol class="carousel-indicators">...

    用户5760343
  • Flask配置Cors跨域

    跨域是指:浏览器A从服务器B获取的静态资源,包括Html、Css、Js,然后在Js中通过Ajax访问C服务器的静态资源或请求。即:浏览器A从B服务器拿的资源,资...

    用户1558882
  • Kafka运维填坑Kafka源码分析-汇总

    调用Runtime.getRuntime.halt(1)直接暴力退出了. 可参考Kafka issue: Unclean leader election an...

    扫帚的影子
  • Java Getter和Setter

    Author:杭州电子科技大学 2016级管理学院 工商管理 唐涛 16011324@hdu.edu.cn

    用户3906512

扫码关注云+社区

领取腾讯云代金券