本人在淘天集团移动中间件团队亲历过不少有意思的项目, 本文总结了其中之一: 轻量级流程引擎;
同其他已在编排领域深内耕多年的框架相比, 本流程引擎还显得十分稚嫩, 甚至有重复造轮之嫌, 然而与老前辈们相比它又确实有一点点自己的小个性, 也确实满足了特定场景下的业务需求, 该篇即总结一下本人在相关领域内的一些实践经历;
(注: 本文涉及的相关代码已做过脱敏及混淆, 不一定能实际执行)
背景 & 简介
淘系移动中间件的新版控制台, 在各种日常运维和配置发布场景中, 广泛使用了形态多变、需求各异的多种工作流程; 为了尽可能轻量化并降低对外部系统的依赖, 新控制台在最近几年的迭代与演进中, 沉淀出了一套轻量而不失丰富、多变但不显冗余的流程引擎, 并在多条产品线中落地生根, 包括但不限于:
- API 网关 (MTOP) 的接口统一发布
- API 网关的接口归属应用迁移
- SSR (Server-Side-Render) 投放的流量接入
- 统一接入层 (tengine) 流量防护 (前置限流等) 配置下发
- 统一接入层的金丝雀 (精细) 路由配置下发
- 面向大语言模型的 MCP Server 统一发布
该平台使用的流程引擎 (以下简称本框架) 的能力发展主要经历了以下几个阶段:
- 早期孵化: 早在流量网关只能于旧控制台上运维的时代是没有流程的概念的, 所有关于 API 的操作都是零散、互相无关、没有统筹规划的; 感谢前同事在当时做了最初的尝试, 搭建起了本框架的内核雏形, 规范了 Task - Step 交互的 基本原则, 本人也是在这个阶段接手了本项目;
- 状态管控: 当业务开始产生真实的诉求, 本框架顺势补齐了自身最大的缺陷: 状态一致性保证; 因为业务可能会随时取消一个发布任务, 当任务中的部分步骤已经执行成功时, 无任何补偿措施地取消该任务会导致资源整体状态不一致, 这是业务所不能容忍的, 于是本框架推出了 具备补偿机制 的版本, 这意味着本框架实现了对流程生命周期的可控托管, 意味着本框架在生产环境基本可用;
- 可定义化: 当新控制台计划将发布任务的概览以可视化透出到界面时, 本框架又遇到了任务定义职责不清的问题: 任务的定义被隐式混淆于任务实例的构建逻辑中, 这导致无法给用户提供一个全图视野的任务概览, 只能随任务的执行而逐步展开, 这让可视化的效果大打折扣; 本框架以此为契机将 任务定义 (Task Definition) 显式独立出来, 任务定义和任务实例 (Task Instance) 的生命周期至此彻底切割;
- 产品化尝试: 随着新控制台能力的不断丰富, 越来越多场景的工作流程亦试图接入进来, 此刻接入成本的问题开始凸显, 本框架于是迈出产品化尝试的第一步: 在任务定义的基础之上进一步实现了 可配置化, 允许在 diamond 等平台独立维护任务定义, 降低流程的管理成本, 增强任务定义的动态性;
时至今日, 本框架已形成了一个五脏俱全的小生态, 为上文提及的业务场景持续输出技术价值; 同其他已在编排领域深内耕多年的框架相比, 本框架还显得十分稚嫩, 甚至有重复造轮之嫌, 然而与老前辈们相比它又确实有一点点自己的小个性, 也确实满足了特定场景下的业务需求, 不过以当前的产品化水平, 只能支撑部门内部的需求, 如果想要更进一步, 还有很长的路要走;
本文便以分享为目的, 介绍一下本框架的设计思想, 抛砖引玉;
基本原理
设计哲学
本框架具备微内核、高可扩展的特性:
内核层仅定义了 Task 和 Step 的极简模型:
1
2
3
4
5
6TaskCtx StepCtx
1 ^ ^ 1
| |
1 | | 1
Task ----> Step
1 *在内核之上做丰富的能力扩展:
- 编排能力扩展: SequentialStep (串行步骤)、CompositeStep (并行步骤);
- 有状态能力扩展: StatefulTask、StatefulStep (会激活存储模块);
- 可补偿能力扩展: CompensableTask、CompensableStep (允许取消任务时自动调度用户配置的补偿步骤);
- 自动调度能力扩展: AutoExeStep (会激活调度模块);
- 第三方异步流程扩展: 对接外部系统, 比如安全审核、changefree 等;
扁平的继承结构, 优先使用组合的方式拼搭任务能力;
模型细节

