Browse Source

[Fix-10666] Workflow submit failed will still in memory and never retry (#10667)

* Workflow submit failed will still in memory and never retry
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
35a10d092f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java
  2. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java
  3. 161
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  4. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  5. 184
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  6. 34
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java
  7. 18
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  8. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.exception;
public class MasterException extends Exception {
public MasterException(String message) {
super(message);
}
public MasterException(String message, Throwable throwable) {
super(message, throwable);
}
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java

@ -90,6 +90,12 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry); .register(Metrics.globalRegistry);
} }
public static synchronized void registerProcessInstanceResubmitGauge(Supplier<Number> function) {
Gauge.builder("ds.workflow.instance.resubmit", function)
.description("The current process instance need to resubmit count")
.register(Metrics.globalRegistry);
}
public static void incProcessInstanceSubmit() { public static void incProcessInstanceSubmit() {
PROCESS_INSTANCE_SUBMIT_COUNTER.increment(); PROCESS_INSTANCE_SUBMIT_COUNTER.increment();
} }

161
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -28,11 +28,10 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@ -44,7 +43,9 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -52,6 +53,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import lombok.NonNull;
/** /**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/ */
@ -72,8 +75,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired @Autowired
private ProcessAlertManager processAlertManager; private ProcessAlertManager processAlertManager;
private NettyRemotingClient nettyRemotingClient;
@Autowired @Autowired
private NettyExecutorManager nettyExecutorManager; private NettyExecutorManager nettyExecutorManager;
@ -97,6 +98,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired @Autowired
private CuringParamsService curingGlobalParamsService; private CuringParamsService curingGlobalParamsService;
private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances = new LinkedBlockingQueue<>();
private Thread failedProcessInstanceResubmitThread;
private String masterAddress; private String masterAddress;
protected MasterSchedulerService() { protected MasterSchedulerService() {
@ -108,22 +113,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/ */
public void init() { public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
} }
@Override @Override
public synchronized void start() { public synchronized void start() {
logger.info("Master schedule service starting.."); logger.info("Master schedule service starting..");
this.stateWheelExecuteThread.start();
super.start(); super.start();
this.failedProcessInstanceResubmitThread.start();
logger.info("Master schedule service started..."); logger.info("Master schedule service started...");
} }
public void close() { public void close() {
logger.info("Master schedule service stopping..."); logger.info("Master schedule service stopping...");
nettyRemotingClient.close(); // these process instances will be failover, so we can safa clear here
submitFailedProcessInstances.clear();
logger.info("Master schedule service stopped..."); logger.info("Master schedule service stopped...");
} }
@ -146,7 +152,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; break;
} catch (Exception e) { } catch (Exception e) {
logger.error("Master schedule service loop command error", e); logger.error("Master schedule workflow error", e);
// sleep for 1s here to avoid the database down cause the exception boom
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
} }
@ -154,7 +162,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
/** /**
* Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool. * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
*/ */
private void scheduleWorkflow() throws InterruptedException { private void scheduleWorkflow() throws InterruptedException, MasterException {
List<Command> commands = findCommands(); List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) { if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s // indicate that no command ,sleep for 1s
@ -164,38 +172,53 @@ public class MasterSchedulerService extends BaseDaemonThread {
List<ProcessInstance> processInstances = command2ProcessInstance(commands); List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) { if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return; return;
} }
MasterServerMetrics.incMasterConsumeCommand(commands.size()); MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) { for (ProcessInstance processInstance : processInstances) {
try { submitProcessInstance(processInstance);
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); }
logger.info("Master schedule service starting workflow instance"); }
final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
, processService try {
, nettyExecutorManager LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
, processAlertManager logger.info("Master schedule service starting workflow instance");
, masterConfig final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
, stateWheelExecuteThread processInstance
, curingGlobalParamsService); , processService
, nettyExecutorManager
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); , processAlertManager
if (processInstance.getTimeout() > 0) { , masterConfig
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); , stateWheelExecuteThread
} , curingGlobalParamsService);
ProcessInstanceMetrics.incProcessInstanceSubmit();
workflowExecuteThreadPool.submit(workflowExecuteRunnable); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
logger.info("Master schedule service started workflow instance"); if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
} }
ProcessInstanceMetrics.incProcessInstanceSubmit();
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
// submit failed
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
submitFailedProcessInstances.add(processInstance);
}
});
logger.info("Master schedule service started workflow instance");
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
} }
} }
@ -237,23 +260,27 @@ public class MasterSchedulerService extends BaseDaemonThread {
return processInstances; return processInstances;
} }
private List<Command> findCommands() { private List<Command> findCommands() throws MasterException {
long scheduleStartTime = System.currentTimeMillis(); try {
int thisMasterSlot = ServerNodeManager.getSlot(); long scheduleStartTime = System.currentTimeMillis();
int masterCount = ServerNodeManager.getMasterSize(); int thisMasterSlot = ServerNodeManager.getSlot();
if (masterCount <= 0) { int masterCount = ServerNodeManager.getMasterSize();
logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); if (masterCount <= 0) {
return Collections.emptyList(); logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
} return Collections.emptyList();
int pageNumber = 0; }
int pageSize = masterConfig.getFetchCommandNum(); int pageNumber = 0;
final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); int pageSize = masterConfig.getFetchCommandNum();
if (CollectionUtils.isNotEmpty(result)) { final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", if (CollectionUtils.isNotEmpty(result)) {
result.size(), thisMasterSlot, masterCount); logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result;
} catch (Exception ex) {
throw new MasterException("Master loop command from database error", ex);
} }
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result;
} }
private SlotCheckState slotCheck(Command command) { private SlotCheckState slotCheck(Command command) {
@ -270,4 +297,34 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state; return state;
} }
private class FailedProcessInstanceResubmitThread extends Thread {
private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances;
public FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances) {
logger.info("Starting workflow resubmit thread");
this.submitFailedProcessInstances = submitFailedProcessInstances;
this.setDaemon(true);
this.setName("SubmitFailedProcessInstanceHandleThread");
logger.info("Started workflow resubmit thread");
}
@Override
public void run() {
while (Stopper.isRunning()) {
try {
ProcessInstance processInstance = submitFailedProcessInstances.take();
submitProcessInstance(processInstance);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return");
break;
}
// avoid the failed-fast cause CPU higher
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
} }

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -33,12 +33,11 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.commons.lang3.ThreadUtils;
import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -91,9 +90,14 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
super("StateWheelExecuteThread"); super("StateWheelExecuteThread");
} }
@PostConstruct
public void startWheelThread() {
super.start();
}
@Override @Override
public void run() { public void run() {
Duration checkInterval = masterConfig.getStateWheelInterval(); final long checkInterval = masterConfig.getStateWheelInterval().toMillis();
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
checkTask4Timeout(); checkTask4Timeout();
@ -104,9 +108,11 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.error("state wheel thread check error:", e); logger.error("state wheel thread check error:", e);
} }
try { try {
ThreadUtils.sleep(checkInterval); Thread.sleep(checkInterval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("state wheel thread sleep error", e); logger.error("state wheel thread sleep error, will close the loop", e);
Thread.currentThread().interrupt();
break;
} }
} }
} }

