前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 定义keys的几种方法

Flink1.4 定义keys的几种方法

作者头像
smartsi
发布2019-08-07 08:31:48
9680
发布2019-08-07 08:31:48
举报
文章被收录于专栏:SmartSiSmartSi

一些转换(例如,joincoGroupkeyBygroupBy)要求在一组元素上定义一个key。其他转换(ReduceGroupReduceAggregateWindows)允许在使用这些函数之前根据key对数据进行分组。

一个DataSet进行分组如下:

代码语言:javascript
复制
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);

DataStream也可以指定一个key:

代码语言:javascript
复制
DataStream<...> input = // [...]
DataStream<...> windowed = input
.keyBy(/*define key here*/)
.window(/*window specification*/);

Flink的数据模型不是基于键值对。因此,没有必要将数据集类型打包成keysvalueskeys是”虚拟”:它们只是被定义在实际数据之上的函数,以指导分组算子使用。

备注:

代码语言:javascript
复制
在下面的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,你只需要替换为DataSet和groupBy即可。

下面介绍几种Flink定义keys方法。

1. 为Tuples类型定义keys

最简单的情况就是在元组的一个或多个字段上对元组进行分组。下面是在元组的第一个字段(整数类型)上进行分组:

Java版本:

代码语言:javascript
复制
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

Scala版本:

代码语言:javascript
复制
val input: DataStream[(Int, String, Long)] = // [...]
val keyed = input.keyBy(0)

下面,我们将在复合key上对元组进行分组,复合key包含元组的第一个和第二个字段:

Java版本:

代码语言:javascript
复制
DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

Scala版本:

代码语言:javascript
复制
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input.groupBy(0,1)

如果你有一个包含嵌套元组的DataStream,例如:

代码语言:javascript
复制
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

如果指定keyBy(0),则使用整个Tuple2作为key(以IntegerFloatkey)。如果要使用嵌套中Tuple2的某个字段,则必须使用下面介绍的字段表达式指定keys

2. 使用字段表达式定义keys

你可以使用基于字符串的字段表达式来引用嵌套字段以及定义keys来进行分组,排序,连接或coGrouping。字段表达式可以非常容易地选择(嵌套)复合类型(如TuplePOJO类型)中的字段。

在下面的例子中,我们有一个WC POJO,它有两个字段wordcount。如果想通过word字段分组,我们只需将word传递给keyBy()函数即可。

代码语言:javascript
复制
// some ordinary POJO (Plain old Java Object)
public class WC {
  public String word;
  public int count;
}
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

字段表达式语法:

(1) 按其字段名称选择POJO字段。例如,user是指向POJO类型的user字段。

(2) 通过字段名称或0到offset的数值字段索引来选择元组字段(field name or 0-offset field index)。例如,f05分别指向Java元组类型的第一和第六字段。

(3) 你可以在POJO和元组中选择嵌套字段。例如,user.zip是指POJO类型user字段中的zip字段。支持POJOTuples的任意嵌套和组合,如f1.user.zipuser.f3.1.zip

(4) 你可以使用*通配符表达式选择所有类型。这也适用于不是元组或POJO类型的类型。

Example:

代码语言:javascript
复制
public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3<Long, Long, String> word;
  public IntWritable hadoopCitizen;
}

下面是上述示例代码的有效字段表达式:

代码语言:javascript
复制
count:WC类中的count字段。
complex:递归地选择复合字段POJO类型ComplexNestedClass的所有字段。
complex.word.f2:选择嵌套字段Tuple3的最后一个字段。
complex.hadoopCitizen:选择Hadoop IntWritable类型。

3. 使用key Selector 函数定义keys

定义key的另一种方法是key选择器函数。key选择器函数将单个元素作为输入,并返回元素的key。key可以是任何类型的。

以下示例显示了一个key选择器函数,它只返回一个对象的字段:

Java版本:

代码语言:javascript
复制
public class WC {
  public String word; public int count;
}

DataStream<WC> words = // [...]
KeyedStream<WC> kyed = words.keyBy(new KeySelector<WC, String>() {
     public String getKey(WC wc) { return wc.word; }
});

Scala版本:

代码语言:javascript
复制
case class WC(word: String, count: Int)
val words: DataStream[WC] = // [...]
val keyed = words.keyBy( _.word )

备注:

代码语言:javascript
复制
Flink版本:1.4
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 为Tuples类型定义keys
  • 2. 使用字段表达式定义keys
  • 3. 使用key Selector 函数定义keys
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档