前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在Mule 4 Beta中实现自动流式传输

如何在Mule 4 Beta中实现自动流式传输

作者头像
Steve Wang
发布2018-06-07 16:40:44
2.1K0
发布2018-06-07 16:40:44
举报
文章被收录于专栏:从流域到海域从流域到海域

How Automatic Streaming in Mule 4 Beta Works

原文作者:Mariano Gonzalez

原文地址:https://dzone.com/articles/how-automatic-streaming-in-mule-4-beta-works

译者微博:@从流域到海域

译者博客:blog.csdn.net/solo95

如何在Mule 4 Beta中实现自动流式传输

现在流传输就像喝啤酒那样简单!Mule 4使您能够处理,访问,转换以及传输数据的方式有了令人难以置信的改善。对于特定的流式传输,Mule 4支持多个并行数据读取,没有副作用,并且用户无需先将数据缓存到内存中。

很多人不熟悉流传输的概念。因此,在我们深入了解Mule 4的流媒体特性之前,我们首先介绍一些能比较突出其价值的用例。

示例1:HTTP> 2 Files

在这个简单的流程中,您从HTTP(比方说,带有JSON的POST)接收内容,然后将其写入两个文件。运行后得到的结果是什么?第一个文件被正确写入。第二个文件被创建,但其内容为空。

示例2:HTTP> Logs> File

这个例子接收到相同的JSON POST,但是这一次它会记录它并将其写入文件。这个流程的输出是你所期望的。其中内容被记录并且文件也被写入。但行为是否正确?最简洁的答案是不。

长然而简洁的原因是,为了记录有效载荷,记录器必须完全处理掉(consume)流,这意味着它的全部内容将被加载到内存中。消息传到文件连接器时,内容已全部在内存中。大多数时候,这并不是问题; 但如果内容体量过大并且将其加载到内存中,则应用程序很可能会耗尽内存 - 这威胁到应用程序的稳定性。

示例3:HTTP> Scatter-Gather> Whatever

现在,让我们尝试同样的例子但使用分散收集组件(scatter-gather component)(仅用于说明目的)。这种情况只是失败。一个流不能同时被两个不同的线程使用,因此该组件只有两个选项:

  1. 将整个流加载到内存中(如记录器一样)。
  2. 失败。

分散收集组件选择了后者。

但为什么?

这是我们真正需要了解流式传输含义含义的部分。处理流有两个问题:

  1. 它只能被读取一次。
  2. 它不能并行读取。

赫拉克利特说,你不能在同一条河流洗两次澡。这是因为每次洗澡时,组成这条河流的水滴都不相同。喝一品脱啤酒也是如此。你喝的每一口都是一口不能再喝的。流传输中也发生了同样的事情。

流的思路是,为了避免完全将潜在的大块数据加载到内存中,您可以通过一次一小口一小口地加载它。这意味着,虽然你仍在“消化”(即处理)第一口饮料,但第二口饮料已经通过你的咽喉(AKA网络,磁盘IO等)。这不仅节省了内存,而且还提高了性能。问题是啜饮过的(即处理过的流)不能被回收!

回到示例1,在第一个文件出站后“饮用”数据流以处理它(将其写入磁盘)之后,数据流变空了(其中没有啤酒)。为了使示例正常工作,需要在第一个文件出站处理器之前放置一个<object-to-string />转换器。这样做效果并不明显,并且会迫使Mule将流的内容完全加载到内存中。

同样在示例2中,记录器必须将整个内容加载到内存中并替换掉消息有效负载。又一次,所有内容都被加载到内存中。

可重复流的介绍

那是否有一种方法可以再次让同样的啤酒倒满杯子?

在Mule 4中,你不再需要担心回答以下问题:

  • 哪些组件正在流式传输,哪些不是?
  • 流在是在此时被处理的吗?
  • 流到底在哪个位置?
  • 流在深层次意味着什么?

Mule 4现在确保任何需要读取流的组件都能够这样做,而不管哪些组件已经被篡改。该流将始终可用并将处于其起始位置。

文件存储可重复流

文件存储可重复流需要缓冲,而且我们有不同的缓冲策略。Mule现在在内存中保留了一部分内容。如果流内容小于该缓冲区的大小,那么我们很好。如果内容量较大,Mule会先将缓冲区的内容备份到磁盘,然后清除内存。这是Mule 4的默认策略。