184
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -81,12 +81,14 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -96,30 +98,29 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.NonNull;
/** /**
* Workflow execute task, used to execute a workflow instance. * Workflow execute task, used to execute a workflow instance.
*/ */
public class WorkflowExecuteRunnable implements Runnable { public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* logger of WorkflowExecuteThread * logger of WorkflowExecuteThread
*/ */
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
/**
* master config
*/
private final MasterConfig masterConfig;
/** /**
* process service * process service
*/ */
@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable {
private DAG<String, TaskNode, TaskNodeRelation> dag; private DAG<String, TaskNode, TaskNodeRelation> dag;
/** /**
* key of workflow * unique key of workflow
*/ */
private String key; private String key;
/** /**
* start flag, true: start nodes submit completely * start flag, true: start nodes submit completely
*/ */
private boolean isStart = false; private volatile boolean isStart = false;
/** /**
* submit failure nodes * submit failure nodes
@ -240,6 +241,8 @@ public class WorkflowExecuteRunnable implements Runnable {
*/ */
private final CuringParamsService curingParamsService; private final CuringParamsService curingParamsService;
private final String masterAddress;
/** /**
* @param processInstance processInstance * @param processInstance processInstance
* @param processService processService * @param processService processService
@ -248,20 +251,21 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param masterConfig masterConfig * @param masterConfig masterConfig
* @param stateWheelExecuteThread stateWheelExecuteThread * @param stateWheelExecuteThread stateWheelExecuteThread
*/ */
public WorkflowExecuteRunnable(ProcessInstance processInstance public WorkflowExecuteRunnable(
, ProcessService processService @NonNull ProcessInstance processInstance,
, NettyExecutorManager nettyExecutorManager @NonNull ProcessService processService,
, ProcessAlertManager processAlertManager @NonNull NettyExecutorManager nettyExecutorManager,
, MasterConfig masterConfig @NonNull ProcessAlertManager processAlertManager,
, StateWheelExecuteThread stateWheelExecuteThread @NonNull MasterConfig masterConfig,
, CuringParamsService curingParamsService) { @NonNull StateWheelExecuteThread stateWheelExecuteThread,
@NonNull CuringParamsService curingParamsService) {
this.processService = processService; this.processService = processService;
this.processInstance = processInstance; this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager; this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager; this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread; this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingParamsService = curingParamsService; this.curingParamsService = curingParamsService;
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
} }
@ -287,6 +291,7 @@ public class WorkflowExecuteRunnable implements Runnable {
this.stateEvents.remove(stateEvent); this.stateEvents.remove(stateEvent);
} }
} catch (Exception e) { } catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
logger.error("state handle error:", e); logger.error("state handle error:", e);
} finally { } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
@ -472,6 +477,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (taskInstance.getState().typeIsSuccess()) { if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool()); processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance); processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) { if (!processInstance.isBlocked()) {
@ -828,18 +834,24 @@ public class WorkflowExecuteRunnable implements Runnable {
* ProcessInstance start entrypoint. * ProcessInstance start entrypoint.
*/ */
@Override @Override
public void run() { public WorkflowSubmitStatue call() {
if (this.taskInstanceMap.size() > 0 || isStart) { if (this.taskInstanceMap.size() > 0 || isStart) {
logger.warn("The workflow has already been started"); logger.warn("The workflow has already been started");
return; return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
} }
try { try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
buildFlowDag(); buildFlowDag();
initTaskQueue(); initTaskQueue();
submitPostNode(null); submitPostNode(null);
isStart = true; isStart = true;
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) { } catch (Exception e) {
logger.error("start process error, process instance id:{}", processInstance.getId(), e); logger.error("Start workflow error", e);
return WorkflowSubmitStatue.FAILED;
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
} }
} }
@ -893,7 +905,7 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
/** /**
* generate process dag * Generate process dag
* *
* @throws Exception exception * @throws Exception exception
*/ */
@ -905,7 +917,7 @@ public class WorkflowExecuteRunnable implements Runnable {
processInstance.getProcessDefinitionVersion()); processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition); processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); List<TaskInstance> recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam());
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion());
List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations); List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations);
@ -996,10 +1008,10 @@ public class WorkflowExecuteRunnable implements Runnable {
} }
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.isEmpty() && needComplementProcess()) { if (complementListDate.isEmpty() && needComplementProcess()) {
if(start != null && end != null){ if (start != null && end != null) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);
} }
if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); complementListDate = CronUtils.getSelfScheduleDateList(cmdParam);
} }
logger.info(" process definition code:{} complement data: {}", logger.info(" process definition code:{} complement data: {}",
@ -1096,7 +1108,7 @@ public class WorkflowExecuteRunnable implements Runnable {
try { try {
HostUpdateCommand hostUpdateCommand = new HostUpdateCommand(); HostUpdateCommand hostUpdateCommand = new HostUpdateCommand();
hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort())); hostUpdateCommand.setProcessHost(masterAddress);
hostUpdateCommand.setTaskInstanceId(taskInstance.getId()); hostUpdateCommand.setTaskInstanceId(taskInstance.getId());
Host host = new Host(taskInstance.getHost()); Host host = new Host(taskInstance.getHost());
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
@ -1857,99 +1869,75 @@ public class WorkflowExecuteRunnable implements Runnable {
* handling the list of tasks to be submitted * handling the list of tasks to be submitted
*/ */
private void submitStandByTask() { private void submitStandByTask() {
try { int length = readyToSubmitTaskQueue.size();
int length = readyToSubmitTaskQueue.size(); for (int i = 0; i < length; i++) {
for (int i = 0; i < length; i++) { TaskInstance task = readyToSubmitTaskQueue.peek();
TaskInstance task = readyToSubmitTaskQueue.peek(); if (task == null) {
if (task == null) { continue;
}
// stop tasks which is retrying if forced success happens
if (task.taskCanRetry()) {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
task.setState(retryTask.getState());
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue; continue;
} }
// stop tasks which is retrying if forced success happens }
if (task.taskCanRetry()) { //init varPool only this task is the first time running
TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (task.isFirstRun()) {
if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { //get pre task ,get all the task varPool to this task
task.setState(retryTask.getState()); Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); getPreVarPool(task, preTask);
removeTaskFromStandbyList(task); }
completeTaskMap.put(task.getTaskCode(), task.getId()); DependResult dependResult = getDependResultForTask(task);
taskInstanceMap.put(task.getId(), task); if (DependResult.SUCCESS == dependResult) {
submitPostNode(Long.toString(task.getTaskCode())); Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
continue; if (!taskInstanceOptional.isPresent()) {
} this.taskFailedSubmit = true;
} // Remove and add to complete map and error map
//init varPool only this task is the first time running
if (task.isFirstRun()) {
//get pre task ,get all the task varPool to this task
Set<String> preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode()));
getPreVarPool(task, preTask);
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
} else {
removeTaskFromStandbyList(task);
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); completeTaskMap.put(task.getTaskCode(), task.getId());
} else if (DependResult.NON_EXEC == dependResult) { errorTaskMap.put(task.getTaskCode(), task.getId());
// for some reasons(depend task pause/stop) this task would not be submit logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
} else {
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
} }
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task would not be submit
removeTaskFromStandbyList(task);
logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
} }
} catch (Exception e) {
logger.error("submit standby task error", e);
} }
} }
/** /**
* get recovery task instance list * Get start task instance list from recover
*
* @param taskIdArray task id array
* @return recovery task instance list
*/
private List<TaskInstance> getRecoverTaskInstanceList(String[] taskIdArray) {
if (taskIdArray == null || taskIdArray.length == 0) {
return new ArrayList<>();
}
List<Integer> taskIdList = new ArrayList<>(taskIdArray.length);
for (String taskId : taskIdArray) {
try {
Integer id = Integer.valueOf(taskId);
taskIdList.add(id);
} catch (Exception e) {
logger.error("get recovery task instance failed ", e);
}
}
return processService.findTaskInstanceByIdList(taskIdList);
}
/**
* get start task instance list
* *
* @param cmdParam command param * @param cmdParam command param
* @return task instance list * @return task instance list
*/ */
private List<TaskInstance> getStartTaskInstanceList(String cmdParam) { protected List<TaskInstance> getRecoverTaskInstanceList(String cmdParam) {
List<TaskInstance> instanceList = new ArrayList<>();
Map<String, String> paramMap = JSONUtils.toMap(cmdParam); Map<String, String> paramMap = JSONUtils.toMap(cmdParam);
// todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam
if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) {
String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA);
instanceList = getRecoverTaskInstanceList(idList); if (ArrayUtils.isNotEmpty(idList)) {
List<Integer> taskInstanceIds = Arrays.stream(idList).map(Integer::valueOf).collect(Collectors.toList());
return processService.findTaskInstanceByIdList(taskInstanceIds);
}
} }
return instanceList; return Collections.emptyList();
} }
/** /**

34
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
public enum WorkflowSubmitStatue {
/**
* Submit success
*/
SUCCESS,
/**
* Submit failed, this status should be retry
*/
FAILED,
/**
* Duplicated submitted, this status should never occur.
*/
DUPLICATED_SUBMITTED,
;
}

