DolphinScheduler任务调度源码剖析
DolphinScheduler任务调度源码剖析
目录
3.4.2.taskEventService处理taskevent
3.4.3.TaskResultEventHandler处理taskevent
3.5.1.EventExecuteService处理stateEvent
3.5.2.workflowExecuteThread处理stateEvent事件
3.5.3.TaskStateEventHandler处理stateEvent事件
3.5.4.wokerflowExecuteThread调度下游任务
1.数据库表
t_ds_process_definition:工作流定义表
当新建一个工作流,则会往该表中插入一条数据。
t_ds_process_definition_log
t_ds_process_instance:工作流运行实例表
当工作流运行一次,则会往该表中插入一条数据
t_ds_task_definition:任务定义表
工作流中拉取了节点,保存了 则会往该表中插入数据。
t_ds_task_definition_log
t_ds_process_task_relation:任务关系表
保存节点与节点之间边的关系
t_ds_process_task_relation_log
t_ds_task_instance:task运行实例表
工作流中的task运行一次往该表中插入一条数据
t_ds_command:发起任务工作流运行,向apiserver发送http请求,然后接口往该表输出要运行工作流的信息。被master扫描到。
2.整体运行流程
- ui点击启动工作流按钮
- apiserver封装commnd到db。
- master扫描到commad,进行dag图构建,初始化,将源头task提交到priority队列中
- taskconsumer消费队列,选择一台worker分配任务。
- worker接收到分配任务的消息启动任务
- 返回结果给master,master更新任务信息到db
- 紧接着继续提交头节点的下游节点任务
3.源码剖析
3.1 apiserver任务执行入口
ExecutorController.startProcessInstance()方法。
最终会往mysql表t_ds_command插入一条数据,将要运行的工作流信息写入该表。
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ApiException(START_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode,
@RequestParam(value = "scheduleTime") String scheduleTime,
@RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
@RequestParam(value = "startNodeList", required = false) String startNodeList,
@RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
@RequestParam(value = "execType", required = false) CommandType execType,
@RequestParam(value = "warningType") WarningType warningType,
@RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
@RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
}
Map<String, String> startParamMap = null;
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
if (complementDependentMode == null) {
complementDependentMode = ComplementDependentMode.OFF_MODE;
}
//生成commnd,并入库
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode);
return returnDataList(result);
}
3.2 master调度任务
3.2.1 master启动
public void run() throws SchedulerException {
// init rpc server
this.masterRPCServer.start();//启动rpc服务,与worker通信使用
// install task plugin
this.taskPluginManager.loadPlugin();//加载taskplugin
// self tolerant
this.masterRegistryClient.init();//加载高可用的一些注册信息
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
//process扫描线程
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();
//事件处理线程
this.eventExecuteService.start();
this.failoverExecuteThread.start();
//可能是定时调度
this.schedulerApi.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("MasterServer shutdownHook");
}
}));
}
3.2.2 command扫描
线程启动之后,进入循环,一直扫描command表,查询出command,然后封装成processInstants入库,创建WorkflowExecuteRunnable 写入到workflowEventQueue中。
public void run() {
while (Stopper.isRunning()) {
try {
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
//将command转换成processInstance,并入库
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
processInstances.forEach(processInstance -> {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.error("The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
nettyExecutorManager,
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);//processInstanceExecCacheManager设置进cache 被 workflowEventLoop获取
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
});
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Master schedule workflow error", e);
// sleep for 1s here to avoid the database down cause the exception boom
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
3.2.3.workerFlowEvent消费
在command扫描线程中启动了workflowEventLooper线程用于消费workerFlowEvent。
@Override
public synchronized void start() {
logger.info("Master schedule bootstrap starting..");
super.start();
workflowEventLooper.start();//工作流调度线程启动
logger.info("Master schedule bootstrap started...");
}
从workflowEventQueue拉取workflowevent事件,调用workflowEventHandler处理该事件。
public void run() {
WorkflowEvent workflowEvent = null;
while (Stopper.isRunning()) {
try {
workflowEvent = workflowEventQueue.poolEvent();//拉取workflowevent
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());//获取workflowevent,处理workflowevent事件
workflowEventHandler.handleWorkflowEvent(workflowEvent);
} catch (InterruptedException e) {
logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
Thread.currentThread().interrupt();
break;
} catch (WorkflowEventHandleException workflowEventHandleException) {
logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
workflowEvent, workflowEventHandleException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (WorkflowEventHandleError workflowEventHandleError) {
logger.error("Handle workflow event error, will drop this event, event: {}",
workflowEvent,
workflowEventHandleError);
} catch (Exception unknownException) {
logger.error(
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
workflowEvent, unknownException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
3.2.4.workerflow事件处理逻辑
获取WorkflowExecuteRunnable ,异步调用call方法
@Override
public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
//获取WorkflowExecuteRunnable
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstanceMetrics.incProcessInstanceSubmit();
//异步调用call方法执行workflowExecute运行逻辑。
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
// submit failed will resend the event to workflow event queue
logger.info("Success submit the workflow instance");//监听返回状态是否成功
if (processInstance.getTimeout() > 0) {//是否超时
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else {//出现异常,重试,重新进入队列,调用call方法
logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
}
});
}
3.2.5.workerflowRunnable运行逻辑
- 初始化workerflow的有向无环图。
- 初始化任务调度配置
- 提交源头任务到任务优先级队列中
@Override
public WorkflowSubmitStatue call() {
if (isStart()) {
// This case should not been happened
logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
buildFlowDag();//创建dag有向无环图
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
initTaskQueue();//初始化任务调度配置
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
submitPostNode(null);//提交任务到队列中,注意是先提交源头结点,源头结点运行万再提交源头结点的下有节点
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
logger.error("Start workflow error", e);
return WorkflowSubmitStatue.FAILED;
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
3.2.6.任务消费
TaskPriorityQueueConsumer.run方法
/通过注解启动
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
logger.info("Task priority queue consume thread staring");
super.start();
logger.info("Task priority queue consume thread started");
}
@Override
public void run() {
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
while (Stopper.isRunning()) {
try {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
if (fetchTaskNum == failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
} catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("dispatcher task error", e);
}
}
}
batchDispatch
/**
* batch dispatch with thread pool
*/
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {//拉取任务
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
latch.countDown();
continue;
}
consumerThreadPoolExecutor.submit(() -> {//创建异步线程分派任务
try {
boolean dispatchResult = this.dispatchTask(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
} finally {
// make sure the latch countDown
latch.countDown();
}
});
}
latch.await();
return failedDispatchTasks;
}
3.2.7.任务分派
/**
* Dispatch task to worker.
*
* @param taskPriority taskPriority
* @return dispatch result, return true if dispatch success, return false if dispatch failed.
*/
protected boolean dispatchTask(TaskPriority taskPriority) {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
WorkflowExecuteRunnable workflowExecuteRunnable =//获取workflowexecuteRunnable
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
return true;
}
Optional<TaskInstance> taskInstanceOptional =//获取任务实例
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
if (!taskInstanceOptional.isPresent()) {
logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
taskPriority);
// we return true, so that we will drop this task.
return true;
}
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext =//创建执行上下文
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
return true;
}
}
//分派任务
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
}
return result;
}
真正通过netty向worker发送消息。
/**
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
// get executor manager 获取
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
// host select worke节点选择器
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
return false;
}
context.setHost(host);//设置host进上下文
executorManager.beforeExecute(context);
try {
// task execute 通过netty发送消息给worker,告知worker要执行该任务
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
// all nodes
Set<String> allNodes = getAllNodes(context);
// fail nodes
Set<String> failNodeSet = new HashSet<>();
// build command accord executeContext
Command command = context.getCommand();
// execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
try {
doExecute(host, command);
success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
return success;
}
3.3.worker执行任务
3.3.1.Worker启动
public void run() {
this.workerRpcServer.start();//启动rpc通信组件
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.handleDeadServer();
//启动workermaanager线程,消费command执行任务。
this.workerManagerThread.start();
this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("WorkerServer shutdown hook");
}
}));
}
3.3.2.Worker消费任务启动command
提取command中的task上下文对象封装成TaskExecuteThread,提交到
workerManager中。
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
if (taskDispatchCommand == null) {
logger.error("task execute request command content is null");
return;
}
final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();//获取上下文
if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {//不是测试运行
boolean osUserExistFlag;
//if Using distributed is true and Currently supported systems are linux,Should not let it automatically
//create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
//use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!osUserExistFlag) {//校验操作系统用户是否存在
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,//发送失败消息
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {//workerdir创建失败
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId(),
ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
}
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
}
// submit task to manager//提交任务 workerManager 中
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate));
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(),
taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
3.3.3.workerManager消费
public void start() {
logger.info("Worker manager thread starting");
Thread thread = new Thread(this, this.getClass().getName());
thread.setDaemon(true);
thread.start();
logger.info("Worker manager thread started");
}
public void submit(TaskExecuteThread taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);//设置进正则运行task的map中。
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);//提交任务到线程池中运行
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {//注册毁掉函数
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());//移除
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
3.3.4.任务运行
@Override
public void run() {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);//创建task
// task init
this.task.init();//初始化task
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();//task处理的真正逻辑
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
3.4.master接受任务反馈
3.4.1.master接受反馈消息
@PostConstruct
private void init() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.WORKFLOW_EXECUTING_DATA_REQUEST, workflowExecutingDataRequestProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
}
将其封装成taskResultEvent 提交给taskEventService
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
logger.info("Received task execute result, event: {}", taskResultEvent);
taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
3.4.2.taskEventService处理taskevent
TaskEventService启动,启动 taskeventdispacher 和 taskeventhandler
Eventservice由这两个组件组成。
@PostConstruct
public void start() {
this.taskEventThread = new TaskEventDispatchThread();//事件分发线程
logger.info("TaskEvent dispatch thread starting");
this.taskEventThread.start();
logger.info("TaskEvent dispatch thread started");
this.taskEventHandlerThread = new TaskEventHandlerThread();
logger.info("TaskEvent handle thread staring");
this.taskEventHandlerThread.start();//事件处理线程
logger.info("TaskEvent handle thread started");
}
class TaskEventDispatchThread extends BaseDaemonThread {
protected TaskEventDispatchThread() {
super("TaskEventLoopThread");
}
@Override
public void run() {
while (Stopper.isRunning()) {
try {
// if not task event, blocking here
TaskEvent taskEvent = eventQueue.take();//消费taskevent
taskExecuteThreadPool.submitTaskEvent(taskEvent);//提交taskEvent到taskExecuteThreadPool
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("persist task error", e);
}
}
logger.info("StateEventResponseWorker stopped");
}
}
public void submitTaskEvent(TaskEvent taskEvent) {
if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
} //创建taskExecuteRunnable,并且将事件提交到taskEventRunnable
TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
(processInstanceId) -> new TaskExecuteRunnable(processInstanceId, taskEventHandlerMap));
taskExecuteRunnable.addEvent(taskEvent);
}
处理taskevent,最终调用taskExecuteThread.run方法处理taskevent
class TaskEventHandlerThread extends BaseDaemonThread {
protected TaskEventHandlerThread() {
super("TaskEventHandlerThread");
}
@Override
public void run() {
logger.info("event handler thread started");
while (Stopper.isRunning()) {
try {
taskExecuteThreadPool.eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("TaskEvent handle thread interrupted, will return this loop");
break;
} catch (Exception e) {
logger.error("event handler thread error", e);
}
}
}
}
public void eventHandler() {
for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
executeEvent(taskExecuteThread);
}
}
public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
if (taskExecuteThread.isEmpty()) {
return;
}
if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
return;
}
multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
ListenableFuture future = this.submitListenable(taskExecuteThread::run);
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
});
}
public void run() {
while (!this.events.isEmpty()) {
// we handle the task event belongs to one task serial, so if the event comes in wrong order,
TaskEvent event = this.events.peek();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
logger.info("Handle task event begin: {}", event);/获取handler
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
events.remove(event);
logger.info("Handle task event finished: {}", event);
} catch (TaskEventHandleException taskEventHandleException) {
// we don't need to resubmit this event, since the worker will resubmit this event
logger.error("Handle task event failed, this event will be retry later, event: {}", event,
taskEventHandleException);
} catch (TaskEventHandleError taskEventHandleError) {
logger.error("Handle task event error, this event will be removed, event: {}", event,
taskEventHandleError);
events.remove(event);
} catch (Exception unknownException) {
logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}",
event, unknownException);
events.remove(event);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
3.4.3.TaskResultEventHandler处理taskevent
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
//获取wowrkflowExecuteRunnable
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find related workflow instance from cache, will discard this event");
}//获取taskinstantce
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find the taskInstance from cache, will discord this event");
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, the task instance is already finished, will discord this event");
}
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
try {
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.updateTaskInstance(taskInstance);//更新task信息
sendAckToWorker(taskEvent);//发送确认消息给worker
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);//提交workerflow状态修改事件到workflowExecuteThreadPool中
}
/**
* submit state event
*/
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent);
return;
}
workflowExecuteThread.addStateEvent(stateEvent);
logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
3.5.master闭环提交下游任务
3.5.1.EventExecuteService处理stateEvent
@Override
public void run() {
while (Stopper.isRunning()) {
try {
eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (InterruptedException interruptedException) {
logger.warn("Master event service interrupted, will exit this loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Master event execute service error", e);
}
}
}
private void eventHandler() {
for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
调用workflowExecuteThread的handlerEvents处理事件。
/**
* Handle the events belong to the given workflow.
*/
public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
logger.warn("The workflow has been executed by another thread");
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
try {
logger.error("Workflow instance events handle failed", ex);
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
@Override
public void onSuccess(Object result) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("Workflow instance is finished.");
}
} catch (Exception e) {
logger.error("Workflow instance is finished, but notify changed error", e);
} finally {
// make sure the process has been removed from multiThreadFilterMap
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
});
}
3.5.2.workflowExecuteThread处理stateEvent事件
/**
* handle event
*/
public void handleEvents() {
if (!isStart()) {
logger.info(
"The workflow instance is not started, will not handle its state event, current state event size: {}",
stateEvents);
return;
}
StateEvent stateEvent = null;
while (!this.stateEvents.isEmpty()) {
try {
stateEvent = this.stateEvents.peek();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
// if state handle success then will remove this state, otherwise will retry this state next time.
// The state should always handle success except database error.
checkProcessInstance(stateEvent);
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
logger.info("Begin to handle state event, {}", stateEvent);
if (stateEventHandler.handleStateEvent(this, stateEvent)) {//调用stateEventHandler处理事件
this.stateEvents.remove(stateEvent);
}
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
3.5.3.TaskStateEventHandler处理stateEvent事件
判断是否是任务完成事件如果是完成事件,调用workflowExecuteThread的taskFinished方法,提交下游任务
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException, StateEventHandleError {
measureTaskState(stateEvent);
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
"Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) {
throw new StateEventHandleError("Task state event handle error due to task state is null");
}
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode())
&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
//调用workflowexecuteRunnabletaskFinished方法。
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
}
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
if (iTaskProcessor.taskInstance().getState() != task.getState()) {
task.setState(iTaskProcessor.taskInstance().getState());
}
workflowExecuteRunnable.taskFinished(task);
}
return true;
}
throw new StateEventHandleException(
"Task state event handle error, due to the task is not in activeTaskProcessorMaps");
}
3.5.4.wokerflowExecuteThread调度下游任务
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
try {
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
}
} else if (taskInstance.getState().typeIsFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();
} catch (Exception ex) {
logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
// remove the task from complete map, so that we can finish in the next time.
completeTaskMap.remove(taskInstance.getTaskCode());
throw ex;
}
}
4.总结
4.1.各个组件作用
- ApiServer负责前端的接口请求。
- Master负责工作流的任务调度,根据策略选择一台合适的worker执行任务,并更新任务状态到db。
- Worker负责接受到master的任务,然后运行,并报告运行结果给master
4.2.线程作用
- MasterSchedulerBootstrap:command扫描线程,负责扫描apiserver写入的command信息,并创建processInstants入库。
- WorkflowEventLooper:WorkflowEvent处理线程,负责处理MasterSchedulerBootstrap写入的WorkflowEvent
- WorkflowExecuteRunnable:负责工作流的启动逻辑,构建dag有向无环图,初始化调度配置,提交任务到任务队列中。
- TaskPriorityQueueConsumer:负责消费优先级队列的任务,异步通过dispacher选择wokrer,向worker发送启动任务的command。
- TaskDispatchProcessor:worker节点负责处理 master的command信息。
- WorkerManagerThread:worker节点负责管理当前正则运行的task信息
- TaskExecuteThread:task的执行线程,负责执行task的逻辑,并返回执行结果给master。
- TaskExecuteResponseProcessor:负责处理worker返回的任务结果,并包装成taskEvent
- TaskEventService:负责处理taskEvent TaskResultEventHandler:被taskEventService调用,负责task结果处理。
- EventExecuteService:负责处理工作流产生的事件。
- TaskStateEventHandler:负责处理workflowExecute的event,event类型为TaskStateEvent 并且如果是taskstate是运行完成信息,还要提交调度下游任务。
更多推荐
所有评论(0)