-
Notifications
You must be signed in to change notification settings - Fork 2
Description
工作流(workflow),是对工作流程及其各操作步骤之间业务规则的抽象、概括描述。工作流建模,即将工作流程中的工作如何前后组织在一起的逻辑和规则,在计算机中以恰当的模型表达并对其实施计算。工作流要解决的主要问题是:为实现某个业务目标,利用计算机在多个参与者之间按某种预定规则自动传递文档、信息或者任务。 by: wikipedia
要实现一个工作流,几个必要的工作:
- 工作流表达,包括数据结构、数据流向表达、节点间的依赖关系表达等
- 工作流执行,包括给节点提供执行上下文,以及分配计算资源(可能分布式)
- 元数据管理
- 可观测,获取工作流及执行状态
用户故事(UserStory)
两种工作流的实现形式:
- 工作流平台:类似于airflow,用户可以通过定义DAG文件、或api调用提交工作流任务,工作流平台负责解析和执行
- 如果编写 DAG 的是文本,那么只能使用预定义的Operator,要添加自己的Operator,得修改工作流引擎的代码
- 如果编写 DAG 的是编程语言,例如Python,用户可以自行定义Operator
- 嵌入应用程序的SDK:通过程序调用创建工作流,Operator可以自行定义,工作流调度器、执行器和应用程序是一体的
第一种更适合一些Operator固定几类或者很少扩展的场景,例如 CI/CD 系统;第二种更适合需要灵活定制 Operator 的场景,例如对应用的运维操作。
两种形式工作流引擎是一样的,不过是使用的两种不同方式,本方案实现的可以满足两种使用场景。
名词解释
涉及工作流的术语都与airflow 等成熟的工作流概念对齐,减少混淆和误解:
- Operator:用于定义工作流中的任务,可以有多种 Operator,例如 BashOperator、PythonOperator
- Task:指Operator 在特定执行时间点上的具体执行实例
- Scheduler:负责按照预定的调度策略执行任务
- Executor:复杂任务的实际执行,可以是本地执行、分布式执行、Kubernetes执行
- Metadata:存储工作流和任务的元数据,如任务状态、执行记录等
工作流的表示(数据结构)
工作流用有向无环图数据结构(DAG)表示,由多个 Operator 组成,依赖关系通过 Operator.In 和 Operator.Out 表示,都可以多个,由此可以推导出 Flow的执行顺序。Operator.Task 字段表示要执行的任务。
Operator
Operator 是图中一个节点,包含了要执行的任务(task)和执行task时的策略,如超时、重试次数等。
Task 表达
Task 因业务不同各有不同,可分为 1. 有外部依赖,需要网络访问 和 2. 本地计算两类。Task 是一个函数,可以起到互相区分作用的特征有函数名、参数个数和参数类型,选择哪几个区分,会影响工作流的易用性和灵活性。
- 函数名允许定制,函数名是区分业务和任务的重要标识,函数名不允许定制会极大削弱可维护性;
- 参数个数
- 参数类型
Edge
根据Node的In和Out字段可以推导出Node之间的链接关系,但是无法对Node之间的联系赋值,例如
- 权重
- 优先级
- 其它附加信息
维护 Edge 则可以表达这些内容
Edge 添加一个 Condition 属性,可以实现:
- 条件执行,某些任务需要根据特定条件决定是否执行下一个任务;
- 分支和决策,工作流可能在某些节点上需要做决策,选择不同的路径继续执行;
- 错误处理和回退,在某些情况下,如果任务失败,可以根据条件选择回退路径或错误处理路径;
- 动态控制,可以根据运行时的参数动态调整工作流的执行路径。
工作流的执行
工作流的执行可以参考关系数据库对SQL的执行流程:
- 解析SQL,生成
执行计划
- 按
执行计划
由数据库引擎执行
工作流也需要先被解析成执行计划,然后才被工作流引擎执行。
解析构图
调用者通过sdk或api创建一个工作流,引擎需要依据节点之间依赖关系构图生成执行计划,然后按计划执行。这里关键是
节点依赖关系`,我们有两种方式获得:
- 用户硬编码指定,例如上述通过 Edge 指定 To,指定当前Node 的下一个 Node
- 根据参数推导,
调度器(Scheduler)
调度器的主要作用:
- 解析 DAG:解析用户传入的工作流配置为内部表示格式(DAG数据结构)
- 管理任务状态 & 分发任务:周期性的检查 DAG 中的任务是否满足执行条件,更新状态,决定是否将任务放入执行队列
- 任务依赖管理:处理任务之间的依赖关系,确保任务按定义的依赖顺序执行
- 任务重试:处理任务失败后的重试逻辑,重新调度需要重试的任务
- 任务并发控制:管理任务的并发执行,确保资源的合理利用
调度器的工作流程:

- 加载DAG:调度器定期扫描 DAG 定义
- 创建DAG运行实例:根据 DAG 的调度规则(如cron 表达式或时间间隔)创建DAG运行实例(DagRuns)
- 任务实例化:根据DAG 运行实例的执行时间和任务的依赖关系,创建任务实例(TaskInstances)
- 任务状态检查:检查任务实例的状态,确定哪些任务需要执行、重试或跳过
- 任务调度:将需要执行的任务实例放入任务队列,交由执行器(Executor)处理
用户传入工作流格式及存储形式
todo:像ariflow一样存储为文件,还是放到RDBMS,还是Redis,还是直接是队列中间件?
执行队列(Queue)
执行队列是调度器和执行器之间的桥梁,调度器负责将任务实例放入执行队列,而执行器则从执行队列中获取任务并执行。
执行器
执行包括执行当前节点,并用当前节点的输出填充下游节点的执行参数。
执行当前节点
todo: 考虑执行时 op配置的超时、重试策略
todo:这里注意能重试的op必须是幂等的
当前节点输出填充下游节点执行参数
todo: op之间的输入和输出如何传递?
分布式执行
分布式执行是分布在不同机器/容器上的Executor通过抢占任务队列中的任务实现的,因为抢占,所以任务队列需要有锁保护。
metadata 和 状态轮转
todo: 集中式数据库 or redis
todo: 状态定义
管理监控
- 工作流执行过程,落点
- 需要能够读取工作流的实时状态