|
|
|
@ -81,12 +81,14 @@ import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
|
|
|
|
|
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
|
import org.apache.commons.lang3.ArrayUtils; |
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
import org.apache.commons.lang3.math.NumberUtils; |
|
|
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
|
import java.util.Arrays; |
|
|
|
|
import java.util.Collection; |
|
|
|
|
import java.util.Collections; |
|
|
|
|
import java.util.Date; |
|
|
|
|
import java.util.HashMap; |
|
|
|
|
import java.util.HashSet; |
|
|
|
@ -96,30 +98,29 @@ import java.util.Map;
|
|
|
|
|
import java.util.Objects; |
|
|
|
|
import java.util.Optional; |
|
|
|
|
import java.util.Set; |
|
|
|
|
import java.util.concurrent.Callable; |
|
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue; |
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import com.google.common.collect.Lists; |
|
|
|
|
|
|
|
|
|
import lombok.NonNull; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Workflow execute task, used to execute a workflow instance. |
|
|
|
|
*/ |
|
|
|
|
public class WorkflowExecuteRunnable implements Runnable { |
|
|
|
|
public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> { |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* logger of WorkflowExecuteThread |
|
|
|
|
*/ |
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* master config |
|
|
|
|
*/ |
|
|
|
|
private final MasterConfig masterConfig; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* process service |
|
|
|
|
*/ |
|
|
|
@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
private DAG<String, TaskNode, TaskNodeRelation> dag; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* key of workflow |
|
|
|
|
* unique key of workflow |
|
|
|
|
*/ |
|
|
|
|
private String key; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* start flag, true: start nodes submit completely |
|
|
|
|
*/ |
|
|
|
|
private boolean isStart = false; |
|
|
|
|
private volatile boolean isStart = false; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* submit failure nodes |
|
|
|
@ -235,6 +236,8 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
*/ |
|
|
|
|
private final StateWheelExecuteThread stateWheelExecuteThread; |
|
|
|
|
|
|
|
|
|
private final String masterAddress; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* @param processInstance processInstance |
|
|
|
|
* @param processService processService |
|
|
|
@ -243,19 +246,19 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
* @param masterConfig masterConfig |
|
|
|
|
* @param stateWheelExecuteThread stateWheelExecuteThread |
|
|
|
|
*/ |
|
|
|
|
public WorkflowExecuteRunnable(ProcessInstance processInstance |
|
|
|
|
, ProcessService processService |
|
|
|
|
, NettyExecutorManager nettyExecutorManager |
|
|
|
|
, ProcessAlertManager processAlertManager |
|
|
|
|
, MasterConfig masterConfig |
|
|
|
|
, StateWheelExecuteThread stateWheelExecuteThread) { |
|
|
|
|
public WorkflowExecuteRunnable(@NonNull ProcessInstance processInstance, |
|
|
|
|
@NonNull ProcessService processService, |
|
|
|
|
@NonNull NettyExecutorManager nettyExecutorManager, |
|
|
|
|
@NonNull ProcessAlertManager processAlertManager, |
|
|
|
|
@NonNull MasterConfig masterConfig, |
|
|
|
|
@NonNull StateWheelExecuteThread stateWheelExecuteThread) { |
|
|
|
|
this.processService = processService; |
|
|
|
|
this.processInstance = processInstance; |
|
|
|
|
this.masterConfig = masterConfig; |
|
|
|
|
this.nettyExecutorManager = nettyExecutorManager; |
|
|
|
|
this.processAlertManager = processAlertManager; |
|
|
|
|
this.stateWheelExecuteThread = stateWheelExecuteThread; |
|
|
|
|
TaskMetrics.registerTaskRunning(readyToSubmitTaskQueue::size); |
|
|
|
|
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); |
|
|
|
|
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
@ -280,6 +283,7 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
this.stateEvents.remove(stateEvent); |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
|
|
|
|
|
logger.error("state handle error:", e); |
|
|
|
|
} finally { |
|
|
|
|
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); |
|
|
|
@ -464,6 +468,7 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
|
|
|
|
|
if (taskInstance.getState().typeIsSuccess()) { |
|
|
|
|
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); |
|
|
|
|
// todo: merge the last taskInstance
|
|
|
|
|
processInstance.setVarPool(taskInstance.getVarPool()); |
|
|
|
|
processService.saveProcessInstance(processInstance); |
|
|
|
|
if (!processInstance.isBlocked()) { |
|
|
|
@ -822,18 +827,24 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
* ProcessInstance start entrypoint. |
|
|
|
|
*/ |
|
|
|
|
@Override |
|
|
|
|
public void run() { |
|
|
|
|
public WorkflowSubmitStatue call() { |
|
|
|
|
if (this.taskInstanceMap.size() > 0 || isStart) { |
|
|
|
|
logger.warn("The workflow has already been started"); |
|
|
|
|
return; |
|
|
|
|
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); |
|
|
|
|
buildFlowDag(); |
|
|
|
|
initTaskQueue(); |
|
|
|
|
submitPostNode(null); |
|
|
|
|
isStart = true; |
|
|
|
|
return WorkflowSubmitStatue.SUCCESS; |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("start process error, process instance id:{}", processInstance.getId(), e); |
|
|
|
|
logger.error("Start workflow error", e); |
|
|
|
|
return WorkflowSubmitStatue.FAILED; |
|
|
|
|
} finally { |
|
|
|
|
LoggerUtils.removeWorkflowInstanceIdMDC(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -883,7 +894,7 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* generate process dag |
|
|
|
|
* Generate process dag |
|
|
|
|
* |
|
|
|
|
* @throws Exception exception |
|
|
|
|
*/ |
|
|
|
@ -895,7 +906,7 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
processInstance.getProcessDefinitionVersion()); |
|
|
|
|
processInstance.setProcessDefinition(processDefinition); |
|
|
|
|
|
|
|
|
|
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); |
|
|
|
|
List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam()); |
|
|
|
|
|
|
|
|
|
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); |
|
|
|
|
List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations); |
|
|
|
@ -990,7 +1001,8 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
if (complementListDate.isEmpty() && needComplementProcess()) { |
|
|
|
|
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); |
|
|
|
|
logger.info(" process definition code:{} complement data: {}", |
|
|
|
|
processInstance.getProcessDefinitionCode(), complementListDate.toString()); |
|
|
|
|
processInstance.getProcessDefinitionCode(), |
|
|
|
|
complementListDate.toString()); |
|
|
|
|
|
|
|
|
|
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { |
|
|
|
|
processInstance.setScheduleTime(complementListDate.get(0)); |
|
|
|
@ -1082,7 +1094,7 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
HostUpdateCommand hostUpdateCommand = new HostUpdateCommand(); |
|
|
|
|
hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort())); |
|
|
|
|
hostUpdateCommand.setProcessHost(masterAddress); |
|
|
|
|
hostUpdateCommand.setTaskInstanceId(taskInstance.getId()); |
|
|
|
|
Host host = new Host(taskInstance.getHost()); |
|
|
|
|
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); |
|
|
|
@ -1843,7 +1855,6 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
* handling the list of tasks to be submitted |
|
|
|
|
*/ |
|
|
|
|
private void submitStandByTask() { |
|
|
|
|
try { |
|
|
|
|
int length = readyToSubmitTaskQueue.size(); |
|
|
|
|
for (int i = 0; i < length; i++) { |
|
|
|
|
TaskInstance task = readyToSubmitTaskQueue.peek(); |
|
|
|
@ -1855,7 +1866,8 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); |
|
|
|
|
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { |
|
|
|
|
task.setState(retryTask.getState()); |
|
|
|
|
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); |
|
|
|
|
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", |
|
|
|
|
task.getName()); |
|
|
|
|
removeTaskFromStandbyList(task); |
|
|
|
|
completeTaskMap.put(task.getTaskCode(), task.getId()); |
|
|
|
|
taskInstanceMap.put(task.getId(), task); |
|
|
|
@ -1899,49 +1911,28 @@ public class WorkflowExecuteRunnable implements Runnable {
|
|
|
|
|
dependResult); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("submit standby task error", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get recovery task instance list |
|
|
|
|
* |
|
|
|
|
* @param taskIdArray task id array |
|
|
|
|
* @return recovery task instance list |
|
|
|
|
*/ |
|
|
|
|
private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) { |
|
|
|
|
if (taskIdArray == null || taskIdArray.length == 0) { |
|
|
|
|
return new ArrayList<>(); |
|
|
|
|
} |
|
|
|
|
List<Integer> taskIdList = new ArrayList<>(taskIdArray.length); |
|
|
|
|
for (String taskId : taskIdArray) { |
|
|
|
|
try { |
|
|
|
|
Integer id = Integer.valueOf(taskId); |
|
|
|
|
taskIdList.add(id); |
|
|
|
|
} catch (Exception e) { |
|
|
|
|
logger.error("get recovery task instance failed ", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return processService.findTaskInstanceByIdList(taskIdList); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* get start task instance list |
|
|
|
|
* Get start task instance list from recover |
|
|
|
|
* |
|
|
|
|
* @param cmdParam command param |
|
|
|
|
* @return task instance list |
|
|
|
|
*/ |
|
|
|
|
private List<TaskInstance> getStartTaskInstanceList(String cmdParam) { |
|
|
|
|
|
|
|
|
|
List<TaskInstance> instanceList = new ArrayList<>(); |
|
|
|
|
protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) { |
|
|
|
|
Map<String, String> paramMap = JSONUtils.toMap(cmdParam); |
|
|
|
|
|
|
|
|
|
// todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam
|
|
|
|
|
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { |
|
|
|
|
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(COMMA); |
|
|
|
|
instanceList = getRecoverTaskInstanceList(idList); |
|
|
|
|
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); |
|
|
|
|
if (ArrayUtils.isNotEmpty(idList)) { |
|
|
|
|
List<Integer> taskInstanceIds = Arrays.stream(idList) |
|
|
|
|
.map(Integer::valueOf) |
|
|
|
|
.collect(Collectors.toList()); |
|
|
|
|
return processService.findTaskInstanceByIdList(taskInstanceIds); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return instanceList; |
|
|
|
|
return Collections.emptyList(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|