前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink使用代码提交任务

Flink使用代码提交任务

作者头像
码客说
发布2023-01-08 11:15:46
1.5K0
发布2023-01-08 11:15:46
举报
文章被收录于专栏:码客码客

前言

本文Flink使用版本1.12.7

代码提交任务

准备文件夹和文件

代码语言:javascript
复制
hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib

拷贝需要的文件

代码语言:javascript
复制
hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/

查看文件可以访问这个地址

http://hadoop01:50070/explorer.html#/

http://hadoop02:50070/explorer.html#/

在服务器上测试一下

代码语言:javascript
复制
flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03

添加依赖

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-yarn_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

代码

代码语言:javascript
复制
package cn.psvmc;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.Collections;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;

public class RunFlinkJob {
    public static void main(String[] args) {
        //flink的本地配置目录,为了得到flink的配置
        // 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
        // 则在flink-config.yaml加入
        // classloader.resolve-order: parent-first
        String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf";

        //存放flink集群相关的jar包目录
        String flinkLibs = "hdfs://hacluster/jar/flink12/lib";
        //用户jar
        String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar";
        String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();
        org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml"));
        YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
        yarnClient.init(yarnConfiguration);
        yarnClient.start();

        // 设置日志的,没有的话看不到日志
        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);

        //获取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory
        );

        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

        flinkConfiguration.set(
                PipelineOptions.JARS,
                Collections.singletonList(userJarPath)
        );

        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString())
        );

//        flinkConfiguration.set(
//                YarnConfigOptions.FLINK_DIST_JAR,
//                flinkDistJar
//        );

        // 设置为APPLICATION模式
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName()
        );

        // yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");

//        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
//        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));


        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

        ClusterSpecification clusterSpecification = new ClusterSpecification
                .ClusterSpecificationBuilder()
                .createClusterSpecification();

        // 设置用户jar的参数和主类
//        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount");
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true
        );

        try {
            ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig
            );

            ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

            ApplicationId applicationId = clusterClient.getClusterId();
            String webInterfaceURL = clusterClient.getWebInterfaceURL();
            System.out.println("applicationId is {}" + applicationId);
            System.out.println("webInterfaceURL is {}" + webInterfaceURL);

            // 退出
            // yarnClusterDescriptor.killCluster(applicationId);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

查看yarn

http://hadoop02:8088/cluster

调用脚本执行

代码语言:javascript
复制
package cn.psvmc;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class ConnectionSSH {
  private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
  public static void main(String[] args) throws JSchException, IOException {
    JSch jsch = new JSch();
    String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
    jsch.addIdentity(pubKeyPath);
    String username = "root";
    String host = "192.168.7.101";
    Session session =jsch.getSession(username, host, 22);//为了连接做准备
    session.setConfig("StrictHostKeyChecking", "no");
    session.connect();
    String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
    ChannelExec channel=(ChannelExec)session.openChannel("exec");
    channel.setCommand(command);
    BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
    channel.connect();

    String msg;
    while((msg = in.readLine()) != null){
      System.out.println(msg);
    }
    channel.disconnect();
    session.disconnect();
  }
}

使用密码

代码语言:javascript
复制
JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();

使用密匙

代码语言:javascript
复制
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();

调用脚本执行2

这个类除了可以运行脚本,还可以复制文件。

依赖:

代码语言:javascript
复制
<dependency>
    <groupId>ch.ethz.ganymed</groupId>
    <artifactId>ganymed-ssh2</artifactId>
    <version>build210</version>
</dependency>

工具类

代码语言:javascript
复制
package cn.psvmc;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import java.io.*;
/**
 * 描述:连接linux服务器并执行相关的shell命令
 */
