1. HDFS部署 (k8s)

hdfs的作用是用户保存flink的检查点与保持点
但是大数据集群目前还是建议单独使用CDH或者HDP部署
目前文中的这种hdfs on k8s方式做高可用不太方便

1.1 配置文件

hdfs-conf.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  namespace: dev-flink-plat
  name: kube-hadoop-conf
data:
  HDFS_MASTER_SERVICE: hadoop-hdfs-master
  HDOOP_YARN_MASTER: hadoop-yarn-master

hdfs-service.yaml

apiVersion: v1
kind: Service
metadata:
  namespace: dev-flink-plat
  name: hadoop-hdfs-master
spec:
  type: NodePort
  selector:
    app: hdfs-master
  ports:
    - name: rpc
      port: 9000
      targetPort: 9000
    - name: http
      port: 50070
      targetPort: 50070
      nodePort: 32007

hdfs-namenode.yaml

没有配置secondary-namenode,而是通过ReplicationController的replicas来保证namenode的副本数

apiVersion: v1
kind: ReplicationController
metadata:
  namespace: dev-flink-plat
  name: hdfs-master
  labels:
    app: hdfs-master
spec:
  replicas: 1
  selector:
    name: hdfs-master
  template:
    metadata:
      labels:
        name: hdfs-master
    spec:
      containers:
        - name: hdfs-master
          image: kubeguide/hadoop:latest
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9000
            - containerPort: 50070
          env:
            - name: HADOOP_NODE_TYPE
              value: namenode
            - name: HDFS_MASTER_SERVICE
              valueFrom:
                configMapKeyRef:
                  name: kube-hadoop-conf
                  key: HDFS_MASTER_SERVICE
            - name: HDOOP_YARN_MASTER
              valueFrom:
                configMapKeyRef:
                  name: kube-hadoop-conf
                  key: HDOOP_YARN_MASTER
      restartPolicy: Always

hdfs-datanode.yaml

apiVersion: v1
kind: Pod
metadata:
    namespace: dev-flink-plat
    name: hadoop-datanode
    labels:
      app: hadoop-datanode
spec:
  containers:
    - name: hadoop-datanode
      image: kubeguide/hadoop:latest
      imagePullPolicy: IfNotPresent
      ports:
        - containerPort: 9000
        - containerPort: 50070    
      env:
        - name: HADOOP_NODE_TYPE
          value: datanode
        - name: HDFS_MASTER_SERVICE
          valueFrom:
            configMapKeyRef:
              name: kube-hadoop-conf
              key: HDFS_MASTER_SERVICE
        - name: HDOOP_YARN_MASTER
          valueFrom:
            configMapKeyRef:
              name: kube-hadoop-conf
              key: HDOOP_YARN_MASTER        
  restartPolicy: Always

2. Flink部署 (k8s)

2.1 Flink部署方式

  1. flink standalone
  2. flink on yarn
  3. flink on k8s

我们采用的是flink on k8s的部署方式

2.2 flink on k8s 部署方案

Flink 选择 Kubernetes 的主要原因是结合 Flink 和 Kubernetes 的长稳性。
① Flink 特性:提供的实时服务是需要长时间、稳定地运行,常应用于电信网络质量监控、实时风控、实时推荐等稳定性要求较高的场景;
② Kubernetes 优势:为应用提供了部署、管理能力,同时保证其稳定运行。Kubernetes 具有很好的生态,可以集成各种运维工具,例如 prometheus、主流日志采集工具等。Kubernetes 具有很好的扩缩容机制,可以大大提高资源利用率。

Session 模式

预先构建 Flink 集群,且该集群长期处于运行状态,但不能自动扩缩容。用户通过 client 提交作业到运行中的 JobManager,而 JobManager 将任务分配到运行中的 TaskManager

优点缺点
Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即作业启动速度快资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反正 TaskManager 资源不足
作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启

Application 模式

每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。

注意:
① Flink 镜像需要包含作业即Application 依赖的 Class
② 启动作业的时候需要指定 Main 函数入口类

优点缺点
一个作业独占一个集群,作业的隔离性好资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之 TaskManager 资源不足。同时,JobManager 不能复用

Flink Native Session 模式

类似 Session 模式,需要预先构建 JobManager。不同点是用户通过 Flink Client 向 JobManager 提交作业后,根据作业需要的 Slot 数量,JobManager 直接向 Kubernetes 申请 TaskManager 资源,最后把作业提交到 TaskManager 上。