这里有一个细节需要解释: 如上图, Task 和 TaskStep 被实际设计为 1:1 的关系, 这是为了更清晰地区分任务和步骤的职责边界, 例如:
- 假设 Task 和 TaskStep 是 1:N 的关系:
- 那么 task 就需要感知 step 的编排逻辑, 要负责去按照约定顺序调度多个 step;
- 当任务执行了一半后需要取消, task 还需要额外负责处理这 N 个 step 补偿的顺序关系, 亲自编排补偿步骤 $step^´$;
- 而在 1:1 的关系下:
- task 只需要无脑去调用 step.execute(), 至于 step 如何去处理内部多个子步骤的调用顺序, 完全不用关心;
- 当任务执行了一半后需要取消, 一个 CompensableTask 只需要规范其步骤也必须是 CompensableTaskStep, 就可以将步骤的补偿逻辑屏蔽, 全权交给 CompensableTaskStep 自己去实现;
下文中我们将直接被 task 关联依赖的 step 称为: Task 直隶的统领型步骤;
执行模式
本着轻量化的原则, 该框架在任务调度之外不会使用额外资源维护各任务的状态, 并且为了简化调度逻辑, 当一个任务被派发到执行器, 一律从任务入口处重新执行, 任务 (及相关步骤) 是否需要加载状态的判断、以及实际的状态拉起动作 被延迟到其真正执行的那一刻, 这意味着:
- 当一个任务 (及其步骤) 未配置任何 stateful 能力的扩展, 则该任务的每次调度都等于从头重新执行一遍, 即使之前的调度已经对该任务的部分步骤完成了执行;
- 当一个任务 (及其步骤) 配置了 stateful 能力扩展, 则任务及相关步骤的每次执行都会保存其最新状态, 在任务的下一次调度中, 已执行的步骤会加载恢复到最新状态, 并从最新状态继续执行, 如果步骤已经执行成功, 执行器会越过该步骤并按照编排顺序执行下一个步骤;
无状态的执行模式有助于降低任务调度间隙的内存消耗, 由于大部分任务都是 IO 密集型的, 如果后期能进一步对执行的调用模型做一些反应式的改造, 本框架将能撑起很高的任务并发;
作用域 & 约束关系
- TaskStep 由 Task 生成, Task 的生命周期 > TaskStep 的生命周期;
- 一个 TaskStep 只能属于一个 Task, 一个 Task 可以基于组合型的 TaskStep 派生出多个 TaskStep;
- 一个 Task 只有一个 TaskContext, 多个 TaskStep 可以共享同一个 TaskContext;
- 一个 TaskStep 只有一个 TaskStepContext, 不同 TaskStep 之间的 TaskStepContext 互相隔离, 不能共享;
由以上约束关系可以得知 TaskStep 间的通信方式如下:
- 同一个 TaskStep, 在不同的调度批号 (schedulerx 任务) 中, 可以通过对应的 TaskStepContext 传递信息;
- 不同的 TaskStep, 可以通过其所在 Task 的 TaskContext 传递信息;
状态流转
任务的状态取决于步骤的状态, 步骤的状态取决于步骤自身或其嵌套的子步骤的执行状态;
- 任务的状态流转:task status machine
- 步骤的状态流转:step status machine
组合优先架构
本框架以 “组合优先” 的理念为指导, 提供了 steteful、autoExecute、compensate (补偿) 等多种可以拼搭组合的能力;
TaskStep 的组合能力

