Browse Source

delay execution of tasks and improve some designs (#3427)

pull/3/MERGE
vanilla111 4 years ago committed by GitHub
parent
commit
a4ee351a3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java
  2. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 109
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  4. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java
  5. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  6. 444
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
  7. 32
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java
  8. 127
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  9. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java
  10. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  11. 169
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  12. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  13. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  14. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  15. 54
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  16. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  17. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  18. 98
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  19. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  20. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  21. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java
  22. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  23. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java
  24. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  25. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  26. 171
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
  27. 98
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  28. 2
      pom.xml
  29. 4
      sql/dolphinscheduler-postgre.sql
  30. 2
      sql/dolphinscheduler_mysql.sql
  31. 1
      sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql
  32. 58
      sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
  33. 16
      sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql
  34. 52
      sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
  35. 16
      sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/TaskCountDto.java

@ -42,9 +42,10 @@ public class TaskCountDto {
countTaskDtos(taskInstanceStateCounts);
}
private void countTaskDtos(List<ExecuteStatusCount> taskInstanceStateCounts){
private void countTaskDtos(List<ExecuteStatusCount> taskInstanceStateCounts) {
int submittedSuccess = 0;
int runningExeution = 0;
int runningExecution = 0;
int delayExecution = 0;
int readyPause = 0;
int pause = 0;
int readyStop = 0;
@ -55,15 +56,18 @@ public class TaskCountDto {
int kill = 0;
int waittingThread = 0;
for(ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts){
for (ExecuteStatusCount taskInstanceStateCount : taskInstanceStateCounts) {
ExecutionStatus status = taskInstanceStateCount.getExecutionStatus();
totalCount += taskInstanceStateCount.getCount();
switch (status){
switch (status) {
case SUBMITTED_SUCCESS:
submittedSuccess += taskInstanceStateCount.getCount();
break;
case RUNNING_EXECUTION:
runningExeution += taskInstanceStateCount.getCount();
runningExecution += taskInstanceStateCount.getCount();
break;
case DELAY_EXECUTION:
delayExecution += taskInstanceStateCount.getCount();
break;
case READY_PAUSE:
readyPause += taskInstanceStateCount.getCount();
@ -93,13 +97,14 @@ public class TaskCountDto {
waittingThread += taskInstanceStateCount.getCount();
break;
default:
break;
default:
break;
}
}
this.taskCountDtos = new ArrayList<>();
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.SUBMITTED_SUCCESS, submittedSuccess));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExeution));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.RUNNING_EXECUTION, runningExecution));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.DELAY_EXECUTION, delayExecution));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_PAUSE, readyPause));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.PAUSE, pause));
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.READY_STOP, readyStop));
@ -111,8 +116,7 @@ public class TaskCountDto {
this.taskCountDtos.add(new TaskStateCount(ExecutionStatus.WAITTING_THREAD, waittingThread));
}
public List<TaskStateCount> getTaskCountDtos(){
public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos;
}

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -138,7 +138,7 @@ public final class Constants {
/**
* python home
*/
public static final String PYTHON_HOME="PYTHON_HOME";
public static final String PYTHON_HOME = "PYTHON_HOME";
/**
* resource.view.suffixs
@ -366,7 +366,6 @@ public final class Constants {
public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10;
/**
* default log cache rows num,output when reach the number
*/
@ -752,7 +751,7 @@ public final class Constants {
/**
* preview schedule execute count
* preview schedule execute count
*/
public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5;
@ -832,6 +831,7 @@ public final class Constants {
public static final int[] NOT_TERMINATED_STATES = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
@ -852,18 +852,17 @@ public final class Constants {
/**
* data total
*/
public static final String COUNT = "count";
public static final String COUNT = "count";
/**
* page size
*/
public static final String PAGE_SIZE = "pageSize";
public static final String PAGE_SIZE = "pageSize";
/**
* current page no
*/
public static final String PAGE_NUMBER = "pageNo";
public static final String PAGE_NUMBER = "pageNo";
/**
@ -966,11 +965,11 @@ public final class Constants {
/**
* authorize writable perm
*/
public static final int AUTHORIZE_WRITABLE_PERM=7;
public static final int AUTHORIZE_WRITABLE_PERM = 7;
/**
* authorize readable perm
*/
public static final int AUTHORIZE_READABLE_PERM=4;
public static final int AUTHORIZE_READABLE_PERM = 4;
/**

109
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -14,16 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import java.util.HashMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
import java.util.HashMap;
/**
* running status for workflow and task nodes
*
*/
public enum ExecutionStatus {
@ -41,6 +40,7 @@ public enum ExecutionStatus {
* 9 kill
* 10 waiting thread
* 11 waiting depend node complete
* 12 delay execution
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
@ -53,9 +53,10 @@ public enum ExecutionStatus {
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete");
WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution");
ExecutionStatus(int code, String descp){
ExecutionStatus(int code, String descp) {
this.code = code;
this.descp = descp;
}
@ -64,77 +65,85 @@ public enum ExecutionStatus {
private final int code;
private final String descp;
private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP=new HashMap<>();
private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
static {
for (ExecutionStatus executionStatus:ExecutionStatus.values()){
EXECUTION_STATUS_MAP.put(executionStatus.code,executionStatus);
}
for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
}
}
/**
* status is success
* @return status
*/
public boolean typeIsSuccess(){
return this == SUCCESS;
}
/**
* status is failure
* @return status
*/
public boolean typeIsFailure(){
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**
* status is finished
* @return status
*/
public boolean typeIsFinished(){
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsStop();
}
/**
* status is success
*
* @return status
*/
public boolean typeIsSuccess() {
return this == SUCCESS;
}
/**
* status is failure
*
* @return status
*/
public boolean typeIsFailure() {
return this == FAILURE || this == NEED_FAULT_TOLERANCE || this == KILL;
}
/**
* status is finished
*
* @return status
*/
public boolean typeIsFinished() {
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsStop();
}
/**
* status is waiting thread
*
* @return status
*/
public boolean typeIsWaitingThread(){
return this == WAITTING_THREAD;
}
public boolean typeIsWaitingThread() {
return this == WAITTING_THREAD;
}
/**
* status is pause
*
* @return status
*/
public boolean typeIsPause(){
return this == PAUSE;
}
public boolean typeIsPause() {
return this == PAUSE;
}
/**
* status is pause
*
* @return status
*/
public boolean typeIsStop(){
public boolean typeIsStop() {
return this == STOP;
}
/**
* status is running
*
* @return status
*/
public boolean typeIsRunning(){
return this == RUNNING_EXECUTION || this == WAITTING_DEPEND;
}
public boolean typeIsRunning() {
return this == RUNNING_EXECUTION || this == WAITTING_DEPEND || this == DELAY_EXECUTION;
}
/**
* status is cancel
*
* @return status
*/
public boolean typeIsCancel(){
return this == KILL || this == STOP ;
public boolean typeIsCancel() {
return this == KILL || this == STOP;
}
public int getCode() {
@ -145,10 +154,10 @@ public enum ExecutionStatus {
return descp;
}
public static ExecutionStatus of(int status){
if(EXECUTION_STATUS_MAP.containsKey(status)){
return EXECUTION_STATUS_MAP.get(status);
}
public static ExecutionStatus of(int status) {
if (EXECUTION_STATUS_MAP.containsKey(status)) {
return EXECUTION_STATUS_MAP.get(status);
}
throw new IllegalArgumentException("invalid status : " + status);
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskStateType.java

@ -31,12 +31,13 @@ public enum TaskStateType {
/**
* convert task state to execute status integer array ;
*
* @param taskStateType task state type
* @return result of execution status
*/
public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType){
public static int[] convert2ExecutStatusIntArray(TaskStateType taskStateType) {
switch (taskStateType){
switch (taskStateType) {
case SUCCESS:
return new int[]{ExecutionStatus.SUCCESS.ordinal()};
case FAILED:
@ -51,14 +52,15 @@ public enum TaskStateType {
case RUNNING:
return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
case WAITTING:
return new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal()
};
default:
break;
default:
break;
}
return new int[0];
}

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -136,6 +136,11 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String timeout;
/**
* delay execution time.
*/
private int delayTime;
public String getId() {
return id;
}
@ -310,24 +315,25 @@ public class TaskNode {
@Override
public String toString() {
return "TaskNode{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", desc='" + desc + '\'' +
", type='" + type + '\'' +
", runFlag='" + runFlag + '\'' +
", loc='" + loc + '\'' +
", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval +
", params='" + params + '\'' +
", preTasks='" + preTasks + '\'' +
", extras='" + extras + '\'' +
", depList=" + depList +
", dependence='" + dependence + '\'' +
", taskInstancePriority=" + taskInstancePriority +
", timeout='" + timeout + '\'' +
", workerGroup='" + workerGroup + '\'' +
'}';
return "TaskNode{"
+ "id='" + id + '\''
+ ", name='" + name + '\''
+ ", desc='" + desc + '\''
+ ", type='" + type + '\''
+ ", runFlag='" + runFlag + '\''
+ ", loc='" + loc + '\''
+ ", maxRetryTimes=" + maxRetryTimes
+ ", retryInterval=" + retryInterval
+ ", params='" + params + '\''
+ ", preTasks='" + preTasks + '\''
+ ", extras='" + extras + '\''
+ ", depList=" + depList
+ ", dependence='" + dependence + '\''
+ ", taskInstancePriority=" + taskInstancePriority
+ ", timeout='" + timeout + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", delayTime=" + delayTime
+ '}';
}
public String getWorkerGroup() {
@ -353,4 +359,12 @@ public class TaskNode {
public void setWorkerGroupId(Integer workerGroupId) {
this.workerGroupId = workerGroupId;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
}

444
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java

File diff suppressed because one or more lines are too long

32
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/enums/ExecutionStatusTest.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import junit.framework.TestCase;
/**
* execution status test.
*/
public class ExecutionStatusTest extends TestCase {
public void testTypeIsRunning() {
assertTrue(ExecutionStatus.RUNNING_EXECUTION.typeIsRunning());
assertTrue(ExecutionStatus.WAITTING_DEPEND.typeIsRunning());
assertTrue(ExecutionStatus.DELAY_EXECUTION.typeIsRunning());
}
}

127
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -42,7 +42,7 @@ public class TaskInstance implements Serializable {
/**
* id
*/
@TableId(value="id", type=IdType.AUTO)
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
@ -51,7 +51,6 @@ public class TaskInstance implements Serializable {
private String name;
/**
* task type
*/
@ -83,22 +82,28 @@ public class TaskInstance implements Serializable {
*/
private ExecutionStatus state;
/**
* task first submit time.
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date firstSubmitTime;
/**
* task submit time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date submitTime;
/**
* task start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
/**
* task end time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime;
/**
@ -214,11 +219,14 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private Map<String,String> resources;
private Map<String, String> resources;
/**
* delay execution time.
*/
private int delayTime;
public void init(String host,Date startTime,String executePath){
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
@ -297,6 +305,14 @@ public class TaskInstance implements Serializable {
this.state = state;
}
public Date getFirstSubmitTime() {
return firstSubmitTime;
}
public void setFirstSubmitTime(Date firstSubmitTime) {
this.firstSubmitTime = firstSubmitTime;
}
public Date getSubmitTime() {
return submitTime;
}
@ -361,7 +377,7 @@ public class TaskInstance implements Serializable {
this.retryTimes = retryTimes;
}
public Boolean isTaskSuccess(){
public Boolean isTaskSuccess() {
return this.state == ExecutionStatus.SUCCESS;
}
@ -400,6 +416,7 @@ public class TaskInstance implements Serializable {
public void setFlag(Flag flag) {
this.flag = flag;
}
public String getProcessInstanceName() {
return processInstanceName;
}
@ -464,33 +481,33 @@ public class TaskInstance implements Serializable {
this.resources = resources;
}
public boolean isSubProcess(){
public boolean isSubProcess() {
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}
public boolean isDependTask(){
public boolean isDependTask() {
return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType));
}
public boolean isConditionsTask(){
public boolean isConditionsTask() {
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
}
/**
* determine if you can try again
*
* @return can try result
*/
public boolean taskCanRetry() {
if(this.isSubProcess()){
if (this.isSubProcess()) {
return false;
}
if(this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){
if (this.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
return true;
}else {
} else {
return (this.getState().typeIsFailure()
&& this.getRetryTimes() < this.getMaxRetryTimes());
&& this.getRetryTimes() < this.getMaxRetryTimes());
}
}
@ -526,40 +543,50 @@ public class TaskInstance implements Serializable {
this.dependentResult = dependentResult;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
@Override
public String toString() {
return "TaskInstance{" +
"id=" + id +
", name='" + name + '\'' +
", taskType='" + taskType + '\'' +
", processDefinitionId=" + processDefinitionId +
", processInstanceId=" + processInstanceId +
", processInstanceName='" + processInstanceName + '\'' +
", taskJson='" + taskJson + '\'' +
", state=" + state +
", submitTime=" + submitTime +
", startTime=" + startTime +
", endTime=" + endTime +
", host='" + host + '\'' +
", executePath='" + executePath + '\'' +
", logPath='" + logPath + '\'' +
", retryTimes=" + retryTimes +
", alertFlag=" + alertFlag +
", processInstance=" + processInstance +
", processDefine=" + processDefine +
", pid=" + pid +
", appLink='" + appLink + '\'' +
", flag=" + flag +
", dependency='" + dependency + '\'' +
", duration=" + duration +
", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority +
", dependentResult='" + dependentResult + '\'' +
", workerGroup='" + workerGroup + '\'' +
", executorId=" + executorId +
", executorName='" + executorName + '\'' +
'}';
return "TaskInstance{"
+ "id=" + id
+ ", name='" + name + '\''
+ ", taskType='" + taskType + '\''
+ ", processDefinitionId=" + processDefinitionId
+ ", processInstanceId=" + processInstanceId
+ ", processInstanceName='" + processInstanceName + '\''
+ ", taskJson='" + taskJson + '\''
+ ", state=" + state
+ ", firstSubmitTime=" + firstSubmitTime
+ ", submitTime=" + submitTime
+ ", startTime=" + startTime
+ ", endTime=" + endTime
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", retryTimes=" + retryTimes
+ ", alertFlag=" + alertFlag
+ ", processInstance=" + processInstance
+ ", processDefine=" + processDefine
+ ", pid=" + pid
+ ", appLink='" + appLink + '\''
+ ", flag=" + flag
+ ", dependency='" + dependency + '\''
+ ", duration=" + duration
+ ", maxRetryTimes=" + maxRetryTimes
+ ", retryInterval=" + retryInterval
+ ", taskInstancePriority=" + taskInstancePriority
+ ", processInstancePriority=" + processInstancePriority
+ ", dependentResult='" + dependentResult + '\''
+ ", workerGroup='" + workerGroup + '\''
+ ", executorId=" + executorId
+ ", executorName='" + executorName + '\''
+ ", delayTime=" + delayTime
+ '}';
}
}

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/codec/NettyDecoder.java

@ -75,6 +75,7 @@ public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> {
out.add(packet);
//
checkpoint(State.MAGIC);
break;
default:
logger.warn("unknown decoder state {}", state());
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -40,6 +40,7 @@ public class TaskExecutionContextBuilder {
public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
taskExecutionContext.setTaskInstanceId(taskInstance.getId());
taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime());
taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
@ -48,6 +49,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setResources(taskInstance.getResources());
taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
return this;
}

169
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
@ -26,30 +26,38 @@ import java.io.Serializable;
import java.util.Date;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonFormat;
/**
* master/worker task transport
* master/worker task transport
*/
public class TaskExecutionContext implements Serializable{
public class TaskExecutionContext implements Serializable {
/**
* task id
* task id
*/
private int taskInstanceId;
/**
* task name
* task name
*/
private String taskName;
/**
* task start time
* task first submit time.
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date firstSubmitTime;
/**
* task start time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTime;
/**
* task type
* task type
*/
private String taskType;
@ -59,7 +67,7 @@ public class TaskExecutionContext implements Serializable{
private String host;
/**
* task execute path
* task execute path
*/
private String executePath;
@ -69,7 +77,7 @@ public class TaskExecutionContext implements Serializable{
private String logPath;
/**
* task json
* task json
*/
private String taskJson;
@ -84,53 +92,53 @@ public class TaskExecutionContext implements Serializable{
private String appIds;
/**
* process instance id
* process instance id
*/
private int processInstanceId;
/**
* process instance schedule time
* process instance schedule time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date scheduleTime;
/**
* process instance global parameters
* process instance global parameters
*/
private String globalParams;
/**
* execute user id
* execute user id
*/
private int executorId;
/**
* command type if complement
* command type if complement
*/
private int cmdTypeIfComplement;
/**
* tenant code
* tenant code
*/
private String tenantCode;
/**
* task queue
* task queue
*/
private String queue;
/**
* process define id
* process define id
*/
private int processDefineId;
/**
* project id
* project id
*/
private int projectId;
@ -140,12 +148,12 @@ public class TaskExecutionContext implements Serializable{
private String taskParams;
/**
* envFile
* envFile
*/
private String envFile;
/**
* definedParams
* definedParams
*/
private Map<String, String> definedParams;
@ -155,7 +163,7 @@ public class TaskExecutionContext implements Serializable{
private String taskAppId;
/**
* task timeout strategy
* task timeout strategy
*/
private int taskTimeoutStrategy;
@ -169,18 +177,28 @@ public class TaskExecutionContext implements Serializable{
*/
private String workerGroup;
/**
* delay execution time.
*/
private int delayTime;
/**
* current execution status
*/
private ExecutionStatus currentExecutionStatus;
/**
* resources full name and tenant code
*/
private Map<String,String> resources;
private Map<String, String> resources;
/**
* sql TaskExecutionContext
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
/**
* datax TaskExecutionContext
* datax TaskExecutionContext
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
@ -195,7 +213,7 @@ public class TaskExecutionContext implements Serializable{
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
* procedure TaskExecutionContext
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
@ -215,6 +233,14 @@ public class TaskExecutionContext implements Serializable{
this.taskName = taskName;
}
public Date getFirstSubmitTime() {
return firstSubmitTime;
}
public void setFirstSubmitTime(Date firstSubmitTime) {
this.firstSubmitTime = firstSubmitTime;
}
public Date getStartTime() {
return startTime;
}
@ -407,6 +433,22 @@ public class TaskExecutionContext implements Serializable{
this.workerGroup = workerGroup;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
public ExecutionStatus getCurrentExecutionStatus() {
return currentExecutionStatus;
}
public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
this.currentExecutionStatus = currentExecutionStatus;
}
public SQLTaskExecutionContext getSqlTaskExecutionContext() {
return sqlTaskExecutionContext;
}
@ -431,7 +473,7 @@ public class TaskExecutionContext implements Serializable{
this.procedureTaskExecutionContext = procedureTaskExecutionContext;
}
public Command toCommand(){
public Command toCommand() {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JsonSerializer.serializeToString(this));
return requestCommand.convert2Command();
@ -463,39 +505,42 @@ public class TaskExecutionContext implements Serializable{
@Override
public String toString() {
return "TaskExecutionContext{" +
"taskInstanceId=" + taskInstanceId +
", taskName='" + taskName + '\'' +
", startTime=" + startTime +
", taskType='" + taskType + '\'' +
", host='" + host + '\'' +
", executePath='" + executePath + '\'' +
", logPath='" + logPath + '\'' +
", taskJson='" + taskJson + '\'' +
", processId=" + processId +
", appIds='" + appIds + '\'' +
", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime +
", globalParams='" + globalParams + '\'' +
", executorId=" + executorId +
", cmdTypeIfComplement=" + cmdTypeIfComplement +
", tenantCode='" + tenantCode + '\'' +
", queue='" + queue + '\'' +
", processDefineId=" + processDefineId +
", projectId=" + projectId +
", taskParams='" + taskParams + '\'' +
", envFile='" + envFile + '\'' +
", definedParams=" + definedParams +
", taskAppId='" + taskAppId + '\'' +
", taskTimeoutStrategy=" + taskTimeoutStrategy +
", taskTimeout=" + taskTimeout +
", workerGroup='" + workerGroup + '\'' +
", resources=" + resources +
", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext +
", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext +
", procedureTaskExecutionContext=" + procedureTaskExecutionContext +
'}';
return "TaskExecutionContext{"
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\''
+ ", processDefineId=" + processDefineId
+ ", projectId=" + projectId
+ ", taskParams='" + taskParams + '\''
+ ", envFile='" + envFile + '\''
+ ", definedParams=" + definedParams
+ ", taskAppId='" + taskAppId + '\''
+ ", taskTimeoutStrategy=" + taskTimeoutStrategy
+ ", taskTimeout=" + taskTimeout
+ ", workerGroup='" + workerGroup + '\''
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ ", dataxTaskExecutionContext=" + dataxTaskExecutionContext
+ ", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext
+ ", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext
+ ", procedureTaskExecutionContext=" + procedureTaskExecutionContext
+ '}';
}
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -60,6 +60,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
*/
public ConditionsTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
taskInstance.setStartTime(new Date());
}
@Override

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -64,6 +64,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
*/
public DependentTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
taskInstance.setStartTime(new Date());
}

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -16,11 +16,11 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -30,11 +30,14 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
import java.util.concurrent.Callable;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
/**
@ -171,9 +174,10 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){
logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
// task cannot be submitted because its execution state is RUNNING or DELAY.
if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION
|| taskInstance.getState() == ExecutionStatus.DELAY_EXECUTION) {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.info("task ready to submit: {}", taskInstance);
@ -272,5 +276,4 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return logPath;
}
}

54
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -16,10 +16,21 @@
*/
package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -27,7 +38,13 @@ import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -37,17 +54,26 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* master exec thread,split dag
@ -470,9 +496,6 @@ public class MasterExecThread implements Runnable {
// task instance whether alert
taskInstance.setAlertFlag(Flag.NO);
// task instance start time
taskInstance.setStartTime(new Date());
// task instance flag
taskInstance.setFlag(Flag.YES);
@ -501,6 +524,8 @@ public class MasterExecThread implements Runnable {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
// delay execution time
taskInstance.setDelayTime(taskNode.getDelayTime());
}
return taskInstance;
}
@ -719,9 +744,10 @@ public class MasterExecThread implements Runnable {
* @return ExecutionStatus
*/
private ExecutionStatus runningState(ExecutionStatus state){
if(state == ExecutionStatus.READY_STOP ||
state == ExecutionStatus.READY_PAUSE ||
state == ExecutionStatus.WAITTING_THREAD){
if (state == ExecutionStatus.READY_STOP
|| state == ExecutionStatus.READY_PAUSE
|| state == ExecutionStatus.WAITTING_THREAD
|| state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains unchanged
return state;
}else{

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -39,7 +41,6 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.utils.*;
/**
@ -150,7 +151,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
break;
}
if(checkTimeout){
long remainTime = getRemaintime(taskTimeoutParameter.getInterval() * 60L);
long remainTime = DateUtils.getRemainTime(taskInstance.getStartTime(), taskTimeoutParameter.getInterval() * 60L);
if (remainTime < 0) {
logger.warn("task id: {} execution time out",taskInstance.getId());
// process define
@ -256,16 +257,4 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
return taskNode.getTaskTimeoutParameter();
}
/**
* get remain time?s?
*
* @return remain time
*/
private long getRemaintime(long timeoutSeconds) {
Date startTime = taskInstance.getStartTime();
long usedTime = (System.currentTimeMillis() - startTime.getTime()) / 1000;
return timeoutSeconds - usedTime;
}
}

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,15 +18,17 @@
package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.github.rholder.retry.RetryException;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -38,14 +40,21 @@ import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
/**
* worker request processor
*/
@ -97,7 +106,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
return;
}
taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
taskExecutionContext.setHost(NetUtils.getHost() + ":" + workerConfig.getListenPort());
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -122,7 +131,15 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// tell master that task is in executing
if (DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L) > 0) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
} else {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setStartTime(new Date());
}
// tell master the status of this task (RUNNING_EXECUTION or DELAY_EXECUTION)
final Command ackCommand = buildAckCommand(taskExecutionContext).convert2Command();
try {
@ -167,10 +184,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(new Date());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{

98
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -16,25 +16,20 @@
*/
package org.apache.dolphinscheduler.server.worker.runner;
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@ -43,9 +38,23 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils;
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
/**
* task scheduler thread
@ -105,6 +114,15 @@ public class TaskExecuteThread implements Runnable {
// task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
delayExecutionIfNeeded();
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) {
changeTaskExecutionStatusToRunning();
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
@ -138,7 +156,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
}catch (Exception e){
} catch (Exception e) {
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
@ -147,9 +165,10 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setAppIds(task.getAppIds());
} finally {
try {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(responseCommand.getStatus()));
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}catch (Exception e){
} catch (Exception e) {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
@ -256,4 +275,59 @@ public class TaskExecuteThread implements Runnable {
}
}
}
/**
* delay execution if needed.
*/
private void delayExecutionIfNeeded() {
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime);
if (remainTime > 0) {
try {
Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}
}
/**
* send an ack to change the status of the task.
*/
private void changeTaskExecutionStatusToRunning() {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
Command ackCommand = buildAckCommand().convert2Command();
try {
RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
return Boolean.TRUE;
});
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
}
/**
* build ack command.
*
* @return TaskExecuteAckCommand
*/
private TaskExecuteAckCommand buildAckCommand() {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
ackCommand.setLogPath(taskExecutionContext.getLogPath());
ackCommand.setHost(taskExecutionContext.getHost());
if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name())
|| taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
ackCommand.setExecutePath(null);
} else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
}

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -16,12 +16,14 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinNT;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -32,9 +34,12 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -47,8 +52,10 @@ import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
import org.slf4j.Logger;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinNT;
/**
* abstract command executor
@ -474,8 +481,7 @@ public abstract class AbstractCommandExecutor {
* @return remain time
*/
private long getRemaintime() {
long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000;
long remainTime = taskExecutionContext.getTaskTimeout() - usedTime;
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getStartTime(), taskExecutionContext.getTaskTimeout());
if (remainTime < 0) {
throw new RuntimeException("task execution time out");

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -55,7 +55,8 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class})
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class,
CuratorZookeeperClient.class})
public class TaskPriorityQueueConsumerTest {

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
@ -46,7 +47,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class})
public class ExecutorDispatcherTest {
@Autowired

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
@ -52,7 +53,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class,
ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, CuratorZookeeperClient.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class})
public class NettyExecutorManagerTest {

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
@ -43,7 +44,7 @@ import org.springframework.test.context.junit4.SpringRunner;
*/
@RunWith(SpringRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, WorkerConfig.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class})
public class RoundRobinHostManagerTest {

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.server.registry.DependencyConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
@ -35,7 +36,8 @@ import java.util.Date;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class})
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class,
CuratorZookeeperClient.class})
public class TaskResponseServiceTest {
@Autowired

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
@ -38,7 +39,8 @@ import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEP
* master registry test
*/
@RunWith(SpringRunner.class)
@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class, ZookeeperRegistryCenter.class,
MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class})
public class MasterRegistryTest {
@Autowired

171
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test task execute thread.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class})
public class TaskExecuteThreadTest {
private TaskExecutionContext taskExecutionContext;
private TaskCallbackService taskCallbackService;
private Command ackCommand;
private Command responseCommand;
private Logger taskLogger;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
@Before
public void before() {
// init task execution context, logger
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
ackCommand = new TaskExecuteAckCommand().convert2Command();
responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command();
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));
taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
PowerMockito.mockStatic(TaskManager.class);
PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger))
.thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class))
.thenReturn(new TaskNode());
PowerMockito.mockStatic(CommonUtils.class);
PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
}
@Test
public void testNormalExecution() {
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
}
@Test
public void testDelayExecution() {
taskExecutionContext.setTaskType("PYTHON");
taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
}
private class SimpleTask extends AbstractTask {
protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
// pid
this.processId = taskExecutionContext.getProcessId();
}
@Override
public AbstractParameters getParameters() {
return null;
}
@Override
public void init() {
}
@Override
public void handle() throws Exception {
}
@Override
public void after() {
}
@Override
public ExecutionStatus getExitStatus() {
return ExecutionStatus.SUCCESS;
}
}
}

98
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -16,31 +16,89 @@
*/
package org.apache.dolphinscheduler.service.process;
import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang.ArrayUtils;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CycleDependency;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
import com.cronutils.model.Cron;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* process relative dao that some mappers in this.
@ -52,6 +110,7 @@ public class ProcessService {
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@ -1001,8 +1060,9 @@ public class ProcessService {
if(taskInstance.getState() != ExecutionStatus.NEED_FAULT_TOLERANCE){
taskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1 );
}
taskInstance.setSubmitTime(null);
taskInstance.setStartTime(null);
taskInstance.setEndTime(null);
taskInstance.setStartTime(new Date());
taskInstance.setFlag(Flag.YES);
taskInstance.setHost(null);
taskInstance.setId(0);
@ -1012,7 +1072,12 @@ public class ProcessService {
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
taskInstance.setSubmitTime(new Date());
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
}
if (taskInstance.getFirstSubmitTime() == null) {
taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
}
boolean saveResult = saveTaskInstance(taskInstance);
if(!saveResult){
return null;
@ -1062,10 +1127,11 @@ public class ProcessService {
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
ExecutionStatus state = taskInstance.getState();
if(
// running or killed
// running, delayed or killed
// the task already exists in task queue
// return state
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
|| checkTaskExistsInTaskQueue(taskInstance)
){
@ -1588,7 +1654,7 @@ public class ProcessService {
*/
public List<CycleDependency> getCycleDependencies(int masterId,int[] ids,Date scheduledFireTime) throws Exception {
List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
if(ArrayUtils.isEmpty(ids)){
if (ids == null || ids.length == 0) {
logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList;
}
@ -1772,7 +1838,7 @@ public class ProcessService {
public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){
List<T> resultList = new ArrayList<T>();
if (!ArrayUtils.isEmpty(needChecks)) {
if (Objects.nonNull(needChecks) && needChecks.length > 0) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){

2
pom.xml

@ -792,6 +792,7 @@
<include>**/common/utils/RetryerUtilsTest.java</include>
<include>**/common/plugin/FilePluginManagerTest</include>
<include>**/common/plugin/PluginClassLoaderTest</include>
<include>**/common/enums/ExecutionStatusTest</include>
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/CommandMapperTest.java</include>
@ -842,6 +843,7 @@
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>

4
sql/dolphinscheduler-postgre.sql

@ -566,8 +566,10 @@ CREATE TABLE t_ds_task_instance (
retry_interval int DEFAULT NULL ,
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
worker_group varchar(64),
worker_group varchar(64),
executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
PRIMARY KEY (id)
) ;

2
sql/dolphinscheduler_mysql.sql

@ -707,6 +707,8 @@ CREATE TABLE `t_ds_task_instance` (
`task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,

1
sql/upgrade/1.3.0_schema/mysql/dolphinscheduler_ddl.sql

@ -297,4 +297,3 @@ delimiter ;
CALL ac_dolphin_T_t_ds_user_A_state;
DROP PROCEDURE ac_dolphin_T_t_ds_user_A_state;

58
sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_task_instance_A_first_submit_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_first_submit_time;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_first_submit_time()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='first_submit_time')
THEN
ALTER TABLE t_ds_task_instance ADD `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_first_submit_time();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_first_submit_time;
-- uc_dolphin_T_t_ds_task_instance_A_delay_time
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='delay_time')
THEN
ALTER TABLE t_ds_task_instance ADD `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time;

16
sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

52
sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- uc_dolphin_T_t_ds_task_instance_A_first_submit_time
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_first_submit_time() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='first_submit_time')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN first_submit_time timestamp DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_first_submit_time();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_first_submit_time();
-- uc_dolphin_T_t_ds_task_instance_A_delay_time
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_delay_time() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='delay_time')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN delay_time int DEFAULT '0';
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time();

16
sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Loading…
Cancel
Save