首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >读取缓慢的更改查找和丰富流输入集合的最佳方法是哪一种?

读取缓慢的更改查找和丰富流输入集合的最佳方法是哪一种?
EN

Stack Overflow用户
提问于 2019-11-28 11:09:05
回答 2查看 263关注 0票数 2

我使用的是Apache,它的流集合为1.5GB。我的查找表是一个JDBCio mysql响应。

当我在没有侧输入的情况下运行管道时,我的工作将在大约2分钟内完成。当我运行我的工作与旁输入,我的工作永远不会完成,泥潭和死亡。

下面是我用来存储查找的代码(~1M记录)

代码语言:javascript
运行
复制
  PCollectionView<Map<String,String>> sideData = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
       "com.mysql.jdbc.Driver", "jdbc:mysql://ip")
      .withUsername("username")
      .withPassword("password"))
      .withQuery("select a_number from cell")
      .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
      .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
      public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
        return KV.of(resultSet.getString(1), resultSet.getString(1));
      }
})).apply(View.asMap());

这是我的流集合的代码

代码语言:javascript
运行
复制
pipeline
.apply("ReadMyFile", TextIO.read().from("/home/data/**")
.watchForNewFiles(Duration.standardSeconds(60),  Watch.Growth.<String>never()))
.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))

下面是我的parDo在每个事件行( 10M条记录)上迭代的代码

代码语言:javascript
运行
复制
  .apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<String,Integer> i = c.element();
    String sideInputData = c.sideInput(sideData).get(i.getKey());
    if (sideInputData == null) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));

我使用的是flink集群,但是使用直接运行程序输出的结果是相同的。

集群:

2 cpu 6核24 ram内存

我做错什么了?我一直在跟踪这个

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-12-04 16:50:18

解决方案是创建一个“缓存”映射。

sideInput只触发一次,然后我将其缓存到一个等价的映射成功中。

所以,我避免为每一个sideInput做processElement。

代码语言:javascript
运行
复制
.apply(ParDo.of(new DoFn<KV<String,Integer>,KV<String,Integer>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
   if (isFirstTime) {
        myList = c.sideInput(sideData);
    }
    isFirstTime = false;
    boolean result = myList.containsKey(c.element().getKey());         
    if (result == false) {
      c.output(i);
    } 
  }
 }).withSideInputs(sideData));
票数 1
EN

Stack Overflow用户

发布于 2019-11-28 19:13:38

如果运行的数据少得多,我怀疑程序正在耗尽java进程的所有内存。您可以通过JVisualVM或JConsole对此进行监视。有很多关于这个问题的文章,我只是偶然发现了这一个的一个快速谷歌搜索。

如果内存耗尽,您的java进程大多忙于清理内存,您将看到性能大幅下降。在某种程度上,java放弃并失败了。

要解决这个问题,只需增加java堆大小就足够了。如何增加这个值取决于如何和在何处执行它。查看for的-Xmx参数或beam中的堆选项。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59087483

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档