以下是几种经典的能力:
StatefulTaskStep: 有状态能力增强
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public StatefulTaskStep(AbstractTaskStep delegatedTaskStep, TaskPersistence taskPersistence) {
super(delegatedTaskStep);
this.taskPersistence = taskPersistence;
// 恢复步骤状态
recoverTaskStep();
}
public TaskStepResult execute() {
try {
val taskStepResult = delegatedTaskStep.execute();
setStatus(taskStepResult.getStatus());
return taskStepResult;
} finally {
// 保存步骤状态
saveTaskStep();
}
}
private void saveTaskStep() {
TaskStepSnapshot taskStepSnapshot = TaskStepSnapshot.builder()
.taskId(taskId)
.name(delegatedTaskStep.getName())
.type(delegatedTaskStep.getType())
.......
.build();
taskPersistence.saveTaskStep(taskStepSnapshot);
}
private void recoverTaskStep() {
val taskStepSnapshot = taskPersistence.recoverTaskStep(taskId, delegatedTaskStep.name);
val taskStepContext = taskStepSnapshot.getTaskStepContext();
delegatedTaskStep.saveStepContext(taskStepContext);
}其本质是:
- 在构造步骤实例时恢复被代理步骤上次执行后的上下文;
- 在被代理步骤执行完成后保存步骤最新的上下文;
AutoExecuteTaskStep: 自动调度能力增强
AutoExecuteTaskStep 实现自动调度的原理是借助 StatefulTaskStep 将 autoExecute 类型的 taskStep 上下文写入持久层, 然后由 AutoExecuteScheduler 扫描持久层追踪正在执行中的 autoExeTaskStep, 并执行相应 task;1
2
3
4
5
6
7
8
9
10
11
12
13public AutoExecuteTaskStep(String name,
AbstractTaskStep scheduledTaskStep,
TaskPersistence taskPersistence) {
super(name, StatefulTaskStep.wrap(new DoAutoExecuteStep(
name, scheduledTaskStep), taskPersistence));
}
private static final class DoAutoExecuteStep extends DelegateTaskStep {
public String getType() {
return "autoExecute";
}
}可以看到, AutoExecuteTaskStep 自身就极致利用了组合代理的能力, 通过两个维度的简单代理实现自动调度的核心能力:
- 第一个维度: 通过一个间接的 DoAutoExecuteStep 代理用户真实的业务步骤, 并标识自身的 type 为约定的 autoExecute, 以插桩的形式指引调度器识别出需要自动调度的 task;
- 第二个维度: 通过 StatefulTaskStep 代理 DoAutoExecuteStep, 以持久化 DoAutoExecuteStep 的状态, 从而允许调度器扫描持久层列举需要自动调度的 task;
能力的组合
框架提供了便于拼搭 stateful 与 autoExecute 能力的包装工具:
1 | public static AbstractTaskStep wrap(AbstractTaskStep taskStep, |
如果用户希望一个步骤既要拥有状态持久化的能力, 同时也要拥有被自动调度的能力, 可以如下实现:
1 | AbstractTaskStep wrap(AbstractTaskStep taskStep) { |
执行以上代码将会得到如下状态:

当然下文会讲到, 用户并不需要如此亲自编写代码, 本框架提供了便捷的方式帮用户自动生成相关能力叠加;
补充: 高级能力
除了 stateful、autoExecute 等常用能力外, 框架内还有其他高级能力使用了该组合机制, 比如与补偿相关的顺序执行可补偿步骤 (SequentialCompensableTaskStep) 和 并行执行可补偿步骤 (CompositeCompensableTaskStep), 这些补偿相关的步骤没有被设计为直接对用户开放, 而是通过更友好的方式被上层包装, 下文将会具体介绍;
Task 的组合能力
与 TaskStep 类似, Task 也提供了诸如 stateful、compensate 等多种可以拼搭组合的任务能力; 不过与 TaskStep 略有不同的是:- TaskStep 是直接面向具体业务细节的, 不同的业务场景差异很大, 灵活度太高, 无法沉淀出一套通用的步骤范式, 只能提供一些最基础的 stateful、autoExecute 等原子能力;
- 而 Task 不是面向具体业务细节的, 它是可以被更抽象的场景所概括描述的, 比如说内部系统的审批流、配置发布的流程等, 所以框架针对这类通用运维场景沉淀出了一个最佳实践OpsTask / CompensableOpsTask;

- OpsTask 的组合链路: StatefulTask (AbstractTask);
- CompensableOpsTask 的组合链路: StatefulTask (CompensableTask);
OpsTask 维护了通用运维任务所需要的业务强类型数据结构 globalBusData 及对应的序列化/反序列化方法 taskDataSerDeser, 在任务生命周期中的每一次调度, 都会在初始化时帮用户恢复最新的数据, 调度结束后再将更新后的数据状态持久化;
CompensableOpsTask 是 OpsTask 的可补偿版本, 除了 OpsTask 的基本能力外, CompensableOpsTask 还会帮助用户管理补偿编排逻辑;
OpsTask / CompensableOpsTask 帮用户设定好了组合模式, 可以开箱即用, 用户直接继承使用即可, 当然如果用户有自定义需求, 也可以选择自己去组合 stateful 等原子能力;
根据目前的实际使用情况, 大部分业务的任务都选择了具有补偿能力的 CompensableOpsTask, 只有少数简单的没有数据一致性要求的场景选择了 OpsTask, 暂没有业务选择自定义 Task;
任务取消的补偿机制
由于任务中存在多个步骤, 在执行任意一个步骤时, 用户都可能取消, 此时当前步骤之前的步骤已经执行成功, 当前步骤之后的步骤均未执行; 要想让任务取消后保持任务执行前的状态, 就需要引入回退补偿机制: Hector & Kenneth: Sagas (1987)

由用户定义自己的步骤 $(S_1, S_2, ….. S_n)$, 及对应的补偿步骤 $(S_1^´, S_2^´, ….. S_n^´)$, CompensableTask 负责根据当前状态自适应地生成补偿步骤, 比如:
某任务共有 $S_1、S_2、S_3$ 三个步骤, 用户执行完成了 $S_1、S_2$ 后取消任务, 则此时任务的步骤链路变成: $S_1 \Rightarrow S_2 \Rightarrow S_2^´ \Rightarrow S_1^´$;
Task 和 TaskStep 分别在各自的维度实现了补偿能力:
TaskStep 的补偿原理

CompensateAwareTaskStep
对补偿有感知能力的步骤, 着重点是步骤的性质分解, 表达的是对任务中的 (原子或复合) 步骤 能够识别自身已执行和待补偿 (子) 步骤的一种抽象;
顾名思义, 该接口在本框架的语境下就是能够识别出已被执行的 “逻辑步骤”, 以及能够推导出已执行步骤对应的 “补偿逻辑步骤”:
1 | public interface CompensateAwareTaskStep { |
对于原子步骤、串行步骤、并行步骤这三种类型, 将分别围绕各自的编排特性完成对以上接口的实现:
原子步骤的补偿版本:
AtomicCompensateAwareStep
:
感知补偿的原子步骤的逻辑最简单, 需要用户同时给定目标步骤 (normalStep) 和对应的补偿步骤 (compensateStep):- executedTaskStep 方法: 当 normalStep 已执行则返回 normalStep, 否则返回 Optional.empty();
- compensateTaskStep 方法: 当用户没有配置补偿步骤 (NoneTaskStep) 返回 Optional.empty(), 或者当 normalStep 的状态需要补偿, 则返回 compensateStep;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21class AtomicCompensateAwareStep implements CompensateAwareTaskStep {
private final AbstractTaskStep normalStep;
private final AbstractTaskStep compensateStep;
public Optional<AbstractTaskStep> executedTaskStep() {
return normalStep.getStatus().isProcessedStatus()
? Optional.of(normalStep)
: Optional.empty();
}
public Optional<AbstractTaskStep> compensateTaskStep() {
if (!TaskStepStatus.needCompensate(normalStep.getStatus())) {
return Optional.empty();
}
// 补偿步骤类型为占位步骤, 标识用户认为无需针对目标步骤补偿
if (compensateStep instanceof NoneTaskStep) {
return Optional.empty();
}
return Optional.ofNullable(compensateStep);
}
}
串行步骤的补偿版本:
SequentialCompensableTaskStep
:
感知补偿的串行步骤的逻辑是在原子补偿步骤的基础上, 结合了自身串行编排的特征, 需要用户同时给定 n 组目标步骤和对应补偿步骤的配对 $(S_1, S_1^´), (S_2, S_2^´) ….. (S_n, S_n^´)$, SequentialCompensableTaskStep 基于这 n 组配对构建一个 List< CompensateAwareTaskStep> compensateAwareSteps:- executedTaskStep 方法: 对 compensateAwareSteps 中的元素遍历执行 executedTaskStep 方法获取一个 executedSteps 列表, 当列表不为空, 以此构建一个新的 SequentialTaskStep 返回, 否则返回 Optional.empty();
- compensateTaskStep 方法: 对 compensateAwareSteps 中的元素遍历执行 compensateTaskStep 方法获取一个 compensateSteps 列表, 当列表不为空, 对此做 reverse 构建一个反向的 SequentialTaskStep 返回, 否则返回 Optional.empty();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class SequentialCompensableTaskStep extends DelegateTaskStep
implements CompensableTaskStep, CompensateAwareTaskStep {
public Optional<AbstractTaskStep> executedTaskStep() {
List<AbstractTaskStep> executedSteps = Compensates.getExecutedSteps(compensateAwareSteps);
return CollectionUtils.isNotEmpty(executedSteps)
? Optional.of(buildCompensatingSequentialTaskStep(executedSteps))
: Optional.empty();
}
public Optional<AbstractTaskStep> compensateTaskStep() {
final List<AbstractTaskStep> compensateSteps = Compensates.getCompensateSteps(compensateAwareSteps);
final List<AbstractTaskStep> compensateStepChain = Lists.reverse(compensateSteps);
return CollectionUtils.isNotEmpty(compensateStepChain)
? Optional.of(buildCompensatingSequentialTaskStep(compensateStepChain))
: Optional.empty();
}
}
并行步骤的补偿版本:
CompositeCompensableTaskStep
:
感知补偿的并行步骤的逻辑是在原子补偿步骤的基础上, 结合了自身并行编排的特征, 需要用户同时给定 n 组目标步骤和对应补偿步骤的配对 $(S_1, S_1^´), (S_2, S_2^´) ….. (S_n, S_n^´)$, CompositeCompensableTaskStep 基于这 n 组配对构建一个 List< CompensateAwareTaskStep> compensateAwareSteps:- executedTaskStep 方法: 对 compensateAwareSteps 中的元素遍历执行 executedTaskStep 方法获取一个 executedSteps 列表, 当列表不为空, 以此构建一个新的 CompositeTaskStep 返回, 否则返回 Optional.empty();
- compensateTaskStep 方法: 对 compensateAwareSteps 中的元素遍历执行 compensateTaskStep 方法获取一个 compensateSteps 列表, 当列表不为空, 以此构建一个新的 CompositeTaskStep 返回 (不需要 reverse, 因为是并行的, 不区分次序), 否则返回 Optional.empty();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class CompositeCompensableTaskStep extends DelegateTaskStep
implements CompensableTaskStep, CompensateAwareTaskStep {
public Optional<AbstractTaskStep> executedTaskStep() {
List<AbstractTaskStep> executedSteps = Compensates.getExecutedSteps(compensateAwareSteps);
return CollectionUtils.isNotEmpty(executedSteps)
? Optional.of(new CompositeTaskStep(name, taskContext, executedSteps))
: Optional.empty();
}
public Optional<AbstractTaskStep> compensateTaskStep() {
final List<AbstractTaskStep> compensateSteps = Compensates.getCompensateSteps(compensateAwareSteps);
return CollectionUtils.isNotEmpty(compensateSteps)
? Optional.of(new CompositeTaskStep(name, taskContext, compensateSteps))
: Optional.empty();
}
}
CompensableTaskStep
该接口是步骤补偿体系的另一个维度, 着重点是步骤的链路整合, 表达的是对任务中的 (原子或复合) 步骤基于当前的状态自适应生成 同时具备已执行和待补偿步骤的逻辑链路的抽象;
1 | public interface CompensableTaskStep extends TaskStep { |
串行补偿链路整合的实现:
SequentialCompensableTaskStep
:- 对 compensateAwareSteps 中的元素遍历分别生成已执行和待补偿的两部分, 并拼接两者构建新的 SequentialTaskStep;
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class SequentialCompensableTaskStep extends DelegateTaskStep
implements CompensableTaskStep, CompensateAwareTaskStep {
public TaskStep generateCompensableTaskStep() {
// 提取已执行过的普通步骤
final List<AbstractTaskStep> executedSteps = Compensates.getExecutedSteps(compensateAwareSteps);
// 根据原有步骤的执行状态生成适应性的补偿步骤
final List<AbstractTaskStep> compensateSteps = Compensates.getCompensateSteps(compensateAwareSteps);
final List<AbstractTaskStep> compensatingStepChain = Lists.newArrayList(executedSteps);
compensatingStepChain.addAll(Lists.reverse(compensateSteps));
return buildCompensatingSequentialTaskStep(compensatingStepChain);
}
}
- 对 compensateAwareSteps 中的元素遍历分别生成已执行和待补偿的两部分, 并拼接两者构建新的 SequentialTaskStep;
并行补偿链路整合的实现:
CompositeCompensableTaskStep
:- 并行补偿的链路整合没有做到如语义上的绝对并行, 而是出于实现复杂度的考虑, 降级为: 已执行和待补偿两部分 在整体上先后串行执行, 而各自在内部并行执行;
- 所以其实现是将 executedSteps 和 compensateSteps 两个 CompositeTaskStep 串为一个整体的 SequentialTaskStep;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class CompositeCompensableTaskStep extends DelegateTaskStep
implements CompensableTaskStep, CompensateAwareTaskStep {
public TaskStep generateCompensableTaskStep() {
// 提取已执行过的普通步骤
final List<AbstractTaskStep> executedSteps = Compensates.getExecutedSteps(compensateAwareSteps);
// 根据原有步骤的执行状态生成适应性的补偿步骤
final List<AbstractTaskStep> compensateSteps = Compensates.getCompensateSteps(compensateAwareSteps);
final CompositeTaskStep executedCompositeStep = new CompositeTaskStep(name, taskContext, executedSteps);
final CompositeTaskStep compensateCompositeStep = new CompositeTaskStep(COMPENSATE_STEP_DEFAULT_PREFIX + name, taskContext, compensateSteps);
// 为了使用统一的接口 (CompensateAwareTaskStep) 以便于管理, composite compensable step 的补偿逻辑
// 采用了由 executedStep + compensateStep 串联起的 sequentialTaskStep
final List<AbstractTaskStep> compensatingStepChain = ImmutableList.of(executedCompositeStep,compensateCompositeStep);
return new CompensatingSequentialTaskStep(name, taskContext, compensatingStepChain);
}
}
由以上描述可知, SequentialCompensableTaskStep 和 CompositeCompensableTaskStep 既是 CompensateAwareTaskStep 又是 CompensableTaskStep, 这意味着它们:
- 既可以作为一个直接由 Task 直隶的统领型步骤, 从整体上去整合普通链路与补偿链路;
- 又可以作为一个局部的被嵌套的子步骤, 提供基本的步骤拆分能力, 由上层统领型步骤去统一调度;
如此便可编织出一张允许任意递归灵活嵌套的步骤执行网络, 从而拥有强大的自适应补偿能力;
本框架目前只有复合型步骤实现了该接口, 为了简化实现, 原子步骤 AtomicCompensateAwareStep 未实现该接口, 因为原子步骤在语义上可以转化为只有一个子步骤的 SequentialTaskStep 或 CompositeTaskStep;
最后还需要额外补充一下: SequentialCompensableTaskStep 和 CompositeCompensableTaskStep 除了实现了上述两个核心接口外, 还直接继承了 DelegateTaskStep, 它们代理的 TaskStep 逻辑如下:
1 | public static SequentialCompensableTaskStep buildSequentialCompensableTaskStep(String name, TaskContext taskContext, |
1 | public static CompositeCompensableTaskStep buildCompositeCompensableTaskStep(String name, TaskContext taskContext, |
也即: 将 List
Task 的补偿原理
CompensableTask 是任务级补偿能力的核心: CompensableTask 只接受关联一个 CompensableTaskStep, 这就使得 CompensableTask 的补偿逻辑十分简洁:
1 | public abstract class CompensableTask extends AbstractTask { |
- 只需重写 AbstractTask 的两个方法: executeTask() 和 cancel();
- 对于 cancel() 方法:
- 先 Interrupt 原步骤;
- 执行 compensableTaskStep.generateCompensableTaskStep().execute();
- 对于 executeTask() 方法:
- 如果当前未取消: 执行 compensableTaskStep.execute();
- 如果当前正在取消中: 执行 compensableTaskStep.generateCompensableTaskStep().execute();
除此外无需任何多余的逻辑, 便能实现任务级的补偿能力;
复杂 case 举例
图示说明:
状态缩写:
- P (PENDING)
- R (RUNNING)
- SK (SKIPPED)
- SU (SUCCESS)
- INT (INTERRUPTED)
步骤标号: 用于区分步骤次序
- 数字: 单一步骤 (或并行包装步骤) 的次序;
- 字母: 并行包装步骤内各个真正并行步骤的次序;
- 上标 ´ : 表示对应数字 (字母) 步骤的补偿步骤;
本节用具体示例表示几种复杂任务拓扑结构下, 当任务取消后的瞬时状态:
- case 1:
2 号步骤中的两个子步骤是并行的;
SKIP 状态的步骤在补偿阶段被自适应跳过;取消后的步骤状态 - case 2:
取消前 a、b、c 号步骤作为整体, 和 d 号步骤是并行的;
取消后两条并行链路继续并行补偿;取消之前的步骤状态取消后的步骤状态
任务定义的演进
截止到本节之前, 其实本框架最核心的能力已经介绍完毕; 但是很明显, 如果让用户直接去裸用上述接口, 还是有一定理解及操作成本的, 本框架需要给用户提供一种易理解易上手的友好型交互使用模式;
定义层 (Definition Layer)
本框架考虑了将复合型步骤 (串行编排、并行编排) 的概念对用户屏蔽, 让用户专注于自身的业务步骤定义以及业务步骤间的关系管理, 为此引入了一个概念: Stage (阶段), 并将步骤定义下放到 Stage 内:
1 | class Stage { |
其行为特征如下:
- 同一个 stage 内的多个 step 并行执行;
- 不同 stage 按照 stage 的 order 从小打到依次执行;
- stage 有两种类型的 step:
- 原子步骤: 不能再继续分解;
- 嵌套步骤: 嵌套另一个 stage, 递归定义;
虽然从本质上讲, 一个 stage 等效于本框架中的 CompositeTaskStep, 多个 stages 按 order 排列等效于本框架中的 SequentialTaskStep, 但从语义上看, 对用户暴露 阶段 的概念更贴合人类的思维习惯, 用户可以基于 Stage 这个 middle layer 组织业务步骤之间的关系;
Stage 的数据结构
虽然本框架可以构建任意嵌套层级的任务结构, 但若将 Stage 作为一个整体来观察, 无论处于嵌套结构中的哪个位置, 同一层级同一作用域内的多个 stage 一定是线性排列的; 另外还需考虑到 Stage 的表达能力要同时兼顾补偿机制, 具备同时编排用户的普通 stage 和对应的补偿 $stage^´$ 的能力;
以上意味着 Stage 的数据结构要满足:
- 支持正反两个方向 (正向普通和逆向补偿) 的序列编排;
- 支持从任何一个正向的 stage 节点切换到补偿节点 $stage^´$ 并反向流转;
1
2
3
4[end] <- stage1´ <- stage2´ <- stage3´
^ ^ ^
| | |
[start] -> stage1 -> stage2 -> stage3 -> [end]
本框架为此构造了一种 “mirrored double list” 结构, 释义如下:
- mirrored: 普通 stage 与补偿 $stage^´$ 操作对象相同、操作内容相反, 构成轴对称镜像;
- double: 表示正向普通与逆向补偿两条任务推进的动线;
实现逻辑如下:
1 | // 泛型参数 DepType 的含义在下一节解释 |
其中的关键是两个指针 next 和 compensate:
- 普通 $stage_n$ 的 next 指向下一个 $stage_{n+1}$, $stage_n$ 的 compensate 指向与自己对应的补偿 $stage_n^´$;
- 补偿 $stage_n^´$ 的 next 指向下一个补偿 $stage_{n-1}^´$, 补偿 $stage_n^´$ 的 compensate 指向空;
附: 本框架基于 linked list 去编织 stage 镜像双链:
1 | public <DepType> Stage<DepType> buildStage(List<StageConfig> configs) { |
任务实例的解释执行
定义层作为一个中间桥接层, 向上对接用户的任务/步骤定义, 向下负责将用户的定义 interpret 为具体的任务实例, 具体的原则如下:
- 当一个 stage 没有配置对应的补偿 stage^´, 本框架会使用一个空实现的 NoneTaskStep 对其补偿逻辑占位;
- 从入口 stage 开始, 本框架会通过 stage 的 next 指针遍历, 依据 Stage 中的 Step 定义, 构建具体的 TaskStep 实例:
本框架引入了一个 step builder 的概念, 表示如何驱动具体步骤的实例化;
-泛型 DepType 表示构建步骤实例所依赖的具体资源, 用户可以定义一个总线性质的类去承载整个任务中所有步骤的依赖对象, 并由该任务的所有步骤统一依赖 (上文 OpsTask 即遵循此约定);step builder 的引入有效降低了中间定义层和步骤实例构造之间的耦合, 为 “一次编写, 处处使用” 的步骤复用场景打下了基础 (例如 API 统一发布和 API 归属应用迁移 两种场景, 依托 step builder 灵活的适配能力, 实现了步骤的 100% 复用);
另外, step builder 也为 定义的配置化 创造了条件: 实例构建的具体细节被屏蔽, 取而代之的是, step builder 的签名允许以极简的形式转化为任务定义的文本配置;
1
2
3
4
5
6
7
8
9
10
11class AtomicStep<DepType>: Step<DepType> {
val taskStepBuilder: TaskStepBuilder<DepType>
}
class NestedStep: Step<DepType> {
val nestedStage: Stage<DepType>
}
interface TaskStepBuilder<DepType> {
fun build(String name, TaskContext taskContext, DepType stepDependencies): TaskStep
}(stage 内并行编排) 扫描 stage 分别生成 normal 和 compensate 步骤实例的过程:
- 一个 stage 内如果只有一个 step 定义, 解释为步骤实例直接返回;
- 一个 stage 内如果有多个 step 定义, 将其分别解释后的步骤实例组合为一个 CompositeTaskStep;
- (stage 间串行编排) 当扫描完一段完整的 stage 链, 因为这一步可能是 CompensableTask 初始化时直接调用的, 需要返回一个 CompensableTaskStep (当然也可能不是, 比如嵌套 stage 的初始化):
- 如果只生成了一对 normal / compensate 步骤, 既可以构造一个 CompositeCompensableTaskStep, 也可以构造一个 SequentialCompensableTaskStep (参考此处说明);
- 如果生成了多对 normal / compensate 步骤, 按序传入, 构造一个 SequentialCompensableTaskStep;
任务实例的可视化
对任务定义的独立化, 还有一个有益的用途是解除任务实例的生命周期对任务定义的生命周期的绑架:
- 本框架的 执行模式 决定了一个任务中的步骤只有被执行了才会记录其状态, 否则不会留下任何痕迹;
- 如果没有任务定义的独立化, 那么当查询一个任务实例的执行概要就只能列出该任务已经执行过的步骤, 无法列出未执行的步骤, 用户无法建立起对任务执行进度的全局观, 有损用户体验;
任务定义的生命周期和任务实例的生命周期并不相同, 缺乏任务定义的视角、单纯基于任务实例执行现状去构造任务执行概要, 只能取得管中窥豹的效果; 而当任务定义被剥离出来后, 本框架可以基于任务定义的快照, 在此之上填充各步骤的状态概要, 从而得以更友好地向用户展示任务执行进度;
定义的配置化
即使上一小节引入了对用户更友好的 Stage 概念, 但这只是降低了用户对步骤编排的理解成本, 却并未降低用户的使用操作成本; 为此本框架在上一节的基础之上更进一步, 实现了任务定义的全面配置化, 比如以下任务定义 DSL:
1 | [ |
上述配置本框架将会为之生成如下任务结构:
正向流程:
1
2
3APIChangefreeRecord -| |- APIMetaGrayPublish --> APIMetaPublish -|
|---->| |----> APIBaselineRecord
SecurityAudit -------| |------------ APIRoutePublish ------------|补偿流程 1:
1
2
3APIChangefreeRecord -|
|----> APIChangefreeRecordstage´
SecurityAudit -------|补偿流程 2:
1
2
3APIChangefreeRecord -| |- APIMetaGrayPublish --> APIMetaPublish -| |- APIMetaPublish´ --> APIMetaGrayPublish´ -|
|---->| |---->| |----> APIChangefreeRecordstage´
SecurityAudit -------| |------------ APIRoutePublish ------------| |------------- APIRoutePublish´ ------------|
为了最大限度地迎合普通人的直观思维习惯, 本框架允许以自然结对的方式配置 normal 和 compensate 步骤定义, 即: 用户无需关注任务取消时补偿步骤应当如何编排, 只管按照正向流程的自然顺序依次定义即可, 本框架全权负责将 DSL 配置翻译为以 Stage 为核心的中间定义层, 再将中间定义层转译解释为真实的任务实例;
产品化展望
截止到本节之前, 本框架目前已实现的所有能力均已介绍完毕; 然而在产品化方面, 本框架仅仅处于最初级的阶段, 以下是本框架接下来可以发展的几个方向;
任务定义域
版本化
每一次任务定义的修改都应该被赋予一个唯一的版本, 而每一个任务实例也都应绑定到唯一的任务定义版本, 并伴随实例的整个生命周期;
如果没有版本化, 当任务定义修改 (比如前置增加一个步骤), 已执行或正在执行中的实例会因未曾执行过该新步骤而出现不可预知的结果; 而绑定任务定义版本的实例, 每次调度都会通过既定版本的任务定义解释执行, 与其他版本隔离, 保证执行结果的确定性;
低代码
本框架当前使用 diamond 维护任务定义的配置内容,「上 diamond 编写任务定义的 DSL」这种方式只能说在部门内部小规模使用勉强可以满足, 但要推广到外部就不够看了;
当前业内有影响力的流程引擎大多支持了 ISO 业务流程建模 BPMN 2.0 标准, 本框架也同样应当拥抱该业界规范, 实现图形化表达和 DSL 定义之间的互相转换, 从而允许开发者直接在图形界面上通过拖拽组件的方式实现便捷的流程编排, 更进一步降低接入及运维成本;
由于本框架编排流程的底层数据结构是基于「横(串行)」和「竖(并行)」两种基本逻辑单元组合交织而成, 对于任何 “无冗余依赖型的有向无环图” 都可以通过这种逻辑方式表达 (反例: 令 $G=(a,b,c | a \rightarrow b, b \rightarrow c, a \rightarrow c)$, 由于 c 对 b 有依赖, 因而 c 对 a 的依赖被 b 对 a 的依赖所阻塞, 导致 c 对 a 的依赖是冗余的);
当一个 DAG 的数据结构被本框架标准化后, 就可以很方便地接入补偿机制, 实现生命周期的自动托管, 这也算是本框架的一个小小优势吧;
调度域
分布式化
本框架当前虽已经沉淀了诸多可复用的常见步骤类型及通用实践经验, 但它们的代码都在淘系中间件控制台的 git 仓库内, 目前只能跑在控制台本地环境, 这意味着想要复用就必须局限在控制台内开发新的任务流程, 这极大限制了本框架的推广;
诚然, 轻量级的特性使得本框架也可以走 jar 包分发的路线, 允许用户在自己的应用中自主集成 sdk, 但如此就形成一个个应用孤岛, 无法聚集起生态, 管控亦没有抓手, 反馈与改进难以闭环, 因此走分布式化执行的路线是一个非常值得考虑的方向;

在这个领域内最值得参考的就是 SchedulerX2, 事实上 SchedulerX2 最近也推出了自己的流程编排能力: 一个分布式调度领域的框架进军编排领域是一个积累到一定程度自然而然的事情, 同理一个编排领域的框架涉足分布式调度领域, 某种角度上看也属殊途同归;
产品定价
流程引擎的服务定价是一件比较复杂的事情, 因为服务对象的形态差异较大, 对资源的要求不尽相同, 很难为每一个类型的任务给出合适精确的定价; 但得益于 无状态的执行模式, 本框架尽可能抹平了异构任务之间的复杂度差异 (结合 分布式化调度 效果更加), 从而有可能只基于任务特征及步骤特征构建一个极简的定价模型:
$$ P(type) = \alpha \cdot \vec{\gamma_t} \cdot \vec{\lambda_t} + (1 - \alpha) \cdot \sum_{n=1}^{cnt(type)} \vec{\gamma_{sn}} \cdot \vec{\lambda_{sn}} + \omega $$
其中:
- $P(type)$: 某类型任务的单价 (单位 元/次);
- $cnt(type)$: 指定类型的任务下步骤的编排数量;
- $\alpha$: 任务分量的定价权重;
- $\vec{\gamma_t}$ 与 $\vec{\lambda_t}$: 任务的特征向量及特征对应的定价因子;
- $\vec{\gamma_{sn}}$ 与 $\vec{\lambda_{sn}}$: 任务中指定步骤的特征向量及特征对应的定价因子;
- $\omega$: 损耗常量 (云/网络 等资源的成本分摊);
为简化计算, 我们只取最核心的特征分量, 令:
$$ \vec{\gamma_t} = \begin{bmatrix} steteful \\ stateless \end{bmatrix}, \vec{\gamma_{sn}} = \begin{bmatrix} stateful \\ stateless \\ autoExe \\ manualExe \end{bmatrix} $$
结合存储服务和调度服务分摊给本框架的费用, 就可以为每一种类型的任务分别算出各自的定价; 以上模型在保证本产品运营不亏损的前提下, 尽可能兼顾了公平;