Browse Source

fix 10517 (#10541)

Co-authored-by: JinyLeeChina <jiny.li@foxmail.com>
2.0.7-release
JinYong Li 2 years ago committed by GitHub
parent
commit
7e3b5c238b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  2. 40
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  3. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  4. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  5. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java
  7. 16
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  8. 19
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OshiTest.java
  9. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  10. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java
  11. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  12. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  13. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
  14. 25
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  15. 2
      dolphinscheduler-dist/release-docs/LICENSE
  16. 16
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
  17. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  18. 3
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventChangeCommand.java
  19. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
  20. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
  21. 60
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  22. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
  23. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
  24. 71
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java
  25. 113
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
  26. 8
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
  27. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/MasterLogFilter.java
  28. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  29. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  30. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  31. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  32. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  33. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
  34. 66
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java
  35. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  36. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
  37. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  38. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  39. 67
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  40. 177
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  41. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  42. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  43. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  44. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  45. 56
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java
  46. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  47. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java
  48. 127
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  49. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  50. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java
  51. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  52. 108
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  53. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  54. 63
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java
  55. 50
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
  56. 57
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  57. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  58. 1
      dolphinscheduler-server/src/main/resources/logback-master.xml
  59. 1
      dolphinscheduler-server/src/main/resources/logback-worker.xml
  60. 2
      dolphinscheduler-server/src/main/resources/worker.properties
  61. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  62. 20
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
  63. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
  64. 101
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  65. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  66. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java
  67. 16
      pom.xml
  68. 2
      tools/dependencies/known-dependencies.txt

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -100,7 +100,7 @@ public class ProcessInstanceController extends BaseController {
*/
@ApiOperation(value = "queryProcessInstanceListPaging", notes = "QUERY_PROCESS_INSTANCE_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefiniteCode", value = "PROCESS_DEFINITION_CODE", dataType = "Long", example = "100"),
@ApiImplicitParam(name = "processDefineCode", value = "PROCESS_DEFINITION_CODE", dataType = "Long", example = "100"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type = "String"),
@ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type = "String"),
@ApiImplicitParam(name = "stateType", value = "EXECUTION_STATUS", type = "ExecutionStatus"),

40
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -288,33 +288,45 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
List<Long> currentUpstreamList = upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList());
if (currentUpstreamList.contains(0L)) {
putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
return result;
}
List<Long> tmpPreTaskCodeList = Lists.newArrayList(preTaskCodeList);
tmpPreTaskCodeList.removeAll(currentUpstreamList);
if (!tmpPreTaskCodeList.isEmpty()) {
putMsg(result, Status.DATA_IS_NOT_VALID, StringUtils.join(preTaskCodeList, Constants.COMMA));
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
return result;
}
List<Long> remainCurrentUpstreamList = Lists.newArrayList(currentUpstreamList);
remainCurrentUpstreamList.removeAll(preTaskCodeList);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> remainProcessTaskRelationList = Lists.newArrayList();
List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
if (preTaskCodeList.size() > 1) {
if (preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
preTaskCodeList.remove(processTaskRelation.getPreTaskCode());
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPostTaskCode() == taskCode
&& preTaskCodeList.contains(processTaskRelation.getPreTaskCode())) {
processTaskRelationWaitRemove.add(processTaskRelation);
}
} else {
if (processTaskRelation.getPostTaskCode() == taskCode) {
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L);
}
remainProcessTaskRelationList.add(processTaskRelation);
}
if (preTaskCodeList.contains(processTaskRelation.getPostTaskCode())) {
processTaskRelationWaitRemove.add(processTaskRelation);
}
if (remainCurrentUpstreamList.isEmpty() && processTaskRelationWaitRemove.size() > 0) {
ProcessTaskRelation lastTaskRelation = processTaskRelationWaitRemove.get(0);
lastTaskRelation.setPreTaskVersion(0);
lastTaskRelation.setPreTaskCode(0L);
remainProcessTaskRelationList.add(lastTaskRelation);
}
processTaskRelationList.removeAll(processTaskRelationWaitRemove);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
updateRelation(loginUser, result, processDefinition, remainProcessTaskRelationList);
return result;
}

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
@ -28,26 +29,16 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.*;
import java.util.stream.Collectors;
/**
* task instance service impl
@ -67,9 +58,6 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
ProcessInstanceService processInstanceService;
@Autowired
UsersService usersService;
@ -188,6 +176,16 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(task.getProcessInstanceId());
if (processInstance != null && processInstance.getState().typeIsFailure()) {
List<TaskInstance> validTaskList = processService.findValidTaskListByProcessId(processInstance.getId());
List<Integer> failTaskList = validTaskList.stream().filter(instance -> instance.getState().typeIsFailure())
.map(TaskInstance::getId).collect(Collectors.toList());
if (failTaskList.size() == 1 && failTaskList.contains(taskInstanceId)) {
processInstance.setState(ExecutionStatus.SUCCESS);
processService.updateProcessInstance(processInstance);
}
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -180,7 +180,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
@Override
@Transactional(rollbackFor = RuntimeException.class)
public User createUser(String userName,
String userPassword,
String email,

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java

@ -20,5 +20,7 @@ package org.apache.dolphinscheduler.common.enums;
public enum Event {
ACK,
RESULT,
ACTION_STOP;
ACTION_STOP,
WORKER_REJECT,
REALLOCATE
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java

@ -85,7 +85,7 @@ public class StateEvent {
public String toString() {
return "State Event :"
+ "key: " + key
+ " type: " + type.toString()
+ " type: " + type
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId

16
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -57,6 +57,9 @@ public class OSUtils {
public static final double NEGATIVE_ONE = -1;
private static final HardwareAbstractionLayer hal = SI.getHardware();
private static long[] prevTicks = new long[CentralProcessor.TickType.values().length];
private static long prevTickTime = 0L;
private static double cpuUsage = 0.0D;
private OSUtils() {
throw new UnsupportedOperationException("Construct OSUtils");
@ -129,7 +132,7 @@ public class OSUtils {
loadAverage = osBean.getSystemLoadAverage();
} catch (Exception e) {
logger.error("get operation system load average exception, try another method ", e);
loadAverage = hal.getProcessor().getSystemLoadAverage();
loadAverage = hal.getProcessor().getSystemLoadAverage(1)[0];
if (Double.isNaN(loadAverage)) {
return NEGATIVE_ONE;
}
@ -146,7 +149,16 @@ public class OSUtils {
*/
public static double cpuUsage() {
CentralProcessor processor = hal.getProcessor();
double cpuUsage = processor.getSystemCpuLoad();
// Check if > ~ 0.95 seconds since last tick count.
long now = System.currentTimeMillis();
if (now - prevTickTime > 950) {
// Enough time has elapsed.
cpuUsage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
prevTickTime = System.currentTimeMillis();
prevTicks = processor.getSystemCpuLoadTicks();
}
if (Double.isNaN(cpuUsage)) {
return NEGATIVE_ONE;
}

19
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/os/OshiTest.java

@ -51,7 +51,7 @@ public class OshiTest {
logger.info("Checking CPU...");
printCpu(hal.getProcessor());
printCpu(si);
}
@ -64,12 +64,15 @@ public class OshiTest {
}
private static void printCpu(CentralProcessor processor) {
logger.info(String.format("CPU load: %.1f%% (OS MXBean)%n", processor.getSystemCpuLoad() * 100));//CPU load: 24.9% (OS MXBean)
logger.info("CPU load averages : {}", processor.getSystemLoadAverage());//CPU load averages : 1.5234375
private static void printCpu(SystemInfo si) {
CentralProcessor processor = si.getHardware().getProcessor();
long[] systemCpuLoadTicks = processor.getSystemCpuLoadTicks();
Util.sleep(1000);
logger.info(String.format("CPU load: %.1f%% (OS MXBean)%n", processor.getSystemCpuLoadBetweenTicks(systemCpuLoadTicks) * 100));//CPU load: 24.9% (OS MXBean)
logger.info("CPU load averages : {}", processor.getSystemLoadAverage(1)[0]);//CPU load averages : 1.5234375
logger.info("Uptime: " + FormatUtil.formatElapsedSecs(processor.getSystemUptime()));
logger.info("Uptime: " + FormatUtil.formatElapsedSecs(si.getOperatingSystem().getSystemUptime()));
logger.info("Context Switches/Interrupts: " + processor.getContextSwitches() + " / " + processor.getInterrupts());
@ -93,7 +96,7 @@ public class OshiTest {
"User: %.1f%% Nice: %.1f%% System: %.1f%% Idle: %.1f%% IOwait: %.1f%% IRQ: %.1f%% SoftIRQ: %.1f%% Steal: %.1f%%%n",
100d * user / totalCpu, 100d * nice / totalCpu, 100d * sys / totalCpu, 100d * idle / totalCpu,
100d * iowait / totalCpu, 100d * irq / totalCpu, 100d * softirq / totalCpu, 100d * steal / totalCpu));
logger.info(String.format("CPU load: %.1f%% (counting ticks)%n", processor.getSystemCpuLoadBetweenTicks() * 100));
logger.info(String.format("CPU load: %.1f%% (counting ticks)%n", processor.getSystemCpuLoadBetweenTicks(prevTicks) * 100));
@ -103,7 +106,9 @@ public class OshiTest {
+ (loadAverage[2] < 0 ? " N/A" : String.format(" %.2f", loadAverage[2])));
// per core CPU
StringBuilder procCpu = new StringBuilder("CPU load per processor:");
double[] load = processor.getProcessorCpuLoadBetweenTicks();
long[][] processorCpuLoadTicks = processor.getProcessorCpuLoadTicks();
Util.sleep(1000);
double[] load = processor.getProcessorCpuLoadBetweenTicks(processorCpuLoadTicks);
for (double avg : load) {
procCpu.append(String.format(" %.1f%%", avg * 100));
}

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -26,10 +26,9 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.ServerAlertContent;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
@ -111,19 +110,22 @@ public class AlertDao {
* process time out alert
*
* @param processInstance processInstance
* @param processDefinition processDefinition
* @param projectUser projectUser
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
int alertGroupId = processInstance.getWarningGroupId();
Alert alert = new Alert();
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectCode(processDefinition.getProjectCode())
.projectName(processDefinition.getProjectName())
.owner(processDefinition.getUserName())
.projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId())
.processDefinitionCode(processInstance.getProcessDefinitionCode())
.processName(processInstance.getName())
.processType(processInstance.getCommandType())
.processState(processInstance.getState())
.runTimes(processInstance.getRunTimes())
.processStartTime(processInstance.getStartTime())
.processHost(processInstance.getHost())
.event(AlertEvent.TIME_OUT)
@ -148,15 +150,15 @@ public class AlertDao {
*
* @param processInstance processInstanceId
* @param taskInstance taskInstance
* @param taskDefinition taskDefinition
* @param projectUser projectUser
*/
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) {
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, ProjectUser projectUser) {
Alert alert = new Alert();
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectCode(taskDefinition.getProjectCode())
.projectName(taskDefinition.getProjectName())
.owner(taskDefinition.getUserName())
.projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId())
.processDefinitionCode(processInstance.getProcessDefinitionCode())
.processName(processInstance.getName())

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProjectUser.java

@ -37,6 +37,12 @@ public class ProjectUser {
@TableField("project_id")
private int projectId;
/**
* project code
*/
@TableField(exist = false)
private long projectCode;
/**
* project name
*/
@ -124,12 +130,21 @@ public class ProjectUser {
this.perm = perm;
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
@Override
public String toString() {
return "ProjectUser{"
+ "id=" + id
+ ", userId=" + userId
+ ", projectId=" + projectId
+ ", projectCode=" + projectCode
+ ", projectName='" + projectName + '\''
+ ", userName='" + userName + '\''
+ ", perm=" + perm

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -546,7 +546,6 @@ public class TaskInstance implements Serializable {
}
public boolean isTaskComplete() {
return this.getState().typeIsPause()
|| this.getState().typeIsSuccess()
|| this.getState().typeIsCancel()
@ -590,7 +589,7 @@ public class TaskInstance implements Serializable {
return true;
} else {
return (this.getState().typeIsFailure()
&& this.getRetryTimes() < this.getMaxRetryTimes());
&& this.getRetryTimes() <= this.getMaxRetryTimes());
}
}
@ -601,12 +600,12 @@ public class TaskInstance implements Serializable {
*/
public boolean retryTaskIntervalOverTime() {
if (getState() != ExecutionStatus.FAILURE) {
return true;
return false;
}
if (getId() == 0
|| getMaxRetryTimes() == 0
|| getRetryInterval() == 0) {
return true;
return false;
}
Date now = new Date();
long failedTimeInterval = DateUtils.differSec(now, getEndTime());

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -17,18 +17,17 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.Set;
/**
* task instance mapper interface
@ -51,8 +50,7 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndName(@Param("processInstanceId") int processInstanceId,
@Param("name") String name);
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
@Param("taskCode") Long taskCode);
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId, @Param("taskCode") Long taskCode);
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
@ -71,8 +69,19 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime
);
@Param("endTime") Date endTime);
int updateHostAndSubmitTimeById(@Param("id") int id, @Param("host") String host, @Param("submitTime") Date submitTime);
/**
* query last task instance
*
* @param taskCode taskCode
* @param startTime startTime
* @param endTime endTime
* @return task instance
*/
TaskInstance queryLastTaskInstance(@Param("taskCode") long taskCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime);
List<TaskInstance> queryLastTaskInstanceList(@Param("taskCodes") Set<Long> taskCodes, @Param("startTime") Date startTime, @Param("endTime") Date endTime);
}

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml

@ -137,6 +137,7 @@
<select id="queryProjectWithUserByProcessInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.ProjectUser">
select
dp.id project_id,
dp.code project_code,
dp.name project_name,
u.user_name user_name
from t_ds_process_instance di

25
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -171,4 +171,29 @@
submit_time = #{submitTime}
where id = #{id}
</update>
<select id="queryLastTaskInstance" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where task_code=#{taskCode}
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1
</select>
<select id="queryLastTaskInstanceList" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
from t_ds_task_instance
where 1=1
<if test="taskCodes != null and taskCodes.size() != 0">
and task_code in
<foreach collection="taskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
<if test="startTime!=null and endTime != null">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
</select>
</mapper>

2
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -462,7 +462,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
aspectjweaver 1.9.7:https://mvnrepository.com/artifact/org.aspectj/aspectjweaver/1.9.7, EPL 1.0
logback-classic 1.2.3: https://mvnrepository.com/artifact/ch.qos.logback/logback-classic/1.2.3, EPL 1.0 and LGPL 2.1
logback-core 1.2.3: https://mvnrepository.com/artifact/ch.qos.logback/logback-core/1.2.3, EPL 1.0 and LGPL 2.1
oshi-core 3.9.1: https://mvnrepository.com/artifact/com.github.oshi/oshi-core/3.9.1, EPL 1.0
oshi-core 6.1.1: https://mvnrepository.com/artifact/com.github.oshi/oshi-core/6.1.1, EPL 1.0
h2-1.4.200 https://github.com/h2database/h2database/blob/master/LICENSE.txt, MPL 2.0 or EPL 1.0
========================================================================

16
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java

@ -43,6 +43,11 @@ public class Command implements Serializable {
*/
private CommandType type;
/**
* gen command time millis
*/
private long genCommandTimeMillis;
/**
* request unique identification
*/
@ -66,6 +71,14 @@ public class Command implements Serializable {
this.type = type;
}
public long getGenCommandTimeMillis() {
return genCommandTimeMillis;
}
public void setGenCommandTimeMillis(long genCommandTimeMillis) {
this.genCommandTimeMillis = genCommandTimeMillis;
}
public long getOpaque() {
return opaque;
}
@ -115,7 +128,8 @@ public class Command implements Serializable {
@Override
public String toString() {
return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]";
return "Command [type=" + type + ", opaque=" + opaque + ", genCommandTimeMillis=" + genCommandTimeMillis
+ ", bodyLen=" + (body == null ? 0 : body.length) + "]";
}
}

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -103,6 +103,16 @@ public enum CommandType {
*/
TASK_KILL_RESPONSE_ACK,
/**
* task recall
*/
TASK_RECALL,
/**
* task recall ack
*/
TASK_RECALL_ACK,
/**
* HEART_BEAT
*/

3
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/StateEventChangeCommand.java

@ -46,8 +46,7 @@ public class StateEventChangeCommand implements Serializable {
public StateEventChangeCommand(int sourceProcessInstanceId, int sourceTaskInstanceId,
ExecutionStatus sourceStatus,
int destProcessInstanceId,
int destTaskInstanceId
) {
int destTaskInstanceId) {
this.key = String.format("%d-%d-%d-%d",
sourceProcessInstanceId,
sourceTaskInstanceId,

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java

@ -121,6 +121,7 @@ public class TaskExecuteAckCommand implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_ACK);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java

@ -54,6 +54,7 @@ public class TaskExecuteRequestCommand implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

60
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -52,6 +52,26 @@ public class TaskExecuteResponseCommand implements Serializable {
*/
private int status;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* logPath
*/
private String logPath;
/**
* end time
* executePath
*/
private String executePath;
/**
* end time
@ -59,7 +79,6 @@ public class TaskExecuteResponseCommand implements Serializable {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
/**
* processId
*/
@ -123,6 +142,38 @@ public class TaskExecuteResponseCommand implements Serializable {
this.appIds = appIds;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
/**
* package response command
* @return command
@ -130,6 +181,7 @@ public class TaskExecuteResponseCommand implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
@ -139,10 +191,16 @@ public class TaskExecuteResponseCommand implements Serializable {
public String toString() {
return "TaskExecuteResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", processInstanceId=" + processInstanceId
+ ", status=" + status
+ ", startTime=" + startTime
+ ", endTime=" + endTime
+ ", host=" + host
+ ", logPath=" + logPath
+ ", executePath=" + executePath
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ ", varPool=" + varPool
+ '}';
}

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

@ -47,6 +47,7 @@ public class TaskKillRequestCommand implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java

@ -114,6 +114,7 @@ public class TaskKillResponseCommand implements Serializable {
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_KILL_RESPONSE);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;

71
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallAckCommand.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
public class TaskRecallAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public TaskRecallAckCommand() {
super();
}
public TaskRecallAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package response command
*
* @return command
*/
public Command convert2Command(long opaque) {
Command command = new Command(opaque);
command.setType(CommandType.TASK_RECALL_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}

113
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* kill task recall command
*/
public class TaskRecallCommand implements Serializable {
/**
* taskInstanceId
*/
private int taskInstanceId;
/**
* host
*/
private String host;
/**
* process instance id
*/
private int processInstanceId;
private Event event;
private int status;
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Event getEvent() {
return event;
}
public void setEvent(Event event) {
this.event = event;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_RECALL);
command.setGenCommandTimeMillis(System.currentTimeMillis());
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
}
@Override
public String toString() {
return "TaskRecallCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", host='" + host + '\''
+ ", processInstanceId=" + processInstanceId
+ ", event=" + event
+ ", status=" + status
+ '}';
}
}

8
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.remote;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.Ping;
@ -27,14 +28,11 @@ import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
/**
* netty remote client test

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/MasterLogFilter.java

@ -24,8 +24,7 @@ import ch.qos.logback.core.spi.FilterReply;
/**
* master log filter
*/
public class MasterLogFilter
extends Filter<ILoggingEvent> {
public class MasterLogFilter extends Filter<ILoggingEvent> {
/**
* log level
*/
@ -33,7 +32,6 @@ public class MasterLogFilter
/**
* Accept or reject based on thread name
*
* @param event event
* @return FilterReply
*/

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
@ -148,6 +149,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL, new TaskRecallProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor());
this.nettyRemotingServer.start();

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
@ -72,20 +74,16 @@ public class ExecutorDispatcher implements InitializingBean {
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
// get executor manager
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
/**
* host select
*/
// host select
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "
+ "current task needs worker group %s to execute",
context.getCommand(),context.getWorkerGroup()));
@ -93,9 +91,7 @@ public class ExecutorDispatcher implements InitializingBean {
context.setHost(host);
executorManager.beforeExecute(context);
try {
/**
* task execute
*/
// task execute
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@ -81,6 +83,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, new TaskRecallProcessor());
}
/**
@ -91,25 +94,10 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
/**
* all nodes
*/
Set<String> allNodes = getAllNodes(context);
/**
* fail nodes
*/
Set<String> failNodeSet = new HashSet<>();
/**
* build command accord executeContext
*/
Command command = context.getCommand();
/**
* execute task host
*/
// execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
@ -163,7 +151,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
ThreadUtils.sleep(100);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} while (retryCount >= 0 && !success);

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -170,7 +170,7 @@ public class LowerWeightHostManager extends CommonHostManager {
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getStartupTime()));
heartBeat.getWorkerWaitingTaskCount(), heartBeat.getStartupTime()));
}
}

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -37,10 +37,13 @@ public class HostWeight {
private double currentWeight;
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, long startTime) {
private final int waitingTaskCount;
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, int waitingTaskCount, long startTime) {
this.hostWorker = hostWorker;
this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
this.currentWeight = this.weight;
this.waitingTaskCount = waitingTaskCount;
}
public double getWeight() {
@ -63,12 +66,17 @@ public class HostWeight {
return (Host)hostWorker;
}
public int getWaitingTaskCount() {
return waitingTaskCount;
}
@Override
public String toString() {
return "HostWeight{"
+ "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ ", waitingTaskCount=" + waitingTaskCount
+ '}';
}

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java

@ -18,6 +18,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
/**
* lower weight round robin
@ -35,7 +40,8 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
List<HostWeight> weights = canAssignTaskHost(sources);
for (HostWeight hostWeight : weights) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
@ -45,7 +51,21 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
if (!zeroWaitingTask.isEmpty()) {
return zeroWaitingTask;
}
HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())
.collect(Collectors.toList());
if (!equalWaitingTask.isEmpty()) {
waitingTask.addAll(equalWaitingTask);
}
return waitingTask;
}
}

66
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task recall processor
*/
public class TaskRecallProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskRecallProcessor.class);
/**
* process service
*/
private final TaskResponseService taskResponseService;
public TaskRecallProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
}
/**
* task ack process
*
* @param channel channel channel
* @param command command TaskExecuteAckCommand
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
logger.info("taskRecallCommand: {}, opaque: {}", recallCommand, command.getOpaque());
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newRecall(ExecutionStatus.of(recallCommand.getStatus()), recallCommand.getEvent(),
recallCommand.getTaskInstanceId(), recallCommand.getProcessInstanceId(), channel, command.getOpaque());
taskResponseService.addResponse(taskResponseEvent);
}
}

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -95,6 +95,11 @@ public class TaskResponseEvent {
private int processInstanceId;
/**
* request unique identification
*/
private long opaque;
public static TaskResponseEvent newActionStop(ExecutionStatus state,
int taskInstanceId,
int processInstanceId) {
@ -148,6 +153,22 @@ public class TaskResponseEvent {
return event;
}
public static TaskResponseEvent newRecall(ExecutionStatus state,
Event event,
int taskInstanceId,
int processInstanceId,
Channel channel,
long opaque) {
TaskResponseEvent responseEvent = new TaskResponseEvent();
responseEvent.setEvent(event);
responseEvent.setState(state);
responseEvent.setTaskInstanceId(taskInstanceId);
responseEvent.setProcessInstanceId(processInstanceId);
responseEvent.setChannel(channel);
responseEvent.setOpaque(opaque);
return responseEvent;
}
public String getVarPool() {
return varPool;
}
@ -251,4 +272,12 @@ public class TaskResponseEvent {
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public long getOpaque() {
return opaque;
}
public void setOpaque(long opaque) {
this.opaque = opaque;
}
}

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@ -163,14 +165,39 @@ public class TaskResponsePersistThread implements Runnable {
TaskKillAckCommand taskKillAckCommand = new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskKillAckCommand.convert2Command());
}
break;
case WORKER_REJECT:
try {
WorkflowExecuteThread executeThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (executeThread != null) {
ITaskProcessor taskProcessor = executeThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId());
if (taskProcessor != null) {
taskProcessor.action(TaskAction.RESUBMIT);
logger.info("RESUBMIT: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
}
}
if (channel != null) {
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command(taskResponseEvent.getOpaque()));
logger.info("taskRecallAckCommand send successfully, task instance id:{}, opaque:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getOpaque());
}
} catch (Exception e) {
result = false;
logger.error("worker reject error", e);
TaskRecallAckCommand taskRecallAckCommand = new TaskRecallAckCommand(ExecutionStatus.FAILURE.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command(taskResponseEvent.getOpaque()));
logger.info("taskRecallAckCommand send successfully, task instance id:{}, opaque:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getOpaque());
}
break;
case REALLOCATE:
logger.warn("Not yet supported");
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
if (workflowExecuteThread != null && taskResponseEvent.getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -481,7 +481,11 @@ public class MasterRegistryClient {
.buildProcessInstanceRelatedInfo(processInstance)
.create();
if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()) {
if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()
&& !(taskInstance.isSubProcess()
|| taskInstance.isDependTask()
|| taskInstance.isConditionsTask()
|| taskInstance.isSwitchTask())) {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -125,6 +125,11 @@ public class MasterSchedulerService extends Thread {
*/
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
/**
* dep task check list
*/
ConcurrentHashMap<Integer, TaskInstance> depStateCheckList = new ConcurrentHashMap<>();
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@ -144,6 +149,7 @@ public class MasterSchedulerService extends Thread {
processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
depStateCheckList,
this.processInstanceExecMaps,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
}
@ -204,15 +210,9 @@ public class MasterSchedulerService extends Thread {
ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(), command);
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, taskResponseService
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList
, taskRetryCheckList);
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(processInstance,
taskResponseService, processService, nettyExecutorManager, processAlertManager,
masterConfig, taskTimeoutCheckList, taskRetryCheckList, depStateCheckList);
this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {

67
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -27,14 +27,12 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.hadoop.util.ThreadUtil;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* 1. timeout check wheel
* 2. dependent task check wheel
@ -48,7 +46,7 @@ public class StateWheelExecuteThread extends Thread {
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
private ConcurrentHashMap<Integer, TaskInstance> depStateCheckList;
/**
* start process failed map
*/
@ -68,6 +66,7 @@ public class StateWheelExecuteThread extends Thread {
ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ConcurrentHashMap<Integer, TaskInstance> depStateCheckList,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
this.masterExecService = masterExecService;
@ -78,6 +77,7 @@ public class StateWheelExecuteThread extends Thread {
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
this.processInstanceExecMaps = processInstanceExecMaps;
this.stateCheckIntervalSecs = stateCheckIntervalSecs;
this.depStateCheckList = depStateCheckList;
}
@Override
@ -90,6 +90,7 @@ public class StateWheelExecuteThread extends Thread {
checkTask4Timeout();
checkTask4Retry();
checkProcess4Timeout();
checkDepTask();
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
@ -97,6 +98,31 @@ public class StateWheelExecuteThread extends Thread {
}
}
private void checkDepTask() {
if (depStateCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : depStateCheckList.values()) {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecMaps.get(taskInstance.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskId:{}",
taskInstance.getProcessInstanceId(), taskInstance.getId());
depStateCheckList.remove(taskInstance.getId());
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
depStateCheckList.remove(taskInstance.getId());
break;
}
if (taskInstance.getState().typeIsFinished()) {
depStateCheckList.remove(taskInstance.getId());
continue;
}
addTaskStateChangeEvent(taskInstance);
}
}
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance);
}
@ -114,6 +140,18 @@ public class StateWheelExecuteThread extends Thread {
return;
}
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecMaps.get(taskInstance.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskId:{}",
taskInstance.getProcessInstanceId(), taskInstance.getId());
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
break;
}
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
if (taskInstance.getStartTime() == null) {
TaskInstance newTaskInstance = processService.findTaskInstanceById(taskInstance.getId());
@ -132,11 +170,21 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) {
if (!taskInstance.getState().typeIsFinished() && (taskInstance.isSubProcess() || taskInstance.isDependTask())) {
addTaskStateChangeEvent(taskInstance);
} else if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecMaps.get(taskInstance.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskId:{}",
taskInstance.getProcessInstanceId(), taskInstance.getId());
taskInstanceRetryCheckList.remove(taskInstance.getId());
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
taskInstanceRetryCheckList.remove(taskInstance.getId());
break;
}
if (((taskInstance.getRetryTimes() <= taskInstance.getMaxRetryTimes() && taskInstance.isDependTask())
|| (taskInstance.getState().typeIsFinished() && taskInstance.taskCanRetry())) && taskInstance.retryTaskIntervalOverTime()) {
addTaskStateChangeEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstance.getId());
}
@ -148,7 +196,6 @@ public class StateWheelExecuteThread extends Thread {
return;
}
for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addProcessTimeoutEvent(processInstance);

177
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -202,6 +202,11 @@ public class WorkflowExecuteThread implements Runnable {
*/
private ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList;
/**
* dep task check list
*/
private ConcurrentHashMap<Integer, TaskInstance> depStateCheckList;
/**
* start flag, true: start nodes submit completely
*/
@ -214,14 +219,15 @@ public class WorkflowExecuteThread implements Runnable {
* @param processService processService
* @param nettyExecutorManager nettyExecutorManager
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, TaskResponseService taskResponseService
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList) {
public WorkflowExecuteThread(ProcessInstance processInstance,
TaskResponseService taskResponseService,
ProcessService processService,
NettyExecutorManager nettyExecutorManager,
ProcessAlertManager processAlertManager,
MasterConfig masterConfig,
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList,
ConcurrentHashMap<Integer, TaskInstance> depStateCheckList) {
this.processService = processService;
this.taskResponseService = taskResponseService;
this.processInstance = processInstance;
@ -230,6 +236,7 @@ public class WorkflowExecuteThread implements Runnable {
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
this.taskRetryCheckList = taskRetryCheckList;
this.depStateCheckList = depStateCheckList;
}
@Override
@ -328,8 +335,7 @@ public class WorkflowExecuteThread implements Runnable {
}
private boolean taskTimeout(StateEvent stateEvent) {
if (!taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
@ -342,25 +348,59 @@ public class WorkflowExecuteThread implements Runnable {
return true;
}
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy) {
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy && !taskInstance.getState().typeIsFinished()) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
taskProcessor.action(TaskAction.TIMEOUT);
if (taskInstance.isDependTask()) {
TaskInstance task = processService.findTaskInstanceById(taskInstance.getId());
taskFinished(task);
}
} else {
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, taskInstance.getTaskDefine());
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
}
return true;
}
private boolean processTimeout() {
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, this.processDefinition);
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
return true;
}
/**
* check if task instance exist by state event
*/
private boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) {
if (stateEvent.getTaskInstanceId() == 0) {
logger.error("task instance id null, state event:{}", stateEvent);
return false;
}
if (!taskInstanceHashMap.containsRow(stateEvent.getTaskInstanceId())) {
logger.error("mismatch task instance id, event:{}", stateEvent);
return false;
}
return true;
}
private boolean taskStateChangeHandler(StateEvent stateEvent) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
TaskInstance task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
}
if (task.getState().typeIsFinished()) {
if (completeTaskList.containsKey(Long.toString(task.getTaskCode())) && completeTaskList.get(Long.toString(task.getTaskCode())).getId() == task.getId()) {
return true;
}
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
return true;
}
if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
iTaskProcessor.action(TaskAction.RUN);
@ -368,41 +408,54 @@ public class WorkflowExecuteThread implements Runnable {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
taskFinished(task);
}
} else {
logger.error("state handler error: {}", stateEvent.toString());
return true;
}
logger.error("state handler error: {}", stateEvent);
return true;
}
private void taskFinished(TaskInstance task) {
logger.info("work flow {} task {} state:{} ",
processInstance.getId(),
task.getId(),
task.getState());
if (task.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
logger.info("work flow {} task {} state:{} ", processInstance.getId(), task.getId(), task.getState());
if (task.isDependTask() && task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
logger.info("resubmit NEED_FAULT_TOLERANCE dependent task");
addTaskToStandByList(task);
if (!task.retryTaskIntervalOverTime()) {
submitStandByTask();
return;
}
if (task.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
if (task.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance id: {} state:{} retry times:{}/{}, interval:{}",
processInstance.getId(),
task.getId(),
task.getState(),
task.getRetryTimes(),
task.getMaxRetryTimes(),
task.getRetryInterval());
this.addTimeoutCheck(task);
this.addRetryCheck(task);
} else {
processInstance.getId(), task.getId(), task.getState(), task.getRetryTimes() + 1, task.getMaxRetryTimes(), task.getRetryInterval());
submitStandByTask();
taskTimeoutCheckList.remove(task.getId());
taskRetryCheckList.remove(task.getId());
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask.isDependTask()) {
retryTask.setRetryTimes(retryTask.getRetryTimes() + 1);
if (retryTask.taskCanRetry()) {
addTaskToStandByList(retryTask);
this.taskRetryCheckList.put(retryTask.getId(), retryTask);
}
} else {
addTaskToStandByList(retryTask);
this.taskRetryCheckList.put(retryTask.getId(), retryTask);
}
}
return;
} else {
task.setRetryTimes(task.getRetryTimes() + 1);
if (task.taskCanRetry()) {
addTaskToStandByList(task);
this.taskRetryCheckList.put(task.getId(), task);
return;
}
}
}
ProcessInstance processInstance = processService.findProcessInstanceById(this.processInstance.getId());
completeTaskList.put(Long.toString(task.getTaskCode()), task);
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
taskRetryCheckList.remove(task.getId());
depStateCheckList.remove(task.getId());
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
processService.saveProcessInstance(processInstance);
@ -424,9 +477,7 @@ public class WorkflowExecuteThread implements Runnable {
private boolean checkStateEvent(StateEvent stateEvent) {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}, task instance id:{}",
this.processInstance.getId(),
stateEvent.toString(),
stateEvent.getTaskInstanceId());
this.processInstance.getId(), stateEvent, stateEvent.getTaskInstanceId());
return false;
}
return true;
@ -460,7 +511,7 @@ public class WorkflowExecuteThread implements Runnable {
return true;
}
private boolean processComplementData() throws Exception {
private boolean processComplementData() {
if (!needComplementProcess()) {
return false;
}
@ -665,7 +716,8 @@ public class WorkflowExecuteThread implements Runnable {
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.action(TaskAction.RUN);
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
addDepTaskCheck(taskInstance);
// addRetryCheck(taskInstance);
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
@ -726,6 +778,15 @@ public class WorkflowExecuteThread implements Runnable {
}
}
private void addDepTaskCheck(TaskInstance taskInstance) {
if (taskInstance.isDependTask()) {
if (depStateCheckList.containsKey(taskInstance.getId())) {
return;
}
this.depStateCheckList.put(taskInstance.getId(), taskInstance);
}
}
private void addRetryCheck(TaskInstance taskInstance) {
if (taskRetryCheckList.containsKey(taskInstance.getId())) {
return;
@ -737,11 +798,7 @@ public class WorkflowExecuteThread implements Runnable {
);
taskInstance.setTaskDefine(taskDefinition);
}
if (taskInstance.taskCanRetry()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
if (taskInstance.getRetryTimes() <= taskInstance.getMaxRetryTimes() && taskInstance.isDependTask()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
}
@ -1047,14 +1104,21 @@ public class WorkflowExecuteThread implements Runnable {
* @return Boolean whether has failed task
*/
private boolean hasFailedTask() {
if (this.taskFailedSubmit) {
if (taskFailedSubmit) {
return true;
}
if (this.errorTaskList.size() > 0) {
if (errorTaskList.size() > 0) {
for (Map.Entry<String, TaskInstance> errorTaskMap : errorTaskList.entrySet()) {
TaskInstance taskInstance = processService.findTaskInstanceById(errorTaskMap.getValue().getId());
if (taskInstance == null || taskInstance.getState().typeIsSuccess()) {
errorTaskList.remove(errorTaskMap.getKey());
}
}
if (errorTaskList.size() > 0) {
return true;
}
return this.dependFailedTask.size() > 0;
}
return dependFailedTask.size() > 0;
}
/**
@ -1219,12 +1283,8 @@ public class WorkflowExecuteThread implements Runnable {
private void updateProcessInstanceState(StateEvent stateEvent) {
ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) {
logger.info(
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(), processInstance.getState(), state, processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
processInstance.setEndTime(new Date());
@ -1333,18 +1393,13 @@ public class WorkflowExecuteThread implements Runnable {
}
for (int taskId : activeTaskProcessorMaps.keySet()) {
if (taskRetryCheckList.containsKey(taskId)) {
taskRetryCheckList.remove(taskId);
logger.info("task id {} removed from taskRetryCheckList", taskId);
}
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
if (taskInstance == null || taskInstance.getState().typeIsFinished()) {
continue;
}
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
taskProcessor.action(TaskAction.STOP);
if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) {
if (taskProcessor.taskState().typeIsFinished()) {
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(
taskProcessor.taskState(),
taskInstance.getId(),
@ -1353,6 +1408,8 @@ public class WorkflowExecuteThread implements Runnable {
}
}
this.taskRetryCheckList.clear();
this.depStateCheckList.clear();
this.addProcessStopEvent(processInstance);
}
@ -1391,7 +1448,6 @@ public class WorkflowExecuteThread implements Runnable {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (task.retryTaskIntervalOverTime()) {
int originalId = task.getId();
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
@ -1402,7 +1458,6 @@ public class WorkflowExecuteThread implements Runnable {
activeTaskProcessorMaps.remove(originalId);
}
}
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(Long.toString(task.getTaskCode()), task);

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -143,6 +143,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean submitTask();
/*
* resubmit task
*/
protected abstract boolean resubmitTask();
/**
* run task
*/
@ -166,6 +171,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return submit();
case RUN:
return run();
case RESUBMIT:
return resubmit();
default:
logger.error("unknown task action: {}", taskAction.toString());
}
@ -175,6 +182,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return false;
}
protected boolean resubmit() {
return resubmitTask();
}
protected boolean submit() {
return submitTask();
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -68,6 +68,15 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return dispatchTask(taskInstance, processInstance);
}
@Override
protected boolean resubmitTask() {
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
return dispatchTask(taskInstance, processInstance);
}
@Override
public void setTaskExecutionLogger() {
threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -140,7 +149,9 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
if (logger.isDebugEnabled()) {
logger.debug("task ready to submit: {}", taskInstance.getName());
}
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -76,6 +76,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
@ -135,6 +140,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -76,9 +76,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
@ -92,6 +90,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
@ -169,6 +172,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
@ -210,9 +214,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return result;
}
/**
*
*/
private void endTask() {
ExecutionStatus status;
status = (result == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;

56
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.utils.LogUtils;
@ -48,9 +49,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
@Override
public boolean submitTask() {
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
taskDefinition = processService.findTaskDefinition(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
@ -59,13 +58,16 @@ public class SubTaskProcessor extends BaseTaskProcessor {
setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId()));
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
public ExecutionStatus taskState() {
return this.taskInstance.getState();
@ -79,10 +81,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
updateTaskState();
}
} catch (Exception e) {
logger.error("work flow {} sub task {} exceptions",
this.processInstance.getId(),
this.taskInstance.getId(),
e);
logger.error("work flow {} sub task {} exceptions", this.processInstance.getId(), this.taskInstance.getId(), e);
} finally {
this.runLock.unlock();
}
@ -91,25 +90,19 @@ public class SubTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean taskTimeout() {
TaskTimeoutStrategy taskTimeoutStrategy =
taskDefinition.getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy
&& TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
TaskTimeoutStrategy taskTimeoutStrategy = taskDefinition.getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED != taskTimeoutStrategy && TaskTimeoutStrategy.WARNFAILED != taskTimeoutStrategy) {
return true;
}
logger.info("sub process task {} timeout, strategy {} ",
taskInstance.getId(), taskTimeoutStrategy.getDescp());
logger.info("sub process task {} timeout, strategy {} ", taskInstance.getId(), taskTimeoutStrategy.getDescp());
killTask();
return true;
}
private void updateTaskState() {
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
logger.info("work flow {} task {}, sub work flow: {} state: {}",
this.processInstance.getId(),
this.taskInstance.getId(),
subProcessInstance.getId(),
subProcessInstance.getState().getDescp());
logger.info("work flow {} task {}, sub work flow: {} state: {}", this.processInstance.getId(), this.taskInstance.getId(),
subProcessInstance.getId(), subProcessInstance.getState().getDescp());
if (subProcessInstance != null && subProcessInstance.getState().typeIsFinished()) {
taskInstance.setState(subProcessInstance.getState());
taskInstance.setEndTime(new Date());
@ -123,7 +116,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
case STOP:
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
logger.error("unknown task action: {}", taskAction);
}
return false;
}
@ -135,7 +128,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
private boolean pauseSubWorkFlow() {
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return false;
}
@ -156,7 +149,11 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return false;
}
TaskInstance instance = processService.findTaskInstanceById(taskInstance.getId());
if (instance.getState() == ExecutionStatus.RUNNING_EXECUTION) {
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return true;
}
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
processService.updateTaskInstance(taskInstance);
@ -165,25 +162,26 @@ public class SubTaskProcessor extends BaseTaskProcessor {
taskInstance.getId(),
taskInstance.getState());
return true;
}
@Override
protected boolean killTask() {
ProcessInstance subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());
if (subProcessInstance == null || taskInstance.getState().typeIsFinished()) {
return false;
}
subProcessInstance.setState(ExecutionStatus.READY_STOP);
processService.updateProcessInstance(subProcessInstance);
sendToSubProcess();
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
private void sendToSubProcess() {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0
);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(processInstance.getId(),
taskInstance.getId(), subProcessInstance.getState(), subProcessInstance.getId(), 0);
String address = subProcessInstance.getHost().split(":")[0];
int port = Integer.parseInt(subProcessInstance.getHost().split(":")[1]);
this.stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -75,6 +75,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean resubmitTask() {
return true;
}
@Override
public boolean runTask() {
try {
@ -120,6 +125,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskAction.java

@ -25,5 +25,6 @@ public enum TaskAction {
STOP,
TIMEOUT,
SUBMIT,
RUN
RUN,
RESUBMIT
}

127
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -17,15 +17,17 @@
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -34,7 +36,9 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,7 +74,7 @@ public class DependentExecute {
/**
* logger
*/
private Logger logger = LoggerFactory.getLogger(DependentExecute.class);
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
/**
* constructor
@ -102,21 +106,25 @@ public class DependentExecute {
* @param dateIntervals date intervals
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {
private DependResult calculateResultForTasks(DependentItem dependentItem, List<DateInterval> dateIntervals) {
DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(), dateInterval);
if (processInstance == null) {
logger.info("Cannot find dependent processInstance, waiting for workflow to run, processDefiniteCode:{}, taskCode:{}",
dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode());
return DependResult.WAITING;
}
if (!processInstance.getState().typeIsFinished()) {
logger.info("Wait for the dependent workflow to complete, processDefiniteCode:{}, taskCode:{}, processInstanceId:{}, processInstance state:{}",
dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode(), processInstance.getId(), processInstance.getState());
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByProcessInstance(processInstance);
result = dependResultByProcessInstance(processInstance, dateInterval);
} else {
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance);
result = getDependTaskResult(dependentItem.getDepTaskCode(), dateInterval);
}
if (result != DependResult.SUCCESS) {
break;
@ -127,14 +135,55 @@ public class DependentExecute {
/**
* depend type = depend_all
*
* @return
*/
private DependResult dependResultByProcessInstance(ProcessInstance processInstance) {
if (!processInstance.getState().typeIsFinished()) {
private DependResult dependResultByProcessInstance(ProcessInstance processInstance, DateInterval dateInterval) {
if (processInstance.getState().typeIsSuccess()) {
List<ProcessTaskRelation> taskRelations = processService.findRelationByCode(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (!taskRelations.isEmpty()) {
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(taskRelations);
Map<Long, String> definiteTask = taskDefinitionLogs.stream().filter(log -> !log.getTaskType().equals(TaskType.SUB_PROCESS.getDesc())
|| !log.getTaskType().equals(TaskType.DEPENDENT.getDesc())
|| !log.getTaskType().equals(TaskType.CONDITIONS.getDesc()))
.collect(Collectors.toMap(TaskDefinition::getCode, TaskDefinitionLog::getName));
if (!definiteTask.isEmpty()) {
List<TaskInstance> taskInstanceList = processService.findLastTaskInstanceListInterval(definiteTask.keySet(), dateInterval);
if (taskInstanceList.isEmpty()) {
logger.warn("Cannot find the task instance: {}", JSONUtils.toJsonString(definiteTask));
return DependResult.FAILED;
}
Map<Long, TaskInstance> taskInstanceMap = new HashMap<>();
for (TaskInstance instance : taskInstanceList) {
taskInstanceMap.compute(instance.getTaskCode(), (k, v) -> {
if (v == null) {
v = instance;
} else {
if (v.getId() < instance.getId()) {
v = instance;
}
}
return v;
});
definiteTask.remove(instance.getTaskCode());
}
List<TaskInstance> instanceFail = taskInstanceMap.values().stream().filter(instance -> instance.getState().typeIsFailure()).collect(Collectors.toList());
if (!instanceFail.isEmpty()) {
List<String> log = instanceFail.stream().map(instance -> instance.getId() + "|" + instance.getTaskCode() + "|" + instance.getName()).collect(Collectors.toList());
logger.warn("The fail task: {}", StringUtils.join(log, Constants.COMMA));
return DependResult.FAILED;
}
List<TaskInstance> instanceRunning = taskInstanceMap.values().stream().filter(instance -> instance.getState().typeIsRunning()).collect(Collectors.toList());
if (!instanceRunning.isEmpty()) {
List<String> log = instanceRunning.stream().map(instance -> instance.getId() + "|" + instance.getTaskCode() + "|" + instance.getName()).collect(Collectors.toList());
logger.info("The running task: {}", StringUtils.join(log, Constants.COMMA));
return DependResult.WAITING;
}
if (processInstance.getState().typeIsSuccess()) {
if (!definiteTask.isEmpty()) {
logger.warn("Cannot find the task instance: {}", JSONUtils.toJsonString(definiteTask));
return DependResult.FAILED;
}
}
}
return DependResult.SUCCESS;
}
return DependResult.FAILED;
@ -142,35 +191,17 @@ public class DependentExecute {
/**
* get depend task result
*
* @param taskCode
* @param processInstance
* @return
*/
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance) {
private DependResult getDependTaskResult(long taskCode, DateInterval dateInterval) {
TaskInstance taskInstance = processService.findLastTaskInstanceInterval(taskCode, dateInterval);
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) {
taskInstance = task;
break;
}
}
if (taskInstance == null) {
// cannot find task in the process instance
// maybe because process instance is running or failed.
if (processInstance.getState().typeIsFinished()) {
logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}", taskCode);
result = DependResult.FAILED;
} else {
return DependResult.WAITING;
}
} else {
logger.info("The running task, taskId:{}, taskCode:{}, taskName:{}", taskInstance.getId(), taskInstance.getTaskCode(), taskInstance.getName());
result = getDependResultByState(taskInstance.getState());
}
return result;
}
@ -211,7 +242,6 @@ public class DependentExecute {
* @return DependResult
*/
private DependResult getDependResultByState(ExecutionStatus state) {
if (!state.typeIsFinished()) {
return DependResult.WAITING;
} else if (state.typeIsSuccess()) {
@ -221,23 +251,6 @@ public class DependentExecute {
}
}
/**
* get dependent result by task instance state when task instance is null
*
* @param state state
* @return DependResult
*/
private DependResult getDependResultByProcessStateWhenTaskNull(ExecutionStatus state) {
if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|| state == ExecutionStatus.WAITING_THREAD) {
return DependResult.WAITING;
} else {
return DependResult.FAILED;
}
}
/**
* judge depend item finished
*
@ -245,9 +258,9 @@ public class DependentExecute {
* @return boolean
*/
public boolean finish(Date currentTime) {
if (modelDependResult == DependResult.WAITING) {
if (modelDependResult == DependResult.WAITING || modelDependResult == DependResult.NON_EXEC) {
modelDependResult = getModelDependResult(currentTime);
return false;
return modelDependResult == DependResult.SUCCESS || modelDependResult == DependResult.FAILED;
}
return true;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@ -148,6 +149,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK, new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, new TaskRecallAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java vendored

@ -24,7 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Responce Cache : cache worker send master result
* Response Cache : cache worker send master result
*/
public class ResponceCache {
@ -38,7 +38,8 @@ public class ResponceCache {
private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
private final Map<Integer,Command> killResponseCache = new ConcurrentHashMap<>();
private Map<Integer,Command> killResponseCache = new ConcurrentHashMap<>();
private Map<Integer,Command> recallCache = new ConcurrentHashMap<>();
/**
@ -58,6 +59,10 @@ public class ResponceCache {
case ACTION_STOP:
killResponseCache.put(taskInstanceId,command);
break;
case WORKER_REJECT:
case REALLOCATE:
recallCache.put(taskInstanceId,command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
@ -85,6 +90,19 @@ public class ResponceCache {
return killResponseCache;
}
/**
* recall response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeRecallCache(Integer taskInstanceId) {
recallCache.remove(taskInstanceId);
}
public Map<Integer, Command> getRecallCache() {
return recallCache;
}
/**
* remove reponse cache
* @param taskInstanceId taskInstanceId

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -128,6 +128,20 @@ public class TaskCallbackService {
REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
* get opaque
*
* @param taskInstanceId taskInstanceId
*/
public static long getOpaque(int taskInstanceId) {
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
long opaque = 0L;
if (nettyRemoteChannel != null) {
opaque = nettyRemoteChannel.getOpaque();
}
return opaque;
}
/**
* send ack
*

108
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -19,21 +19,16 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
@ -45,9 +40,6 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.Date;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -112,7 +104,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRequestCommand.class);
@ -132,90 +123,39 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
setTaskCache(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.getWorkerTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
} else {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setStartTime(new Date());
}
this.doAck(taskExecutionContext);
ResponceCache.get().removeRecallCache(taskExecutionContext.getTaskInstanceId());
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
// submit task to manager
if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize());
}
}
private void doAck(TaskExecutionContext taskExecutionContext) {
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
sendRecallCommand(taskExecutionContext, channel);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
/**
* build ack command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
if (TaskType.SQL.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType()) || TaskType.PROCEDURE.getDesc().equalsIgnoreCase(taskExecutionContext.getTaskType())) {
ackCommand.setExecutePath(null);
} else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
taskExecutionContext.setLogPath(ackCommand.getLogPath());
ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
return ackCommand;
private void sendRecallCommand(TaskExecutionContext taskExecutionContext, Channel channel) {
TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
Command command = taskRecallCommand.convert2Command();
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), command, Event.WORKER_REJECT);
taskCallbackService.changeRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque()));
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), command);
logger.info("send recall command successfully, taskId:{}, opaque:{}", taskExecutionContext.getTaskInstanceId(), command.getOpaque());
}
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskRecallCommand.setHost(taskExecutionContext.getHost());
taskRecallCommand.setEvent(Event.WORKER_REJECT);
taskRecallCommand.setStatus(ExecutionStatus.SUBMITTED_SUCCESS.getCode());
return taskRecallCommand;
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -161,7 +161,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* build TaskKillResponseCommand
*
* @param killCommand kill command
* @param taskRequest taskRequest
* @param result exe result
* @return build TaskKillResponseCommand
*/

63
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRecallAckProcessor.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
public class TaskRecallAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskRecallAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskRecallAckCommand taskRecallAckCommand = JSONUtils.parseObject(
command.getBody(), TaskRecallAckCommand.class);
logger.info("taskRecallAckCommand:{}, opaque:{}", taskRecallAckCommand, command.getOpaque());
if (taskRecallAckCommand == null) {
return;
}
if (taskRecallAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
Command recallCommand = ResponceCache.get().getRecallCache().get(taskRecallAckCommand.getTaskInstanceId());
if (recallCommand != null && command.getOpaque() == recallCommand.getOpaque()) {
ResponceCache.get().removeRecallCache(taskRecallAckCommand.getTaskInstanceId());
logger.info("removeRecallCache: task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
}
if (command.getOpaque() == TaskCallbackService.getOpaque(taskRecallAckCommand.getTaskInstanceId())) {
TaskCallbackService.remove(taskRecallAckCommand.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", taskRecallAckCommand.getTaskInstanceId());
}
}
}
}

50
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
@ -24,14 +25,13 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Retry Report Task Status Thread
*/
@ -64,40 +64,28 @@ public class RetryReportTaskStatusThread implements Runnable {
@Override
public void run() {
ResponceCache responceCache = ResponceCache.get();
long interval = workerConfig.getRetryReportTaskStatusInterval() * Constants.SLEEP_TIME_MILLIS * 60L;
while (Stopper.isRunning()) {
// sleep 5 minutes
ThreadUtils.sleep(workerConfig.getRetryReportTaskStatusInterval() * 1000);
ThreadUtils.sleep(60 * Constants.SLEEP_TIME_MILLIS);
long nowTimeMillis = System.currentTimeMillis();
try {
if (!responceCache.getAckCache().isEmpty()){
Map<Integer,Command> ackCache = responceCache.getAckCache();
for (Map.Entry<Integer, Command> entry : ackCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command ackCommand = entry.getValue();
taskCallbackService.sendAck(taskInstanceId,ackCommand);
}
retrySendCommand(responceCache.getAckCache(), interval, nowTimeMillis);
retrySendCommand(responceCache.getResponseCache(), interval, nowTimeMillis);
retrySendCommand(responceCache.getKillResponseCache(), interval, nowTimeMillis);
retrySendCommand(responceCache.getRecallCache(), interval, nowTimeMillis);
} catch (Exception e) {
logger.warn("retry report task status error", e);
}
if (!responceCache.getResponseCache().isEmpty()){
Map<Integer,Command> responseCache = responceCache.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()){
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.sendResult(taskInstanceId,responseCommand);
}
}
if (!responceCache.getKillResponseCache().isEmpty()) {
Map<Integer, Command> killResponseCache = responceCache.getKillResponseCache();
for (Map.Entry<Integer, Command> entry : killResponseCache.entrySet()) {
private void retrySendCommand(Map<Integer, Command> cache, long interval, long nowTimeMillis) {
for (Map.Entry<Integer, Command> entry : cache.entrySet()) {
Command command = entry.getValue();
if (nowTimeMillis - command.getGenCommandTimeMillis() > interval) {
Integer taskInstanceId = entry.getKey();
Command killResponseCommand = entry.getValue();
taskCallbackService.sendResult(taskInstanceId, killResponseCommand);
}
}
}catch (Exception e){
logger.warn("retry report task status error", e);
taskCallbackService.sendResult(taskInstanceId, command);
logger.info("retry send command successfully, the command type {}, the task id:{}", command.getType(),taskInstanceId);
}
}
}

57
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -22,16 +22,11 @@ import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
@ -123,10 +118,16 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
FileUtils.createWorkDirIfAbsent(execLocalPath);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
@ -143,7 +144,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
changeTaskExecutionStatusToRunning();
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
sendTaskExecuteRunningCommand(taskExecutionContext);
int dryRun = taskExecutionContext.getDryRun();
// copy hdfs/minio file to local
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
@ -203,7 +205,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
@ -218,6 +219,40 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
private void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
// add response cache
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
private TaskExecuteAckCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand command = new TaskExecuteAckCommand();
command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
command.setLogPath(taskExecutionContext.getLogPath());
command.setHost(taskExecutionContext.getHost());
command.setStartTime(taskExecutionContext.getStartTime());
command.setExecutePath(taskExecutionContext.getExecutePath());
return command;
}
private void sendAlert(TaskAlertInfo taskAlertInfo) {
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent());
}

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
@ -49,7 +51,7 @@ public class WorkerManagerThread implements Runnable {
/**
* task queue
*/
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
/**
* running task
@ -73,6 +75,7 @@ public class WorkerManagerThread implements Runnable {
public WorkerManagerThread() {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.waitSubmitQueue = new DelayQueue<>();
this.workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()),
taskExecuteThreadMap
@ -85,12 +88,12 @@ public class WorkerManagerThread implements Runnable {
}
/**
* get delay queue size
* get wait submit queue size
*
* @return queue size
*/
public int getDelayQueueSize() {
return workerExecuteQueue.size();
public int getWaitSubmitQueueSize() {
return waitSubmitQueue.size();
}
/**
@ -107,9 +110,9 @@ public class WorkerManagerThread implements Runnable {
* then send Response to Master, update the execution status of task instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
workerExecuteQueue.stream()
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(workerExecuteQueue::remove);
.forEach(waitSubmitQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
@ -135,7 +138,14 @@ public class WorkerManagerThread implements Runnable {
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
return workerExecuteQueue.offer(taskExecuteThread);
if (waitSubmitQueue.size() > workerConfig.getWorkerExecThreads()) {
// if waitSubmitQueue is full, it will wait 1s, then try add
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
if (waitSubmitQueue.size() > workerConfig.getWorkerExecThreads()) {
return false;
}
}
return waitSubmitQueue.offer(taskExecuteThread);
}
public void start() {
@ -150,8 +160,14 @@ public class WorkerManagerThread implements Runnable {
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
taskExecuteThread = workerExecuteQueue.take();
if (this.getThreadPoolQueueSize() <= workerConfig.getWorkerExecThreads()) {
taskExecuteThread = waitSubmitQueue.take();
workerExecService.submit(taskExecuteThread);
} else {
logger.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);

1
dolphinscheduler-server/src/main/resources/logback-master.xml

@ -62,6 +62,7 @@
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-master.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<encoder>

1
dolphinscheduler-server/src/main/resources/logback-worker.xml

@ -62,6 +62,7 @@
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log.base}/dolphinscheduler-worker.%d{yyyy-MM-dd_HH}.%i.log</fileNamePattern>
<maxHistory>168</maxHistory>
<totalSizeCap>2GB</totalSizeCap>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<encoder>

2
dolphinscheduler-server/src/main/resources/worker.properties

@ -50,5 +50,5 @@ task.plugin.dir=lib/plugin/task
#task.plugin.binding config the task plugin need be load when development and run in IDE
#task.plugin.binding=./dolphinscheduler-task-plugin/dolphinscheduler-task-shell/pom.xml
# worker retry report task statues interval, default value 10
# worker retry report task statues interval, default value 10 min
#worker.retry.report.task.statues.interval=10

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -102,7 +102,10 @@ public class WorkflowExecuteThreadTest {
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
ConcurrentHashMap<Integer, TaskInstance> depStateCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,
processService, null, null,
config, taskTimeoutCheckList, taskRetryCheckList, depStateCheckList));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
@ -112,7 +115,7 @@ public class WorkflowExecuteThreadTest {
@Test
public void testParseStartNodeName() throws ParseException {
public void testParseStartNodeName() {
try {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");

20
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java

@ -28,33 +28,33 @@ public class LowerWeightRoundRobinTest {
@Test
public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 1, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, 2, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, 1, System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.3", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
}
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, 0, System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -24,10 +24,8 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.ArrayList;
@ -104,7 +102,7 @@ public class ProcessAlertManager {
if (processInstance.getState().typeIsSuccess()) {
List<ProcessAlertContent> successTaskList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId())
.projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId())
@ -128,7 +126,7 @@ public class ProcessAlertManager {
continue;
}
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId())
.projectCode(projectUser.getProjectCode())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId())
@ -197,7 +195,6 @@ public class ProcessAlertManager {
} catch (Exception e) {
logger.error("send alert failed:{} ", e.getMessage());
}
}
/**
@ -263,13 +260,13 @@ public class ProcessAlertManager {
* send process timeout alert
*
* @param processInstance process instance
* @param processDefinition process definition
* @param projectUser projectUser
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
alertDao.sendProcessTimeoutAlert(processInstance, processDefinition);
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProjectUser projectUser) {
alertDao.sendProcessTimeoutAlert(processInstance, projectUser);
}
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) {
alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, taskDefinition);
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, ProjectUser projectUser ) {
alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
}
}

101
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -454,6 +454,8 @@ public class ProcessService {
/**
* recursive delete all task instance by process instance id
*
* @param processInstanceId
*/
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
@ -989,6 +991,13 @@ public class ProcessService {
processInstanceMap.setProcessInstanceId(subProcessInstance.getId());
this.updateWorkProcessInstanceMap(processInstanceMap);
TaskInstance subTask = this.findTaskInstanceById(processInstanceMap.getParentTaskInstanceId());
if (subTask != null && subTask.isSubProcess() && subTask.getState() == ExecutionStatus.SUBMITTED_SUCCESS) {
subTask.setState(ExecutionStatus.RUNNING_EXECUTION);
subTask.setStartTime(new Date());
this.updateTaskInstance(subTask);
}
}
/**
@ -1308,33 +1317,15 @@ public class ProcessService {
*/
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (taskInstance.getState().typeIsFailure()) {
if (taskInstance.isSubProcess()) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
} else {
if (processInstanceState != ExecutionStatus.READY_STOP
&& processInstanceState != ExecutionStatus.READY_PAUSE) {
// failure task set invalid
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
// crate new task instance
if (taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE) {
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1);
}
taskInstance.setSubmitTime(null);
taskInstance.setLogPath(null);
taskInstance.setExecutePath(null);
taskInstance.setStartTime(null);
taskInstance.setEndTime(null);
taskInstance.setFlag(Flag.YES);
taskInstance.setHost(null);
taskInstance.setId(0);
}
if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) {
logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
taskInstance.setState(ExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
@ -1747,6 +1738,7 @@ public class ProcessService {
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
* @return
*/
public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
@ -1968,6 +1960,28 @@ public class ProcessService {
return scheduleMapper.selectAllByProcessDefineArray(codes);
}
/**
* find last task instance in the date interval
*
* @param taskCode taskCode
* @param dateInterval dateInterval
* @return task instance
*/
public TaskInstance findLastTaskInstanceInterval(long taskCode, DateInterval dateInterval) {
return taskInstanceMapper.queryLastTaskInstance(taskCode, dateInterval.getStartTime(), dateInterval.getEndTime());
}
/**
* find last task instance list in the date interval
*
* @param taskCodes taskCode list
* @param dateInterval dateInterval
* @return task instance
*/
public List<TaskInstance> findLastTaskInstanceListInterval(Set<Long> taskCodes, DateInterval dateInterval) {
return taskInstanceMapper.queryLastTaskInstanceList(taskCodes, dateInterval.getStartTime(), dateInterval.getEndTime());
}
/**
* find last scheduler process instance in the date interval
*
@ -2262,6 +2276,23 @@ public class ProcessService {
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
continue;
}
}
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setCreateTime(now);
if (taskDefinitionLog.getCode() == 0) {
try {
taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
@ -2270,27 +2301,7 @@ public class ProcessService {
return Constants.DEFINITION_FAILURE;
}
}
if (taskDefinitionLog.getVersion() == 0) {
// init first version
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
}
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion == null) {
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setCreateTime(now);
newTaskDefinitionLogs.add(taskDefinitionLog);
continue;
}
if (taskDefinitionLog.equals(definitionCodeAndVersion)) {
// do nothing if equals
continue;
}
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
int insertResult = 0;
int updateResult = 0;
@ -2308,7 +2319,7 @@ public class ProcessService {
}
}
}
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) {
if (!newTaskDefinitionLogs.isEmpty()) {
insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if (Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -18,17 +18,21 @@
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service;
/**
* A singleton of a task queue implemented using PriorityBlockingQueue
*/
@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueImpl.class);
/**
* queue size
*/
@ -47,7 +51,12 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
*/
@Override
public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException {
if (!queue.contains(taskPriorityInfo)) {
queue.put(taskPriorityInfo);
} else {
logger.warn("the priorityBlockingQueue contain the task already, taskId: {}, processInstanceId: {}",
taskPriorityInfo.getTaskId(), taskPriorityInfo.getProcessInstanceId());
}
}
/**

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java

@ -61,6 +61,12 @@ public class TaskExecutionContext implements Serializable {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
/**
* endTime
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
/**
* task type
*/
@ -287,6 +293,14 @@ public class TaskExecutionContext implements Serializable {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public String getTaskType() {
return taskType;
}
@ -573,6 +587,7 @@ public class TaskExecutionContext implements Serializable {
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", endTime=" + endTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''

16
pom.xml

@ -92,7 +92,7 @@
<postgresql.version>42.2.5</postgresql.version>
<hive.jdbc.version>2.1.0</hive.jdbc.version>
<commons.io.version>2.4</commons.io.version>
<oshi.core.version>3.9.1</oshi.core.version>
<oshi.core.version>6.1.1</oshi.core.version>
<clickhouse.jdbc.version>0.1.52</clickhouse.jdbc.version>
<mssql.jdbc.version>6.1.0.jre8</mssql.jdbc.version>
<presto.jdbc.version>0.238.1</presto.jdbc.version>
@ -748,6 +748,20 @@
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>${oshi.core.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

2
tools/dependencies/known-dependencies.txt

@ -163,7 +163,7 @@ mybatis-spring-2.0.2.jar
netty-3.6.2.Final.jar
netty-all-4.1.53.Final.jar
opencsv-2.3.jar
oshi-core-3.9.1.jar
oshi-core-6.1.1.jar
paranamer-2.3.jar
parquet-hadoop-bundle-1.8.1.jar
poi-4.1.2.jar

Loading…
Cancel
Save