流-1:
[KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752}
[KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784}
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}流-2:
[KSTREAM-SOURCE-0000000002]: null, {"id": 1, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 2, "name": "e", "age": 78}
[KSTREAM-SOURCE-0000000002]: null, {"id": 12, "name": "d", "age": 67}
[KSTREAM-SOURCE-0000000002]: null, {"id": 21, "name": "e", "age": 78}现在,我想在这两个流上执行连接操作,并且只想检索stream-1中没有出现在stream-2中的行。我的输入流数据是AVRO格式
预期输出:
[KSTREAM-SOURCE-0000000000]: null, {"id": 3, "name": "julia", "age": 25, "updated_at": 1525774480827}
[KSTREAM-SOURCE-0000000000]: null, {"id": 4, "name": "jamie", "age": 22, "updated_at": 1525774480875}
[KSTREAM-SOURCE-0000000000]: null, {"id": 5, "name": "jenny", "age": 27, "updated_at": 1525774482927}
[KSTREAM-SOURCE-0000000000]: null, {"id": 6, "name": "kishore", "age": 27, "updated_at": 1525775063908}
[KSTREAM-SOURCE-0000000000]: null, {"id": 7, "name": "purna", "age": 27, "updated_at": 1525775072006}
[KSTREAM-SOURCE-0000000000]: null, {"id": 8, "name": "xxx", "age": 10, "updated_at": 1525783464123}
[KSTREAM-SOURCE-0000000000]: null, {"id": 9, "name": "yyy", "age": 10, "updated_at": 1525783667644}
[KSTREAM-SOURCE-0000000000]: null, {"id": 10, "name": "zzz", "age": 10, "updated_at": 1525783741814}那么,我应该执行哪些连接操作,以及如何实现预期的输出呢?有谁能帮我做到这一点吗?
发布于 2018-05-21 20:23:49
如果您查看这里的文档:kafka streams join semantics,您可以使用左连接,并在设置来自stream2的值时在值连接器中只返回null。
一些伪代码:
stream1.leftJoin(stream2, valueJoiner);
valueJoiner = (s1, s2) -> {if (s2 != null) {
return false
} else {
return true;
}
};免责声明:我没有测试过这一点。
https://stackoverflow.com/questions/50447037
复制相似问题