18
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java → dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -15,12 +15,13 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.powermock.api.mockito.PowerMockito.mock; import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -36,8 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -70,7 +71,7 @@ import org.springframework.context.ApplicationContext;
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({WorkflowExecuteRunnable.class}) @PrepareForTest({WorkflowExecuteRunnable.class})
public class WorkflowExecuteTaskTest { public class WorkflowExecuteRunnableTest {
private WorkflowExecuteRunnable workflowExecuteThread; private WorkflowExecuteRunnable workflowExecuteThread;
@ -116,7 +117,10 @@ public class WorkflowExecuteTaskTest {
stateWheelExecuteThread = mock(StateWheelExecuteThread.class); stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
curingGlobalParamsService = mock(CuringParamsService.class); curingGlobalParamsService = mock(CuringParamsService.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService)); NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class);
ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class);
workflowExecuteThread =
PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag // prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true); dag.setAccessible(true);
@ -157,9 +161,9 @@ public class WorkflowExecuteTaskTest {
Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId()))
).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4));
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class; Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class); Method method = masterExecThreadClass.getDeclaredMethod("getRecoverTaskInstanceList", String.class);
method.setAccessible(true); method.setAccessible(true);
List<TaskInstance> taskInstances = (List<TaskInstance>) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); List<TaskInstance> taskInstances = workflowExecuteThread.getRecoverTaskInstanceList(JSONUtils.toJsonString(cmdParam));
Assert.assertEquals(4, taskInstances.size()); Assert.assertEquals(4, taskInstances.size());
cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1"); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1");

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -81,7 +81,7 @@ public interface ProcessService {
ProcessDefinition findProcessDefineById(int processDefinitionId); ProcessDefinition findProcessDefineById(int processDefinitionId);
ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version); ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion);
ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode); ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);

Loading…
Cancel
Save