前言

本文记录argo-workflow学习过程,具体操作练习请参照官网
argo官网
参考文章:
https://blog.csdn.net/oqqYuan1234567890/article/details/104271124?spm=1001.2014.3001.5501

主要初略研究argo-workflow各组件之间的作用和关系

由于刚接触K8S 更多深入问题后续补充


一、Argo workflow what?

Argo workflow专门设计解决Kubernetes工作流任务编排问题。

  • 定义工作流,其中工作流中的每个步骤都是一个容器。
  • 将多步骤工作流建模为一系列任务,或者使用图形(DAG)捕获任务之间的依赖关系。
  • 使用Kubernetes上的Argo Workflow,可以在短时间内轻松运行用于计算机学习或数据处理的计算密集型作业。
  • 在Kubernetes上本地运行CI / CD管道而无需配置复杂的软件开发产品。

一、Argo workflow Why?

  • 从头开始设计容器,而没有传统VM和基于服务器的环境的开销和限制。
  • 与云厂商无关,可以在任何Kubernetes集群上运行。
  • 在Kubernetes上轻松编排高度并行的工作。
  • Argo Workflows使一台云级超级计算机触手可及!

二、Argo workflow How?

1.安装

参照官网 https://argoproj.github.io/argo-workflows/installation/

官方提供了三种安装方式
集群安装 命名空间安装和托管在其他命名空间安装

后期要和argo-Events配合时候 由于两者在不同命名空间下,需要使用集群安装。具体可根据需求自定义安装方式。

安装官网以集群方式安装只安装了argo-server和workflow-controller

https://github.com/argoproj/argo-workflows/tree/master/manifests/cluster-install
若要在argo-workflow传递内容参数
使用artifacts需要配置artifacts存储
在这里插入图片描述
在argo命名空间里安装minio https://github.com/argoproj/argo-workflows/tree/master/manifests/quick-start/base/minio
并编辑 workflow-controller ConfigMap 引用argo-artifacts
https://github.com/argoproj/argo-workflows/blob/master/manifests/quick-start/base/artifact-repositories-configmap.yaml
在这里插入图片描述

2.argo-wrokflow整体架构

在这里插入图片描述

1.argo artifacts 传递

如上述架构图所示,传递artifaces主要通过init 和wait contaienr传递
在这里插入图片描述
Workflow 由一个 entrypoint 及一系列 template 组成,entrypoint 定义了这个 workflow 执行的入口,而 template 会实际去执行一个 Pod,其中,用户定义的内容会在 Pod 中以 Main Container 体现。此外,还有两个 Sidecar 来辅助运行。如上图init wait 所示。
在这里插入图片描述
在这里插入图片描述在这里插入图片描述在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在运行 Workflow 时,一个常见的场景是输出产物的传递。通常,一个 Step 的输出产物可以用作后续步骤的输入产物。在 Argo 中,产物可以通过 Artifact 或是 Parameter 传递。
在这里插入图片描述
如图上图所示,默认情况下,Artifact 被打包为 tar 包和 gzip 包,我们也可以使用 archive 字段指定存档策略。
在上面的例子里,名为 whalesay 的 template 使用 cowsay 命令生成一个名为 /tmp/hello-world.txt 的文件,然后将该文件作为一个名为 hello-art 的 Artifact 输出。名为 print-message 的 template 接受一个名为 message 的输入 Artifact,在 /tmp/message 的路径上解包它,然后使用 cat 命令打印 /tmp/message 的内容。
在前面 Sidecar 介绍中提到过,Init Container 主要用于拉取 Artifact 产物。这些 Sidecar 正是产物传递的关键。下面,我们通过介绍另一种产物传递的方式来体验 Argo 中传递产物的关键。

2.Argo 流程引擎

Argo 的步骤间可以传递信息,即下一步(容器)可以获取上一步(容器)的结果。结果传递有 2 种:

文件(Artifacts):上一步容器新生成的文件,会直接出现在下一步容器里面。
信息(Parameters):上一步的执行结果信息(如某文件内容),下一步也可以拿到。

1:文件怎么从上一个容器跑到下一个容器里的?(HOW)
Argo 流程,可以指定 2 个步骤之间,传递结果文件(Artifact)。即假设流程为:A->B,那么 A 容器跑完,B 容器可以取得上一个容器的输出文件。

