diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d2b5952149..aa13eec513 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -567,6 +567,8 @@ public final class Constants { public static final String QUEUE_NAME = "queueName"; public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0; public static final int LOG_QUERY_LIMIT = 4096; + public static final String BLOCKING_CONDITION = "blockingCondition"; + public static final String ALERT_WHEN_BLOCKING = "alertWhenBlocking"; /** * master/worker server use for zk diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/BlockingOpportunity.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/BlockingOpportunity.java new file mode 100644 index 0000000000..c2d103e5d8 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/BlockingOpportunity.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum BlockingOpportunity { + + BLOCKING_ON_SUCCESS("BlockingOnSuccess"), + BLOCKING_ON_FAILED("BlockingOnFailed"); + + private final String desc; + + BlockingOpportunity(String desc){ + this.desc = desc; + } + + public String getDesc() { + return desc; + } + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index 009eeb23be..7f0a9714cf 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -43,6 +43,8 @@ public enum ExecutionStatus { * 12 delay execution * 13 forced success * 14 serial wait + * 15 ready block + * 16 block */ SUBMITTED_SUCCESS(0, "submit success"), RUNNING_EXECUTION(1, "running"), @@ -58,7 +60,9 @@ public enum ExecutionStatus { WAITING_DEPEND(11, "waiting depend node complete"), DELAY_EXECUTION(12, "delay execution"), FORCED_SUCCESS(13, "forced success"), - SERIAL_WAIT(14, "serial wait"); + SERIAL_WAIT(14, "serial wait"), + READY_BLOCK(15, "ready block"), + BLOCK(16, "block"); ExecutionStatus(int code, String descp) { this.code = code; @@ -102,7 +106,7 @@ public enum ExecutionStatus { */ public boolean typeIsFinished() { return typeIsSuccess() || typeIsFailure() || typeIsCancel() || typeIsPause() - || typeIsStop(); + || typeIsStop() || typeIsBlock(); } /** @@ -141,6 +145,15 @@ public enum ExecutionStatus { return this == RUNNING_EXECUTION || this == WAITING_DEPEND || this == DELAY_EXECUTION; } + /** + * status is block + * + * @return status + */ + public boolean typeIsBlock() { + return this == BLOCK; + } + /** * status is cancel * diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java index c758bc7b4e..292202501c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java @@ -26,8 +26,8 @@ public enum StateEventType { PROCESS_TIMEOUT(2, "process timeout"), TASK_TIMEOUT(3, "task timeout"), WAIT_TASK_GROUP(4, "wait task group"), - TASK_RETRY(5, "task retry") - ; + TASK_RETRY(5, "task retry"), + PROCESS_BLOCKED(6, "process blocked"); StateEventType(int code, String descp) { this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 393c607104..1462d1c489 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -42,6 +42,7 @@ public enum TaskType { * 15 PIGEON * 16 DATA_QUALITY * 17 EMR + * 18 BLOCKING */ SHELL(0, "SHELL"), SQL(1, "SQL"), @@ -61,6 +62,7 @@ public enum TaskType { PIGEON(15, "PIGEON"), DATA_QUALITY(16, "DATA_QUALITY"), EMR(17, "EMR"), + BLOCKING(18, "BLOCKING"); ; TaskType(int code, String desc) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index dfcc11614c..30ee7fb64b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -398,6 +398,10 @@ public class TaskNode { return preTaskNodeList; } + public boolean isBlockingTask() { + return TaskType.BLOCKING.getDesc().equalsIgnoreCase(this.getType()); + } + public void setPreTaskNodeList(List preTaskNodeList) { this.preTaskNodeList = preTaskNodeList; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/blocking/BlockingParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/blocking/BlockingParameters.java new file mode 100644 index 0000000000..ae89f37727 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/blocking/BlockingParameters.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task.blocking; + +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +public class BlockingParameters extends AbstractParameters { + // condition of blocking: BlockingOnFailed or BlockingOnSuccess + + private String blockingOpportunity; + + // if true, alert when blocking, otherwise do nothing + + private boolean isAlertWhenBlocking; + + @Override + public boolean checkParameters() { + return !StringUtils.isEmpty(blockingOpportunity); + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + public String getBlockingOpportunity() { + return blockingOpportunity; + } + + public void setBlockingCondition(String blockingOpportunity) { + this.blockingOpportunity = blockingOpportunity; + } + + public boolean isAlertWhenBlocking() { + return isAlertWhenBlocking; + } + + public void setAlertWhenBlocking(boolean alertWhenBlocking) { + isAlertWhenBlocking = alertWhenBlocking; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index d5407b2332..e47654411b 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; @@ -90,6 +91,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, DataQualityParameters.class); case "SWITCH": return JSONUtils.parseObject(parameter, SwitchParameters.class); + case "BLOCKING": + return JSONUtils.parseObject(parameter, BlockingParameters.class); case "PIGEON": return JSONUtils.parseObject(parameter, PigeonCommonParameters.class); case "EMR": diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index c60dca9fd9..41fde2456e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -249,6 +249,12 @@ public class ProcessInstance { */ private Date restartTime; + /** + * workflow block flag + */ + @TableField(exist = false) + private boolean isBlocked; + public ProcessInstance() { } @@ -623,6 +629,14 @@ public class ProcessInstance { this.processDefinitionVersion = processDefinitionVersion; } + public boolean isBlocked() { + return isBlocked; + } + + public void setBlocked(boolean blocked) { + isBlocked = blocked; + } + @Override public String toString() { return "ProcessInstance{" @@ -701,6 +715,8 @@ public class ProcessInstance { + ", restartTime='" + restartTime + '\'' + + ", isBlocked=" + + isBlocked + '}'; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 8b18195574..d48ad66d19 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -260,6 +260,7 @@ public class TaskInstance implements Serializable { */ private int delayTime; + /** * task params */ @@ -593,6 +594,10 @@ public class TaskInstance implements Serializable { return TaskType.SWITCH.getDesc().equalsIgnoreCase(this.taskType); } + public boolean isBlockingTask() { + return TaskType.BLOCKING.getDesc().equalsIgnoreCase(this.taskType); + } + /** * determine if a task instance can retry * if subProcess, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java index e7404fe141..4c190413c4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java @@ -564,4 +564,23 @@ public class DagHelper { } return false; } + + /** + * is there have blocking node after the parent node + */ + public static boolean haveBlockingAfterNode(String parentNodeCode, + DAG dag) { + Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); + if (CollectionUtils.isEmpty(subsequentNodes)) { + return false; + } + for (String nodeName : subsequentNodes) { + TaskNode taskNode = dag.getNode(nodeName); + List preTaskList = JSONUtils.toList(taskNode.getPreTasks(),String.class); + if (preTaskList.contains(parentNodeCode) && taskNode.isBlockingTask()) { + return true; + } + } + return false; + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index f19dfbe447..6411198108 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.StateEvent; @@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.hadoop.util.ThreadUtil; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index bc0dd09815..418e5dddfe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -37,16 +37,19 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -330,6 +333,9 @@ public class WorkflowExecuteThread { case TASK_RETRY: result = taskRetryEventHandler(stateEvent); break; + case PROCESS_BLOCKED: + result = processBlockHandler(stateEvent); + break; default: break; } @@ -430,9 +436,9 @@ public class WorkflowExecuteThread { private void taskFinished(TaskInstance taskInstance) { logger.info("work flow {} task id:{} code:{} state:{} ", processInstance.getId(), - taskInstance.getId(), - taskInstance.getTaskCode(), - taskInstance.getState()); + taskInstance.getId(), + taskInstance.getTaskCode(), + taskInstance.getState()); activeTaskProcessorMaps.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); @@ -443,14 +449,17 @@ public class WorkflowExecuteThread { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); - submitPostNode(Long.toString(taskInstance.getTaskCode())); + if (!processInstance.isBlocked()) { + submitPostNode(Long.toString(taskInstance.getTaskCode())); + } } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) { // retry task retryTaskInstance(taskInstance); } else if (taskInstance.getState().typeIsFailure()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (taskInstance.isConditionsTask() - || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { + || DagHelper.haveConditionsAfterNode(Long.toString(taskInstance.getTaskCode()), dag) + || DagHelper.haveBlockingAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) { submitPostNode(Long.toString(taskInstance.getTaskCode())); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); @@ -467,6 +476,7 @@ public class WorkflowExecuteThread { /** * release task group + * * @param taskInstance */ private void releaseTaskGroup(TaskInstance taskInstance) { @@ -482,7 +492,7 @@ public class WorkflowExecuteThread { } else { ProcessInstance processInstance = this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); this.processService.sendStartTask2Master(processInstance, nextTaskInstance.getId(), - org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); + org.apache.dolphinscheduler.remote.command.CommandType.TASK_WAKEUP_EVENT_REQUEST); } } } @@ -490,6 +500,7 @@ public class WorkflowExecuteThread { /** * crate new task instance to retry, different objects from the original + * * @param taskInstance */ private void retryTaskInstance(TaskInstance taskInstance) { @@ -504,12 +515,12 @@ public class WorkflowExecuteThread { waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); if (!taskInstance.retryTaskIntervalOverTime()) { logger.info("failure task will be submitted: process id: {}, task instance code: {} state:{} retry times:{} / {}, interval:{}", - processInstance.getId(), - newTaskInstance.getTaskCode(), - newTaskInstance.getState(), - newTaskInstance.getRetryTimes(), - newTaskInstance.getMaxRetryTimes(), - newTaskInstance.getRetryInterval()); + processInstance.getId(), + newTaskInstance.getTaskCode(), + newTaskInstance.getState(), + newTaskInstance.getRetryTimes(), + newTaskInstance.getMaxRetryTimes(), + newTaskInstance.getRetryInterval()); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); } else { @@ -521,6 +532,7 @@ public class WorkflowExecuteThread { /** * handle task retry event + * * @param stateEvent * @return */ @@ -665,6 +677,26 @@ public class WorkflowExecuteThread { return true; } + private boolean processBlockHandler(StateEvent stateEvent) { + try { + TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId()); + if (!checkTaskInstanceByStateEvent(stateEvent)) { + logger.error("task {} is not a blocking task", task.getTaskCode()); + return false; + } + BlockingParameters parameters = (BlockingParameters) TaskParametersUtils.getParameters(TaskType.BLOCKING.getDesc(), + task.getTaskParams()); + if (parameters.isAlertWhenBlocking()) { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); + logger.info("processInstance {} block alert send successful!", processInstance.getId()); + } + } catch (Exception e) { + logger.error("sending blocking message error:", e); + } + return true; + } + private boolean processComplementData() throws Exception { if (!needComplementProcess()) { return false; @@ -961,12 +993,20 @@ public class WorkflowExecuteThread { stateWheelExecuteThread.addTask4StateCheck(processInstance, taskInstance); if (taskProcessor.taskInstance().getState().typeIsFinished()) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(this.processInstance.getId()); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskInstance().getState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - this.stateEvents.add(stateEvent); + if (processInstance.isBlocked()) { + StateEvent processBlockEvent = new StateEvent(); + processBlockEvent.setProcessInstanceId(this.processInstance.getId()); + processBlockEvent.setTaskInstanceId(taskInstance.getId()); + processBlockEvent.setExecutionStatus(taskProcessor.taskInstance().getState()); + processBlockEvent.setType(StateEventType.PROCESS_BLOCKED); + this.stateEvents.add(processBlockEvent); + } + StateEvent taskStateChangeEvent = new StateEvent(); + taskStateChangeEvent.setProcessInstanceId(this.processInstance.getId()); + taskStateChangeEvent.setTaskInstanceId(taskInstance.getId()); + taskStateChangeEvent.setExecutionStatus(taskProcessor.taskInstance().getState()); + taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE); + this.stateEvents.add(taskStateChangeEvent); } return taskInstance; } catch (Exception e) { @@ -1027,6 +1067,7 @@ public class WorkflowExecuteThread { /** * clone a new taskInstance for retry and reset some logic fields + * * @return */ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { @@ -1035,7 +1076,7 @@ public class WorkflowExecuteThread { logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); return null; } - TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); newTaskInstance.setProcessInstance(processInstance); @@ -1048,6 +1089,7 @@ public class WorkflowExecuteThread { /** * clone a new taskInstance for tolerant and reset some logic fields + * * @return */ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { @@ -1056,7 +1098,7 @@ public class WorkflowExecuteThread { logger.error("taskNode is null, code:{}", taskInstance.getTaskCode()); return null; } - TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); newTaskInstance.setProcessInstance(processInstance); @@ -1067,6 +1109,7 @@ public class WorkflowExecuteThread { /** * new a taskInstance + * * @param processInstance * @param taskNode * @return @@ -1286,8 +1329,8 @@ public class WorkflowExecuteThread { if (depTaskState.typeIsPause() || depTaskState.typeIsCancel()) { return DependResult.NON_EXEC; } - // ignore task state if current task is condition - if (taskNode.isConditionsTask()) { + // ignore task state if current task is condition and block + if (taskNode.isConditionsTask() || taskNode.isBlockingTask()) { continue; } if (!dependTaskSuccess(depsNode, taskCode)) { @@ -1366,6 +1409,7 @@ public class WorkflowExecuteThread { if (state == ExecutionStatus.READY_STOP || state == ExecutionStatus.READY_PAUSE || state == ExecutionStatus.WAITING_THREAD + || state == ExecutionStatus.READY_BLOCK || state == ExecutionStatus.DELAY_EXECUTION) { // if the running task is not completed, the state remains unchanged return state; @@ -1402,8 +1446,8 @@ public class WorkflowExecuteThread { } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { return readyToSubmitTaskQueue.size() == 0 - && activeTaskProcessorMaps.size() == 0 - && waitToRetryTaskInstanceMap.size() == 0; + && activeTaskProcessorMaps.size() == 0 + && waitToRetryTaskInstanceMap.size() == 0; } } return false; @@ -1434,6 +1478,7 @@ public class WorkflowExecuteThread { List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) + || processInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) { return ExecutionStatus.PAUSE; @@ -1442,6 +1487,30 @@ public class WorkflowExecuteThread { } } + /** + * prepare for block + * if process has tasks still running, pause them + * if readyToSubmitTaskQueue is not empty, kill them + * else return block status directly + * + * @return ExecutionStatus + */ + private ExecutionStatus processReadyBlock() { + if (activeTaskProcessorMaps.size() > 0) { + for (ITaskProcessor taskProcessor : activeTaskProcessorMaps.values()) { + if (!TaskType.BLOCKING.getDesc().equals(taskProcessor.getType())) { + taskProcessor.action(TaskAction.PAUSE); + } + } + } + if (readyToSubmitTaskQueue.size() > 0) { + for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) { + iter.next().setState(ExecutionStatus.KILL); + } + } + return ExecutionStatus.BLOCK; + } + /** * generate the latest process instance status by the tasks state * @@ -1455,6 +1524,11 @@ public class WorkflowExecuteThread { return runningState(state); } + // block + if (state == ExecutionStatus.READY_BLOCK) { + return processReadyBlock(); + } + // waiting thread if (hasWaitingThreadTask()) { return ExecutionStatus.WAITING_THREAD; @@ -1598,7 +1672,7 @@ public class WorkflowExecuteThread { return; } logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", - taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); + taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); readyToSubmitTaskQueue.put(taskInstance); } catch (Exception e) { logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); @@ -1845,6 +1919,7 @@ public class WorkflowExecuteThread { /** * check if had not fail task by taskCode and version + * * @param taskCode * @param version * @return diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 60ab1b8878..174d82f7dd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -238,6 +238,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return null; } + @Override public TaskInstance taskInstance() { return this.taskInstance; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java new file mode 100644 index 0000000000..ee52090d9b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.BlockingOpportunity; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.DependentUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.utils.LogUtils; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.auto.service.AutoService; + +/** + * blocking task processor + */ +@AutoService(ITaskProcessor.class) +public class BlockingTaskProcessor extends BaseTaskProcessor { + + /** + * dependent parameters + */ + private DependentParameters dependentParameters; + + /** + * condition result + */ + private DependResult conditionResult = DependResult.WAITING; + + /** + * blocking parameters + */ + private BlockingParameters blockingParam; + + /** + * complete task map + */ + private Map completeTaskList = new ConcurrentHashMap<>(); + + private void initTaskParameters() { + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + this.taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + this.taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + this.taskInstance.setStartTime(new Date()); + this.processService.saveTaskInstance(taskInstance); + this.dependentParameters = taskInstance.getDependency(); + this.blockingParam = (BlockingParameters) TaskParametersUtils.getParameters( + TaskType.BLOCKING.getDesc(), taskInstance.getTaskParams()); + } + + @Override + protected boolean pauseTask() { + taskInstance.setState(ExecutionStatus.PAUSE); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + logger.info("blocking task has been paused"); + return true; + } + + @Override + protected boolean killTask() { + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + logger.info("blocking task has been killed"); + return true; + } + + @Override + protected boolean taskTimeout() { + return true; + } + + @Override + protected boolean submitTask() { + this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); + if (this.taskInstance == null) { + return false; + } + this.setTaskExecutionLogger(); + initTaskParameters(); + logger.info("blocking task start"); + return true; + } + + @Override + protected boolean runTask() { + if (conditionResult.equals(DependResult.WAITING)) { + setConditionResult(); + endTask(); + } else { + endTask(); + } + return true; + } + + @Override + protected boolean dispatchTask() { + return false; + } + + @Override + public String getType() { + return TaskType.BLOCKING.getDesc(); + } + + /** + * depend result for depend item + */ + private DependResult getDependResultForItem(DependentItem item) { + + DependResult dependResult = DependResult.SUCCESS; + if (!completeTaskList.containsKey(item.getDepTaskCode())) { + logger.info("depend item: {} have not completed yet.", item.getDepTaskCode()); + dependResult = DependResult.FAILED; + return dependResult; + } + ExecutionStatus executionStatus = completeTaskList.get(item.getDepTaskCode()); + if (executionStatus != item.getStatus()) { + logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTaskCode(), item.getStatus(), executionStatus); + dependResult = DependResult.FAILED; + } + logger.info("dependent item complete {} {},{}", + Constants.DEPENDENT_SPLIT, item.getDepTaskCode(), dependResult); + return dependResult; + } + + private void setConditionResult() { + + List taskInstances = processService + .findValidTaskListByProcessId(taskInstance.getProcessInstanceId()); + for (TaskInstance task : taskInstances) { + completeTaskList.putIfAbsent(task.getTaskCode(), task.getState()); + } + + List tempResultList = new ArrayList<>(); + for (DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()) { + List itemDependResult = new ArrayList<>(); + for (DependentItem item : dependentTaskModel.getDependItemList()) { + itemDependResult.add(getDependResultForItem(item)); + } + DependResult tempResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult); + tempResultList.add(tempResult); + } + conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), tempResultList); + logger.info("the blocking task depend result : {}", conditionResult); + } + + private void endTask() { + ExecutionStatus status = ExecutionStatus.SUCCESS; + DependResult expected = this.blockingParam.getBlockingOpportunity() + .equals(BlockingOpportunity.BLOCKING_ON_SUCCESS.getDesc()) + ? DependResult.SUCCESS : DependResult.FAILED; + boolean isBlocked = (expected == this.conditionResult); + logger.info("blocking opportunity: expected-->{}, actual-->{}", expected, this.conditionResult); + processInstance.setBlocked(isBlocked); + if (isBlocked) { + processInstance.setState(ExecutionStatus.READY_BLOCK); + } + taskInstance.setState(status); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + logger.info("blocking task execute complete, blocking:{}", isBlocked); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java new file mode 100644 index 0000000000..3ca640f494 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master; + +import org.apache.dolphinscheduler.common.enums.DependentRelation; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.blocking.BlockingParameters; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.task.BlockingTaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.context.ApplicationContext; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class BlockingTaskTest { + + /** + * TaskNode.runFlag : task can be run normally + */ + public static final String FLOW_NODE_RUN_FLAG_NORMAL = "NORMAL"; + + private ProcessService processService; + + private ProcessInstance processInstance; + + private MasterConfig config; + + + @Before + public void before() { + // init spring context + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + + // mock master + config = new MasterConfig(); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + config.setTaskCommitRetryTimes(3); + config.setTaskCommitInterval(1000); + + // mock process service + processService = Mockito.mock(ProcessService.class); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + // mock process instance + processInstance = getProcessInstance(); + Mockito.when(processService + .findProcessInstanceById(processInstance.getId())) + .thenReturn(processInstance); + + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN); + taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN); + taskDefinition.setTimeout(0); + Mockito.when(processService.findTaskDefinition(1L, 1)) + .thenReturn(taskDefinition); + } + + private ProcessInstance getProcessInstance() { + // mock process instance + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1000); + processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + processInstance.setProcessDefinitionCode(1L); + + return processInstance; + } + + private TaskInstance getTaskInstance(TaskNode taskNode, ProcessInstance processInstance) { + // wrap taskNode + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(100); + taskInstance.setName(taskNode.getName()); + taskInstance.setTaskType(taskNode.getType().toUpperCase()); + taskInstance.setTaskCode(taskNode.getCode()); + taskInstance.setTaskDefinitionVersion(taskNode.getVersion()); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setTaskParams(taskNode.getTaskParams()); + taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + taskInstance.setFirstSubmitTime(new Date()); + Mockito.when(processService + .submitTaskWithRetry(Mockito.any(ProcessInstance.class) + , Mockito.any(TaskInstance.class) + , Mockito.any(Integer.class), Mockito.any(Integer.class))) + .thenReturn(taskInstance); + return taskInstance; + } + + private TaskNode getTaskNode(String blockingCondition) { + // mock task nodes + // 1----\ + // 2-----4(Blocking Node) + // 3----/ + // blocking logic: 1-->SUCCESS 2-->SUCCESS 3-->SUCCESS + TaskNode taskNode = new TaskNode(); + taskNode.setId("tasks-1000"); + taskNode.setName("4"); + taskNode.setCode(1L); + taskNode.setVersion(1); + taskNode.setType(TaskType.BLOCKING.getDesc()); + taskNode.setRunFlag(FLOW_NODE_RUN_FLAG_NORMAL); + + DependentItem dependentItemA = new DependentItem(); + dependentItemA.setDepTaskCode(1L); + dependentItemA.setStatus(ExecutionStatus.SUCCESS); + + DependentItem dependentItemB = new DependentItem(); + dependentItemB.setDepTaskCode(2L); + dependentItemB.setStatus(ExecutionStatus.SUCCESS); + + DependentItem dependentItemC = new DependentItem(); + dependentItemC.setDepTaskCode(3L); + dependentItemC.setStatus(ExecutionStatus.SUCCESS); + + // build relation + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setDependItemList(Stream.of(dependentItemA, dependentItemB, dependentItemC) + .collect(Collectors.toList())); + dependentTaskModel.setRelation(DependentRelation.AND); + + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + dependentParameters.setRelation(DependentRelation.AND); + + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + + // set blocking node params + BlockingParameters blockingParameters = new BlockingParameters(); + blockingParameters.setAlertWhenBlocking(false); + blockingParameters.setBlockingCondition(blockingCondition); + + taskNode.setParams(JSONUtils.toJsonString(blockingParameters)); + + return taskNode; + } + + private TaskInstance testBasicInit(String blockingCondition, ExecutionStatus... expectResults) { + + TaskInstance taskInstance = getTaskInstance(getTaskNode(blockingCondition), processInstance); + + Mockito.when(processService + .submitTask(processInstance, taskInstance)) + .thenReturn(taskInstance); + + Mockito.when(processService + .findTaskInstanceById(taskInstance.getId())) + .thenReturn(taskInstance); + + // for BlockingTaskExecThread.initTaskParameters + Mockito.when(processService + .saveTaskInstance(taskInstance)) + .thenReturn(true); + + // for BlockingTaskExecThread.updateTaskState + Mockito.when(processService + .updateTaskInstance(taskInstance)) + .thenReturn(true); + + // for BlockingTaskExecThread.waitTaskQuit + List conditions = getTaskInstanceForValidTaskList(expectResults); + Mockito.when(processService. + findValidTaskListByProcessId(processInstance.getId())) + .thenReturn(conditions); + return taskInstance; + } + + /** + * mock task instance and its execution result in front of blocking node + */ + private List getTaskInstanceForValidTaskList(ExecutionStatus... status) { + List taskInstanceList = new ArrayList<>(); + for (int i = 1; i <= status.length; i++) { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(i); + taskInstance.setName(String.valueOf(i)); + taskInstance.setState(status[i - 1]); + taskInstanceList.add(taskInstance); + } + return taskInstanceList; + } + + @Test + public void testBlockingTaskSubmit() { + TaskInstance taskInstance = testBasicInit("BlockingOnFailed", + ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS); + BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor(); + blockingTaskProcessor.init(taskInstance, processInstance); + boolean res = blockingTaskProcessor.action(TaskAction.SUBMIT); + Assert.assertEquals(true, res); + } + + @Test + public void testPauseTask() { + TaskInstance taskInstance = testBasicInit("BlockingOnFailed", + ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS); + BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor(); + blockingTaskProcessor.init(taskInstance, processInstance); + blockingTaskProcessor.action(TaskAction.SUBMIT); + blockingTaskProcessor.action(TaskAction.PAUSE); + ExecutionStatus status = taskInstance.getState(); + Assert.assertEquals(ExecutionStatus.PAUSE, status); + } + + @Test + public void testBlocking() { + TaskInstance taskInstance = testBasicInit("BlockingOnFailed", + ExecutionStatus.SUCCESS, ExecutionStatus.FAILURE, ExecutionStatus.SUCCESS); + BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor(); + blockingTaskProcessor.init(taskInstance, processInstance); + blockingTaskProcessor.action(TaskAction.SUBMIT); + blockingTaskProcessor.action(TaskAction.RUN); + ExecutionStatus status = processInstance.getState(); + Assert.assertEquals(ExecutionStatus.READY_BLOCK, status); + } + + @Test + public void testNoneBlocking() { + TaskInstance taskInstance = testBasicInit("BlockingOnSuccess", + ExecutionStatus.SUCCESS, ExecutionStatus.SUCCESS, ExecutionStatus.SUCCESS); + BlockingTaskProcessor blockingTaskProcessor = new BlockingTaskProcessor(); + blockingTaskProcessor.init(taskInstance, processInstance); + blockingTaskProcessor.action(TaskAction.SUBMIT); + blockingTaskProcessor.action(TaskAction.RUN); + ExecutionStatus status = processInstance.getState(); + Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, status); + } + + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 5907a81368..c9ed066003 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -272,4 +272,4 @@ public class WorkflowExecuteThreadTest { return schedulerList; } -} +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java index d5ef11e8aa..1a8cb8b4a2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java @@ -360,4 +360,39 @@ public class ProcessAlertManager { public void sendTaskTimeoutAlert(ProcessInstance processInstance, TaskInstance taskInstance, TaskDefinition taskDefinition) { alertDao.sendTaskTimeoutAlert(processInstance, taskInstance, taskDefinition); } + + /** + * + * check node type and process blocking flag, then insert a block record into db + * + * @param processInstance process instance + * @param projectUser the project owner + */ + public void sendProcessBlockingAlert(ProcessInstance processInstance, + ProjectUser projectUser) { + Alert alert = new Alert(); + String cmdName = getCommandCnName(processInstance.getCommandType()); + List blockingNodeList = new ArrayList<>(1); + ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder() + .projectId(projectUser.getProjectId()) + .projectName(projectUser.getProjectName()) + .owner(projectUser.getUserName()) + .processId(processInstance.getId()) + .processName(processInstance.getName()) + .processType(processInstance.getCommandType()) + .processState(processInstance.getState()) + .runTimes(processInstance.getRunTimes()) + .processStartTime(processInstance.getStartTime()) + .processEndTime(processInstance.getEndTime()) + .processHost(processInstance.getHost()) + .build(); + blockingNodeList.add(processAlertContent); + String content = JSONUtils.toJsonString(blockingNodeList); + alert.setTitle(cmdName + " Blocked"); + alert.setContent(content); + alert.setAlertGroupId(processInstance.getWarningGroupId()); + alert.setCreateTime(new Date()); + alertDao.addAlert(alert); + logger.info("add alert to db, alert: {}",alert); + } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java index f6ba45e878..d2808e64f7 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManagerTest.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import java.util.ArrayList; +import java.util.Date; import java.util.List; import org.junit.Test; @@ -90,4 +91,25 @@ public class ProcessAlertManagerTest { processAlertManager.sendAlertProcessInstance(processInstance, taskInstanceList, projectUser); } + /** + * send blocking alert + */ + @Test + public void sendBlockingAlertTest() { + // process instance + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setName("test-process-01"); + processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); + processInstance.setRunTimes(0); + processInstance.setStartTime(new Date()); + processInstance.setEndTime(new Date()); + processInstance.setHost("127.0.0.1"); + processInstance.setWarningGroupId(1); + + ProjectUser projectUser = new ProjectUser(); + + processAlertManager.sendProcessBlockingAlert(processInstance,projectUser); + } }