FTP 数据源

最近更新时间:2024-12-11 16:06:12

我的收藏

使用限制

FTP Reader 实现了从远程 FTP 文件读取数据并转为数据同步协议的功能,远程 FTP 文件本身是无结构化数据存储。目前 FTP Reader 支持的功能如下所示:
支持
不支持
支持且仅支持读取 TXT 的文件,并要求 TXT 中的 schema 为一张二维表。
支持类 CSV 格式文件,自定义分隔符。
支持多种类型数据读取(使用 STRING 表示)、支持列裁剪和列常量。
支持递归读取、支持文件名过滤。
支持文本压缩,支持无压缩、zip、gzip、bzip2。
多个 File 可以支持并发读取。

单个 File 多线程并发读取,此处涉及到单个 File 内部切分算法。
单个 File 在压缩情况下,从技术上无法支持多线程并发读取。

FTP Writer 实现了从数据集成协议转为 FTP 文件功能,FTP 文件本身是无结构化数据存储。目前 FTP Writer 支持的功能如下:
支持
不支持
支持且仅支持写入文本类型(不支持 BLOB,如视频数据)的文件,且要求文本中 schema 为一张二维表。
支持类 CSV 和 TEXT 格式的文件,自定义分隔符。
写出时不支持文本压缩。
支持多线程写入,每个线程写入不同子文件。

单个文件不能支持并发写入。
FTP 本身不提供数据类型,FTP Writer 均将数据以 STRING类型写入 FTP 文件。


FTP 离线单表读取节点配置




参数
说明
数据来源
选择当前项目中可用的 FTP 数据源。
同步方式
FTP 支持两种同步方式:
数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。
文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。
注意:
文件传输仅支持来源端、目标端均为文件类型(COS/HDFS/SFTP/FTP)的数据源,且来源端、目标端同步方式均需要为文件传输。
文件路径
远程 FTP 文件系统的路径和文件名信息,需要填写包含路径和文件后缀的完整文件路径和文件名。这里可以支持填写多个路径。
当指定单个远程 FTP 文件,FTP 暂时只能使用单线程进行数据抽取。后期会在非压缩文件情况下针对单个 File 进行多线程并发读取。
当指定多个远程 FTP 文件,FTP 支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,FTP 尝试遍历出多个文件信息。例如,指定/代表读取/目录下所有的文件,指定 /bazhen/ 代表读取 bazhen 目录下所有的文件。FTP 目前仅支持星号(*)作为文件通配符,并支持使用调度参数配合调度,灵活配置文件名与文件路径。
文件类型
支持两种文件类型:TXT、CSV。
TXT:表示 TextFile 文件格式。
CSV:表示普通 HDFS 文件格式(逻辑二维表)。
字段分隔符
读取的字段分隔符,FTP 在读取数据时,需要指定字段分隔符,如果不指定会默认为(,),界面配置也会默认填写(,)。
行分割符(选填)
linux 默认是 \\n , windows 默认是 \\r\\n,手填支持单个字符作为行分割符。
编码
读取文件的编码配置。支持 utf8 和 gbk 两种编码。
空值转换
读取时,将指定字符串转为 null。
文本压缩类型
支持无压缩、zip、gzip、bzip2
跳过表头
否:读取时,不跳过表头。
是:读取时,跳过表头。
高级设置(选填)
可根据业务需求配置参数。
关于文件路径说明:
通常不建议您使用星号(*),易导致任务运行报 JVM 内存溢出的错误。
数据同步会将一个作业下同步的所有 Text File 视作同一张数据表。您必须自己保证所有的 File 能够适配同一套 Schema 信息。
您必须保证读取文件为类 CSV 格式,并且提供给数据同步系统权限可读。
如果 Path 指定的路径下没有符合匹配的文件抽取,同步任务将报错。

FTP 离线单表写入节点配置