如下:gitcheck 容器通过git 拉取了git文件并将其存入了目录/usr/src,mvn-docker 容器将git文件从/usr/src取出并使用进而完成ci。
在这里插入图片描述
在这里插入图片描述
流程上的每个步骤,都对应执行一个容器。 在 A 跑完后容器就退出了,然后才跑的 B(这时候已经没有 A 容器在运行了)。

所以 Argo 怎么把一个文件从 A 容器“拷贝”到 B 容器里面的?

2:通过一个中间库中转
在这里插入图片描述
ar go通过一个中转库,先把文件通过压缩的方式存储进一个库中,A容器完成任务后,B容器将其从库中取出并解压使用。
argo库配置可参考官方网站https://argoproj.github.io/argo-workflows/configure-artifact-repository/

** 3:分析pod yaml**
在argo中,一个templates对应一个pod。任务里写了两个template,在这里插入图片描述
分析容器,发现一个pod里会有三个容器,分别为init,wait,main。这三个文件是用来做什么的呢?
在这里插入图片描述

3.init、main、wait分析

A. init
[root@k8s-master cci-ci]# kubectl describe pod argoworkflow-ci4p8rb-1871813636  -n argo 
Name:         argoworkflow-ci4p8rb-1871813636
Namespace:    argo
Priority:     0
Node:         k8s-master/192.168.43.3
Start Time:   Sun, 26 Sep 2021 10:22:35 +0800
Labels:       workflows.argoproj.io/completed=true
              workflows.argoproj.io/workflow=argoworkflow-ci4p8rb