在内存的可重复流中

你也可以采取内存策略。在这种模式下进行流式传输时,Mule永远不会使用磁盘来缓冲内容。如果超过缓冲区大小,则消息传送将失败。

代码语言:txt
复制
< file : read path = “bigFile.json” > 
  < repeatable - in - memory - stream initialBufferSize = “512” 
                               bufferSizeIncrement = “256” 
                               maxBufferSize = “2048” 
                               bufferUnit = “KB” / > 
< / file : read >

以并行方式读取流

到现在为止都很好!但是我们只解决了例子1和例子2的问题,例子3仍然没有解决。

让我们回到我们的啤酒故事。所以我们回到酒吧,喝了一杯啤酒。假设1品脱包含500毫升啤酒。由于这个世界很小,你碰巧碰到酒吧的一位老朋友,你开始分享你的啤酒。借助使用吸管,你们可以平行喝,但你永远不会喝你的和朋友一样的一小口。而且,由于你在分享,当啤酒喝完时,你没有喝到完整的 500cc,这意味着你失去了一些内容。

流传输中发生了同样的事情。如果两个线程同时从同一个流中读取,则一个线程将占用一些字节,另一个线程将占用其他字节,但是没有一个线程拥有完整的内容。因此,内容已损坏。

Mule 4中新的可重复的流框架自动解决了这个问题。所有可重复的流都支持并行访问。Mule 4将自动确保组件A读取流时,它不会在组件B中产生任何副作用,从而消除脏读操作!

禁用可重复流

虽然不常见,但有些情况下您可能想要禁用此功能并使用普通的旧的流(处理方式)。例如,你的用例可能并不需要这个,你不想为额外的内存或性能开销付费。再次,您可以使用以下方法禁用它:

代码语言:txt
复制
< file : read path = “bigFile.json” > 
  < non - repeatable - stream / > 
< / file : read >

请注意,通过禁用此功能,即使使用Mule 4,示例1,示例2和示例3的所有缺陷也会变为当前值

流媒体对象

原始字节流不是Mule 4支持的流式传输的唯一情况。早在2013年,Mule 3.5就发布了,我们引入了自动分页连接器的概念。这是一个允许连接器(如Salesforce)透明地访问分页数据的功能。这是一种流式传输!在底层,连接器读取了第一页,当它被使用时,它会去取下一页,从内存中丢弃前面的页面。实质上,这与从FTP流式传输文件完全相同。

文件存储自动分页

默认情况下,您现在将获得一个缓冲区,该缓冲区将大量对象保存到内存中,并使用该磁盘缓冲剩余的内容:

代码语言:txt
复制
< sfdc : query query = “dsql:...”  / >

就像以前一样,你也可以在内存中完全做到这一点:

代码语言:txt
复制
< sfdc : query query = “dsql:...” > 
  < repeatable - in - memory - iterable
    initialBufferSize = “100”  
    bufferSizeIncrement = “100”  
    maxBufferSize = “500”  / > 
< / sfdc : query >
缓冲区大小

但请注意,然而,控制缓冲区大小在这里需要不同的方法。在前面的例子中,所有的缓冲区大小都是以字节为单位来衡量的(或者是一个派生单位,如KB)。在这种情况下,我们会探讨以实例计数。

对象序列化

为了让FileStore策略将磁盘用作缓冲区,它需要序列化流式对象。这是否意味着它只适用于实现java.io序列化接口的对象?一点也不。就像批处理模块一样,该功能使用Kryo框架来序列化默认情况下JVM无法序列化的内容。尽管Kryo实现了很多黑魔法,但它既不强大也不是银弹(喻指新技术,尤指人们寄予厚望的某种新科技)。有些东西就是不能被序列化,所以尽量保持你的对象是简单的状态!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • How Automatic Streaming in Mule 4 Beta Works
  • 如何在Mule 4 Beta中实现自动流式传输
    • 示例1:HTTP> 2 Files
      • 示例2:HTTP> Logs> File
        • 示例3:HTTP> Scatter-Gather> Whatever
          • 但为什么?
        • 可重复流的介绍
          • 文件存储可重复流
          • 在内存的可重复流中
          • 以并行方式读取流
          • 禁用可重复流
          • 流媒体对象
      相关产品与服务
      文件存储
      文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档