专栏首页品茗IT从Java流到Spring Cloud Stream,流到底为我们做了什么?

从Java流到Spring Cloud Stream,流到底为我们做了什么?

从Java流到Spring Cloud Stream,流到底为我们做了什么?

一、概述

首先,网络释义:流是一个相对抽象的概念,所谓流就是一个传输数据的通道,这个通道可以传输相应类型的数据。进而完成数据的传输。这个通道被实现为一个具体的对象。

当你第一次学习JAVA流,各种InputStream、OutputStream,觉得看到你眼花,然而,你眼花的太早了,了解的越多,你越会发现,Java流、Jdk8 stream、Spring Cloud Stream、kafkaStream、Spark Streaming、Apache Storm等(这些还只是我听过名字的),怎么流越来越多了?怎么什么都叫流?流到底是什么?

那就让我来告诉你吧,本篇整理了下Java应用中为人所知的流及概念,让你对流有一个清晰的认识。

二、Java流

从功能上区分,可以分为输入输出流:

  • 输入流:从外部空间(文件、网络连接、内存块)读入字节序列的管道(对象)。
  • 输出流:可以向外部空间(文件、网络连接、内存块)写入字节序列的管道(对象)。

从读取方式来区分,可以分为字节流字符流:

  • InputStream (字节输入流);
  • OutputStream(字节输出流);
  • Reader(字符输入流);
  • Writer(字符输出流);

2.1 InputStream

InputStream 类是字节输入流的父类。InputStream 类的常用子类如下。

  • ByteArrayInputStream 类:将字节数组转换为字节输入流,从中读取字节。
  • FileInputStream 类:从文件中读取数据。
  • PipedInputStream 类:连接到一个 PipedOutputStream(管道输出流)。
  • SequenceInputStream 类:将多个字节输入流串联成一个字节输入流。
  • ObjectInputStream 类:将对象反序列化。
  • BufferedInputStream 类:缓冲输入流,为另一个输入流添加一些功能。

2.2 OutputStream

OutputStream 类是字节输出流的父类。OutputStream 类的常用子类如下。

  • ByteArrayOutputStream 类:向内存缓冲区的字节数组中写数据。
  • FileOutputStream 类:向文件中写数据。
  • PrintStream 类:继承了FilterOutputStream.是"装饰类"的一种,为其他的输出流添加功能.使它们能够方便打印各种数据值的表示形式。
  • PipedOutputStream 类:连接到一个 PipedlntputStream(管道输入流)。
  • ObjectOutputStream 类:将对象序列化。
  • BufferedOutputStream 类:缓冲的输出流。通过设置这种输出流,应用程序就可以将各个字节写入底层输出流中,而不必针对每次字节写入调用底层系统。

2.3 Reader

Reader 类是字符流输入类的父类;Reader 类的常用子类如下。

  • CharArrayReader 类:将字符数组转换为字符输入流,从中读取字符。
  • StringReader 类:将字符串转换为字符输入流,从中读取字符。
  • BufferedReader 类:为其他字符输入流提供读缓冲区。
  • PipedReader 类:连接到一个 PipedWriter。
  • InputStreamReader 类:将字节输入流转换为字符输入流,可以指定字符编码。
  • FileReader 类:继承自InputStreamReader,该类按字符读取文件流中数据。

2.4 Writer

Writer 类是所有字符输出流的父类,Writer 类的常用子类如下。

  • PrintWriter类:也是装饰器模式,向输出流打印对象的格式化表示形式
  • CharArrayWriter 类:向内存缓冲区的字符数组写数据。
  • StringWriter 类:向内存缓冲区的字符串(StringBuffer)写数据。
  • BufferedWriter 类:为其他字符输出流提供写缓冲区。
  • PipedWriter 类:连接到一个 PipedReader。
  • OutputStreamReader 类:将字节输出流转换为字符输出流,可以指定字符编码。
  • FileWriter类:继承自OutputStreamReader,该类按字符向文件流中写入数据;

结论:从以上的各种流可以看出,Java IO包中的所有流,不论网络数据还是文件数据,都是为了将数据从缓冲区拿出来,再把数据塞回缓冲区。属于传统意义上的IO流。