Annotations:  cni.projectcalico.org/containerID: 8251ff1f9633bc4e0329a63eb5f1cb6bd95a47badcad2001f912a6941296aaca
              cni.projectcalico.org/podIP: 
              cni.projectcalico.org/podIPs: 
              workflows.argoproj.io/node-name: argoworkflow-ci4p8rb[0].gitcheck
              workflows.argoproj.io/outputs:
                {"parameters":[{"name":"tag","value":"ebe2053","valueFrom":{"path":"/usr/src/git-commit"}}],"artifacts":[{"name":"source","path":"/usr/src...
              workflows.argoproj.io/template:
                {"name":"gitcheck","inputs":{"artifacts":[{"name":"git-repo","path":"/usr/src","git":{"repo":"http://192.168.43.3:9999/root/zykj.git","rev...
Status:       Succeeded
IP:           172.16.235.225
IPs:
  IP:           172.16.235.225
Controlled By:  Workflow/argoworkflow-ci4p8rb
Init Containers:
  init:
    Container ID:  docker://2f98cea524bdd198bec6bff966a81c96ba8edffca9222fbb51882cfa3af716e0
    Image:         argoproj/argoexec:v3.0.10
    Image ID:      docker-pullable://argoproj/argoexec@sha256:67e02b8d4ba4733a54e6fd31fda8f73c1e6180b5599a60edda02a492b2304142
    Port:          <none>
    Host Port:     <none>
    Command:
      argoexec
      init
      --loglevel
      info
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Sun, 26 Sep 2021 10:22:36 +0800
      Finished:     Sun, 26 Sep 2021 10:22:38 +0800
    Ready:          True
    Restart Count:  0
    Environment:
      ARGO_POD_NAME:  argoworkflow-ci4p8rb-1871813636 (v1:metadata.name)
      GODEBUG:        x509ignoreCN=0
    Mounts:
      /argo/inputs/artifacts from input-artifacts (rw)
      /argo/podmetadata from podmetadata (rw)
      /argo/secret/my-minio-cred from my-minio-cred (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-h2rmv (ro)
Containers:
  wait:
    Container ID:  docker://46331f618e0a455999389af4c09c6151fcae532016e7f31abe076599b4db5488
    Image:         argoproj/argoexec:v3.0.10
    Image ID:      docker-pullable://argoproj/argoexec@sha256:67e02b8d4ba4733a54e6fd31fda8f73c1e6180b5599a60edda02a492b2304142
    Port:          <none>
    Host Port:     <none>
    Command:
      argoexec
      wait
      --loglevel
      info
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Sun, 26 Sep 2021 10:22:39 +0800
      Finished:     Sun, 26 Sep 2021 10:22:42 +0800
    Ready:          False
    Restart Count:  0
    Environment:
      ARGO_POD_NAME:        argoworkflow-ci4p8rb-1871813636 (v1:metadata.name)
      GODEBUG:              x509ignoreCN=0
      ARGO_CONTAINER_NAME:  wait
    Mounts:
      /argo/podmetadata from podmetadata (rw)
      /argo/secret/my-minio-cred from my-minio-cred (ro)
      /mainctrfs/usr/src from input-artifacts (rw,path="git-repo")
      /var/run/docker.sock from docker-sock (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-h2rmv (ro)
  main:
    Container ID:  docker://42965aa8ed29bb92d8c9192b518a4825d1d15fd99df60b47b9a99babea93be44
    Image:         open-registry.going-link.com/zhenyun/cibase:0.7.0
    Image ID:      docker-pullable://open-registry.going-link.com/zhenyun/cibase@sha256:5a76091b98f9f1b4431df64dcbe7f4d02478f22d0119e8768c12e6c4398da40c
    Port:          <none>
    Host Port:     <none>
    Command:
      sh
      -c
    Args:
       git rev-parse --short HEAD > /usr/src/git-commit
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Sun, 26 Sep 2021 10:22:39 +0800
      Finished:     Sun, 26 Sep 2021 10:22:39 +0800
    Ready:          False
    Restart Count:  0
    Limits:
      cpu:     400m
      memory:  800Mi
    Requests:
      cpu:     100m
      memory:  100Mi
    Environment:
      ARGO_CONTAINER_NAME:  main
    Mounts:
      /usr/src from input-artifacts (rw,path="git-repo")
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-h2rmv (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             False 
  ContainersReady   False 
  PodScheduled      True 
Volumes:
  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations
  docker-sock:
    Type:          HostPath (bare host directory volume)
    Path:          /var/run/docker.sock
    HostPathType:  Socket
  my-minio-cred:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  my-minio-cred
    Optional:    false
  input-artifacts:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  default-token-h2rmv:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-h2rmv
    Optional:    false
QoS Class:       Burstable
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:          <none>

如上图
主要关注

  • Pod 的 Annotations
  • InitContainers 启动的初始化容器
  • Containers 中的 wait 容器和 main 容器
  • Pod 的 Volumes 和每个容器的 Mounts

1:Init 容器
argo 创建的 Pod 的初始化容器argoproj/argoexec:v3.0.10执行了 argo exec init 命令,
从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go,
在执行 argo exec init之后,第一个调用的函数应该是loadArtifacts()。这个方法中做了三件事: initExecutor()、wfExecutor.StageFiles()、wfExecutor.LoadArtifacts()

initExecutor 的代码如下

 func initExecutor() *executor.WorkflowExecutor {
 // 从 podAnnotationsPath加载模板
    tmpl, err := executor.LoadTemplate(podAnnotationsPath)
  //获取环境变量
    var cre executor.ContainerRuntimeExecutor
    switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
    case common.ContainerRuntimeExecutorK8sAPI:
        cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
    case common.ContainerRuntimeExecutorKub	elet:
        cre, err = kubelet.NewKubeletExecutor()
    case common.ContainerRuntimeExecutorPNS:
        cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
    default:
        cre, err = docker.NewDockerExecutor()
    }
//猜测将annotations
    wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
    yamlBytes, _ := json.Marshal(&wfExecutor.Template)
    return &wfExecutor
}

从 podAnnotationsPath加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations,这正好是 init 容器的挂载,而这个挂载对应的卷是:

  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations

DownwardAPI是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。

接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor实例化了一个 wfExecutor

StageFiles()
源代码如下:

func (we *WorkflowExecutor) StageFiles() error {
    var filePath string
    var body []byte
    switch we.Template.GetType() {
    case wfv1.TemplateTypeScript:
        log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
        filePath = common.ExecutorScriptSourcePath
        body = []byte(we.Template.Script.Source)
    case wfv1.TemplateTypeResource:
        log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
        filePath = common.ExecutorResourceManifestPath
        body = []byte(we.Template.Resource.Manifest)
    default:
        return nil
    }
    err := ioutil.WriteFile(filePath, body, 0644)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return nil
}

职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script。这就是我们在 main 容器中执行的脚本了。

LoadArtifacts

// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
    for _, art := range we.Template.Inputs.Artifacts {
        artDriver, err := we.InitDriver(art)

        var artPath string
        mnt := common.FindOverlappingVolume(&we.Template, art.Path)
        if mnt == nil {
            artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
        } else {
            // If we get here, it means the input artifact path overlaps with an user specified
            // volumeMount in the container. Because we also implement input artifacts as volume
            // mounts, we need to load the artifact into the user specified volume mount,
            // as opposed to the `input-artifacts` volume that is an implementation detail
            // unbeknownst to the user.
            log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
            artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
        }

        // The artifact is downloaded to a temporary location, after which we determine if
        // the file is a tarball or not. If it is, it is first extracted then renamed to
        // the desired location. If not, it is simply renamed to the location.
        tempArtPath := artPath + ".tmp"
        err = artDriver.Load(&art, tempArtPath)
        if err != nil {
            return err
        }
        if isTarball(tempArtPath) {
            err = untar(tempArtPath, artPath)
            _ = os.Remove(tempArtPath)
        } else {
            err = os.Rename(tempArtPath, artPath)
        }

        if art.Mode != nil {
            err = os.Chmod(artPath, os.FileMode(*art.Mode))
        }
    }
    return nil
}

InitDriver是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。

FindOverlappingVolume 是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts 作为 artifacts 的基础路径。否则使用 /mainctrfs 作为路径。

下面就是下载文件,解压文件并修改权限了。

注意在这里,init、wait和main容器都挂载了input-artifacts和argo-staging,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。

B. wait

wait容器的职责有以下几点:

  • 等待 main 容器结束
  • 杀死 sidecar
  • 保存日志
  • 保存 parameters
  • 保存 artifacts
  • 获取脚本的输出流
  • 将输出放在 Annotations 上

下面我们看这些功能点的实现:
等待 main 容器结束

// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
    // WaitInit() 是初始化操作,只有 PSN 需要
    err := we.RuntimeExecutor.WaitInit()
    if err != nil {
        return err
    }
    log.Infof("Waiting on main container")
    // waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
    // 不为空说明启动了
    mainContainerID, err := we.waitMainContainerStart()
    if err != nil {
        return err
    }
    log.Infof("main container started with container ID: %s", mainContainerID)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // monitorAnnotations是因为pod的annotations会更改
    annotationUpdatesCh := we.monitorAnnotations(ctx)
    // 超时会杀死
    go we.monitorDeadline(ctx, annotationUpdatesCh)

    // 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
    err = we.RuntimeExecutor.Wait(mainContainerID)
    if err != nil {
        return err
    }
    log.Infof("Main container completed")
    return nil
}

杀死 sidecar
main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
    if len(we.Template.Sidecars) == 0 {
        log.Infof("No sidecars")
        return nil
    }
    log.Infof("Killing sidecars")
    pod, err := we.getPod()
    if err != nil {
        return err
    }
    sidecarIDs := make([]string, 0)
    // 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
    for _, ctrStatus := range pod.Status.ContainerStatuses {
        if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
            continue
        }
        if ctrStatus.State.Terminated != nil {
            continue
        }
        containerID := containerID(ctrStatus.ContainerID)
        log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
        sidecarIDs = append(sidecarIDs, containerID)
    }
    if len(sidecarIDs) == 0 {
        return nil
    }
    return we.RuntimeExecutor.Kill(sidecarIDs)
}

保存 artifacts

保存 artifacts 和 保存 parameters 的操作是一样的。

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
    if len(we.Template.Outputs.Artifacts) == 0 {
        log.Infof("No output artifacts")
        return nil
    }
    log.Infof("Saving output artifacts")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    err = os.MkdirAll(tempOutArtDir, os.ModePerm)
    if err != nil {
        return errors.InternalWrapError(err)
    }

    for i, art := range we.Template.Outputs.Artifacts {
        err := we.saveArtifact(mainCtrID, &art)
        if err != nil {
            return err
        }
        we.Template.Outputs.Artifacts[i] = art
    }
    return nil
}

获取脚本的输出流

直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中

func (we *WorkflowExecutor) CaptureScriptResult() error {
    if we.Template.Script == nil {
        return nil
    }
    log.Infof("Capturing script output")
    mainContainerID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }
    reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
    if err != nil {
        return err
    }
    defer func() { _ = reader.Close() }()
    bytes, err := ioutil.ReadAll(reader)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    out := string(bytes)
    // Trims off a single newline for user convenience
    outputLen := len(out)
    if outputLen > 0 && out[outputLen-1] == '\n' {
        out = out[0 : outputLen-1]
    }
    we.Template.Outputs.Result = &out
    return nil
}

将输出放在 Annotations 上

将 outputs 存在 pod 的 annotations 上。

func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
    outputs := we.Template.Outputs.DeepCopy()
    if logArt != nil {
        outputs.Artifacts = append(outputs.Artifacts, *logArt)
    }

    if !outputs.HasOutputs() {
        return nil
    }
    log.Infof("Annotating pod with output")
    outputBytes, err := json.Marshal(outputs)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

总结

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