public class ConnectLinuxCommand {
    private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);

    private static final String DEFAULTCHARTSET = "UTF-8";
    private static Connection conn;

    /**
     * @Title: login
     * @Description: 用户名密码方式  远程登录linux服务器
     * @return: Boolean
     */
    public static Boolean login(RemoteConnect remoteConnect) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
        boolean flag = true;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
            System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess);
            logger.info("认证成功!");
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect 连接信息对象
     * @param keyFile       一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签
     * @param keyfilePass   如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null
     * @return Boolean
     * @Title: loginByKey
     * @Description: 秘钥方式  远程登录linux服务器
     */
    public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect 连接信息对象
     * @param keys          一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。
     * @param keyPass       如果秘钥字符数组加密  需要用该字段解密  否则不需要可以为null
     * @return Boolean
     * @Title: loginByCharsKey
     * @Description: 秘钥方式  远程登录linux服务器
     */
    public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param cmd 脚本命令
     * @Title: execute
     * @Description: 远程执行shll脚本或者命令
     * @return: result 命令执行完毕返回结果
     */
    public static String runCmd(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            // 如果为得到标准输出为空,说明脚本执行出错了
            if (StringUtils.isBlank(result)) {
                result = processStdout(session.getStderr(), DEFAULTCHARTSET);
            }
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null
     * @Title: executeSuccess
     * @Description: 远程执行shell脚本或者命令
     */
    public static String runCmdSuccess(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @param in      输入流对象
     * @param charset 编码
     * @return String 以纯文本的格式返回
     * @Title: processStdout
     * @Description: 解析脚本执行的返回结果
     */
    public static String processStdout(InputStream in, String charset) {
        InputStream stdout = new StreamGobbler(in);
        StringBuilder buffer = new StringBuilder();
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
            String line = null;
            while ((line = br.readLine()) != null) {
                buffer.append(line).append("\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer.toString();
    }

    /**
     * @return String
     * @Description: 通过用户名和密码关联linux服务器
     */
    public static String runCmd(String ip, String userName, String password, String commandStr) {
        logger.info(
                "ConnectLinuxCommand  scpGet===" +
                        "ip:" + ip +
                        "  userName:" + userName +
                        "  commandStr:" + commandStr
        );

        String returnStr = "";
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        try {
            if (login(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(returnStr);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnStr;
    }

    public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
        logger.info("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  commandStr:"
                + commandStr);

        String returnStr = "";
        boolean result = true;
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        try {
            if (loginWithoutPwd(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (StringUtils.isBlank(returnStr)) {
            result = false;
        }
        return result;
    }

    /**
     * @param password   密码(其他服务器)
     * @param remoteFile 文件位置(其他服务器)
     * @param localDir   本服务器目录
     * @Title: scpGet
     * @Description: 从其他服务器获取文件到本服务器指定目录
     */
    public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
            throws IOException {

        logger.info("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  remoteFile:"
                + remoteFile + "  localDir:" + localDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.get(remoteFile, localDir);
            conn.close();
        }
    }


    /**
     * 将文件复制到其他计算机中
     * @param ip 远程IP
     * @param userName 远程用户名
     * @param password 远程密码
     * @param localFile 本地文件
     * @param remoteDir 远程目录
     * @throws IOException 异常
     */
    public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
            throws IOException {
        logger.info("ConnectLinuxCommand  scpPut===" + "ip:" + ip + "  userName:" + userName + "  localFile:"
                + localFile + "  remoteDir:" + remoteDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.put(localFile, remoteDir);
            conn.close();
        }
    }
}

RemoteConnect

代码语言:javascript
复制
public class RemoteConnect {
    String ip;
    String userName;
    String password;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

测试

代码语言:javascript
复制
package cn.psvmc;

public class CLCTest {
    public static void main(String[] args) {
        mTest1();
    }

    public static  void mTest1() {
        System.out.println("--------------------------------------");
        String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
        String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
        System.out.println("结果:"+result);
        System.out.println("--------------------------------------");
    }

    public static void mTest2() {
        try {
            ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void mTest3() {
        try {
            ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 代码提交任务
  • 调用脚本执行
  • 调用脚本执行2
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档