首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用序列号和分区ID从Kinesis获取记录

使用序列号和分区ID从Kinesis获取记录
EN

Stack Overflow用户
提问于 2016-07-06 13:38:25
回答 0查看 1.5K关注 0票数 0

我已经编写了一段代码来从kinesis流中获取记录到一个Lambda函数,该函数给出数据、分区ID和序列号的输出有效负载,然后我尝试调用第二个lambda从第一个lambda中获取序列号和分区ID,然后第二个lambda从kinesis流中提取数据。我坚持使用序列号和分区ID从kinesis流中获取数据。

下面是调用一个lambda到另一个lambda的代码。

代码语言:javascript
运行
复制
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
        private static final String regionName = "us-east-1";
        private static final String functionName = "Test1";


    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
        context.getLogger().log("Input: " + input);
        List<KinesisEventRecord> records = input.getRecords();
        for (KinesisEventRecord rec : records){
            ByteBuffer recdata = rec.getKinesis().getData();
             String data = new String( recdata.array(), Charset.forName("UTF-8") );
            context.getLogger().log("Data: " +data);
            context.getLogger().log("Partition key: " +rec.getKinesis().getPartitionKey());
            context.getLogger().log("Sequence Number: " +rec.getKinesis().getSequenceNumber());
        }

        //call another lambda function
        try {
            AWSLambdaClient lambda = new AWSLambdaClient();
            Region region = Region.getRegion(Regions.fromName(regionName));
            lambda.setRegion(region);
            InvokeRequest invokeRequest = new InvokeRequest();
            invokeRequest.setFunctionName(functionName);
            invokeRequest.setPayload("\" AWS Lambda Test - internal call\"");
            System.out.println(
                    lambda.invoke(invokeRequest).getPayload());
        } catch (Exception e) {
            System.out.println(e.getMessage());

        }

        // TODO: implement your handler
        return null;
    }

}

下面是我尝试使用序列号和分区ID获取记录的代码。

代码语言:javascript
运行
复制
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String streamName = "Test";
private static final String partitionKey = "123456676454";
private static final String sequenceNumber = "12345"

public Object handleRequest(KinesisEvent input, Context context) {
    context.getLogger().log("Input: " + input);
    GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setStreamName(streamName);
    getRecordsRequest.setPartitionKey(partitionKey);
    getRecordsRequest.setsequenceNumber(sequenceNumber);
    KinesisEvent.getRecord(getRecord);

}
        }

请告诉我使用序列号和分区ID从kinesis流中获取记录的方法。

EN

回答

页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38216952

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档