Browse Source

Add pause to task instance status (#11390)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
5fa3e7b1ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java
  2. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  6. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  7. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  8. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java
  9. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  10. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java

@ -17,25 +17,24 @@
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import com.facebook.presto.jdbc.internal.guava.collect.ImmutableSet;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.experimental.UtilityClass;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@UtilityClass
public class TaskMetrics {
private final Map<String, Counter> taskInstanceCounters = new HashMap<>();
private final Set<String> taskInstanceStates = ImmutableSet.of(
"submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "fail", "stop");
"submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop");
static {
for (final String state : taskInstanceStates) {
@ -44,8 +43,7 @@ public class TaskMetrics {
Counter.builder("ds.task.instance.count")
.tags("state", state)
.description(String.format("Process instance %s total count", state))
.register(Metrics.globalRegistry)
);
.register(Metrics.globalRegistry));
}
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -17,19 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import com.google.common.collect.Lists;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -68,7 +60,13 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.event.*;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
@ -79,10 +77,9 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -103,13 +100,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import com.google.common.collect.Lists;
import lombok.NonNull;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
/**
* Workflow execute task, used to execute a workflow instance.
@ -1509,7 +1511,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return WorkflowExecutionStatus.FAILURE;
}
if (processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) {
List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd()
|| readyToSubmitTaskQueue.size() > 0) {
return WorkflowExecutionStatus.PAUSE;
} else {
return WorkflowExecutionStatus.SUCCESS;
@ -1534,7 +1538,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
if (readyToSubmitTaskQueue.size() > 0) {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
iter.next().setState(TaskExecutionStatus.KILL);
iter.next().setState(TaskExecutionStatus.PAUSE);
}
}
return WorkflowExecutionStatus.BLOCK;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -85,7 +85,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
// todo: task cannot be pause
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setState(TaskExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("blocking task has been paused");

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -95,7 +95,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -140,7 +140,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -104,7 +104,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean pauseTask() {
this.taskInstance.setState(TaskExecutionStatus.KILL);
this.taskInstance.setState(TaskExecutionStatus.PAUSE);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -239,7 +239,7 @@ public class BlockingTaskTest {
blockingTaskProcessor.action(TaskAction.SUBMIT);
blockingTaskProcessor.action(TaskAction.PAUSE);
TaskExecutionStatus status = taskInstance.getState();
Assert.assertEquals(TaskExecutionStatus.KILL, status);
Assert.assertEquals(TaskExecutionStatus.PAUSE, status);
}
@Test

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/WorkflowStateEventChangeCommand.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -27,6 +28,7 @@ import java.io.Serializable;
* db task final result response command
*/
@Data
@NoArgsConstructor
public class WorkflowStateEventChangeCommand implements Serializable {
private String key;

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1570,7 +1570,7 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
if (processInstanceState == WorkflowExecutionStatus.READY_PAUSE) {
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setState(TaskExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));
@ -1614,7 +1614,7 @@ public class ProcessServiceImpl implements ProcessService {
// return pasue /stop if process instance state is ready pause / stop
// or return submit success
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
state = TaskExecutionStatus.KILL;
state = TaskExecutionStatus.PAUSE;
} else if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP
|| !checkProcessStrategy(taskInstance, processInstance)) {
state = TaskExecutionStatus.KILL;

7
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java

@ -26,6 +26,7 @@ public enum TaskExecutionStatus {
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
PAUSE(3, "pause"),
FAILURE(6, "failure"),
SUCCESS(7, "success"),
NEED_FAULT_TOLERANCE(8, "need fault tolerance"),
@ -82,8 +83,12 @@ public enum TaskExecutionStatus {
return this == TaskExecutionStatus.FAILURE;
}
public boolean isPause() {
return this == TaskExecutionStatus.PAUSE;
}
public boolean isFinished() {
return isSuccess() || isKill() || isFailure();
return isSuccess() || isKill() || isFailure() || isPause();
}
public boolean isNeedFaultTolerance() {

Loading…
Cancel
Save