From 28872d8706f7414868d6101600f9f8bbd3c9d15f Mon Sep 17 00:00:00 2001 From: wind Date: Fri, 19 Nov 2021 18:31:24 +0800 Subject: [PATCH] [DS-6891][MasterServer] reduce db operation when process instance running (#6904) * remove taskInstanceCacheManager * remove taskInstanceCacheManager * [DS-6891][MasterServer] reduce db operation when process instance running * [DS-6891][MasterServer] reduce db operation when process instance running * fix test * fix Transactional method call * checkstyle Co-authored-by: caishunfeng <534328519@qq.com> --- .../cache/TaskInstanceCacheManager.java | 64 --- .../impl/TaskInstanceCacheManagerImpl.java | 155 ------- .../consumer/TaskPriorityQueueConsumer.java | 28 +- .../master/processor/TaskAckProcessor.java | 10 - .../processor/TaskResponseProcessor.java | 10 - .../queue/StateEventResponseService.java | 9 + .../processor/queue/TaskResponseService.java | 108 +++-- .../master/runner/WorkflowExecuteThread.java | 412 ++++++++++++------ .../master/runner/task/BaseTaskProcessor.java | 8 +- .../runner/task/CommonTaskProcessor.java | 2 +- .../runner/task/ConditionTaskProcessor.java | 2 +- .../runner/task/DependentTaskProcessor.java | 3 +- .../master/runner/task/SubTaskProcessor.java | 2 +- .../runner/task/SwitchTaskProcessor.java | 2 +- .../server/master/ConditionsTaskTest.java | 2 +- .../server/master/DependentTaskTest.java | 9 +- .../server/master/SubProcessTaskTest.java | 10 +- .../server/master/SwitchTaskTest.java | 2 +- .../master/WorkflowExecuteThreadTest.java | 39 +- .../TaskInstanceCacheManagerImplTest.java | 177 -------- .../TaskPriorityQueueConsumerTest.java | 9 - .../processor/TaskAckProcessorTest.java | 10 +- .../runner/task/CommonTaskProcessorTest.java | 1 - .../server/registry/DependencyConfig.java | 6 - .../TaskCallbackServiceTestConfig.java | 6 - .../service/alert/ProcessAlertManager.java | 49 ++- .../service/process/ProcessService.java | 91 ++-- .../service/queue/TaskPriority.java | 22 +- 28 files changed, 504 insertions(+), 744 deletions(-) delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java deleted file mode 100644 index 1388c5b73d..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.cache; - -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; -import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; - -/** - * task instance state manager - */ -public interface TaskInstanceCacheManager { - - /** - * get taskInstance by taskInstance id - * - * @param taskInstanceId taskInstanceId - * @return taskInstance - */ - TaskInstance getByTaskInstanceId(Integer taskInstanceId); - - /** - * cache taskInstance - * - * @param taskExecutionContext taskExecutionContext - */ - void cacheTaskInstance(TaskExecutionContext taskExecutionContext); - - /** - * cache taskInstance - * - * @param taskAckCommand taskAckCommand - */ - void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand); - - /** - * cache taskInstance - * - * @param taskExecuteResponseCommand taskExecuteResponseCommand - */ - void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand); - - /** - * remove taskInstance by taskInstanceId - * @param taskInstanceId taskInstanceId - */ - void removeByTaskInstanceId(Integer taskInstanceId); -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java deleted file mode 100644 index dd2d6eb854..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.cache.impl; - -import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS; - -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; -import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * taskInstance state manager - */ -@Component -public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { - - /** - * taskInstance cache - */ - private Map taskInstanceCache = new ConcurrentHashMap<>(); - - /** - * process service - */ - @Autowired - private ProcessService processService; - - /** - * taskInstance cache refresh timer - */ - private Timer refreshTaskInstanceTimer = null; - - @PostConstruct - public void init() { - //issue#5539 add thread to fetch task state from database in a fixed rate - this.refreshTaskInstanceTimer = new Timer(true); - refreshTaskInstanceTimer.scheduleAtFixedRate( - new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS - ); - } - - @PreDestroy - public void close() { - this.refreshTaskInstanceTimer.cancel(); - } - - /** - * get taskInstance by taskInstance id - * - * @param taskInstanceId taskInstanceId - * @return taskInstance - */ - @Override - public TaskInstance getByTaskInstanceId(Integer taskInstanceId) { - return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId)); - } - - /** - * cache taskInstance - * - * @param taskExecutionContext taskExecutionContext - */ - @Override - public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(taskExecutionContext.getTaskInstanceId()); - taskInstance.setName(taskExecutionContext.getTaskName()); - taskInstance.setStartTime(taskExecutionContext.getStartTime()); - taskInstance.setTaskType(taskExecutionContext.getTaskType()); - taskInstance.setExecutePath(taskExecutionContext.getExecutePath()); - taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance); - } - - /** - * cache taskInstance - * - * @param taskAckCommand taskAckCommand - */ - @Override - public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus())); - taskInstance.setStartTime(taskAckCommand.getStartTime()); - taskInstance.setHost(taskAckCommand.getHost()); - taskInstance.setExecutePath(taskAckCommand.getExecutePath()); - taskInstance.setLogPath(taskAckCommand.getLogPath()); - taskInstanceCache.put(taskAckCommand.getTaskInstanceId(), taskInstance); - } - - /** - * cache taskInstance - * - * @param taskExecuteResponseCommand taskExecuteResponseCommand - */ - @Override - public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) { - TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId()); - taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus())); - taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime()); - taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance); - } - - /** - * remove taskInstance by taskInstanceId - * @param taskInstanceId taskInstanceId - */ - @Override - public void removeByTaskInstanceId(Integer taskInstanceId) { - taskInstanceCache.remove(taskInstanceId); - } - - class RefreshTaskInstanceTimerTask extends TimerTask { - @Override - public void run() { - for (Entry taskInstanceEntry : taskInstanceCache.entrySet()) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey()); - if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) { - taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance); - } - } - - } - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 827759bf9d..290164b868 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -130,14 +130,16 @@ public class TaskPriorityQueueConsumer extends Thread { TaskExecutionContext context = taskPriority.getTaskExecutionContext(); ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); - if (taskInstanceIsFinalState(taskPriority.getTaskId())) { - // when task finish, ignore this task, there is no need to dispatch anymore - return true; - } else { - result = dispatcher.dispatch(executionContext); + if (isTaskNeedToCheck(taskPriority)) { + if (taskInstanceIsFinalState(taskPriority.getTaskId())) { + // when task finish, ignore this task, there is no need to dispatch anymore + return true; + } } + + result = dispatcher.dispatch(executionContext); } catch (ExecuteException e) { - logger.error("dispatch error: {}", e.getMessage(),e); + logger.error("dispatch error: {}", e.getMessage(), e); } return result; } @@ -153,4 +155,18 @@ public class TaskPriorityQueueConsumer extends Thread { TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); return taskInstance.getState().typeIsFinished(); } + + /** + * check if task need to check state, if true, refresh the checkpoint + * @param taskPriority + * @return + */ + private boolean isTaskNeedToCheck(TaskPriority taskPriority) { + long now = System.currentTimeMillis(); + if (now - taskPriority.getCheckpoint() > Constants.SECOND_TIME_MILLIS) { + taskPriority.setCheckpoint(now); + return true; + } + return false; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 87612321c3..27b8991541 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; -import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -49,14 +47,8 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private final TaskResponseService taskResponseService; - /** - * taskInstance cache manager - */ - private final TaskInstanceCacheManager taskInstanceCacheManager; - public TaskAckProcessor() { this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); - this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } /** @@ -71,8 +63,6 @@ public class TaskAckProcessor implements NettyRequestProcessor { TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class); logger.info("taskAckCommand : {}", taskAckCommand); - taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); - String workerAddress = ChannelUtils.toAddress(channel).getAddress(); ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 405e6be48f..aa324ebbde 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -48,14 +46,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private final TaskResponseService taskResponseService; - /** - * taskInstance cache manager - */ - private final TaskInstanceCacheManager taskInstanceCacheManager; - public TaskResponseProcessor() { this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); - this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); } /** @@ -72,8 +64,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor { TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class); logger.info("received command : {}", responseCommand); - taskInstanceCacheManager.cacheTaskInstance(responseCommand); - // TaskResponseEvent TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 72e2355325..9a7fc59178 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -133,6 +133,15 @@ public class StateEventResponseService { } WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + switch (stateEvent.getType()) { + case TASK_STATE_CHANGE: + workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId()); + break; + case PROCESS_STATE_CHANGE: + workflowExecuteThread.refreshProcessInstance(stateEvent.getProcessInstanceId()); + break; + default: + } workflowExecuteThread.addStateEvent(stateEvent); writeResponse(stateEvent, ExecutionStatus.SUCCESS); } catch (Exception e) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 9af1ae243b..7a0af9d667 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -142,54 +142,28 @@ public class TaskResponseService { */ private void persist(TaskResponseEvent taskResponseEvent) { Event event = taskResponseEvent.getEvent(); - Channel channel = taskResponseEvent.getChannel(); + int taskInstanceId = taskResponseEvent.getTaskInstanceId(); + int processInstanceId = taskResponseEvent.getProcessInstanceId(); + + TaskInstance taskInstance; + WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) { + taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + } else { + taskInstance = processService.findTaskInstanceById(taskInstanceId); + } - TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); switch (event) { case ACK: - try { - if (taskInstance != null) { - ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState(); - processService.changeTaskState(taskInstance, status, - taskResponseEvent.getStartTime(), - taskResponseEvent.getWorkerAddress(), - taskResponseEvent.getExecutePath(), - taskResponseEvent.getLogPath(), - taskResponseEvent.getTaskInstanceId()); - } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskAckCommand.convert2Command()); - } catch (Exception e) { - logger.error("worker ack master error", e); - DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskAckCommand.convert2Command()); - } + handleAckEvent(taskResponseEvent, taskInstance); break; case RESULT: - try { - if (taskInstance != null) { - processService.changeTaskState(taskInstance, taskResponseEvent.getState(), - taskResponseEvent.getEndTime(), - taskResponseEvent.getProcessId(), - taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId(), - taskResponseEvent.getVarPool() - ); - } - // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); - channel.writeAndFlush(taskResponseCommand.convert2Command()); - } catch (Exception e) { - logger.error("worker response master error", e); - DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); - channel.writeAndFlush(taskResponseCommand.convert2Command()); - } + handleResultEvent(taskResponseEvent, taskInstance); break; default: throw new IllegalArgumentException("invalid event type : " + event); } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId()); + if (workflowExecuteThread != null) { StateEvent stateEvent = new StateEvent(); stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); @@ -200,7 +174,59 @@ public class TaskResponseService { } } - public BlockingQueue getEventQueue() { - return eventQueue; + /** + * handle ack event + * @param taskResponseEvent + * @param taskInstance + */ + private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { + Channel channel = taskResponseEvent.getChannel(); + try { + if (taskInstance != null) { + if (taskInstance.getState().typeIsFinished()) { + logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState()); + } else { + processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + taskResponseEvent.getStartTime(), + taskResponseEvent.getWorkerAddress(), + taskResponseEvent.getExecutePath(), + taskResponseEvent.getLogPath() + ); + } + } + // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskAckCommand.convert2Command()); + } catch (Exception e) { + logger.error("worker ack master error", e); + DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskAckCommand.convert2Command()); + } + } + + /** + * handle result event + * @param taskResponseEvent + * @param taskInstance + */ + private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { + Channel channel = taskResponseEvent.getChannel(); + try { + if (taskInstance != null) { + processService.changeTaskState(taskInstance, taskResponseEvent.getState(), + taskResponseEvent.getEndTime(), + taskResponseEvent.getProcessId(), + taskResponseEvent.getAppIds(), + taskResponseEvent.getVarPool() + ); + } + // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + } catch (Exception e) { + logger.error("worker response master error", e); + DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1); + channel.writeAndFlush(taskResponseCommand.convert2Command()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index ad12abeb7d..44eac21f57 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -86,9 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; -import com.google.common.collect.Table; /** * master exec thread,split dag @@ -99,100 +97,116 @@ public class WorkflowExecuteThread implements Runnable { * logger of WorkflowExecuteThread */ private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class); + /** - * runing TaskNode + * master config */ - private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); + private MasterConfig masterConfig; /** - * process instance + * process service */ - private ProcessInstance processInstance; + private ProcessService processService; + /** - * submit failure nodes + * alert manager */ - private boolean taskFailedSubmit = false; + private ProcessAlertManager processAlertManager; /** - * recover node id list + * netty executor manager */ - private List recoverNodeIdList = new ArrayList<>(); + private NettyExecutorManager nettyExecutorManager; /** - * error task list + * process instance */ - private Map errorTaskList = new ConcurrentHashMap<>(); + private ProcessInstance processInstance; /** - * complete task list + * process definition */ - private Map completeTaskList = new ConcurrentHashMap<>(); + private ProcessDefinition processDefinition; /** - * ready to submit task queue + * the object of DAG */ - private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + private DAG dag; /** - * depend failed task map + * key of workflow */ - private Map dependFailedTask = new ConcurrentHashMap<>(); + private String key; /** - * forbidden task map + * start flag, true: start nodes submit completely */ - private Map forbiddenTaskList = new ConcurrentHashMap<>(); + private boolean isStart = false; /** - * skip task map + * submit failure nodes */ - private Map skipTaskNodeList = new ConcurrentHashMap<>(); + private boolean taskFailedSubmit = false; /** - * recover tolerance fault task list + * task instance hash map, taskId as key */ - private List recoverToleranceFaultTaskList = new ArrayList<>(); + private Map taskInstanceMap = new ConcurrentHashMap<>(); /** - * alert manager + * running TaskNode, taskId as key */ - private ProcessAlertManager processAlertManager; + private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); /** - * the object of DAG + * valid task map, taskCode as key, taskId as value */ - private DAG dag; + private Map validTaskMap = new ConcurrentHashMap<>(); /** - * process service + * error task map, taskCode as key, taskId as value */ - private ProcessService processService; + private Map errorTaskMap = new ConcurrentHashMap<>(); /** - * master config + * complete task map, taskCode as key, taskId as value */ - private MasterConfig masterConfig; + private Map completeTaskMap = new ConcurrentHashMap<>(); /** - * + * depend failed task map, taskCode as key, taskId as value */ - private NettyExecutorManager nettyExecutorManager; + private Map dependFailedTaskMap = new ConcurrentHashMap<>(); - private ConcurrentLinkedQueue stateEvents = new ConcurrentLinkedQueue<>(); + /** + * forbidden task map, code as key + */ + private Map forbiddenTaskMap = new ConcurrentHashMap<>(); - private List complementListDate = Lists.newLinkedList(); + /** + * skip task map, code as key + */ + private Map skipTaskNodeMap = new ConcurrentHashMap<>(); - private Table taskInstanceHashMap = HashBasedTable.create(); - private ProcessDefinition processDefinition; - private String key; + /** + * complement date list + */ + private List complementListDate = Lists.newLinkedList(); + /** + * task timeout check list + */ private ConcurrentHashMap taskTimeoutCheckList; /** - * start flag, true: start nodes submit completely - * + * state event queue */ - private boolean isStart = false; + private ConcurrentLinkedQueue stateEvents = new ConcurrentLinkedQueue<>(); + + /** + * ready to submit task queue + */ + private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); /** * constructor of WorkflowExecuteThread @@ -200,7 +214,6 @@ public class WorkflowExecuteThread implements Runnable { * @param processInstance processInstance * @param processService processService * @param nettyExecutorManager nettyExecutorManager - * @param taskTimeoutCheckList */ public WorkflowExecuteThread(ProcessInstance processInstance , ProcessService processService @@ -232,7 +245,6 @@ public class WorkflowExecuteThread implements Runnable { /** * the process start nodes are submitted completely. - * @return */ public boolean isStart() { return this.isStart; @@ -286,9 +298,10 @@ public class WorkflowExecuteThread implements Runnable { private boolean stateEventHandler(StateEvent stateEvent) { logger.info("process event: {}", stateEvent.toString()); - if (!checkStateEvent(stateEvent)) { + if (!checkProcessInstance(stateEvent)) { return false; } + boolean result = false; switch (stateEvent.getType()) { case PROCESS_STATE_CHANGE: @@ -314,16 +327,11 @@ public class WorkflowExecuteThread implements Runnable { } private boolean taskTimeout(StateEvent stateEvent) { - - if (taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) { + if (!checkTaskInstanceByStateEvent(stateEvent)) { return true; } - TaskInstance taskInstance = taskInstanceHashMap - .row(stateEvent.getTaskInstanceId()) - .values() - .iterator().next(); - + TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId()); if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) { return true; } @@ -344,7 +352,16 @@ public class WorkflowExecuteThread implements Runnable { } private boolean taskStateChangeHandler(StateEvent stateEvent) { - TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); + if (!checkTaskInstanceByStateEvent(stateEvent)) { + return true; + } + + TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId()); + if (task.getState() == null) { + logger.error("task state is null, state handler error: {}", stateEvent); + return true; + } + if (task.getState().typeIsFinished()) { taskFinished(task); } else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) { @@ -356,7 +373,7 @@ public class WorkflowExecuteThread implements Runnable { taskFinished(task); } } else { - logger.error("state handler error: {}", stateEvent.toString()); + logger.error("state handler error: {}", stateEvent); } return true; } @@ -382,10 +399,11 @@ public class WorkflowExecuteThread implements Runnable { } return; } - ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId()); - completeTaskList.put(Long.toString(task.getTaskCode()), task); + + completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); activeTaskProcessorMaps.remove(task.getId()); taskTimeoutCheckList.remove(task.getId()); + if (task.getState().typeIsSuccess()) { processInstance.setVarPool(task.getVarPool()); processService.saveProcessInstance(processInstance); @@ -395,7 +413,7 @@ public class WorkflowExecuteThread implements Runnable { || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { submitPostNode(Long.toString(task.getTaskCode())); } else { - errorTaskList.put(Long.toString(task.getTaskCode()), task); + errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); if (processInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -404,20 +422,102 @@ public class WorkflowExecuteThread implements Runnable { this.updateProcessInstanceState(); } - private boolean checkStateEvent(StateEvent stateEvent) { + /** + * update process instance + */ + public void refreshProcessInstance(int processInstanceId) { + logger.info("process instance update: {}", processInstanceId); + processInstance = processService.findProcessInstanceById(processInstanceId); + processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + processInstance.setProcessDefinition(processDefinition); + } + + /** + * update task instance + */ + public void refreshTaskInstance(int taskInstanceId) { + logger.info("task instance update: {} ", taskInstanceId); + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + if (taskInstance == null) { + logger.error("can not find task instance, id:{}", taskInstanceId); + return; + } + processService.packageTaskInstance(taskInstance, processInstance); + taskInstanceMap.put(taskInstance.getId(), taskInstance); + + validTaskMap.remove(Long.toString(taskInstance.getTaskCode())); + if (Flag.YES == taskInstance.getFlag()) { + validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId()); + } + } + + /** + * check process instance by state event + */ + public boolean checkProcessInstance(StateEvent stateEvent) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { logger.error("mismatch process instance id: {}, state event:{}", this.processInstance.getId(), - stateEvent.toString()); + stateEvent); + return false; + } + return true; + } + + /** + * check if task instance exist by state event + */ + public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) { + if (stateEvent.getTaskInstanceId() == 0) { + logger.error("task instance id null, state event:{}", stateEvent); + return false; + } + if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) { + logger.error("mismatch task instance id, event:{}", stateEvent); return false; } return true; } + /** + * check if task instance exist by task code + */ + public boolean checkTaskInstanceByCode(long taskCode) { + if (taskInstanceMap == null || taskInstanceMap.size() == 0) { + return false; + } + for (TaskInstance taskInstance : taskInstanceMap.values()) { + if (taskInstance.getTaskCode() == taskCode) { + return true; + } + } + return false; + } + + /** + * check if task instance exist by id + */ + public boolean checkTaskInstanceById(int taskInstanceId) { + if (taskInstanceMap == null || taskInstanceMap.size() == 0) { + return false; + } + return taskInstanceMap.containsKey(taskInstanceId); + } + + /** + * get task instance from memory + */ + public TaskInstance getTaskInstance(int taskInstanceId) { + if (taskInstanceMap.containsKey(taskInstanceId)) { + return taskInstanceMap.get(taskInstanceId); + } + return null; + } + private boolean processStateChangeHandler(StateEvent stateEvent) { try { logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus()); - processInstance = processService.findProcessInstanceById(this.processInstance.getId()); if (processComplementData()) { return true; } @@ -477,7 +577,7 @@ public class WorkflowExecuteThread implements Runnable { processInstance.setStartTime(new Date()); processInstance.setEndTime(null); processService.saveProcessInstance(processInstance); - this.taskInstanceHashMap.clear(); + this.taskInstanceMap.clear(); startProcess(); return true; } @@ -491,7 +591,7 @@ public class WorkflowExecuteThread implements Runnable { } private void startProcess() throws Exception { - if (this.taskInstanceHashMap.size() == 0) { + if (this.taskInstanceMap.size() == 0) { isStart = false; buildFlowDag(); initTaskQueue(); @@ -505,25 +605,22 @@ public class WorkflowExecuteThread implements Runnable { */ private void endProcess() { this.stateEvents.clear(); - processInstance.setEndTime(new Date()); - ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion()); if (processDefinition.getExecutionType().typeIsSerialWait()) { checkSerialProcess(processDefinition); } - processService.updateProcessInstance(processInstance); if (processInstance.getState().typeIsWaitingThread()) { processService.createRecoveryWaitingThreadCommand(null, processInstance); } - List taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser); + if (processAlertManager.isNeedToSendWarning(processInstance)) { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); + } } public void checkSerialProcess(ProcessDefinition processDefinition) { - this.processInstance = processService.findProcessInstanceById(processInstance.getId()); int nextInstanceId = processInstance.getNextProcessInstanceId(); if (nextInstanceId == 0) { - ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode()); + ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode()); if (nextProcessInstance == null) { return; } @@ -553,19 +650,21 @@ public class WorkflowExecuteThread implements Runnable { } processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); + processInstance.setProcessDefinition(processDefinition); + + List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); List taskNodeList = - processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); - forbiddenTaskList.clear(); + processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); + forbiddenTaskMap.clear(); taskNodeList.forEach(taskNode -> { if (taskNode.isForbidden()) { - forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode); + forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode); } }); // generate process to get DAG info - List recoveryNodeCodeList = getRecoveryNodeCodeList(); + List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType()); @@ -584,19 +683,23 @@ public class WorkflowExecuteThread implements Runnable { taskFailedSubmit = false; activeTaskProcessorMaps.clear(); - dependFailedTask.clear(); - completeTaskList.clear(); - errorTaskList.clear(); - List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance task : taskInstanceList) { + dependFailedTaskMap.clear(); + completeTaskMap.clear(); + errorTaskMap.clear(); + + List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance task : validTaskInstanceList) { + validTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + taskInstanceMap.put(task.getId(), task); + if (task.isTaskComplete()) { - completeTaskList.put(Long.toString(task.getTaskCode()), task); + completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); } if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { continue; } if (task.getState().typeIsFailure() && !task.taskCanRetry()) { - errorTaskList.put(Long.toString(task.getTaskCode()), task); + errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); } } @@ -637,31 +740,32 @@ public class WorkflowExecuteThread implements Runnable { && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } + + // package task instance before submit + processService.packageTaskInstance(taskInstance, processInstance); + boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval()); - if (submit) { - this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance); - activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); - taskProcessor.run(); - addTimeoutCheck(taskInstance); - TaskDefinition taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), - taskInstance.getTaskDefinitionVersion()); - taskInstance.setTaskDefine(taskDefinition); - if (taskProcessor.taskState().typeIsFinished()) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(this.processInstance.getId()); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - this.stateEvents.add(stateEvent); - } - return taskInstance; - } else { + if (!submit) { logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); return null; } + taskInstanceMap.put(taskInstance.getId(), taskInstance); + activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); + taskProcessor.run(); + + addTimeoutCheck(taskInstance); + + if (taskProcessor.taskState().typeIsFinished()) { + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(this.processInstance.getId()); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setExecutionStatus(taskProcessor.taskState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + this.stateEvents.add(stateEvent); + } + return taskInstance; } catch (Exception e) { logger.error("submit standby task error", e); return null; @@ -688,11 +792,12 @@ public class WorkflowExecuteThread implements Runnable { if (taskTimeoutCheckList.containsKey(taskInstance.getId())) { return; } - TaskDefinition taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), - taskInstance.getTaskDefinitionVersion() - ); - taskInstance.setTaskDefine(taskDefinition); + TaskDefinition taskDefinition = taskInstance.getTaskDefine(); + if (taskDefinition == null) { + logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + return; + } + if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) { this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); } else { @@ -711,8 +816,8 @@ public class WorkflowExecuteThread implements Runnable { * @return TaskInstance */ private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) { - List taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId()); - for (TaskInstance taskInstance : taskInstanceList) { + List validTaskInstanceList = getValidTaskList(); + for (TaskInstance taskInstance : validTaskInstanceList) { if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == taskVersion) { return taskInstance; } @@ -805,7 +910,11 @@ public class WorkflowExecuteThread implements Runnable { Map allTaskInstance = new HashMap<>(); if (CollectionUtils.isNotEmpty(preTask)) { for (String preTaskCode : preTask) { - TaskInstance preTaskInstance = completeTaskList.get(preTaskCode); + Integer taskId = completeTaskMap.get(preTaskCode); + if (taskId == null) { + continue; + } + TaskInstance preTaskInstance = taskInstanceMap.get(taskId); if (preTaskInstance == null) { continue; } @@ -854,12 +963,35 @@ public class WorkflowExecuteThread implements Runnable { } } + /** + * get complete task instance map, taskCode as key + */ + private Map getCompleteTaskInstanceMap() { + Map completeTaskInstanceMap = new HashMap<>(); + for (Integer taskInstanceId : completeTaskMap.values()) { + TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); + completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance); + } + return completeTaskInstanceMap; + } + + /** + * get valid task list + */ + private List getValidTaskList() { + List validTaskInstanceList = new ArrayList<>(); + for (Integer taskInstanceId : validTaskMap.values()) { + validTaskInstanceList.add(taskInstanceMap.get(taskInstanceId)); + } + return validTaskInstanceList; + } + private void submitPostNode(String parentNodeCode) { - Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList); + Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); - if (taskInstanceHashMap.containsColumn(taskNodeObject.getCode())) { + if (checkTaskInstanceByCode(taskNodeObject.getCode())) { continue; } TaskInstance task = createTaskInstance(processInstance, taskNodeObject); @@ -873,15 +1005,16 @@ public class WorkflowExecuteThread implements Runnable { continue; } - if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) { + if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) { logger.info("task {} has already run success", task.getName()); continue; } if (task.getState().typeIsPause() || task.getState().typeIsCancel()) { logger.info("task {} stopped, the state is {}", task.getName(), task.getState()); - } else { - addTaskToStandByList(task); + continue; } + + addTaskToStandByList(task); } submitStandByTask(); updateProcessInstanceState(); @@ -903,15 +1036,16 @@ public class WorkflowExecuteThread implements Runnable { List depCodeList = taskNode.getDepList(); for (String depsNode : depCodeList) { if (!dag.containsNode(depsNode) - || forbiddenTaskList.containsKey(depsNode) - || skipTaskNodeList.containsKey(depsNode)) { + || forbiddenTaskMap.containsKey(depsNode) + || skipTaskNodeMap.containsKey(depsNode)) { continue; } // dependencies must be fully completed - if (!completeTaskList.containsKey(depsNode)) { + if (!completeTaskMap.containsKey(depsNode)) { return DependResult.WAITING; } - ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState(); + Integer depsTaskId = completeTaskMap.get(depsNode); + ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState(); if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) { return DependResult.NON_EXEC; } @@ -923,7 +1057,7 @@ public class WorkflowExecuteThread implements Runnable { return DependResult.FAILED; } } - logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray())); + logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray())); return DependResult.SUCCESS; } @@ -933,12 +1067,13 @@ public class WorkflowExecuteThread implements Runnable { private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) { if (dag.getNode(dependNodeName).isConditionsTask()) { //condition task need check the branch to run - List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList); + List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); if (!nextTaskList.contains(nextNodeName)) { return false; } } else { - ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState(); + Integer taskInstanceId = completeTaskMap.get(dependNodeName); + ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); if (depTaskState.typeIsFailure()) { return false; } @@ -954,9 +1089,10 @@ public class WorkflowExecuteThread implements Runnable { */ private List getCompleteTaskByState(ExecutionStatus state) { List resultList = new ArrayList<>(); - for (Map.Entry entry : completeTaskList.entrySet()) { - if (entry.getValue().getState() == state) { - resultList.add(entry.getValue()); + for (Integer taskInstanceId : completeTaskMap.values()) { + TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); + if (taskInstance != null && taskInstance.getState() == state) { + resultList.add(taskInstance); } } return resultList; @@ -990,10 +1126,10 @@ public class WorkflowExecuteThread implements Runnable { if (this.taskFailedSubmit) { return true; } - if (this.errorTaskList.size() > 0) { + if (this.errorTaskMap.size() > 0) { return true; } - return this.dependFailedTask.size() > 0; + return this.dependFailedTaskMap.size() > 0; } /** @@ -1049,7 +1185,6 @@ public class WorkflowExecuteThread implements Runnable { /** * generate the latest process instance status by the tasks state * - * @param instance * @return process instance execution status */ private ExecutionStatus getProcessInstanceState(ProcessInstance instance) { @@ -1130,8 +1265,7 @@ public class WorkflowExecuteThread implements Runnable { * after each batch of tasks is executed, the status of the process instance is updated */ private void updateProcessInstanceState() { - ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId()); - ExecutionStatus state = getProcessInstanceState(instance); + ExecutionStatus state = getProcessInstanceState(processInstance); if (processInstance.getState() != state) { logger.info( "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", @@ -1139,9 +1273,12 @@ public class WorkflowExecuteThread implements Runnable { processInstance.getState(), state, processInstance.getCommandType()); - instance.setState(state); - processService.updateProcessInstance(instance); - processInstance = instance; + processInstance.setState(state); + if (state.typeIsFinished()) { + processInstance.setEndTime(new Date()); + } + processService.updateProcessInstance(processInstance); + StateEvent stateEvent = new StateEvent(); stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setProcessInstanceId(this.processInstance.getId()); @@ -1254,7 +1391,8 @@ public class WorkflowExecuteThread implements Runnable { task.setState(retryTask.getState()); logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); removeTaskFromStandbyList(task); - completeTaskList.put(Long.toString(task.getTaskCode()), task); + completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + taskInstanceMap.put(task.getId(), task); submitPostNode(Long.toString(task.getTaskCode())); continue; } @@ -1281,7 +1419,7 @@ public class WorkflowExecuteThread implements Runnable { } } else if (DependResult.FAILED == dependResult) { // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTask.put(Long.toString(task.getTaskCode()), task); + dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); removeTaskFromStandbyList(task); logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult); } else if (DependResult.NON_EXEC == dependResult) { @@ -1366,10 +1504,10 @@ public class WorkflowExecuteThread implements Runnable { * * @return recovery node code list */ - private List getRecoveryNodeCodeList() { + private List getRecoveryNodeCodeList(List recoverNodeList) { List recoveryNodeCodeList = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(recoverNodeIdList)) { - for (TaskInstance task : recoverNodeIdList) { + if (CollectionUtils.isNotEmpty(recoverNodeList)) { + for (TaskInstance task : recoverNodeList) { recoveryNodeCodeList.add(Long.toString(task.getTaskCode())); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index fb14d965cb..7194ff950c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -161,8 +161,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { * @return TaskExecutionContext */ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) { - processService.setTaskInstanceDetail(taskInstance); - int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); @@ -172,12 +170,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { taskInstance.getStartTime(), taskInstance.getHost(), null, - null, - taskInstance.getId()); + null + ); return null; } // set queue for process instance, user-specified queue takes precedence over tenant queue - String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); + String userQueue = processService.queryUserQueueByProcessInstance(taskInstance.getProcessInstance()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.setResources(getResourceFullNames(taskInstance)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index ee1c548525..7b193bfe15 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -65,7 +65,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { this.processInstance = processInstance; - this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval); + this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index ee1cf8215e..ee03602b42 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -72,7 +72,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { this.processInstance = processInstance; - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); if (this.taskInstance == null) { return false; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 8c3a287778..d873a81b86 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.Date; @@ -81,7 +80,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { this.processInstance = processInstance; this.taskInstance = task; - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); if (this.taskInstance == null) { return false; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index e0cd3e8603..421efd3937 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -54,7 +54,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { taskDefinition = processService.findTaskDefinition( task.getTaskCode(), task.getTaskDefinitionVersion() ); - this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval); + this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval); if (this.taskInstance == null) { return false; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 116a8d556d..70013e176d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -63,7 +63,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { this.processInstance = processInstance; - this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval); if (this.taskInstance == null) { return false; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java index c8a4ae4545..4207d3079c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java @@ -89,7 +89,7 @@ public class ConditionsTaskTest { // for MasterBaseTaskExecThread.submit Mockito.when(processService - .submitTask(taskInstance)) + .submitTask(processInstance, taskInstance)) .thenReturn(taskInstance); // for MasterBaseTaskExecThread.call Mockito.when(processService diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 1fa37cc7f5..74017f032b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -93,6 +93,7 @@ public class DependentTaskTest { Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); processInstance = getProcessInstance(); + taskInstance = getTaskInstance(); // for MasterBaseTaskExecThread.call // for DependentTaskExecThread.waitTaskQuit @@ -102,7 +103,7 @@ public class DependentTaskTest { // for MasterBaseTaskExecThread.submit Mockito.when(processService - .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000))) + .submitTask(processInstance, taskInstance)) .thenAnswer(i -> taskInstance); // for DependentTaskExecThread.initTaskParameters @@ -346,6 +347,12 @@ public class DependentTaskTest { return processInstance; } + private TaskInstance getTaskInstance() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1000); + return taskInstance; + } + /** * task that dependent on others (and to be tested here) * notice: should be filled with setDependence() and be passed to setupTaskInstance() diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java index 5fcfc77bcc..8af61908e6 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java @@ -74,6 +74,8 @@ public class SubProcessTaskTest { Mockito.when(applicationContext.getBean(AlertDao.class)).thenReturn(alertDao); processInstance = getProcessInstance(); + TaskInstance taskInstance = getTaskInstance(); + Mockito.when(processService .findProcessInstanceById(processInstance.getId())) .thenReturn(processInstance); @@ -85,7 +87,7 @@ public class SubProcessTaskTest { // for MasterBaseTaskExecThread.submit Mockito.when(processService - .submitTask(Mockito.any())) + .submitTask(processInstance, taskInstance)) .thenAnswer(t -> t.getArgument(0)); TaskDefinition taskDefinition = new TaskDefinition(); @@ -147,6 +149,12 @@ public class SubProcessTaskTest { return processInstance; } + private TaskInstance getTaskInstance() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1000); + return taskInstance; + } + private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) { ProcessInstance processInstance = new ProcessInstance(); processInstance.setId(102); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java index 3221edb9aa..4516d9dba2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java @@ -81,7 +81,7 @@ public class SwitchTaskTest { // for MasterBaseTaskExecThread.submit Mockito.when(processService - .submitTask(taskInstance)) + .submitTask(processInstance, taskInstance)) .thenReturn(taskInstance); // for MasterBaseTaskExecThread.call Mockito.when(processService diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 911d0d7a4b..87bc0428fc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -162,39 +162,52 @@ public class WorkflowExecuteThreadTest { public void testGetPreVarPool() { try { Set preTaskName = new HashSet<>(); - preTaskName.add("test1"); - preTaskName.add("test2"); - Map completeTaskList = new ConcurrentHashMap<>(); + preTaskName.add(Long.toString(1)); + preTaskName.add(Long.toString(2)); TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance1 = new TaskInstance(); taskInstance1.setId(1); - taskInstance1.setName("test1"); + taskInstance1.setTaskCode(1); taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]"); taskInstance1.setEndTime(new Date()); TaskInstance taskInstance2 = new TaskInstance(); taskInstance2.setId(2); - taskInstance2.setName("test2"); + taskInstance2.setTaskCode(2); taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); taskInstance2.setEndTime(new Date()); - completeTaskList.put("test1", taskInstance1); - completeTaskList.put("test2", taskInstance2); + Map taskInstanceMap = new ConcurrentHashMap<>(); + taskInstanceMap.put(taskInstance1.getId(), taskInstance1); + taskInstanceMap.put(taskInstance2.getId(), taskInstance2); + + Map completeTaskList = new ConcurrentHashMap<>(); + completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId()); + completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId()); Class masterExecThreadClass = WorkflowExecuteThread.class; - Field field = masterExecThreadClass.getDeclaredField("completeTaskList"); - field.setAccessible(true); - field.set(workflowExecuteThread, completeTaskList); + Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap"); + completeTaskMapField.setAccessible(true); + completeTaskMapField.set(workflowExecuteThread, completeTaskList); + + Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap"); + taskInstanceMapField.setAccessible(true); + taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); Assert.assertNotNull(taskInstance.getVarPool()); + taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); - completeTaskList.put("test2", taskInstance2); - field.setAccessible(true); - field.set(workflowExecuteThread, completeTaskList); + completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId()); + + completeTaskMapField.setAccessible(true); + completeTaskMapField.set(workflowExecuteThread, completeTaskList); + taskInstanceMapField.setAccessible(true); + taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); + workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); Assert.assertNotNull(taskInstance.getVarPool()); } catch (Exception e) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java deleted file mode 100644 index f6098454ba..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.cache.impl; - -import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS; - -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; - -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class TaskInstanceCacheManagerImplTest { - - @InjectMocks - private TaskInstanceCacheManagerImpl taskInstanceCacheManager; - - @Mock(name = "processService") - private ProcessService processService; - - @Before - public void before() { - - TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand(); - taskExecuteAckCommand.setStatus(1); - taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker"); - taskExecuteAckCommand.setHost("worker007"); - taskExecuteAckCommand.setLogPath("/temp/worker.log"); - taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7)); - taskExecuteAckCommand.setTaskInstanceId(0); - - taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand); - - } - - @Test - public void testInit() throws InterruptedException { - - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(0); - taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); - taskInstance.setExecutePath("/dolphinscheduler/worker"); - taskInstance.setHost("worker007"); - taskInstance.setLogPath("/temp/worker.log"); - taskInstance.setProcessInstanceId(0); - - Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance); - - taskInstanceCacheManager.init(); - TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000); - - Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState()); - - } - - @Test - public void getByTaskInstanceIdFromCache() { - TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0); - - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(0); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setExecutePath("/dolphinscheduler/worker"); - taskInstance.setHost("worker007"); - taskInstance.setLogPath("/temp/worker.log"); - taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7)); - - Assert.assertEquals(taskInstance.toString(), instanceGot.toString()); - - } - - @Test - public void getByTaskInstanceIdFromDatabase() { - - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - taskInstance.setExecutePath("/dolphinscheduler/worker"); - taskInstance.setHost("worker007"); - taskInstance.setLogPath("/temp/worker.log"); - taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7)); - - Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); - - TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1); - - Assert.assertEquals(taskInstance, instanceGot); - - } - - @Test - public void cacheTaskInstanceByTaskExecutionContext() { - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setTaskInstanceId(2); - taskExecutionContext.setTaskName("blackberrier test"); - taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7)); - taskExecutionContext.setTaskType(TaskType.SPARK.getDesc()); - taskExecutionContext.setExecutePath("/tmp"); - - taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext); - - TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2); - - Assert.assertEquals(taskInstance.getId(), 2); - Assert.assertEquals(taskInstance.getName(), "blackberrier test"); - Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7)); - Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc()); - Assert.assertEquals(taskInstance.getExecutePath(), "/tmp"); - - } - - @Test - public void testCacheTaskInstanceByTaskExecuteAckCommand() { - TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0); - - Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState()); - Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime()); - Assert.assertEquals("worker007", taskInstance.getHost()); - Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath()); - Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath()); - - } - - @Test - public void testCacheTaskInstanceByTaskExecuteResponseCommand() { - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(); - responseCommand.setTaskInstanceId(0); - responseCommand.setStatus(9); - responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8)); - - taskInstanceCacheManager.cacheTaskInstance(responseCommand); - - TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0); - - Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime()); - Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState()); - - } - - @Test - public void removeByTaskInstanceId() { - taskInstanceCacheManager.removeByTaskInstanceId(0); - Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0)); - - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index f9d51a910e..99b6f797bb 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -76,8 +76,6 @@ public class TaskPriorityQueueConsumerTest { tenant.setUpdateTime(new Date()); Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2); - - Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1); } @Test @@ -101,7 +99,6 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); taskPriorityQueue.put(taskPriority); @@ -129,7 +126,6 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); taskPriorityQueue.put(taskPriority); @@ -171,7 +167,6 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); taskPriorityQueue.put(taskPriority); @@ -211,7 +206,6 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); taskPriorityQueue.put(taskPriority); @@ -271,7 +265,6 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); @@ -310,7 +303,6 @@ public class TaskPriorityQueueConsumerTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskInstance.setTaskDefine(taskDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(); @@ -342,7 +334,6 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java index e215d4cdb6..823ffa2cd7 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -18,20 +18,16 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; -import java.net.InetSocketAddress; import java.util.Date; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -39,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import io.netty.channel.Channel; /** - * task ack processor test + * task ack processor test */ @RunWith(PowerMockRunner.class) @PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class}) @@ -47,7 +43,6 @@ public class TaskAckProcessorTest { private TaskAckProcessor taskAckProcessor; private TaskResponseService taskResponseService; - private TaskInstanceCacheManagerImpl taskInstanceCacheManager; private ProcessService processService; private TaskExecuteAckCommand taskExecuteAckCommand; private TaskResponseEvent taskResponseEvent; @@ -60,9 +55,6 @@ public class TaskAckProcessorTest { taskResponseService = PowerMockito.mock(TaskResponseService.class); PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService); - taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class); - PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager); - processService = PowerMockito.mock(ProcessService.class); PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java index e7afa143bb..55828e1659 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java @@ -86,7 +86,6 @@ public class CommonTaskProcessorTest { taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); taskInstance.setTaskDefine(taskDefinition); - Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1); Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java index 4429e7cd72..8d1faa80a9 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java @@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager; @@ -67,11 +66,6 @@ public class DependencyConfig { return Mockito.mock(AlertMapper.class); } - @Bean - public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() { - return Mockito.mock(TaskInstanceCacheManagerImpl.class); - } - @Bean public ProcessService processService() { return Mockito.mock(ProcessService.class); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java index f4876a697f..0ac237264d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java @@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.service.process.ProcessService; import org.mockito.Mockito; @@ -59,11 +58,6 @@ public class TaskCallbackServiceTestConfig { return Mockito.mock(AlertMapper.class); } - @Bean - public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() { - return Mockito.mock(TaskInstanceCacheManagerImpl.class); - } - @Bean public ProcessService processService() { return Mockito.mock(ProcessService.class); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java index c2db5657db..669bd4e4c8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java @@ -205,43 +205,54 @@ public class ProcessAlertManager { List taskInstances, ProjectUser projectUser) { - if (Flag.YES == processInstance.getIsSubProcess()) { + if (!isNeedToSendWarning(processInstance)) { return; } - boolean sendWarnning = false; + + Alert alert = new Alert(); + + String cmdName = getCommandCnName(processInstance.getCommandType()); + String success = processInstance.getState().typeIsSuccess() ? "success" : "failed"; + alert.setTitle(cmdName + " " + success); + String content = getContentProcessInstance(processInstance, taskInstances,projectUser); + alert.setContent(content); + alert.setAlertGroupId(processInstance.getWarningGroupId()); + alert.setCreateTime(new Date()); + alertDao.addAlert(alert); + logger.info("add alert to db , alert: {}", alert); + } + + /** + * check if need to be send warning + * + * @param processInstance + * @return + */ + public boolean isNeedToSendWarning(ProcessInstance processInstance) { + if (Flag.YES == processInstance.getIsSubProcess()) { + return false; + } + boolean sendWarning = false; WarningType warningType = processInstance.getWarningType(); switch (warningType) { case ALL: if (processInstance.getState().typeIsFinished()) { - sendWarnning = true; + sendWarning = true; } break; case SUCCESS: if (processInstance.getState().typeIsSuccess()) { - sendWarnning = true; + sendWarning = true; } break; case FAILURE: if (processInstance.getState().typeIsFailure()) { - sendWarnning = true; + sendWarning = true; } break; default: } - if (!sendWarnning) { - return; - } - Alert alert = new Alert(); - - String cmdName = getCommandCnName(processInstance.getCommandType()); - String success = processInstance.getState().typeIsSuccess() ? "success" : "failed"; - alert.setTitle(cmdName + " " + success); - String content = getContentProcessInstance(processInstance, taskInstances,projectUser); - alert.setContent(content); - alert.setAlertGroupId(processInstance.getWarningGroupId()); - alert.setCreateTime(new Date()); - alertDao.addAlert(alert); - logger.info("add alert to db , alert: {}", alert); + return sendWarning; } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index bf0706c40a..fbf29b3bfe 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -100,6 +100,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -1087,24 +1088,17 @@ public class ProcessService { /** * retry submit task to db */ - public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { - + public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) { int retryTimes = 1; - boolean submitDB = false; TaskInstance task = null; while (retryTimes <= commitRetryTimes) { try { - if (!submitDB) { - // submit task to db - task = submitTask(taskInstance); - if (task != null && task.getId() != 0) { - submitDB = true; - break; - } - } - if (!submitDB) { - logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); + // submit task to db + task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance); + if (task != null && task.getId() != 0) { + break; } + logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); Thread.sleep(commitInterval); } catch (Exception e) { logger.error("task commit to mysql failed", e); @@ -1118,12 +1112,12 @@ public class ProcessService { * submit task to db * submit sub process to command * + * @param processInstance processInstance * @param taskInstance taskInstance * @return task instance */ @Transactional(rollbackFor = Exception.class) - public TaskInstance submitTask(TaskInstance taskInstance) { - ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) { logger.info("start submit task : {}, instance id:{}, state: {}", taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState()); //submit to db @@ -1131,8 +1125,9 @@ public class ProcessService { if (task == null) { logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); - return task; + return null; } + if (!task.getState().typeIsFinished()) { createSubWorkProcess(processInstance, task); } @@ -1383,7 +1378,7 @@ public class ProcessService { } taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); + taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); if (taskInstance.getSubmitTime() == null) { taskInstance.setSubmitTime(new Date()); } @@ -1406,10 +1401,10 @@ public class ProcessService { * if all of above are not satisfied, return submit success * * @param taskInstance taskInstance - * @param processInstanceState processInstanceState + * @param processInstance processInstance * @return process instance state */ - public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) { + public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) { ExecutionStatus state = taskInstance.getState(); // running, delayed or killed // the task already exists in task queue @@ -1423,10 +1418,10 @@ public class ProcessService { } //return pasue /stop if process instance state is ready pause / stop // or return submit success - if (processInstanceState == ExecutionStatus.READY_PAUSE) { + if (processInstance.getState() == ExecutionStatus.READY_PAUSE) { state = ExecutionStatus.PAUSE; - } else if (processInstanceState == ExecutionStatus.READY_STOP - || !checkProcessStrategy(taskInstance)) { + } else if (processInstance.getState() == ExecutionStatus.READY_STOP + || !checkProcessStrategy(taskInstance, processInstance)) { state = ExecutionStatus.KILL; } else { state = ExecutionStatus.SUBMITTED_SUCCESS; @@ -1440,8 +1435,7 @@ public class ProcessService { * @param taskInstance taskInstance * @return check strategy result */ - private boolean checkProcessStrategy(TaskInstance taskInstance) { - ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId()); + private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) { FailureStrategy failureStrategy = processInstance.getFailureStrategy(); if (failureStrategy == FailureStrategy.CONTINUE) { return true; @@ -1535,39 +1529,15 @@ public class ProcessService { } /** - * package task instanceļ¼Œassociate processInstance and processDefine - * - * @param taskInstId taskInstId - * @return task instance + * package task instance */ - public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId) { - // get task instance - TaskInstance taskInstance = findTaskInstanceById(taskInstId); - if (taskInstance == null) { - return null; - } - setTaskInstanceDetail(taskInstance); - return taskInstance; - } - - /** - * package task instanceļ¼Œassociate processInstance and processDefine - * - * @param taskInstance taskInstance - * @return task instance - */ - public void setTaskInstanceDetail(TaskInstance taskInstance) { - // get process instance - ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); + public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) { taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); - TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + taskInstance.setProcessDefine(processInstance.getProcessDefinition()); + TaskDefinition taskDefinition = this.findTaskDefinition( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); - updateTaskDefinitionResources(taskDefinition); + this.updateTaskDefinitionResources(taskDefinition); taskInstance.setTaskDefine(taskDefinition); } @@ -1576,7 +1546,7 @@ public class ProcessService { * * @param taskDefinition the given {@link TaskDefinition} */ - private void updateTaskDefinitionResources(TaskDefinition taskDefinition) { + public void updateTaskDefinitionResources(TaskDefinition taskDefinition) { Map taskParameters = JSONUtils.parseObject( taskDefinition.getTaskParams(), new TypeReference>() { @@ -1757,12 +1727,10 @@ public class ProcessService { * @param host host * @param executePath executePath * @param logPath logPath - * @param taskInstId taskInstId */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, - String logPath, - int taskInstId) { + String logPath) { taskInstance.setState(state); taskInstance.setStartTime(startTime); taskInstance.setHost(host); @@ -1786,14 +1754,12 @@ public class ProcessService { * * @param state state * @param endTime endTime - * @param taskInstId taskInstId * @param varPool varPool */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, int processId, String appIds, - int taskInstId, String varPool) { taskInstance.setPid(processId); taskInstance.setAppLink(appIds); @@ -2047,15 +2013,14 @@ public class ProcessService { } /** - * query user queue by process instance id + * query user queue by process instance * - * @param processInstanceId processInstanceId + * @param processInstance processInstance * @return queue */ - public String queryUserQueueByProcessInstanceId(int processInstanceId) { + public String queryUserQueueByProcessInstance(ProcessInstance processInstance) { String queue = ""; - ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); if (processInstance == null) { return queue; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java index d78fb98b79..2cbd2987a8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.Objects; /** - * task priority info + * task priority info */ public class TaskPriority implements Comparable { @@ -62,7 +62,14 @@ public class TaskPriority implements Comparable { */ private Map context; - public TaskPriority(){} + /** + * checkpoint + */ + private long checkpoint; + + public TaskPriority() { + this.checkpoint = System.currentTimeMillis(); + } public TaskPriority(int processInstancePriority, int processInstanceId, @@ -73,6 +80,7 @@ public class TaskPriority implements Comparable { this.taskInstancePriority = taskInstancePriority; this.taskId = taskId; this.groupName = groupName; + this.checkpoint = System.currentTimeMillis(); } public int getProcessInstancePriority() { @@ -131,6 +139,14 @@ public class TaskPriority implements Comparable { this.taskExecutionContext = taskExecutionContext; } + public long getCheckpoint() { + return checkpoint; + } + + public void setCheckpoint(long checkpoint) { + this.checkpoint = checkpoint; + } + @Override public int compareTo(TaskPriority other) { if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) { @@ -174,7 +190,7 @@ public class TaskPriority implements Comparable { } TaskPriority that = (TaskPriority) o; return processInstancePriority == that.processInstancePriority - && processInstanceId == that.processInstanceId + && processInstanceId == that.processInstanceId && taskInstancePriority == that.taskInstancePriority && taskId == that.taskId && Objects.equals(groupName, that.groupName);