我已经编写了一段代码来从kinesis流中获取记录到一个Lambda函数,该函数给出数据、分区ID和序列号的输出有效负载,然后我尝试调用第二个lambda从第一个lambda中获取序列号和分区ID,然后第二个lambda从kinesis流中提取数据。我坚持使用序列号和分区ID从kinesis流中获取数据。
下面是调用一个lambda到另一个lambda的代码。
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String regionName = "us-east-1";
private static final String functionName = "Test1";
@Override
public Object handleRequest(KinesisEvent input, Context context) {
context.getLogger().log("Input: " + input);
List<KinesisEventRecord> records = input.getRecords();
for (KinesisEventRecord rec : records){
ByteBuffer recdata = rec.getKinesis().getData();
String data = new String( recdata.array(), Charset.forName("UTF-8") );
context.getLogger().log("Data: " +data);
context.getLogger().log("Partition key: " +rec.getKinesis().getPartitionKey());
context.getLogger().log("Sequence Number: " +rec.getKinesis().getSequenceNumber());
}
//call another lambda function
try {
AWSLambdaClient lambda = new AWSLambdaClient();
Region region = Region.getRegion(Regions.fromName(regionName));
lambda.setRegion(region);
InvokeRequest invokeRequest = new InvokeRequest();
invokeRequest.setFunctionName(functionName);
invokeRequest.setPayload("\" AWS Lambda Test - internal call\"");
System.out.println(
lambda.invoke(invokeRequest).getPayload());
} catch (Exception e) {
System.out.println(e.getMessage());
}
// TODO: implement your handler
return null;
}
}
下面是我尝试使用序列号和分区ID获取记录的代码。
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String streamName = "Test";
private static final String partitionKey = "123456676454";
private static final String sequenceNumber = "12345"
public Object handleRequest(KinesisEvent input, Context context) {
context.getLogger().log("Input: " + input);
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setStreamName(streamName);
getRecordsRequest.setPartitionKey(partitionKey);
getRecordsRequest.setsequenceNumber(sequenceNumber);
KinesisEvent.getRecord(getRecord);
}
}
请告诉我使用序列号和分区ID从kinesis流中获取记录的方法。
https://stackoverflow.com/questions/38216952
复制相似问题