首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将Kafka与Apache方解石集成

将Kafka与Apache方解石集成
EN

Stack Overflow用户
提问于 2017-02-24 02:33:00
回答 1查看 646关注 0票数 1

我正在尝试将方解石与Kafka集成,我引用了CsvStreamableTable。

使用fowlloing代码将每个ConsumerRecord转换为Object[]:

代码语言:javascript
复制
static class ArrayRowConverter extends RowConverter<Object[]> {
    private List<Schema.Field> fields;

    public ArrayRowConverter(List<Schema.Field> fields) {
        this.fields = fields;
    }

    @Override
    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
        Object[] objects = new Object[fields.size()+1];
        int i = 0 ;
        objects[i++] = consumerRecord.timestamp();
        for(Schema.Field field : this.fields) {
            Object obj = consumerRecord.value().get(field.name());
            if( obj instanceof Utf8 ){
                objects[i ++] = obj.toString();
            }else {
                objects[i ++] = obj;
            }
        }
        return objects;
    }
}

枚举器的实现如下所示,一个线程不断轮询来自kafka的记录,并将它们放入一个队列,getRecord()方法从该队列轮询:

代码语言:javascript
复制
public E current() {
    return current;
}

public boolean moveNext() {
for(;;) {
    if(cancelFlag.get()) {
        return false;
    }
    ConsumerRecord<String, GenericRecord> record = getRecord();
    if(record ==  null) {
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        continue;
    }
    current = rowConvert.convertRow(record);
    return true;
    }
}

我测试了SELECT STREAM * FROM Kafka.clicks,它工作得很好。rowtime是显式添加的第一列,取值为Kafka的记录时间戳。

但当我试着

代码语言:javascript
复制
SELECT STREAM FLOOR(rowtime TO HOUR) 
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip

它抛出异常

代码语言:javascript
复制
java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
    at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
    at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
EN

回答 1

Stack Overflow用户

发布于 2017-02-24 08:56:18

您需要声明"ROWTIME“列是单调的。在MockCatalogReader中,请注意“订单”和“发货”流中的"ROWTIME“是如何声明为单调的。这就是为什么SqlValidatorTest.testStreamGroupBy()中的一些查询是有效的,而另一些查询是无效的。验证器依赖的关键方法是SqlValidatorTable.getMonotonicity(String columnName)

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42423559

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档