三、Jdk8 Stream流

Java 8 API添加了一个新的抽象称为流Stream。Stream是元素的集合,这点让Stream看起来有些类似Iterator,可以支持顺序和并行的对原Stream进行汇聚的操作,可以称之为高级版本的Iterator。

Stream流和传统的IO流,它们都叫流,却是两个完全不一样的概念和东西。

Stream(流)是一个来自数据源的元素队列并支持聚合操作:

  • 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
  • 数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
  • 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。

3.1 创建流

  1. 调用集合的stream()方法或者parallelStream()方法。
  2. Stream.of()方法,有针对int,long的专用流IntStream,LongStream。

3.2 使用流

String names = "abc,asdasd,2423sad,,cff,,";
List<String> listNames = Arrays.asList(names.split(",")).stream().map(s -> s + "heihei")
		.filter(s -> s == null ? false : true).collect(Collectors.toList());
List<String> listNamesParrel = Arrays.asList(names.split(",")).parallelStream().map(s -> s + "heihei")
		.filter(s -> s == null ? false : true).collect(Collectors.toList());
System.out.println(listNames); //[abcheihei, asdasdheihei, 2423sadheihei, heihei, cffheihei]
System.out.println(listNamesParrel);//[abcheihei, asdasdheihei, 2423sadheihei, heihei, cffheihei]

IntStream is = IntStream.of(1,2,3,4,5);
System.out.println(is.average().getAsDouble()); //3.0

结论:Jdk8的Stream流,只是对遍历的一种改造,同时支持并行计算(parallelStream,它通过默认的ForkJoinPool,可能提高你的多线程任务的速度),但一定要注意parallelStream会产生线程安全问题,所以在parallelStream里面使用的外部变量,比如集合一定要使用线程安全集合,不然就会引发多线程安全问题。

3.3 Stream的性能

Stream API借助于同样新出现的Lambda表达式,极大的提高编程效率和程序可读性。同时,它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用fork/join并行方式来拆分任务和加速处理过程。

Java8中首次出现的 java.util.stream是一个函数式语言+多核时代综合影响的产物。

开始使用 Java 8 的第一件事情是在实践中使用 lambda 表达式和流。但是请记住:它确实非常好,好到可能会让你上瘾!但是,我们也看到了,使用传统迭代器和 for-each 循环的 Java 编程风格比 Java 8 中的新方式性能高很多。

当然,这也不是绝对的。但这确实是一个相当常见的例子,它显示可能会有大约 5 倍的性能差距。如果这影响到系统的核心功能或成为系统一个新的瓶颈,那就相当可怕了。

四、Spring Cloud Stream

了解SpringCloud流的时候,我们会发现,SpringCloud还有个Data Flow(数据流)的项目,下面是它们的区别:

  1. Spring Cloud Stream:数据流操作开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。是一套用于创建消息驱动(message-driven)微服务的框架。通过向主程序添加@EnableBinding,可以立即连接到消息代理,通过向方法添加@StreamListener,您将收到流处理事件。
  2. Spring Cloud Data Flow:大数据操作工具,作为Spring XD的替代产品,它是一个混合计算模型,结合了流数据与批量数据的处理方式。是构建数据集成和实时数据处理流水线的工具包。
  3. Spring Cloud Data Flow的其中一个章节是包含了Spring Cloud Stream,所以应该说Spring Cloud Data Flow的范围更广,是类似于一种解决方案的集合,而Spring Cloud Stream只是一套消息驱动的框架。
  4. Spring Cloud Stream是在Spring Integration的基础上发展起来的。它为开发人员提供了一致的开发经验,以构建可以包含企业集成模式以与外部系统(例如数据库,消息代理等)连接的应用程序。

在这里插入图片描述

如图所示,Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。

结论:Spring Cloud Stream以消息作为流的基本单位,所以它已经不是狭义上的IO流,而是广义上的数据流动,从生产者到消费者的数据流动。

五、其他

其他的流还有kafkaStream、Spark Streaming、Apache Storm等,这些我只是叫得上名字,kafkaStream有了一些基本了解,但没实际应用过。

