前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >教育行业PyFlink整合FlinkML机器学习场景实践总结

教育行业PyFlink整合FlinkML机器学习场景实践总结

作者头像
用户9421738
发布2024-10-25 13:02:53
发布2024-10-25 13:02:53
15600
代码可运行
举报
文章被收录于专栏:大数据从业者
运行总次数:0
代码可运行

前言

本文主要记录教育行业高校PyFlink整合Flink ML的场景案例实践总结。PyFlink是可以使用Python语言开发Apache Flink的功能API,允许构建批或流任务、机器学习、ETL等场景,分为Table API和DataStreamAPI。

FlinkML类库提供机器学习API、简化构建机器学习流式管道的复杂度,支持Java、Python语言,提供分类、聚类、回归、推荐、特征工程等多种场景的默认实现。

Flink ML模块

源码编译

代码语言:javascript
代码运行次数:0
复制
git clone -b release-2.3.0 https://github.com/apache/flink-ml.git
cd flink-ml  &&  mvn clean package -DskipTests -Pflink-1.17

拷贝Jar到Flink部署目录

代码语言:javascript
代码运行次数:0
复制
cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/flink-ml-uber-1.17-2.3.0.jar /home/myHadoopCluster/flink-1.17.1/lib/
cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/statefun-flink-core-3.2.0.jar /home/myHadoopCluster/flink-1.17.1/lib/
cp flink-ml-dist/target/flink-ml-2.3.0-bin/flink-ml-2.3.0/lib/flink-ml-examples-1.17-2.3.0.jar /home/myHadoopCluster/flink-1.17.1/examples/

验证Java语言测试用例

代码语言:javascript
代码运行次数:0
复制
./flink run -t yarn-per-job -c org.apache.flink.ml.examples.clustering.KMeansExample ../examples/flink-ml-examples-1.17-2.3.0.jar

Python虚拟环境

Flink ML使用Python语言开发,需要特定版本:3.7或者3.8。如果你的集群环境Python版本不满足,建议使用如下脚本工具setup-pyflink-virtual-env.sh构建Python虚拟环境:

代码语言:javascript
代码运行次数:0
复制
set -e
# download miniconda.sh
sys_os=$(uname -s)
echo "Detected OS: ${sys_os}"
sys_machine=$(uname -m)
echo "Detected machine: ${sys_machine}"

if [[ ${sys_os} == "Darwin" ]]; then
    wget "https://repo.anaconda.com/miniconda/Miniconda3-py310_23.5.2-0-MacOSX-${sys_machine}.sh" -O "miniconda.sh"
elif [[ ${sys_os} == "Linux" ]]; then
    wget "https://repo.anaconda.com/miniconda/Miniconda3-py310_23.5.2-0-Linux-${sys_machine}.sh" -O "miniconda.sh"
else
    echo "Unsupported OS: ${sys_os}"
    exit 1
fi

# add the execution permission
chmod +x miniconda.sh

# create python virtual environment
./miniconda.sh -b -p venv

# activate the conda python virtual environment
source venv/bin/activate ""

# install PyFlink dependency
if [[ $1 = "" ]]; then
    # install the latest version of pyflink
    pip install apache-flink
else
    # install the specified version of pyflink
    pip install "apache-flink==$1"
fi

# deactivate the conda python virtual environment
conda deactivate

# remove the cached packages
rm -rf venv/pkgs

# package the prepared conda python virtual environment
zip -r venv.zip venv
代码语言:javascript
代码运行次数:0
复制
sh setup-pyflink-virtual-env.sh 1.17.1

脚本流程主要是先下载miniconda.sh构建Python虚拟环境venv、然后通过pip安装pyflink的依赖apache-flink、最后打成压缩包venv.zip。最终结果如下:

venv虚拟环境验证

代码语言:javascript
代码运行次数:0
复制
source venv/bin/activate  # 声明环境变量
python --version     # 查看Python版本号
conda deactivate    # 撤销环境变量

pyflink依赖验证(local模式)

代码语言:javascript
代码运行次数:0
复制
source venv/bin/activate
python /home/myHadoopCluster/flink-1.17.1/examples/python/datastream/word_count.py

PyFlink on Yarn实践

通常真实现场环境都是Pyflink提交作业到yarn集群,使用统一的资源管理。针对Python虚拟环境的使用,分为三种方法:

方法1:每个pyflink作业提交时自行上传venv.zip

将示例代码和venv.zip放置到特定目录,如:/tmp/myApp

代码语言:javascript
代码运行次数:0
复制
./flink run-application  -t yarn-application   -Dyarn.ship-files=/tmp/myApp/ -pyarch myApp/venv.zip -pyclientexec venv.zip/venv/bin/python3 -pyexec venv.zip/venv/bin/python3 -pyfs myApp -pym word_count --output hdfs:///tmp/       

方法2:提前上传venv.zip到hdfs供所有pyflink作业使用

代码语言:javascript
代码运行次数:0
复制
hdfs dfs -put ven.zip hdfs:///flink/
代码语言:javascript
代码运行次数:0
复制
./flink run-application  -t yarn-application -Dyarn.ship-files=/home/myHadoopCluster/flink-1.18.1/examples/python/datastream/  -pyarch hdfs:///flink/venv.zip  -pyclientexec venv.zip/venv/bin/python3 -pyexec venv.zip/venv/bin/python3 -pyfs datastream -pym word_count --output hdfs:///tmp    

方法3:提前在每个yarn集群节点本地放置相同路径venv.zip解压文件夹

代码语言:javascript
代码运行次数:0
复制
./flink run-application  -t yarn-application -pyclientexec /tmp/venv/bin/python3 -pyexec /tmp/venv/bin/python3 -Dyarn.ship-files=/home/myHadoopCluster/flink-1.18.1/examples/python/datastream/word_count.py -py word_count.py   --output hdfs:///tmp/

推荐使用方法3,性能最好。但是,需要注意一点:yarn集群节点扩容时,新节点需要部署相同venv目录即可!

总结

本文记录如何使用conda构建Python虚拟环境、如何使用PyFlink整合使用FlinkML类库。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据从业者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Python虚拟环境
  • PyFlink on Yarn实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档