Browse Source

[DS-6891][MasterServer] reduce db operation when process instance running (#6904)

* remove taskInstanceCacheManager

* remove taskInstanceCacheManager

* [DS-6891][MasterServer] reduce db operation when process instance running

* [DS-6891][MasterServer] reduce db operation when process instance running

* fix test

* fix Transactional method call

* checkstyle

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
28872d8706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java
  2. 155
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java
  3. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  5. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  6. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  7. 108
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  8. 412
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  9. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  11. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  12. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  14. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  15. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  16. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  17. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  18. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java
  19. 39
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  20. 177
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java
  21. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  22. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
  23. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
  24. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java
  25. 6
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java
  26. 49
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
  27. 91
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  28. 22
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java vendored

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
/**
* task instance state manager
*/
public interface TaskInstanceCacheManager {
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
TaskInstance getByTaskInstanceId(Integer taskInstanceId);
/**
* cache taskInstance
*
* @param taskExecutionContext taskExecutionContext
*/
void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
/**
* cache taskInstance
*
* @param taskAckCommand taskAckCommand
*/
void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
/**
* cache taskInstance
*
* @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
/**
* remove taskInstance by taskInstanceId
* @param taskInstanceId taskInstanceId
*/
void removeByTaskInstanceId(Integer taskInstanceId);
}

155
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java vendored

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* taskInstance state manager
*/
@Component
public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
/**
* taskInstance cache
*/
private Map<Integer,TaskInstance> taskInstanceCache = new ConcurrentHashMap<>();
/**
* process service
*/
@Autowired
private ProcessService processService;
/**
* taskInstance cache refresh timer
*/
private Timer refreshTaskInstanceTimer = null;
@PostConstruct
public void init() {
//issue#5539 add thread to fetch task state from database in a fixed rate
this.refreshTaskInstanceTimer = new Timer(true);
refreshTaskInstanceTimer.scheduleAtFixedRate(
new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
);
}
@PreDestroy
public void close() {
this.refreshTaskInstanceTimer.cancel();
}
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
@Override
public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
}
/**
* cache taskInstance
*
* @param taskExecutionContext taskExecutionContext
*/
@Override
public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(taskExecutionContext.getTaskInstanceId());
taskInstance.setName(taskExecutionContext.getTaskName());
taskInstance.setStartTime(taskExecutionContext.getStartTime());
taskInstance.setTaskType(taskExecutionContext.getTaskType());
taskInstance.setExecutePath(taskExecutionContext.getExecutePath());
taskInstanceCache.put(taskExecutionContext.getTaskInstanceId(), taskInstance);
}
/**
* cache taskInstance
*
* @param taskAckCommand taskAckCommand
*/
@Override
public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
taskInstance.setStartTime(taskAckCommand.getStartTime());
taskInstance.setHost(taskAckCommand.getHost());
taskInstance.setExecutePath(taskAckCommand.getExecutePath());
taskInstance.setLogPath(taskAckCommand.getLogPath());
taskInstanceCache.put(taskAckCommand.getTaskInstanceId(), taskInstance);
}
/**
* cache taskInstance
*
* @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
@Override
public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
}
/**
* remove taskInstance by taskInstanceId
* @param taskInstanceId taskInstanceId
*/
@Override
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskInstanceCache.remove(taskInstanceId);
}
class RefreshTaskInstanceTimerTask extends TimerTask {
@Override
public void run() {
for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
}
}
}
}
}

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -130,14 +130,16 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
return true;
} else {
result = dispatcher.dispatch(executionContext);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
return true;
}
}
result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage(),e);
logger.error("dispatch error: {}", e.getMessage(), e);
}
return result;
}
@ -153,4 +155,18 @@ public class TaskPriorityQueueConsumer extends Thread {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
return taskInstance.getState().typeIsFinished();
}
/**
* check if task need to check state, if true, refresh the checkpoint
* @param taskPriority
* @return
*/
private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
long now = System.currentTimeMillis();
if (now - taskPriority.getCheckpoint() > Constants.SECOND_TIME_MILLIS) {
taskPriority.setCheckpoint(now);
return true;
}
return false;
}
}

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -49,14 +47,8 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskResponseService taskResponseService;
/**
* taskInstance cache manager
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@ -71,8 +63,6 @@ public class TaskAckProcessor implements NettyRequestProcessor {
TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -23,8 +23,6 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -48,14 +46,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskResponseService taskResponseService;
/**
* taskInstance cache manager
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
/**
@ -72,8 +64,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -133,6 +133,15 @@ public class StateEventResponseService {
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
switch (stateEvent.getType()) {
case TASK_STATE_CHANGE:
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());
break;
case PROCESS_STATE_CHANGE:
workflowExecuteThread.refreshProcessInstance(stateEvent.getProcessInstanceId());
break;
default:
}
workflowExecuteThread.addStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {

108
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -142,54 +142,28 @@ public class TaskResponseService {
*/
private void persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
Channel channel = taskResponseEvent.getChannel();
int taskInstanceId = taskResponseEvent.getTaskInstanceId();
int processInstanceId = taskResponseEvent.getProcessInstanceId();
TaskInstance taskInstance;
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread != null && workflowExecuteThread.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
} else {
taskInstance = processService.findTaskInstanceById(taskInstanceId);
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
switch (event) {
case ACK:
try {
if (taskInstance != null) {
ExecutionStatus status = taskInstance.getState().typeIsFinished() ? taskInstance.getState() : taskResponseEvent.getState();
processService.changeTaskState(taskInstance, status,
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath(),
taskResponseEvent.getTaskInstanceId());
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskAckCommand.convert2Command());
}
handleAckEvent(taskResponseEvent, taskInstance);
break;
case RESULT:
try {
if (taskInstance != null) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
handleResultEvent(taskResponseEvent, taskInstance);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
@ -200,7 +174,59 @@ public class TaskResponseService {
}
}
public BlockingQueue<TaskResponseEvent> getEventQueue() {
return eventQueue;
/**
* handle ack event
* @param taskResponseEvent
* @param taskInstance
*/
private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();
try {
if (taskInstance != null) {
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath()
);
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskAckCommand.convert2Command());
}
}
/**
* handle result event
* @param taskResponseEvent
* @param taskInstance
*/
private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();
try {
if (taskInstance != null) {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getVarPool()
);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
}
}