但是这些工具,都是类似于Spring Cloud Stream,属于广义上的数据传输,属于大数据流的范畴。下面对这三种流做简单介绍。

kafkaStream:Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统。Kafka Stream基于一个重要的流处理概念。如正确的区分事件时间和处理时间,窗口支持,以及简单而有效的应用程序状态管理。Kafka Streams的入口门槛很低: 你可以快速的编写和在单台机器上运行一个小规模的概念证明(proof-of-concept);而你只需要运行你的应用程序部署到多台机器上,以扩展高容量的生产负载。Kafka Stream利用kafka的并行模型来透明的处理相同的应用程序作负载平衡。

Spark Streaming: Spark流是对于Spark核心API的拓展,从而支持对于实时数据流的可拓展,高吞吐量和容错性流处理。数据可以由多个源取得,例如:Kafka,Flume,Twitter,ZeroMQ,Kinesis或者TCP接口,同时可以使用由如map,reduce,join和window这样的高层接口描述的复杂算法进行处理。最终,处理过的数据可以被推送到文件系统,数据库和HDFS。

Apache Storm:这是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。他是一个流数据框架,具有最高的社区率。虽然Storm是无状态的,它通过ApacheZooKeeper管理分布式环境和集群状态。使用起来非常简单,并且还支持并行地对实时数据执行各种操作。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • SpringBoot入门建站全系列(七)日志组件的使用

    前面六篇已经对SpringBoot的基础用做了介绍,日常项目使用已经足够,本篇介绍下SpringBoot日志使用的注意事项。

    品茗IT
  • SpringBoot入门建站全系列(七)日志组件的使用

    前面六篇已经对SpringBoot的基础用做了介绍,日常项目使用已经足够,本篇介绍下SpringBoot日志使用的注意事项。

    品茗IT
  • SpringCloud微服务实战系列(十六)应用监控之SpringBootAdmin的使用

    Actuaotr是spring boot项目中非常强大的一个功能,有助于对应用程序进行监控和管理,通过restful api请求来监管、审计、收集应用的运行情况...

    品茗IT
  • Netflix时代之后Spring Cloud微服务的未来

    如果有人会问你有关Spring Cloud的问题,那么你想到的第一件事可能就是Netflix OSS的支持。对Eureka,Zuul或Ribbon等工具的支持不...

    用户1516716
  • Netflix时代之后Spring Cloud微服务的未来

    如果有人会问你有关Spring Cloud的问题,那么你想到的第一件事可能就是Netflix OSS的支持。对Eureka,Zuul或Ribbon等工具的支持不...

    程序猿DD
  • 从互联网+看中央经济工作会议

    孟昭莉 腾讯研究院首席经济学家,产业与经济研究中心主任   12月21日,中央经济工作会议落下帷幕。会议提出,要在适度扩大总需求的同时,提高供给体系质量和效...

    腾讯研究院
  • CDP-DC中Atlas集成FreeIPA的LDAP认证

    Cloudera从CM6.3版本开始,引入了Red Hat IdM来做整个集群的认证,Red Hat IdM对应的软件为FreeIPA,在本文中描述如何使用Fr...

    大数据杂货铺
  • Machine Learning最小可迭代产品No.75

    报告各位首长,我参与的第二个项目顺利上线啦~ 棒棒,又一次感觉自己做的东西是有价值的,这个项目是一个平台类产品,专注于提高线下零售的实施效率,希望后面的迭代会越...

    企鹅号小编
  • 金融科技向左,数字科技向右,分野成为主流

    在互联网金融持续遭遇监管的时候,人们开始寻找互联网金融的接棒者,以此来抵消互联网金融落幕带来的空白。在资本与巨头的联合推动下,金融科技一度成为互联网金融玩家们竞...

    孟永辉
  • Machine Learning最小可迭代产品No.75

    报告各位首长,我参与的第二个项目顺利上线啦~ 棒棒,又一次感觉自己做的东西是有价值的,这个项目是一个平台类产品,专注于提高线下零售的实施效率,希望后面的迭代会越...

    大蕉

扫码关注云+社区

领取腾讯云代金券