参数
说明
数据去向
选择当前项目中可用的 FTP 数据源。
同步方式
FTP 支持两种同步方式:
数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。
文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。
注意:
文件传输仅支持来源端、目标端均为文件类型(COS/HDFS/SFTP/FTP)的数据源,且来源端、目标端同步方式均需要为文件传输。
文件路径
文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。
文件名称
写入的文件名称,该文件名会添加随机的后缀作为实际写入名称。
写入模式
FTP 支持三种写入模式:
append:写入前不做任何处理,直接使用 filename 写入,保证文件名不冲突 。
nonConflict:文件名重复时报错 。
overwrite:写入前清理以文件名为前缀的所有文件。
文件类型
支持两种文件类型:TXT、CSV。
TXT:表示 TextFile 文件格式。
CSV:表示普通 HDFS 文件格式(逻辑二维表)。
字段分隔符
写入的字段分隔符。FTP 写入时的字段分隔符,需要您保证与创建的 FTP 表的字段分隔符一致,否则无法在 FTP 表中查到数据。可选:' \\t ' 、' \\u001 ' 、' | '、' 空格 ' 、 ' ;' ' , '。
行分割符(选填)
linux 默认是 \\n , windows 默认是 \\r\\n,手填支持单个字符作为行分割符。
编码
写入文件的编码配置。支持 utf8 和 gbk 两种编码。
空值转换
写入时,将 null 转为指定字符串。
是否包含表头
否:写入时,不跳过表头。
是:写入时,跳过表头。
标记完成文件
空白的 .ok 文件,标志任务传输完成。
不生成:任务完成后,不生成空白的 .ok 文件。(默认选中)
生成:任务完成后,生成空白的 .ok 文件
标记完成文件名称
填写时需包含完整文件路径、名称、后缀,默认在目标路径下按照目标文件名称及后缀生成 .ok 文件。
支持在数据同步、文件传输模式下使用时间调度参数${yyyyMMdd}等;
支持在文件传输模式下使用内置变量${filetrans_di_src}代表源文件名称。
例如 /user/test/${filetrans_di_src}_${yyyyMMdd}.ok。
注意:
在文件传输模式下目标文件名称中,如果使用 ${filetrans_di_src} 引用源文件名称,并且标记完成文件按照目标文件名称及后缀生成 .ok 文件时,生成的标记完成文件个数取决于来源端的文件数量。
高级设置(选填)
可根据业务需求配置参数。

数据类型转换支持

FTP 实现了读取和写入 FTP 双向通道的功能,远程 FTP 文件本身是无结构化数据存储,数据处理引擎在读取和写入时自动转换为 Bytes 类型。

常见问题

1. ftp 写入任务报错:请确认...拥有目录 ls 权限,errorMessage:connect timed out

问题原因
集成连接 FTP 暂仅支持被动模式,可能是 FTP 服务配置未开启被动模式。
客户端连接到 FTP 服务器的21端口,发送用户名和密码登录,登录成功后要 list 列表或者读取数据时,使用的是另外的端口(1024以上),可能是传输数据端口未开放。
解决方案:
确认 FTP 服务器配置:
1. 是否开启了被动模式
# 配置文件
pasv_enable=YES #开启被动模式
pasv_min_port=${number} # 被动模式最小端口
pasv_max_port=${number} # 被动模式最大端口
2. 服务端是否开启上述范围的端口。
在 pod 中安装 lftp 命令:
# 登录213 拷贝apk包到pod中:
cd /data/home/ryanrliao
kube ${资源组}
kubectl cp ftp/lftp-4.8.3-r2.apk -n ${资源组}/${pod名称}:/data/wedata/runner
# 进入pod后安装:
sudo su
apk add --allow-untrusted --no-network lftp-4.8.3-r2.apk
# 使用命令连接
lftp -u ${用户名},'${密码}' -p ${端口} ${ip}
# 使用主动模式
lftp -e "set ftp:passive-mode 0" -u ${用户名},'${密码}' -p ${端口} ${ip}
# 也可以用来连接sftp
lftp sftp://${用户名}:${密码}@${ip}:${端口}

FTP 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"syncType": "data", //同步方式,支持数据同步和文件传输
"content": [
{
"reader": {
"parameter": {
"nullFormat": "", //空值转换
"compress": "zip", //文件压缩类型
"column": [
"*"
],
"skipHeader": "false", //是否跳过表头
"fieldDelimiter": ",", //字段分割符
"encoding": "utf-8", //编码类型
"path": "/path/source", //来源文件路径
"protocol": "ftp",
"password": "******",
"port": 21,
"lineDelimiter": "\\n", //行分割符
"host": "ip",
"fileType": "text", //文件类型
"username": "ftpuser"
},
"name": "ftpreader"
},
"transformer": [],
"writer": {
"parameter": {
"fileName": "sinkFile", //目标文件名
"nullFormat": "null", //空值转换
"writeMode": "truncate", //写入模式
"suffix": "txt", //文件后缀
"encoding": "utf-8",
"fieldDelimiter": ",", //字段分割符
"path": "/path/sink", //目标文件路径
"protocol": "ftp",
"password": "******",
"port": 21,
"lineDelimiter": "\\n", //行分隔符
"host": "ip",
"fileType": "text", //文件类型
"username": "ftpuser"
},
"name": "ftpwriter"
}
}
],
"setting": {
"errorLimit": { //脏数据阈值
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}