前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink系列(3)-基于k8s的环境搭建

flink系列(3)-基于k8s的环境搭建

作者头像
yiduwangkai
发布2019-09-17 15:57:38
2.5K0
发布2019-09-17 15:57:38
举报
文章被收录于专栏:大数据进阶

前面写了一些flink的基础组件,但是还没有说过flink的环境搭建,现在我们来说下基本的环境搭建 1. 使用StatefulSet的原因 对于Flink来说,使用sts的最大的原因是pod的hostname是有序的;这样潜在的好处有 hostname为-0和-1的pod可以直接指定为jobmanager;可以使用一个statefulset启动一个cluster,而deployment必须2个;Jobmanager和TaskManager分别独立的deployment pod由于各种原因fail后,由于StatefulSet重新拉起的pod的hostname不变,集群recover的速度理论上可以比deployment更快(deployment每次主机名随机) 2.使用StatefulSet部署Flink 2.1 docker的entrypoint 由于要由主机名来判断是启动jobmanager还是taskmanager,因此需要在entrypoint中去匹配设置的jobmanager的主机名是否有一致 传入参数为:cluster ha;则自动根据主机名判断启动那个角色;也可以直接指定角色名称 docker-entrypoint.sh的脚本内容如下:

代码语言:javascript
复制
#!/bin/sh
 
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD="$1"
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} == "cluster" ]; then
  jobmanagers=(${JOB_MANGER_HOSTS//,/ })
  ACTION_CMD="taskmanager"
  for i in ${!jobmanagers[@]}
  do
      if [ "$(hostname -s)" == "${jobmanagers[i]}" ]; then
          ACTION_CMD="jobmanager"
          echo "pod hostname match jobmanager config host, change action to jobmanager."
      fi
  done
fi
 
# if ha model, replace ha configuration
if [ "$2" == "ha" ]; then
  sed -i -e "s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
  sed -i -e "s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g" "$FLINK_CONF_DIR/flink-conf.yaml"
fi
 
if [ ${ACTION_CMD} == "help" ]; then
    echo "Usage: $(basename "$0") (cluster ha|jobmanager|taskmanager|local|help)"
    exit 0
elif [ ${ACTION_CMD} == "jobmanager" ]; then
    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
    echo "Starting Job Manager"
    sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
 
    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground cluster
 
elif [ ${ACTION_CMD} == "taskmanager" ]; then
    TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
    echo "Starting Task Manager"
 
    sed -i -e "s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g" "$FLINK_CONF_DIR/flink-conf.yaml"
    sed -i -e "s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g" "$FLINK_CONF_DIR/flink-conf.yaml"
 
    echo "config file: " && grep '^[^\n#]' "$FLINK_CONF_DIR/flink-conf.yaml"
    exec "$FLINK_HOME/bin/taskmanager.sh" start-foreground
elif [ ${ACTION_CMD} == "local" ]; then
    echo "Starting local cluster"
    exec "$FLINK_HOME/bin/jobmanager.sh" start-foreground local
fi
 
exec "$@"

2.2. 使用ConfigMap分发hdfs和flink配置文件 ConfigMap介绍参考: https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files Q:为什么使用ConfigMap A:由于hadoop配置文件在不同的环境不一样,不方便打包到镜像里面;因此合适的方式就只有2种,使用ConfigMap和Pod的InitContainer。使用InitContainer的话,可以wget获取远程的一个配置文件,但是这样还需要依赖一个配置服务。相比而已,ConfigMap更简单。 创建ConfigMap [root@rc-mzgjg ~]# kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml [root@rc-mzgjg ~]# kubectl create configmap flink-conf --from-file=flink-conf/log4j-console.properties --from-file=flink-conf/flink-conf.yaml 使用describe命令查看创建的名词为hdfs-conf的ConfigMap,会显示文件的内容到控制台 [root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf Name: hdfs-conf Namespace: default Labels: <none> Annotations: <none> Data ==== core-site.xml: 通过volumeMounts使用ConfigMap Pod的Container要使用配置文件,则可以通过volumeMounts方式挂载到Container中。如下demo所示,将配置文件挂载到/home/xxxx/conf/hadoop目录下

代码语言:javascript
复制
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-jm
spec:
  selector:
    matchLabels:
      app: flink-jm
  serviceName: flink-jm
  replicas: 2
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: flink-jm
    spec:
      terminationGracePeriodSeconds: 2
      containers:
      - name: test
        imagePullPolicy: Always
        image: ip:5000/test:latest
        args: ["sleep", "1d"]
        volumeMounts:
        - name: hdfs-conf
          mountPath: /home/xxxx/conf/hadoop
      volumes:
      - name: hdfs-conf
        configMap:
        # Provide the name of the ConfigMap containing the files you want to add to the container
          name: hdfs-conf

创建好Pod后,查看配置文件的挂载 [hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop total 0 lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml -> ..data/core-site.xml lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml -> ..data/hdfs-site.xml 配置文件是链接到了..data目录 1.10才能支持Pod多Container的namespace共享 最初的想法是一个Pod里面多个Container,然后配置文件是其中一个Container;测试验证起数据目录并不能互相访问;如预想的配置,其中一个Container里面的image是hdfs-conf的配置文件

代码语言:javascript
复制
containers:
     - name: hdfs-conf
       imagePullPolicy: Always
       image: ip:5000/hdfs-dev:2.6
       args: ["sleep", "1d"]
     - name: flink-jm
       imagePullPolicy: Always
       image: ip:5000/flink:1.4.2

实际验证,两个Container的只能共享网络,文件目录彼此看不见 “Share Process Namespace between Containers in a Pod”这个是Kubernates 1.10才开始支持,参考 https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/ 2.3 StatefulSet的配置 Flink的配置文件和hadoop的配置文件,依赖ConfigMap来分发

环境变量名称

参数

内容

说明

FLINK_CLUSTER_IDENT

namespace/StatefulSet.name

default/flink-cluster

用来做zk ha设置和hdfs checkpiont的根目录

FLINK_ZK_QUORUM

env:FLINK_ZK_QUORUM

ip:2181

HA ZK的地址

JOB_MANAGER_HEAP_MB

env:JOB_MANAGER_HEAP_MB value:containers.resources.memory.limit -1024

512

JM的Heap大小,由于存在Netty的堆外内存,需要小于container.resources.memory.limits;否则容易OOM kill

JOB_MANGER_HOSTS

StatefulSet.name-0,StatefulSet.name-1

flink-cluster-0,flink-cluster-1

JM的主机名,短主机名;可以不用FQDN

TASK_MANAGER_HEAP_MB

env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024

512

TM的Heap大小,由于存在Netty的堆外内存,需要小于container.resources.memory.limits;否则容易OOM kill

TASK_MANAGER_NUMBER_OF_TASK_SLOTS

containers.resources.cpu.limits

2

TM的slot数量,根据resources.cpu.limits来设置

Pod的imagePullPolicy策略,测试环境Always,每次都pull,方便验证;线上则是IfNotPresent;线上如果对images做了变更,必须更改images的tag 完整的内容可以参考如下:

代码语言:javascript
复制
# headless service for statefulset
apiVersion: v1
kind: Service
metadata:
  name: flink-cluster
  labels:
    app: flink-cluster
spec:
  clusterIP: None
  ports:
    - port: 8080
      name: ui
  selector:
    app: flink-cluster
---
# create flink statefulset
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-cluster
spec:
  selector:
    matchLabels:
      app: flink-cluster
  serviceName: flink-cluster
  replicas: 4
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: flink-cluster
    spec:
      terminationGracePeriodSeconds: 2
      containers:
      - name: flink-cluster
        imagePullPolicy: Always
        image: ip:5000/flink:1.4.2
        args: ["cluster", "ha"]
        volumeMounts:
          - name: hdfs-conf
            mountPath: /home/xxxx/conf/hadoop
          - name: flink-conf
            mountPath: /home/xxxx/conf/flink
          - name: flink-log
            mountPath: /home/xxxx/logs
        resources:
          requests:
            memory: "1536Mi"
            cpu: 1
          limits:
            memory: "1536Mi"
            cpu: 2
        env:
        - name: JOB_MANGER_HOSTS
          value: "flink-cluster-0,flink-cluster-1"
        - name: FLINK_CLUSTER_IDENT
          value: "default/flink-cluster"
        - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
          value: "2"
        - name: FLINK_ZK_QUORUM
          value: "ip:2181"
        - name: JOB_MANAGER_HEAP_MB
          value: "512"
        - name: TASK_MANAGER_HEAP_MB
          value: "512"
        ports:
        - containerPort: 6124
          name: blob
        - containerPort: 6125
          name: query
        - containerPort: 8080
          name: flink-ui
      volumes:
        - name: hdfs-conf
          configMap:
          # Provide the name of the ConfigMap containing the files you want to add to the container
            name: hdfs-conf
        - name: flink-conf
          configMap:
            name: flink-conf
        - name: flink-log
          hostPath:
            # directory location on host
            path: /tmp
            # this field is optional
            type: Directory

3. 测试环境对外暴露Flink UI 由于测试环境使用Flannel进行网络通信,在K8S集群外部无法访问到Flink UI的IP和端口,因此需要通过NodePort方式将内部IP映射出来。配置如下:

代码语言:javascript
复制
# only for test k8s cluster
# use service to expose flink jobmanager 0's web port
apiVersion: v1
kind: Service
metadata:
  labels:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-0
  name: flink-web-0
  namespace: default
spec:
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-0
  type: NodePort
---
# use service to expose flink jobmanager 1's web port
apiVersion: v1
kind: Service
metadata:
  labels:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-1
  name: flink-web-1
  namespace: default
spec:
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app: flink-cluster
    statefulset.kubernetes.io/pod-name: flink-cluster-1
  type: NodePort

4. 服务部署状态 执行完前面操作后,可以查看到当前的StatefulSet状态 [root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide NAME DESIRED CURRENT AGE CONTAINERS IMAGES flink-cluster 4 4 1h flink-cluster ip:5000/flink:1.4.2 容器的Pod状态 [root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster -o wide NAME READY STATUS RESTARTS AGE IP NODE flink-cluster-0 1/1 Running 0 1h ip1 ip5 flink-cluster-1 1/1 Running 0 1h ip2 ip6 flink-cluster-2 1/1 Running 0 1h ip3 ip7 flink-cluster-3 1/1 Running 0 1h ip4 ip8 相关的Service信息 [root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster -o wide NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR flink-cluster ClusterIP None <none> 8080/TCP 2h app=flink-cluster flink-web-0 NodePort 10.254.8.103 <none> 8080:30495/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0 flink-web-1 NodePort 10.254.172.158 <none> 8080:30158/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1 根据Service的信息;可以通过任何一个k8s node的ip地址加PORT来访问Flink UI

这里主要说一下,在搭建的过程中遇到了一个和权限相关的问题 错误日志如下 ERROR setFile(null,true) call failed FileNotFoundException:no such file or directory 原因:是因为flink服务缺少日志目录的权限 修改方式: 1.adduser flink 添加相应的用户 2.chown -R flink:flink /home/xxxx/logs

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档