优点缺点
TaskManager 的资源是实时的、按需进行的创建,对资源的利用率更高作业真正运行起来的时间较长,因为需要等待 TaskManager 创建

Flink Native Application 模式

类似 Application 模式,每个作业独占一个 Flink 集群,当作业完成后,集群也会被回收。不同点是 Native 特性,即 Flink 直接与 Kubernetes 进行通信并按需申请资源,无需用户指定 TaskManager 资源的数量。

优点缺点
一个作业独占一个集群,作业的隔离性好一个作业独占一个集群,JobManager 不能复用
资源利用率相对较高,按需申请 JobManager 和 TaskManager作业启动较慢,在作业提交后,才开始创建 JobManager 和 TaskManager

运行模式总结

模式隔离性作业启动时间资源利用率资源按需创建
Session弱,作业共享集群较短,立即启动较低,集群长期存在
Application强,作业独享集群最长,等待集群创建完成一般,作业结束后释放资源
Native Session弱,作业共享集群一般,等待 TaskManager 创建较低,TaskManager 按需申请
Native Application强,作业独占集群一般, 等待集群创建完成最好,集群按需创建

Kubernetes 高可用 Services

Session 模式和 Application 模式集群都支持使用 Kubernetes 高可用服务。需要在 flink-configuration-configmap.yaml 中添加如下 Flink 配置项。

Note 配置了 HA 存储目录相对应的文件系统必须在运行时可用。请参阅自定义Flink 镜像和启用文件系统插件获取更多相关信息。

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
  ...
    kubernetes.cluster-id: <cluster-id>
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: hdfs:///flink/recovery
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
  ...  

此外,你必须使用具有创建、编辑、删除 ConfigMap 权限的 service 账号启动 JobManager 和 TaskManager pod。请查看如何为 pod 配置 service 账号获取更多信息。

当启用了高可用,Flink 会使用自己的 HA 服务进行服务发现。因此,JobManager Pod 会使用 IP 地址而不是 Kubernetes 的 service 名称来作为 jobmanager.rpc.address 的配置项启动。

2.3 配置文件

flink-conf.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: fat-bigdata-cluster
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 50
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1524m
    taskmanager.memory.process.size: 4096m
    execution.target: kubernetes-session

    state.backend: filesystem
    state.checkpoints.dir: hdfs://192.168.5.131:25305/flink/cp
    state.savepoints.dir: hdfs://192.168.5.131:25305/flink/sp
    state.backend.incremental: true

    kubernetes.cluster-id: fat-bigdata-cluster-k8s-id
    classloader.resolve-order: parent-first
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: hdfs://192.168.5.131:25305/flink/recovery
    #restart-strategy: fixed-delay
    #restart-strategy.fixed-delay.attempts: 10
    #high-availability.jobmanager.port: 34560
    #metrics.internal.query-service.port: 34561

    kubernetes.namespace: fat-bigdata-cluster
    kubernetes.service-account: flink-bigdata-cluster


  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  namespace: fat-bigdata-cluster
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml

apiVersion: v1
kind: Service
metadata:
  namespace: dev-flink-plat
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

jobmanager-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: fat-bigdata-cluster
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.13.2
        env:
          - name: FLINK_PROPERTIES
            value: 'jobmanager.rpc.address: flink-jobmanager'
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
#        - name: flink-config-volume
#          mountPath: /opt/flink/conf/
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
#      - name: flink-config-volume
#        configMap:
#          name: flink-config
#          items:
#          - key: flink-conf.yaml
#            path: flink-conf.yaml
#          - key: log4j.properties
#            path: log4j.properties
      - name: flink-lib-volume
        hostPath:
          path: /home/sll/lib/
          type: Directory
taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: fat-bigdata-cluster
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.13.2
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: flink-lib-volume
          mountPath: /opt/flink/lib/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
      - name: flink-lib-volume
        hostPath:
          path: /home/sll/lib
          type: Directory
serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fat-bigdata-cluster
  namespace: fat-bigdata-cluster
automountServiceAccountToken: false

configmaps-cluster-role.yaml

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
   namespace: fat-bigdata-cluster
   name: configmaps-reader
rules:
 - apiGroups: [""]
   resources: ["configmaps"]
   verbs: ["update","create","get", "watch", "list"]

绑定clusterRole与serviceaccount

k create clusterrolebinding flink-reader-binding --clusterrole=configmaps-reader --serviceaccount=fat-bigdata-cluster:default 
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