背景
我使用以下Boto3代码从S3下载文件。
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print (key)
if key.find('/') < 0 :
if len(key) > 4 and key[-5:].lower() ==
我正在开发一个应用程序,它将一些文件上传到s3桶,稍后,从s3桶中读取文件,并将其推送到数据库。
我使用Flink 1.4.2和fs.s3a API从s3桶读取和写入文件。
上传文件到s3桶没有任何问题,但是当我的应用程序的第二阶段--从s3读取这些上传的文件--启动时,我的应用程序会抛出错误
Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClient
我有一个简单的工作与trigger=15秒,Source=Kafka和Sink=S3。有没有可能知道从Kafka下载消息需要多长时间?或者说,如果我有Sink=Console,它会带回驱动程序上的数据,是否可以找到从Kafka下载数据的时间,以及将数据带回驱动程序的时间?
在写入S3时,我从驱动程序中获取这些用于查询的内容。有没有可能理解它花了多少时间从Kafka下载99998行数据,triggerExecution =44秒?
Streaming query made progress: {
id : 1383g52b-8de4-4e95-a3s9-aea73qe3ea56,
run
我正在尝试将文件从一个S3存储桶移动到另一个,并按日期将它们放入文件夹结构中。简而言之,目前所有文件都放在一个文件夹中,该文件夹中有超过500,000个文件,我现在需要对所有这些文件进行排序,并按月将它们放入文件夹中。
文件名类似于:"This_is_a_file_20150403.xml“
因此,我循环遍历S3存储桶中的所有文件,标记大小并获得日期。我创建了一个忽略日期的年月变量,并将它们移到另一个S3存储桶中。但文件名更改为:"This is a file 20150403.xml“
因此,当我尝试移动它时,AWS找不到该文件。为什么bash从文件名中删除了下划线?我尝试将