Browse Source

[Feature-5752][server]Add blocking task (#6272)

* fix python task execution error

* modify code style

* delete AbstractTask#setCommand

* migrate blocking task alert feature and common attribute

* blocking task migrate(complete base features)

* migrate blocking task test

* resolve TaskNode.java conflict

* fix package problem in BlockingParameters

* resolve duplicate code

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* resolve code style

* optimise code based on pr#6283 in ds master repo

* add log

* modify log

* resolve conflicts

* change hardcode to variable

* Add blocking task UI

* refactor old code to compatable with the newest branch

* add override annotation

* remove old code

* remove never used packages

* add missing packages declaration

* Refine the i18n description

* add UT

* resolve some code smell problems

* add fetch-depth

* enable unit test in sonar analysis

* disable unit test

* remove unused packages

* resolve duplicate codes

* add logs when task status changed and rename some variables

* add BLOCK & READY_BLOCK status and corresponding processing code

* revoke UI

* revert unit-test

Co-authored-by: Jave-Chen <baicai.chen@gmail.com>
3.0.0/version-upgrade
Martin Huang 3 years ago committed by GitHub
parent
commit
b0fc6e7a69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 35
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/BlockingOpportunity.java
  3. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  5. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  6. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  7. 61
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/blocking/BlockingParameters.java
  8. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  9. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  10. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  11. 19
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  12. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  13. 125
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  14. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  15. 197
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  16. 270
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  17. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  18. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
  19. 22
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -567,6 +567,8 @@ public final class Constants {
public static final String QUEUE_NAME = "queueName";
public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0;
public static final int LOG_QUERY_LIMIT = 4096;
public static final String BLOCKING_CONDITION = "blockingCondition";
public static final String ALERT_WHEN_BLOCKING = "alertWhenBlocking";
/**
* master/worker server use for zk

35
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/BlockingOpportunity.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum BlockingOpportunity {
BLOCKING_ON_SUCCESS("BlockingOnSuccess"),
BLOCKING_ON_FAILED("BlockingOnFailed");
private final String desc;
BlockingOpportunity(String desc){
this.desc = desc;
}
public String getDesc() {
return desc;
}
}

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -43,6 +43,8 @@ public enum ExecutionStatus {
* 12 delay execution
* 13 forced success
* 14 serial wait
* 15 ready block
* 16 block
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
@ -58,7 +60,9 @@ public enum ExecutionStatus {
WAITING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success"),
SERIAL_WAIT(14, "serial wait");
SERIAL_WAIT(14, "serial wait"),
READY_BLOCK(15, "ready block"),
BLOCK(16, "block");
ExecutionStatus(int code, String descp) {
this.code = code;
@ -102,7 +106,7 @@ public enum ExecutionStatus {
*/
public boolean typeIsFinished() {
return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause()
|| typeIsStop();
|| typeIsStop() || typeIsBlock();
}
/**
@ -141,6 +145,15 @@ public enum ExecutionStatus {
return this == RUNNING_EXECUTION || this == WAITING_DEPEND || this == DELAY_EXECUTION;
}
/**
* status is block
*
* @return status
*/
public boolean typeIsBlock() {
return this == BLOCK;
}
/**
* status is cancel
*

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -26,8 +26,8 @@ public enum StateEventType {
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry")
;
TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked");
StateEventType(int code, String descp) {
this.code = code;

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -42,6 +42,7 @@ public enum TaskType {
* 15 PIGEON
* 16 DATA_QUALITY
* 17 EMR
* 18 BLOCKING
*/
SHELL(0, "SHELL"),
SQL(1, "SQL"),
@ -61,6 +62,7 @@ public enum TaskType {
PIGEON(15, "PIGEON"),
DATA_QUALITY(16, "DATA_QUALITY"),
EMR(17, "EMR"),
BLOCKING(18, "BLOCKING");
;
TaskType(int code, String desc) {

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -398,6 +398,10 @@ public class TaskNode {
return preTaskNodeList;
}
public boolean isBlockingTask() {
return TaskType.BLOCKING.getDesc().equalsIgnoreCase(this.getType());
}
public void setPreTaskNodeList(List<PreviousTaskNode> preTaskNodeList) {
this.preTaskNodeList = preTaskNodeList;
}

61
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/blocking/BlockingParameters.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.blocking;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
public class BlockingParameters extends AbstractParameters {
// condition of blocking: BlockingOnFailed or BlockingOnSuccess
private String blockingOpportunity;
// if true, alert when blocking, otherwise do nothing
private boolean isAlertWhenBlocking;
@Override
public boolean checkParameters() {
return !StringUtils.isEmpty(blockingOpportunity);
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
public String getBlockingOpportunity() {
return blockingOpportunity;
}
public void setBlockingCondition(String blockingOpportunity) {
this.blockingOpportunity = blockingOpportunity;
}
public boolean isAlertWhenBlocking() {
return isAlertWhenBlocking;
}
public void setAlertWhenBlocking(boolean alertWhenBlocking) {
isAlertWhenBlocking = alertWhenBlocking;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
@ -90,6 +91,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, DataQualityParameters.class);
case "SWITCH":
return JSONUtils.parseObject(parameter, SwitchParameters.class);
case "BLOCKING":
return JSONUtils.parseObject(parameter, BlockingParameters.class);
case "PIGEON":
return JSONUtils.parseObject(parameter, PigeonCommonParameters.class);
case "EMR":

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -249,6 +249,12 @@ public class ProcessInstance {
*/
private Date restartTime;
/**
* workflow block flag
*/
@TableField(exist = false)
private boolean isBlocked;
public ProcessInstance() {
}
@ -623,6 +629,14 @@ public class ProcessInstance {
this.processDefinitionVersion = processDefinitionVersion;
}
public boolean isBlocked() {
return isBlocked;
}
public void setBlocked(boolean blocked) {
isBlocked = blocked;
}
@Override
public String toString() {
return "ProcessInstance{"
@ -701,6 +715,8 @@ public class ProcessInstance {
+ ", restartTime='"
+ restartTime
+ '\''
+ ", isBlocked="
+ isBlocked
+ '}';
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -260,6 +260,7 @@ public class TaskInstance implements Serializable {
*/
private int delayTime;
/**
* task params
*/
@ -593,6 +594,10 @@ public class TaskInstance implements Serializable {
return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType);
}
public boolean isBlockingTask() {
return TaskType.BLOCKING.getDesc().equalsIgnoreCase(this.taskType);
}
/**
* determine if a task instance can retry
* if subProcess,

19
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -564,4 +564,23 @@ public class DagHelper {
}
return false;
}
/**
* is there have blocking node after the parent node
*/
public static boolean haveBlockingAfterNode(String parentNodeCode,
DAG<String,TaskNode,TaskNodeRelation> dag) {
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return false;
}
for (String nodeName : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeName);
List<String> preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class);
if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) {
return true;
}
}
return false;
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.hadoop.util.ThreadUtil;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;

125
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -37,16 +37,19 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -330,6 +333,9 @@ public class WorkflowExecuteThread {
case TASK_RETRY:
result = taskRetryEventHandler(stateEvent);
break;
case PROCESS_BLOCKED:
result = processBlockHandler(stateEvent);
break;
default:
break;
}
@ -430,9 +436,9 @@ public class WorkflowExecuteThread {
private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(),
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
@ -443,14 +449,17 @@ public class WorkflowExecuteThread {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
submitPostNode(Long.toString(taskInstance.getTaskCode()));
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (taskInstance.isConditionsTask()
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
|| DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
|| DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
@ -467,6 +476,7 @@ public class WorkflowExecuteThread {
/**
* release task group
*
* @param taskInstance
*/
private void releaseTaskGroup(TaskInstance taskInstance) {
@ -482,7 +492,7 @@ public class WorkflowExecuteThread {
} else {
ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId());
this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST);
}
}
}
@ -490,6 +500,7 @@ public class WorkflowExecuteThread {
/**
* crate new task instance to retry, different objects from the original
*
* @param taskInstance
*/
private void retryTaskInstance(TaskInstance taskInstance) {
@ -504,12 +515,12 @@ public class WorkflowExecuteThread {
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
if (!taskInstance.retryTaskIntervalOverTime()) {
logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}",
processInstance.getId(),
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
newTaskInstance.getRetryTimes(),
newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
processInstance.getId(),
newTaskInstance.getTaskCode(),
newTaskInstance.getState(),
newTaskInstance.getRetryTimes(),
newTaskInstance.getMaxRetryTimes(),
newTaskInstance.getRetryInterval());
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
} else {
@ -521,6 +532,7 @@ public class WorkflowExecuteThread {
/**
* handle task retry event
*
* @param stateEvent
* @return
*/
@ -665,6 +677,26 @@ public class WorkflowExecuteThread {
return true;
}
private boolean processBlockHandler(StateEvent stateEvent) {
try {
TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
if (!checkTaskInstanceByStateEvent(stateEvent)) {
logger.error("task {} is not a blocking task", task.getTaskCode());
return false;
}
BlockingParameters parameters = (BlockingParameters) TaskParametersUtils.getParameters(TaskType.BLOCKING.getDesc(),
task.getTaskParams());
if (parameters.isAlertWhenBlocking()) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
logger.info("processInstance {} block alert send successful!", processInstance.getId());
}
} catch (Exception e) {
logger.error("sending blocking message error:", e);
}
return true;
}
private boolean processComplementData() throws Exception {
if (!needComplementProcess()) {
return false;
@ -961,12 +993,20 @@ public class WorkflowExecuteThread {
stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance);
if (taskProcessor.taskInstance().getState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(stateEvent);
if (processInstance.isBlocked()) {
StateEvent processBlockEvent = new StateEvent();
processBlockEvent.setProcessInstanceId(this.processInstance.getId());
processBlockEvent.setTaskInstanceId(taskInstance.getId());
processBlockEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
processBlockEvent.setType(StateEventType.PROCESS_BLOCKED);
this.stateEvents.add(processBlockEvent);
}
StateEvent taskStateChangeEvent = new StateEvent();
taskStateChangeEvent.setProcessInstanceId(this.processInstance.getId());
taskStateChangeEvent.setTaskInstanceId(taskInstance.getId());
taskStateChangeEvent.setExecutionStatus(taskProcessor.taskInstance().getState());
taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(taskStateChangeEvent);
}
return taskInstance;
} catch (Exception e) {
@ -1027,6 +1067,7 @@ public class WorkflowExecuteThread {
/**
* clone a new taskInstance for retry and reset some logic fields
*
* @return
*/
public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) {
@ -1035,7 +1076,7 @@ public class WorkflowExecuteThread {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
newTaskInstance.setProcessInstance(processInstance);
@ -1048,6 +1089,7 @@ public class WorkflowExecuteThread {
/**
* clone a new taskInstance for tolerant and reset some logic fields
*
* @return
*/
public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) {
@ -1056,7 +1098,7 @@ public class WorkflowExecuteThread {
logger.error("taskNode is null, code:{}", taskInstance.getTaskCode());
return null;
}
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode);
newTaskInstance.setTaskDefine(taskInstance.getTaskDefine());
newTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
newTaskInstance.setProcessInstance(processInstance);
@ -1067,6 +1109,7 @@ public class WorkflowExecuteThread {
/**
* new a taskInstance
*
* @param processInstance
* @param taskNode
* @return
@ -1286,8 +1329,8 @@ public class WorkflowExecuteThread {
if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) {
return DependResult.NON_EXEC;
}
// ignore task state if current task is condition
if (taskNode.isConditionsTask()) {
// ignore task state if current task is condition and block
if (taskNode.isConditionsTask() || taskNode.isBlockingTask()) {
continue;
}
if (!dependTaskSuccess(depsNode, taskCode)) {
@ -1366,6 +1409,7 @@ public class WorkflowExecuteThread {
if (state == ExecutionStatus.READY_STOP
|| state == ExecutionStatus.READY_PAUSE
|| state == ExecutionStatus.WAITING_THREAD
|| state == ExecutionStatus.READY_BLOCK
|| state == ExecutionStatus.DELAY_EXECUTION) {
// if the running task is not completed, the state remains unchanged
return state;
@ -1402,8 +1446,8 @@ public class WorkflowExecuteThread {
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskQueue.size() == 0
&& activeTaskProcessorMaps.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
&& activeTaskProcessorMaps.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
}
}
return false;
@ -1434,6 +1478,7 @@ public class WorkflowExecuteThread {
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList)
|| processInstance.isBlocked()
|| !isComplementEnd()
|| readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE;
@ -1442,6 +1487,30 @@ public class WorkflowExecuteThread {
}
}
/**
* prepare for block
* if process has tasks still running, pause them
* if readyToSubmitTaskQueue is not empty, kill them
* else return block status directly
*
* @return ExecutionStatus
*/
private ExecutionStatus processReadyBlock() {
if (activeTaskProcessorMaps.size() > 0) {
for (ITaskProcessor taskProcessor : activeTaskProcessorMaps.values()) {
if (!TaskType.BLOCKING.getDesc().equals(taskProcessor.getType())) {
taskProcessor.action(TaskAction.PAUSE);
}
}
}
if (readyToSubmitTaskQueue.size() > 0) {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
iter.next().setState(ExecutionStatus.KILL);
}
}
return ExecutionStatus.BLOCK;
}
/**
* generate the latest process instance status by the tasks state
*
@ -1455,6 +1524,11 @@ public class WorkflowExecuteThread {
return runningState(state);
}
// block
if (state == ExecutionStatus.READY_BLOCK) {
return processReadyBlock();
}
// waiting thread
if (hasWaitingThreadTask()) {
return ExecutionStatus.WAITING_THREAD;
@ -1598,7 +1672,7 @@ public class WorkflowExecuteThread {
return;
}
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode());
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
@ -1845,6 +1919,7 @@ public class WorkflowExecuteThread {
/**
* check if had not fail task by taskCode and version
*
* @param taskCode
* @param version
* @return

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -238,6 +238,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
@Override
public TaskInstance taskInstance() {
return this.taskInstance;
}

197
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.BlockingOpportunity;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.google.auto.service.AutoService;
/**
* blocking task processor
*/
@AutoService(ITaskProcessor.class)
public class BlockingTaskProcessor extends BaseTaskProcessor {
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
/**
* condition result
*/
private DependResult conditionResult = DependResult.WAITING;
/**
* blocking parameters
*/
private BlockingParameters blockingParam;
/**
* complete task map
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
private void initTaskParameters() {
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
this.taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);
this.dependentParameters = taskInstance.getDependency();
this.blockingParam = (BlockingParameters) TaskParametersUtils.getParameters(
TaskType.BLOCKING.getDesc(), taskInstance.getTaskParams());
}
@Override
protected boolean pauseTask() {
taskInstance.setState(ExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("blocking task has been paused");
return true;
}
@Override
protected boolean killTask() {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
logger.info("blocking task has been killed");
return true;
}
@Override
protected boolean taskTimeout() {
return true;
}
@Override
protected boolean submitTask() {
this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
this.setTaskExecutionLogger();
initTaskParameters();
logger.info("blocking task start");
return true;
}
@Override
protected boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
return true;
}
@Override
protected boolean dispatchTask() {
return false;
}
@Override
public String getType() {
return TaskType.BLOCKING.getDesc();
}
/**
* depend result for depend item
*/
private DependResult getDependResultForItem(DependentItem item) {
DependResult dependResult = DependResult.SUCCESS;
if (!completeTaskList.containsKey(item.getDepTaskCode())) {
logger.info("depend item: {} have not completed yet.", item.getDepTaskCode());
dependResult = DependResult.FAILED;
return dependResult;
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTaskCode());
if (executionStatus != item.getStatus()) {
logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTaskCode(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTaskCode(), dependResult);
return dependResult;
}
private void setConditionResult() {
List<TaskInstance> taskInstances = processService
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
}
List<DependResult> tempResultList = new ArrayList<>();
for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) {
List<DependResult> itemDependResult = new ArrayList<>();
for (DependentItem item : dependentTaskModel.getDependItemList()) {
itemDependResult.add(getDependResultForItem(item));
}
DependResult tempResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
tempResultList.add(tempResult);
}
conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), tempResultList);
logger.info("the blocking task depend result : {}", conditionResult);
}
private void endTask() {
ExecutionStatus status = ExecutionStatus.SUCCESS;
DependResult expected = this.blockingParam.getBlockingOpportunity()
.equals(BlockingOpportunity.BLOCKING_ON_SUCCESS.getDesc())
? DependResult.SUCCESS : DependResult.FAILED;
boolean isBlocked = (expected == this.conditionResult);
logger.info("blocking opportunity: expected-->{}, actual-->{}", expected, this.conditionResult);
processInstance.setBlocked(isBlocked);
if (isBlocked) {
processInstance.setState(ExecutionStatus.READY_BLOCK);
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
logger.info("blocking task execute complete, blocking:{}", isBlocked);
}
}

270
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.BlockingTaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ApplicationContext;
@RunWith(MockitoJUnitRunner.Silent.class)
public class BlockingTaskTest {
/**
* TaskNode.runFlag : task can be run normally
*/
public static final String FLOW_NODE_RUN_FLAG_NORMAL = "NORMAL";
private ProcessService processService;
private ProcessInstance processInstance;
private MasterConfig config;
@Before
public void before() {
// init spring context
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
// mock master
config = new MasterConfig();
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
config.setTaskCommitRetryTimes(3);
config.setTaskCommitInterval(1000);
// mock process service
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
// mock process instance
processInstance = getProcessInstance();
Mockito.when(processService
.findProcessInstanceById(processInstance.getId()))
.thenReturn(processInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
private ProcessInstance getProcessInstance() {
// mock process instance
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1000);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setProcessDefinitionCode(1L);
return processInstance;
}
private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) {
// wrap taskNode
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(100);
taskInstance.setName(taskNode.getName());
taskInstance.setTaskType(taskNode.getType().toUpperCase());
taskInstance.setTaskCode(taskNode.getCode());
taskInstance.setTaskDefinitionVersion(taskNode.getVersion());
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setTaskParams(taskNode.getTaskParams());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setFirstSubmitTime(new Date());
Mockito.when(processService
.submitTaskWithRetry(Mockito.any(ProcessInstance.class)
, Mockito.any(TaskInstance.class)
, Mockito.any(Integer.class), Mockito.any(Integer.class)))
.thenReturn(taskInstance);
return taskInstance;
}
private TaskNode getTaskNode(String blockingCondition) {
// mock task nodes
// 1----\
// 2-----4(Blocking Node)
// 3----/
// blocking logic: 1-->SUCCESS 2-->SUCCESS 3-->SUCCESS
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-1000");
taskNode.setName("4");
taskNode.setCode(1L);
taskNode.setVersion(1);
taskNode.setType(TaskType.BLOCKING.getDesc());
taskNode.setRunFlag(FLOW_NODE_RUN_FLAG_NORMAL);
DependentItem dependentItemA = new DependentItem();
dependentItemA.setDepTaskCode(1L);
dependentItemA.setStatus(ExecutionStatus.SUCCESS);
DependentItem dependentItemB = new DependentItem();
dependentItemB.setDepTaskCode(2L);
dependentItemB.setStatus(ExecutionStatus.SUCCESS);
DependentItem dependentItemC = new DependentItem();
dependentItemC.setDepTaskCode(3L);
dependentItemC.setStatus(ExecutionStatus.SUCCESS);
// build relation
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setDependItemList(Stream.of(dependentItemA, dependentItemB, dependentItemC)
.collect(Collectors.toList()));
dependentTaskModel.setRelation(DependentRelation.AND);
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
dependentParameters.setRelation(DependentRelation.AND);
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
// set blocking node params
BlockingParameters blockingParameters = new BlockingParameters();
blockingParameters.setAlertWhenBlocking(false);
blockingParameters.setBlockingCondition(blockingCondition);
taskNode.setParams(JSONUtils.toJsonString(blockingParameters));
return taskNode;
}
private TaskInstance testBasicInit(String blockingCondition, ExecutionStatus... expectResults) {
TaskInstance taskInstance = getTaskInstance(getTaskNode(blockingCondition), processInstance);
Mockito.when(processService
.submitTask(processInstance, taskInstance))
.thenReturn(taskInstance);
Mockito.when(processService
.findTaskInstanceById(taskInstance.getId()))
.thenReturn(taskInstance);
// for BlockingTaskExecThread.initTaskParameters
Mockito.when(processService
.saveTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.updateTaskState
Mockito.when(processService
.updateTaskInstance(taskInstance))
.thenReturn(true);
// for BlockingTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults);
Mockito.when(processService.
findValidTaskListByProcessId(processInstance.getId()))
.thenReturn(conditions);
return taskInstance;
}
/**
* mock task instance and its execution result in front of blocking node
*/
private List<TaskInstance> getTaskInstanceForValidTaskList(ExecutionStatus... status) {
List<TaskInstance> taskInstanceList = new ArrayList<>();
for (int i = 1; i <= status.length; i++) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(i);
taskInstance.setName(String.valueOf(i));
taskInstance.setState(status[i - 1]);
taskInstanceList.add(taskInstance);
}
return taskInstanceList;
}
@Test
public void testBlockingTaskSubmit() {
TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS);
BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor();
blockingTaskProcessor.init(taskInstance, processInstance);
boolean res = blockingTaskProcessor.action(TaskAction.SUBMIT);
Assert.assertEquals(true, res);
}
@Test
public void testPauseTask() {
TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS);
BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor();
blockingTaskProcessor.init(taskInstance, processInstance);
blockingTaskProcessor.action(TaskAction.SUBMIT);
blockingTaskProcessor.action(TaskAction.PAUSE);
ExecutionStatus status = taskInstance.getState();
Assert.assertEquals(ExecutionStatus.PAUSE, status);
}
@Test
public void testBlocking() {
TaskInstance taskInstance = testBasicInit("BlockingOnFailed",
ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS);
BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor();
blockingTaskProcessor.init(taskInstance, processInstance);
blockingTaskProcessor.action(TaskAction.SUBMIT);
blockingTaskProcessor.action(TaskAction.RUN);
ExecutionStatus status = processInstance.getState();
Assert.assertEquals(ExecutionStatus.READY_BLOCK, status);
}
@Test
public void testNoneBlocking() {
TaskInstance taskInstance = testBasicInit("BlockingOnSuccess",
ExecutionStatus.SUCCESS, ExecutionStatus.SUCCESS, ExecutionStatus.SUCCESS);
BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor();
blockingTaskProcessor.init(taskInstance, processInstance);
blockingTaskProcessor.action(TaskAction.SUBMIT);
blockingTaskProcessor.action(TaskAction.RUN);
ExecutionStatus status = processInstance.getState();
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, status);
}
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -272,4 +272,4 @@ public class WorkflowExecuteThreadTest {
return schedulerList;
}
}
}

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -360,4 +360,39 @@ public class ProcessAlertManager {
public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) {
alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, taskDefinition);
}
/**
*
* check node type and process blocking flag, then insert a block record into db
*
* @param processInstance process instance
* @param projectUser the project owner
*/
public void sendProcessBlockingAlert(ProcessInstance processInstance,
ProjectUser projectUser) {
Alert alert = new Alert();
String cmdName = getCommandCnName(processInstance.getCommandType());
List<ProcessAlertContent> blockingNodeList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.projectId(projectUser.getProjectId())
.projectName(projectUser.getProjectName())
.owner(projectUser.getUserName())
.processId(processInstance.getId())
.processName(processInstance.getName())
.processType(processInstance.getCommandType())
.processState(processInstance.getState())
.runTimes(processInstance.getRunTimes())
.processStartTime(processInstance.getStartTime())
.processEndTime(processInstance.getEndTime())
.processHost(processInstance.getHost())
.build();
blockingNodeList.add(processAlertContent);
String content = JSONUtils.toJsonString(blockingNodeList);
alert.setTitle(cmdName + " Blocked");
alert.setContent(content);
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setCreateTime(new Date());
alertDao.addAlert(alert);
logger.info("add alert to db, alert: {}",alert);
}
}

22
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.junit.Test;
@ -90,4 +91,25 @@ public class ProcessAlertManagerTest {
processAlertManager.sendAlertProcessInstance(processInstance, taskInstanceList, projectUser);
}
/**
* send blocking alert
*/
@Test
public void sendBlockingAlertTest() {
// process instance
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test-process-01");
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRunTimes(0);
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstance.setHost("127.0.0.1");
processInstance.setWarningGroupId(1);
ProjectUser projectUser = new ProjectUser();
processAlertManager.sendProcessBlockingAlert(processInstance,projectUser);
}
}

Loading…
Cancel
Save