412
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -86,9 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
/**
* master exec thread,split dag
@ -99,100 +97,116 @@ public class WorkflowExecuteThread implements Runnable {
* logger of WorkflowExecuteThread
*/
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThread.class);
/**
* runing TaskNode
* master config
*/
private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
private MasterConfig masterConfig;
/**
* process instance
* process service
*/
private ProcessInstance processInstance;
private ProcessService processService;
/**
* submit failure nodes
* alert manager
*/
private boolean taskFailedSubmit = false;
private ProcessAlertManager processAlertManager;
/**
* recover node id list
* netty executor manager
*/
private List<TaskInstance> recoverNodeIdList = new ArrayList<>();
private NettyExecutorManager nettyExecutorManager;
/**
* error task list
* process instance
*/
private Map<String, TaskInstance> errorTaskList = new ConcurrentHashMap<>();
private ProcessInstance processInstance;
/**
* complete task list
* process definition
*/
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
private ProcessDefinition processDefinition;
/**
* ready to submit task queue
* the object of DAG
*/
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
* depend failed task map
* key of workflow
*/
private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
private String key;
/**
* forbidden task map
* start flag, true: start nodes submit completely
*/
private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
private boolean isStart = false;
/**
* skip task map
* submit failure nodes
*/
private Map<String, TaskNode> skipTaskNodeList = new ConcurrentHashMap<>();
private boolean taskFailedSubmit = false;
/**
* recover tolerance fault task list
* task instance hash map, taskId as key
*/
private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();
private Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
* alert manager
* running TaskNode, taskId as key
*/
private ProcessAlertManager processAlertManager;
private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
* the object of DAG
* valid task map, taskCode as key, taskId as value
*/
private DAG<String, TaskNode, TaskNodeRelation> dag;
private Map<String, Integer> validTaskMap = new ConcurrentHashMap<>();
/**
* process service
* error task map, taskCode as key, taskId as value
*/
private ProcessService processService;
private Map<String, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
* master config
* complete task map, taskCode as key, taskId as value
*/
private MasterConfig masterConfig;
private Map<String, Integer> completeTaskMap = new ConcurrentHashMap<>();
/**
*
* depend failed task map, taskCode as key, taskId as value
*/
private NettyExecutorManager nettyExecutorManager;
private Map<String, Integer> dependFailedTaskMap = new ConcurrentHashMap<>();
private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
/**
* forbidden task map, code as key
*/
private Map<String, TaskNode> forbiddenTaskMap = new ConcurrentHashMap<>();
private List<Date> complementListDate = Lists.newLinkedList();
/**
* skip task map, code as key
*/
private Map<String, TaskNode> skipTaskNodeMap = new ConcurrentHashMap<>();
private Table<Integer, Long, TaskInstance> taskInstanceHashMap = HashBasedTable.create();
private ProcessDefinition processDefinition;
private String key;
/**
* complement date list
*/
private List<Date> complementListDate = Lists.newLinkedList();
/**
* task timeout check list
*/
private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
/**
* start flag, true: start nodes submit completely
*
* state event queue
*/
private boolean isStart = false;
private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
/**
* ready to submit task queue
*/
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* constructor of WorkflowExecuteThread
@ -200,7 +214,6 @@ public class WorkflowExecuteThread implements Runnable {
* @param processInstance processInstance
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
* @param taskTimeoutCheckList
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, ProcessService processService
@ -232,7 +245,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* the process start nodes are submitted completely.
* @return
*/
public boolean isStart() {
return this.isStart;
@ -286,9 +298,10 @@ public class WorkflowExecuteThread implements Runnable {
private boolean stateEventHandler(StateEvent stateEvent) {
logger.info("process event: {}", stateEvent.toString());
if (!checkStateEvent(stateEvent)) {
if (!checkProcessInstance(stateEvent)) {
return false;
}
boolean result = false;
switch (stateEvent.getType()) {
case PROCESS_STATE_CHANGE:
@ -314,16 +327,11 @@ public class WorkflowExecuteThread implements Runnable {
}
private boolean taskTimeout(StateEvent stateEvent) {
if (taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
TaskInstance taskInstance = taskInstanceHashMap
.row(stateEvent.getTaskInstanceId())
.values()
.iterator().next();
TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
@ -344,7 +352,16 @@ public class WorkflowExecuteThread implements Runnable {
}
private boolean taskStateChangeHandler(StateEvent stateEvent) {
TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
}
if (task.getState().typeIsFinished()) {
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
@ -356,7 +373,7 @@ public class WorkflowExecuteThread implements Runnable {
taskFinished(task);
}
} else {
logger.error("state handler error: {}", stateEvent.toString());
logger.error("state handler error: {}", stateEvent);
}
return true;
}
@ -382,10 +399,11 @@ public class WorkflowExecuteThread implements Runnable {
}
return;
}
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
completeTaskList.put(Long.toString(task.getTaskCode()), task);
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
@ -395,7 +413,7 @@ public class WorkflowExecuteThread implements Runnable {
|| DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
submitPostNode(Long.toString(task.getTaskCode()));
} else {
errorTaskList.put(Long.toString(task.getTaskCode()), task);
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@ -404,20 +422,102 @@ public class WorkflowExecuteThread implements Runnable {
this.updateProcessInstanceState();
}
private boolean checkStateEvent(StateEvent stateEvent) {
/**
* update process instance
*/
public void refreshProcessInstance(int processInstanceId) {
logger.info("process instance update: {}", processInstanceId);
processInstance = processService.findProcessInstanceById(processInstanceId);
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
}
/**
* update task instance
*/
public void refreshTaskInstance(int taskInstanceId) {
logger.info("task instance update: {} ", taskInstanceId);
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId);
if (taskInstance == null) {
logger.error("can not find task instance, id:{}", taskInstanceId);
return;
}
processService.packageTaskInstance(taskInstance, processInstance);
taskInstanceMap.put(taskInstance.getId(), taskInstance);
validTaskMap.remove(Long.toString(taskInstance.getTaskCode()));
if (Flag.YES == taskInstance.getFlag()) {
validTaskMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance.getId());
}
}
/**
* check process instance by state event
*/
public boolean checkProcessInstance(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}",
this.processInstance.getId(),
stateEvent.toString());
stateEvent);
return false;
}
return true;
}
/**
* check if task instance exist by state event
*/
public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
if (stateEvent.getTaskInstanceId() == 0) {
logger.error("task instance id null, state event:{}", stateEvent);
return false;
}
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
logger.error("mismatch task instance id, event:{}", stateEvent);
return false;
}
return true;
}
/**
* check if task instance exist by task code
*/
public boolean checkTaskInstanceByCode(long taskCode) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
return false;
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
if (taskInstance.getTaskCode() == taskCode) {
return true;
}
}
return false;
}
/**
* check if task instance exist by id
*/
public boolean checkTaskInstanceById(int taskInstanceId) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
return false;
}
return taskInstanceMap.containsKey(taskInstanceId);
}
/**
* get task instance from memory
*/
public TaskInstance getTaskInstance(int taskInstanceId) {
if (taskInstanceMap.containsKey(taskInstanceId)) {
return taskInstanceMap.get(taskInstanceId);
}
return null;
}
private boolean processStateChangeHandler(StateEvent stateEvent) {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
processInstance = processService.findProcessInstanceById(this.processInstance.getId());
if (processComplementData()) {
return true;
}
@ -477,7 +577,7 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.setStartTime(new Date());
processInstance.setEndTime(null);
processService.saveProcessInstance(processInstance);
this.taskInstanceHashMap.clear();
this.taskInstanceMap.clear();
startProcess();
return true;
}
@ -491,7 +591,7 @@ public class WorkflowExecuteThread implements Runnable {
}
private void startProcess() throws Exception {
if (this.taskInstanceHashMap.size() == 0) {
if (this.taskInstanceMap.size() == 0) {
isStart = false;
buildFlowDag();
initTaskQueue();
@ -505,25 +605,22 @@ public class WorkflowExecuteThread implements Runnable {
*/
private void endProcess() {
this.stateEvents.clear();
processInstance.setEndTime(new Date());
ProcessDefinition processDefinition = this.processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),processInstance.getProcessDefinitionVersion());
if (processDefinition.getExecutionType().typeIsSerialWait()) {
checkSerialProcess(processDefinition);
}
processService.updateProcessInstance(processInstance);
if (processInstance.getState().typeIsWaitingThread()) {
processService.createRecoveryWaitingThreadCommand(null, processInstance);
}
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
if (processAlertManager.isNeedToSendWarning(processInstance)) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
this.processInstance = processService.findProcessInstanceById(processInstance.getId());
int nextInstanceId = processInstance.getNextProcessInstanceId();
if (nextInstanceId == 0) {
ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(),ExecutionStatus.SERIAL_WAIT.getCode());
ProcessInstance nextProcessInstance = this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), ExecutionStatus.SERIAL_WAIT.getCode());
if (nextProcessInstance == null) {
return;
}
@ -553,19 +650,21 @@ public class WorkflowExecuteThread implements Runnable {
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList =
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskList.clear();
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
forbiddenTaskMap.clear();
taskNodeList.forEach(taskNode -> {
if (taskNode.isForbidden()) {
forbiddenTaskList.put(Long.toString(taskNode.getCode()), taskNode);
forbiddenTaskMap.put(Long.toString(taskNode.getCode()), taskNode);
}
});
// generate process to get DAG info
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList();
List<String> recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList);
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
ProcessDag processDag = generateFlowDag(taskNodeList,
startNodeNameList, recoveryNodeCodeList, processInstance.getTaskDependType());
@ -584,19 +683,23 @@ public class WorkflowExecuteThread implements Runnable {
taskFailedSubmit = false;
activeTaskProcessorMaps.clear();
dependFailedTask.clear();
completeTaskList.clear();
errorTaskList.clear();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
dependFailedTaskMap.clear();
completeTaskMap.clear();
errorTaskMap.clear();
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
completeTaskList.put(Long.toString(task.getTaskCode()), task);
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskList.put(Long.toString(task.getTaskCode()), task);
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
}
@ -637,31 +740,32 @@ public class WorkflowExecuteThread implements Runnable {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getTaskCommitRetryTimes(), masterConfig.getTaskCommitInterval());
if (submit) {
this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
addTimeoutCheck(taskInstance);
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
taskInstance.setTaskDefine(taskDefinition);
if (taskProcessor.taskState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(stateEvent);
}
return taskInstance;
} else {
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return null;
}
taskInstanceMap.put(taskInstance.getId(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
addTimeoutCheck(taskInstance);
if (taskProcessor.taskState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(stateEvent);
}
return taskInstance;
} catch (Exception e) {
logger.error("submit standby task error", e);
return null;
@ -688,11 +792,12 @@ public class WorkflowExecuteThread implements Runnable {
if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion()
);
taskInstance.setTaskDefine(taskDefinition);
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag() || taskInstance.taskCanRetry()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
} else {
@ -711,8 +816,8 @@ public class WorkflowExecuteThread implements Runnable {
* @return TaskInstance
*/
private TaskInstance findTaskIfExists(Long taskCode, int taskVersion) {
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(this.processInstance.getId());
for (TaskInstance taskInstance : taskInstanceList) {
List<TaskInstance> validTaskInstanceList = getValidTaskList();
for (TaskInstance taskInstance : validTaskInstanceList) {
if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == taskVersion) {
return taskInstance;
}
@ -805,7 +910,11 @@ public class WorkflowExecuteThread implements Runnable {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
TaskInstance preTaskInstance = completeTaskList.get(preTaskCode);
Integer taskId = completeTaskMap.get(preTaskCode);
if (taskId == null) {
continue;
}
TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
if (preTaskInstance == null) {
continue;
}
@ -854,12 +963,35 @@ public class WorkflowExecuteThread implements Runnable {
}
}
/**
* get complete task instance map, taskCode as key
*/
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
for (Integer taskInstanceId : completeTaskMap.values()) {
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
}
return completeTaskInstanceMap;
}
/**
* get valid task list
*/
private List<TaskInstance> getValidTaskList() {
List<TaskInstance> validTaskInstanceList = new ArrayList<>();
for (Integer taskInstanceId : validTaskMap.values()) {
validTaskInstanceList.add(taskInstanceMap.get(taskInstanceId));
}
return validTaskInstanceList;
}
private void submitPostNode(String parentNodeCode) {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeList, dag, completeTaskList);
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode);
if (taskInstanceHashMap.containsColumn(taskNodeObject.getCode())) {
if (checkTaskInstanceByCode(taskNodeObject.getCode())) {
continue;
}
TaskInstance task = createTaskInstance(processInstance, taskNodeObject);
@ -873,15 +1005,16 @@ public class WorkflowExecuteThread implements Runnable {
continue;
}
if (completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
if (completeTaskMap.containsKey(Long.toString(task.getTaskCode()))) {
logger.info("task {} has already run success", task.getName());
continue;
}
if (task.getState().typeIsPause() || task.getState().typeIsCancel()) {
logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
} else {
addTaskToStandByList(task);
continue;
}
addTaskToStandByList(task);
}
submitStandByTask();
updateProcessInstanceState();
@ -903,15 +1036,16 @@ public class WorkflowExecuteThread implements Runnable {
List<String> depCodeList = taskNode.getDepList();
for (String depsNode : depCodeList) {
if (!dag.containsNode(depsNode)
|| forbiddenTaskList.containsKey(depsNode)
|| skipTaskNodeList.containsKey(depsNode)) {
|| forbiddenTaskMap.containsKey(depsNode)
|| skipTaskNodeMap.containsKey(depsNode)) {
continue;
}
// dependencies must be fully completed
if (!completeTaskList.containsKey(depsNode)) {
if (!completeTaskMap.containsKey(depsNode)) {
return DependResult.WAITING;
}
ExecutionStatus depTaskState = completeTaskList.get(depsNode).getState();
Integer depsTaskId = completeTaskMap.get(depsNode);
ExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
@ -923,7 +1057,7 @@ public class WorkflowExecuteThread implements Runnable {
return DependResult.FAILED;
}
}
logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskList.keySet().toArray()));
logger.info("taskCode: {} completeDependTaskList: {}", taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
return DependResult.SUCCESS;
}
@ -933,12 +1067,13 @@ public class WorkflowExecuteThread implements Runnable {
private boolean dependTaskSuccess(String dependNodeName, String nextNodeName) {
if (dag.getNode(dependNodeName).isConditionsTask()) {
//condition task need check the branch to run
List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeList, dag, completeTaskList);
List<String> nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
return false;
}
} else {
ExecutionStatus depTaskState = completeTaskList.get(dependNodeName).getState();
Integer taskInstanceId = completeTaskMap.get(dependNodeName);
ExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
if (depTaskState.typeIsFailure()) {
return false;
}
@ -954,9 +1089,10 @@ public class WorkflowExecuteThread implements Runnable {
*/
private List<TaskInstance> getCompleteTaskByState(ExecutionStatus state) {
List<TaskInstance> resultList = new ArrayList<>();
for (Map.Entry<String, TaskInstance> entry : completeTaskList.entrySet()) {
if (entry.getValue().getState() == state) {
resultList.add(entry.getValue());
for (Integer taskInstanceId : completeTaskMap.values()) {
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
if (taskInstance != null && taskInstance.getState() == state) {
resultList.add(taskInstance);
}
}
return resultList;
@ -990,10 +1126,10 @@ public class WorkflowExecuteThread implements Runnable {
if (this.taskFailedSubmit) {
return true;
}
if (this.errorTaskList.size() > 0) {
if (this.errorTaskMap.size() > 0) {
return true;
}
return this.dependFailedTask.size() > 0;
return this.dependFailedTaskMap.size() > 0;
}
/**
@ -1049,7 +1185,6 @@ public class WorkflowExecuteThread implements Runnable {
/**
* generate the latest process instance status by the tasks state
*
* @param instance
* @return process instance execution status
*/
private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {
@ -1130,8 +1265,7 @@ public class WorkflowExecuteThread implements Runnable {
* after each batch of tasks is executed, the status of the process instance is updated
*/
private void updateProcessInstanceState() {
ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
ExecutionStatus state = getProcessInstanceState(instance);
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
@ -1139,9 +1273,12 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getState(), state,
processInstance.getCommandType());
instance.setState(state);
processService.updateProcessInstance(instance);
processInstance = instance;
processInstance.setState(state);
if (state.typeIsFinished()) {
processInstance.setEndTime(new Date());
}
processService.updateProcessInstance(processInstance);
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId());
@ -1254,7 +1391,8 @@ public class WorkflowExecuteThread implements Runnable {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskList.put(Long.toString(task.getTaskCode()), task);
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
}
@ -1281,7 +1419,7 @@ public class WorkflowExecuteThread implements Runnable {
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(Long.toString(task.getTaskCode()), task);
dependFailedTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
@ -1366,10 +1504,10 @@ public class WorkflowExecuteThread implements Runnable {
*
* @return recovery node code list
*/
private List<String> getRecoveryNodeCodeList() {
private List<String> getRecoveryNodeCodeList(List<TaskInstance> recoverNodeList) {
List<String> recoveryNodeCodeList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(recoverNodeIdList)) {
for (TaskInstance task : recoverNodeIdList) {
if (CollectionUtils.isNotEmpty(recoverNodeList)) {
for (TaskInstance task : recoverNodeList) {
recoveryNodeCodeList.add(Long.toString(task.getTaskCode()));
}
}

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -161,8 +161,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
processService.setTaskInstanceDetail(taskInstance);
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
@ -172,12 +170,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null,
taskInstance.getId());
null
);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
String userQueue = processService.queryUserQueueByProcessInstance(taskInstance.getProcessInstance());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setResources(getResourceFullNames(taskInstance));

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -65,7 +65,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -72,7 +72,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
@ -81,7 +80,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = task;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -54,7 +54,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion()
);
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
this.taskInstance = processService.submitTaskWithRetry(processInstance, task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -63,7 +63,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
if (this.taskInstance == null) {
return false;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -89,7 +89,7 @@ public class ConditionsTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(taskInstance))
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -93,6 +93,7 @@ public class DependentTaskTest {
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance();
taskInstance = getTaskInstance();
// for MasterBaseTaskExecThread.call
// for DependentTaskExecThread.waitTaskQuit
@ -102,7 +103,7 @@ public class DependentTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000)))
.submitTask(processInstance, taskInstance))
.thenAnswer(i -> taskInstance);
// for DependentTaskExecThread.initTaskParameters
@ -346,6 +347,12 @@ public class DependentTaskTest {
return processInstance;
}
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
return taskInstance;
}
/**
* task that dependent on others (and to be tested here)
* notice: should be filled with setDependence() and be passed to setupTaskInstance()

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -74,6 +74,8 @@ public class SubProcessTaskTest {
Mockito.when(applicationContext.getBean(AlertDao.class)).thenReturn(alertDao);
processInstance = getProcessInstance();
TaskInstance taskInstance = getTaskInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
@ -85,7 +87,7 @@ public class SubProcessTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(Mockito.any()))
.submitTask(processInstance, taskInstance))
.thenAnswer(t -> t.getArgument(0));
TaskDefinition taskDefinition = new TaskDefinition();
@ -147,6 +149,12 @@ public class SubProcessTaskTest {
return processInstance;
}
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000);
return taskInstance;
}
private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(102);

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SwitchTaskTest.java

@ -81,7 +81,7 @@ public class SwitchTaskTest {
// for MasterBaseTaskExecThread.submit
Mockito.when(processService
.submitTask(taskInstance))
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
// for MasterBaseTaskExecThread.call
Mockito.when(processService

39
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -162,39 +162,52 @@ public class WorkflowExecuteThreadTest {
public void testGetPreVarPool() {
try {
Set<String> preTaskName = new HashSet<>();
preTaskName.add("test1");
preTaskName.add("test2");
Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
preTaskName.add(Long.toString(1));
preTaskName.add(Long.toString(2));
TaskInstance taskInstance = new TaskInstance();
TaskInstance taskInstance1 = new TaskInstance();
taskInstance1.setId(1);
taskInstance1.setName("test1");
taskInstance1.setTaskCode(1);
taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
taskInstance1.setEndTime(new Date());
TaskInstance taskInstance2 = new TaskInstance();
taskInstance2.setId(2);
taskInstance2.setName("test2");
taskInstance2.setTaskCode(2);
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
taskInstance2.setEndTime(new Date());
completeTaskList.put("test1", taskInstance1);
completeTaskList.put("test2", taskInstance2);
Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<String, Integer> completeTaskList = new ConcurrentHashMap<>();
completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance1.getId());
completeTaskList.put(Long.toString(taskInstance1.getTaskCode()), taskInstance2.getId());
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Field field = masterExecThreadClass.getDeclaredField("completeTaskList");
field.setAccessible(true);
field.set(workflowExecuteThread, completeTaskList);
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
completeTaskList.put("test2", taskInstance2);
field.setAccessible(true);
field.set(workflowExecuteThread, completeTaskList);
completeTaskList.put(Long.toString(taskInstance2.getTaskCode()), taskInstance2.getId());
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assert.assertNotNull(taskInstance.getVarPool());
} catch (Exception e) {

177
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java vendored

@ -1,177 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TaskInstanceCacheManagerImplTest {
@InjectMocks
private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
@Mock(name = "processService")
private ProcessService processService;
@Before
public void before() {
TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
taskExecuteAckCommand.setStatus(1);
taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
taskExecuteAckCommand.setHost("worker007");
taskExecuteAckCommand.setLogPath("/temp/worker.log");
taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
taskExecuteAckCommand.setTaskInstanceId(0);
taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
}
@Test
public void testInit() throws InterruptedException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(0);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setProcessInstanceId(0);
Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
taskInstanceCacheManager.init();
TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
}
@Test
public void getByTaskInstanceIdFromCache() {
TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(0);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
}
@Test
public void getByTaskInstanceIdFromDatabase() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
Assert.assertEquals(taskInstance, instanceGot);
}
@Test
public void cacheTaskInstanceByTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(2);
taskExecutionContext.setTaskName("blackberrier test");
taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
taskExecutionContext.setExecutePath("/tmp");
taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
Assert.assertEquals(taskInstance.getId(), 2);
Assert.assertEquals(taskInstance.getName(), "blackberrier test");
Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
}
@Test
public void testCacheTaskInstanceByTaskExecuteAckCommand() {
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
Assert.assertEquals("worker007", taskInstance.getHost());
Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
}
@Test
public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
responseCommand.setTaskInstanceId(0);
responseCommand.setStatus(9);
responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
}
@Test
public void removeByTaskInstanceId() {
taskInstanceCacheManager.removeByTaskInstanceId(0);
Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
}
}

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -76,8 +76,6 @@ public class TaskPriorityQueueConsumerTest {
tenant.setUpdateTime(new Date());
Mockito.doReturn(tenant).when(processService).getTenantForProcess(1, 2);
Mockito.doReturn("default").when(processService).queryUserQueueByProcessInstanceId(1);
}
@Test
@ -101,7 +99,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@ -129,7 +126,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@ -171,7 +167,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@ -211,7 +206,6 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
taskPriorityQueue.put(taskPriority);
@ -271,7 +265,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
@ -310,7 +303,6 @@ public class TaskPriorityQueueConsumerTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority();
@ -342,7 +334,6 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java

@ -18,20 +18,16 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.net.InetSocketAddress;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@ -39,7 +35,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* task ack processor test
* task ack processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class})
@ -47,7 +43,6 @@ public class TaskAckProcessorTest {
private TaskAckProcessor taskAckProcessor;
private TaskResponseService taskResponseService;
private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
private ProcessService processService;
private TaskExecuteAckCommand taskExecuteAckCommand;
private TaskResponseEvent taskResponseEvent;
@ -60,9 +55,6 @@ public class TaskAckProcessorTest {
taskResponseService = PowerMockito.mock(TaskResponseService.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager);
processService = PowerMockito.mock(ProcessService.class);
PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java

@ -86,7 +86,6 @@ public class CommonTaskProcessorTest {
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).getTaskInstanceDetailByTaskId(1);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RandomHostManager;
@ -67,11 +66,6 @@ public class DependencyConfig {
return Mockito.mock(AlertMapper.class);
}
@Bean
public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
return Mockito.mock(TaskInstanceCacheManagerImpl.class);
}
@Bean
public ProcessService processService() {
return Mockito.mock(ProcessService.class);

6
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTestConfig.java

@ -36,7 +36,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.mockito.Mockito;
@ -59,11 +58,6 @@ public class TaskCallbackServiceTestConfig {
return Mockito.mock(AlertMapper.class);
}
@Bean
public TaskInstanceCacheManagerImpl taskInstanceCacheManagerImpl() {
return Mockito.mock(TaskInstanceCacheManagerImpl.class);
}
@Bean
public ProcessService processService() {
return Mockito.mock(ProcessService.class);

49
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -205,43 +205,54 @@ public class ProcessAlertManager {
List<TaskInstance> taskInstances,
ProjectUser projectUser) {
if (Flag.YES == processInstance.getIsSubProcess()) {
if (!isNeedToSendWarning(processInstance)) {
return;
}
boolean sendWarnning = false;
Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success);
String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
alert.setContent(content);
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setCreateTime(new Date());
alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
}
/**
* check if need to be send warning
*
* @param processInstance
* @return
*/
public boolean isNeedToSendWarning(ProcessInstance processInstance) {
if (Flag.YES == processInstance.getIsSubProcess()) {
return false;
}
boolean sendWarning = false;
WarningType warningType = processInstance.getWarningType();
switch (warningType) {
case ALL:
if (processInstance.getState().typeIsFinished()) {
sendWarnning = true;
sendWarning = true;
}
break;
case SUCCESS:
if (processInstance.getState().typeIsSuccess()) {
sendWarnning = true;
sendWarning = true;
}
break;
case FAILURE:
if (processInstance.getState().typeIsFailure()) {
sendWarnning = true;
sendWarning = true;
}
break;
default:
}
if (!sendWarnning) {
return;
}
Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success);
String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
alert.setContent(content);
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setCreateTime(new Date());
alertDao.addAlert(alert);
logger.info("add alert to db , alert: {}", alert);
return sendWarning;
}
/**

91
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -100,6 +100,7 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -1087,24 +1088,17 @@ public class ProcessService {
/**
* retry submit task to db
*/
public TaskInstance submitTask(TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
public TaskInstance submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, int commitInterval) {
int retryTimes = 1;
boolean submitDB = false;
TaskInstance task = null;
while (retryTimes <= commitRetryTimes) {
try {
if (!submitDB) {
// submit task to db
task = submitTask(taskInstance);
if (task != null && task.getId() != 0) {
submitDB = true;
break;
}
}
if (!submitDB) {
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
// submit task to db
task = SpringApplicationContext.getBean(ProcessService.class).submitTask(processInstance, taskInstance);
if (task != null && task.getId() != 0) {
break;
}
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
logger.error("task commit to mysql failed", e);
@ -1118,12 +1112,12 @@ public class ProcessService {
* submit task to db
* submit sub process to command
*
* @param processInstance processInstance
* @param taskInstance taskInstance
* @return task instance
*/
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
@ -1131,8 +1125,9 @@ public class ProcessService {
if (task == null) {
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
return task;
return null;
}
if (!task.getState().typeIsFinished()) {
createSubWorkProcess(processInstance, task);
}
@ -1383,7 +1378,7 @@ public class ProcessService {
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
@ -1406,10 +1401,10 @@ public class ProcessService {
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @param processInstance processInstance
* @return process instance state
*/
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState) {
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus state = taskInstance.getState();
// running, delayed or killed
// the task already exists in task queue
@ -1423,10 +1418,10 @@ public class ProcessService {
}
//return pasue /stop if process instance state is ready pause / stop
// or return submit success
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
state = ExecutionStatus.PAUSE;
} else if (processInstanceState == ExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance)) {
} else if (processInstance.getState() == ExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance, processInstance)) {
state = ExecutionStatus.KILL;
} else {
state = ExecutionStatus.SUBMITTED_SUCCESS;
@ -1440,8 +1435,7 @@ public class ProcessService {
* @param taskInstance taskInstance
* @return check strategy result
*/
private boolean checkProcessStrategy(TaskInstance taskInstance) {
ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
private boolean checkProcessStrategy(TaskInstance taskInstance, ProcessInstance processInstance) {
FailureStrategy failureStrategy = processInstance.getFailureStrategy();
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
@ -1535,39 +1529,15 @@ public class ProcessService {
}
/**
* package task instanceassociate processInstance and processDefine
*
* @param taskInstId taskInstId
* @return task instance
* package task instance
*/
public TaskInstance getTaskInstanceDetailByTaskId(int taskInstId) {
// get task instance
TaskInstance taskInstance = findTaskInstanceById(taskInstId);
if (taskInstance == null) {
return null;
}
setTaskInstanceDetail(taskInstance);
return taskInstance;
}
/**
* package task instanceassociate processInstance and processDefine
*
* @param taskInstance taskInstance
* @return task instance
*/
public void setTaskInstanceDetail(TaskInstance taskInstance) {
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance processInstance) {
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
TaskDefinition taskDefinition = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
TaskDefinition taskDefinition = this.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
updateTaskDefinitionResources(taskDefinition);
this.updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
}
@ -1576,7 +1546,7 @@ public class ProcessService {
*
* @param taskDefinition the given {@link TaskDefinition}
*/
private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
public void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
Map<String, Object> taskParameters = JSONUtils.parseObject(
taskDefinition.getTaskParams(),
new TypeReference<Map<String, Object>>() {
@ -1757,12 +1727,10 @@ public class ProcessService {
* @param host host
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
String logPath,
int taskInstId) {
String logPath) {
taskInstance.setState(state);
taskInstance.setStartTime(startTime);
taskInstance.setHost(host);
@ -1786,14 +1754,12 @@ public class ProcessService {
*
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstId,
String varPool) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
@ -2047,15 +2013,14 @@ public class ProcessService {
}
/**
* query user queue by process instance id
* query user queue by process instance
*
* @param processInstanceId processInstanceId
* @param processInstance processInstance
* @return queue
*/
public String queryUserQueueByProcessInstanceId(int processInstanceId) {
public String queryUserQueueByProcessInstance(ProcessInstance processInstance) {
String queue = "";
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
if (processInstance == null) {
return queue;
}

22
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Objects;
/**
* task priority info
* task priority info
*/
public class TaskPriority implements Comparable<TaskPriority> {
@ -62,7 +62,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
*/
private Map<String, String> context;
public TaskPriority(){}
/**
* checkpoint
*/
private long checkpoint;
public TaskPriority() {
this.checkpoint = System.currentTimeMillis();
}
public TaskPriority(int processInstancePriority,
int processInstanceId,
@ -73,6 +80,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.groupName = groupName;
this.checkpoint = System.currentTimeMillis();
}
public int getProcessInstancePriority() {
@ -131,6 +139,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.taskExecutionContext = taskExecutionContext;
}
public long getCheckpoint() {
return checkpoint;
}
public void setCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
public int compareTo(TaskPriority other) {
if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
@ -174,7 +190,7 @@ public class TaskPriority implements Comparable<TaskPriority> {
}
TaskPriority that = (TaskPriority) o;
return processInstancePriority == that.processInstancePriority
&& processInstanceId == that.processInstanceId
&& processInstanceId == that.processInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& Objects.equals(groupName, that.groupName);

Loading…
Cancel
Save