From 35a10d092f566c07137da5fb67b21cee644cdc8f Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 4 Jul 2022 22:08:15 +0800 Subject: [PATCH] [Fix-10666] Workflow submit failed will still in memory and never retry (#10667) * Workflow submit failed will still in memory and never retry --- .../master/exception/MasterException.java | 29 +++ .../metrics/ProcessInstanceMetrics.java | 6 + .../master/runner/MasterSchedulerService.java | 161 ++++++++++----- .../runner/StateWheelExecuteThread.java | 18 +- .../runner/WorkflowExecuteRunnable.java | 184 ++++++++---------- .../master/runner/WorkflowSubmitStatue.java | 34 ++++ .../WorkflowExecuteRunnableTest.java} | 18 +- .../service/process/ProcessService.java | 2 +- 8 files changed, 288 insertions(+), 164 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java rename dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/{WorkflowExecuteTaskTest.java => runner/WorkflowExecuteRunnableTest.java} (93%) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java new file mode 100644 index 0000000000..eb5d5e8dfe --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/exception/MasterException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.exception; + +public class MasterException extends Exception { + + public MasterException(String message) { + super(message); + } + + public MasterException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java index ad0e479e5b..8edf3f0c86 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/ProcessInstanceMetrics.java @@ -90,6 +90,12 @@ public final class ProcessInstanceMetrics { .register(Metrics.globalRegistry); } + public static synchronized void registerProcessInstanceResubmitGauge(Supplier function) { + Gauge.builder("ds.workflow.instance.resubmit", function) + .description("The current process instance need to resubmit count") + .register(Metrics.globalRegistry); + } + public static void incProcessInstanceSubmit() { PROCESS_INSTANCE_SUBMIT_COUNTER.increment(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 3254d6c5b5..334df5baa3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -28,11 +28,10 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.exception.MasterException; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; @@ -44,7 +43,9 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; @@ -52,6 +53,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import lombok.NonNull; + /** * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @@ -72,8 +75,6 @@ public class MasterSchedulerService extends BaseDaemonThread { @Autowired private ProcessAlertManager processAlertManager; - private NettyRemotingClient nettyRemotingClient; - @Autowired private NettyExecutorManager nettyExecutorManager; @@ -97,6 +98,10 @@ public class MasterSchedulerService extends BaseDaemonThread { @Autowired private CuringParamsService curingGlobalParamsService; + private final LinkedBlockingQueue submitFailedProcessInstances = new LinkedBlockingQueue<>(); + + private Thread failedProcessInstanceResubmitThread; + private String masterAddress; protected MasterSchedulerService() { @@ -108,22 +113,23 @@ public class MasterSchedulerService extends BaseDaemonThread { */ public void init() { this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); - NettyClientConfig clientConfig = new NettyClientConfig(); - this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); + this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances); + ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size); } @Override public synchronized void start() { logger.info("Master schedule service starting.."); - this.stateWheelExecuteThread.start(); super.start(); + this.failedProcessInstanceResubmitThread.start(); logger.info("Master schedule service started..."); } public void close() { logger.info("Master schedule service stopping..."); - nettyRemotingClient.close(); + // these process instances will be failover, so we can safa clear here + submitFailedProcessInstances.clear(); logger.info("Master schedule service stopped..."); } @@ -146,7 +152,9 @@ public class MasterSchedulerService extends BaseDaemonThread { Thread.currentThread().interrupt(); break; } catch (Exception e) { - logger.error("Master schedule service loop command error", e); + logger.error("Master schedule workflow error", e); + // sleep for 1s here to avoid the database down cause the exception boom + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } } } @@ -154,7 +162,7 @@ public class MasterSchedulerService extends BaseDaemonThread { /** * Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool. */ - private void scheduleWorkflow() throws InterruptedException { + private void scheduleWorkflow() throws InterruptedException, MasterException { List commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { // indicate that no command ,sleep for 1s @@ -164,38 +172,53 @@ public class MasterSchedulerService extends BaseDaemonThread { List processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { + // indicate that the command transform to processInstance error, sleep for 1s + Thread.sleep(Constants.SLEEP_TIME_MILLIS); return; } MasterServerMetrics.incMasterConsumeCommand(commands.size()); for (ProcessInstance processInstance : processInstances) { - try { - LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); - logger.info("Master schedule service starting workflow instance"); - final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( - processInstance - , processService - , nettyExecutorManager - , processAlertManager - , masterConfig - , stateWheelExecuteThread - , curingGlobalParamsService); - - this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); - if (processInstance.getTimeout() > 0) { - stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); - } - ProcessInstanceMetrics.incProcessInstanceSubmit(); - workflowExecuteThreadPool.submit(workflowExecuteRunnable); - logger.info("Master schedule service started workflow instance"); - - } catch (Exception ex) { - processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); - stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); - logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); - } finally { - LoggerUtils.removeWorkflowInstanceIdMDC(); + submitProcessInstance(processInstance); + } + } + + private void submitProcessInstance(@NonNull ProcessInstance processInstance) { + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + logger.info("Master schedule service starting workflow instance"); + final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( + processInstance + , processService + , nettyExecutorManager + , processAlertManager + , masterConfig + , stateWheelExecuteThread + , curingGlobalParamsService); + + this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); + if (processInstance.getTimeout() > 0) { + stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } + ProcessInstanceMetrics.incProcessInstanceSubmit(); + CompletableFuture workflowSubmitFuture = CompletableFuture.supplyAsync( + workflowExecuteRunnable::call, workflowExecuteThreadPool); + workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { + if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) { + // submit failed + processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); + stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); + submitFailedProcessInstances.add(processInstance); + } + }); + logger.info("Master schedule service started workflow instance"); + + } catch (Exception ex) { + processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId()); + stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId()); + logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } } @@ -237,23 +260,27 @@ public class MasterSchedulerService extends BaseDaemonThread { return processInstances; } - private List findCommands() { - long scheduleStartTime = System.currentTimeMillis(); - int thisMasterSlot = ServerNodeManager.getSlot(); - int masterCount = ServerNodeManager.getMasterSize(); - if (masterCount <= 0) { - logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); - return Collections.emptyList(); - } - int pageNumber = 0; - int pageSize = masterConfig.getFetchCommandNum(); - final List result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); - if (CollectionUtils.isNotEmpty(result)) { - logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", - result.size(), thisMasterSlot, masterCount); + private List findCommands() throws MasterException { + try { + long scheduleStartTime = System.currentTimeMillis(); + int thisMasterSlot = ServerNodeManager.getSlot(); + int masterCount = ServerNodeManager.getMasterSize(); + if (masterCount <= 0) { + logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); + return Collections.emptyList(); + } + int pageNumber = 0; + int pageSize = masterConfig.getFetchCommandNum(); + final List result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + if (CollectionUtils.isNotEmpty(result)) { + logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", + result.size(), thisMasterSlot, masterCount); + } + ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime); + return result; + } catch (Exception ex) { + throw new MasterException("Master loop command from database error", ex); } - ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime); - return result; } private SlotCheckState slotCheck(Command command) { @@ -270,4 +297,34 @@ public class MasterSchedulerService extends BaseDaemonThread { return state; } + private class FailedProcessInstanceResubmitThread extends Thread { + + private final LinkedBlockingQueue submitFailedProcessInstances; + + public FailedProcessInstanceResubmitThread(LinkedBlockingQueue submitFailedProcessInstances) { + logger.info("Starting workflow resubmit thread"); + this.submitFailedProcessInstances = submitFailedProcessInstances; + this.setDaemon(true); + this.setName("SubmitFailedProcessInstanceHandleThread"); + logger.info("Started workflow resubmit thread"); + } + + @Override + public void run() { + while (Stopper.isRunning()) { + try { + ProcessInstance processInstance = submitFailedProcessInstances.take(); + submitProcessInstance(processInstance); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return"); + break; + } + + // avoid the failed-fast cause CPU higher + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } + } + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 8b92696723..a56a7d8c5a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -33,12 +33,11 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; -import org.apache.commons.lang3.ThreadUtils; - -import java.time.Duration; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -91,9 +90,14 @@ public class StateWheelExecuteThread extends BaseDaemonThread { super("StateWheelExecuteThread"); } + @PostConstruct + public void startWheelThread() { + super.start(); + } + @Override public void run() { - Duration checkInterval = masterConfig.getStateWheelInterval(); + final long checkInterval = masterConfig.getStateWheelInterval().toMillis(); while (Stopper.isRunning()) { try { checkTask4Timeout(); @@ -104,9 +108,11 @@ public class StateWheelExecuteThread extends BaseDaemonThread { logger.error("state wheel thread check error:", e); } try { - ThreadUtils.sleep(checkInterval); + Thread.sleep(checkInterval); } catch (InterruptedException e) { - logger.error("state wheel thread sleep error", e); + logger.error("state wheel thread sleep error, will close the loop", e); + Thread.currentThread().interrupt(); + break; } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 25766d0a4c..7fd23ba42c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -81,12 +81,14 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -96,30 +98,29 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import lombok.NonNull; + /** * Workflow execute task, used to execute a workflow instance. */ -public class WorkflowExecuteRunnable implements Runnable { +public class WorkflowExecuteRunnable implements Callable { /** * logger of WorkflowExecuteThread */ private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class); - /** - * master config - */ - private final MasterConfig masterConfig; - /** * process service */ @@ -151,14 +152,14 @@ public class WorkflowExecuteRunnable implements Runnable { private DAG dag; /** - * key of workflow + * unique key of workflow */ private String key; /** * start flag, true: start nodes submit completely */ - private boolean isStart = false; + private volatile boolean isStart = false; /** * submit failure nodes @@ -240,6 +241,8 @@ public class WorkflowExecuteRunnable implements Runnable { */ private final CuringParamsService curingParamsService; + private final String masterAddress; + /** * @param processInstance processInstance * @param processService processService @@ -248,20 +251,21 @@ public class WorkflowExecuteRunnable implements Runnable { * @param masterConfig masterConfig * @param stateWheelExecuteThread stateWheelExecuteThread */ - public WorkflowExecuteRunnable(ProcessInstance processInstance - , ProcessService processService - , NettyExecutorManager nettyExecutorManager - , ProcessAlertManager processAlertManager - , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread - , CuringParamsService curingParamsService) { + public WorkflowExecuteRunnable( + @NonNull ProcessInstance processInstance, + @NonNull ProcessService processService, + @NonNull NettyExecutorManager nettyExecutorManager, + @NonNull ProcessAlertManager processAlertManager, + @NonNull MasterConfig masterConfig, + @NonNull StateWheelExecuteThread stateWheelExecuteThread, + @NonNull CuringParamsService curingParamsService) { this.processService = processService; this.processInstance = processInstance; - this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; this.curingParamsService = curingParamsService; + this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -287,6 +291,7 @@ public class WorkflowExecuteRunnable implements Runnable { this.stateEvents.remove(stateEvent); } } catch (Exception e) { + // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. logger.error("state handle error:", e); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); @@ -472,6 +477,7 @@ public class WorkflowExecuteRunnable implements Runnable { if (taskInstance.getState().typeIsSuccess()) { completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + // todo: merge the last taskInstance processInstance.setVarPool(taskInstance.getVarPool()); processService.saveProcessInstance(processInstance); if (!processInstance.isBlocked()) { @@ -828,18 +834,24 @@ public class WorkflowExecuteRunnable implements Runnable { * ProcessInstance start entrypoint. */ @Override - public void run() { + public WorkflowSubmitStatue call() { if (this.taskInstanceMap.size() > 0 || isStart) { logger.warn("The workflow has already been started"); - return; + return WorkflowSubmitStatue.DUPLICATED_SUBMITTED; } + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); buildFlowDag(); initTaskQueue(); submitPostNode(null); isStart = true; + return WorkflowSubmitStatue.SUCCESS; } catch (Exception e) { - logger.error("start process error, process instance id:{}", processInstance.getId(), e); + logger.error("Start workflow error", e); + return WorkflowSubmitStatue.FAILED; + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } } @@ -893,7 +905,7 @@ public class WorkflowExecuteRunnable implements Runnable { } /** - * generate process dag + * Generate process dag * * @throws Exception exception */ @@ -905,7 +917,7 @@ public class WorkflowExecuteRunnable implements Runnable { processInstance.getProcessDefinitionVersion()); processInstance.setProcessDefinition(processDefinition); - List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); + List recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam()); List processTaskRelations = processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations); @@ -996,10 +1008,10 @@ public class WorkflowExecuteRunnable implements Runnable { } List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); if (complementListDate.isEmpty() && needComplementProcess()) { - if(start != null && end != null){ + if (start != null && end != null) { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); } - if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)){ + if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); } logger.info(" process definition code:{} complement data: {}", @@ -1096,7 +1108,7 @@ public class WorkflowExecuteRunnable implements Runnable { try { HostUpdateCommand hostUpdateCommand = new HostUpdateCommand(); - hostUpdateCommand.setProcessHost(NetUtils.getAddr(masterConfig.getListenPort())); + hostUpdateCommand.setProcessHost(masterAddress); hostUpdateCommand.setTaskInstanceId(taskInstance.getId()); Host host = new Host(taskInstance.getHost()); nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); @@ -1857,99 +1869,75 @@ public class WorkflowExecuteRunnable implements Runnable { * handling the list of tasks to be submitted */ private void submitStandByTask() { - try { - int length = readyToSubmitTaskQueue.size(); - for (int i = 0; i < length; i++) { - TaskInstance task = readyToSubmitTaskQueue.peek(); - if (task == null) { + int length = readyToSubmitTaskQueue.size(); + for (int i = 0; i < length; i++) { + TaskInstance task = readyToSubmitTaskQueue.peek(); + if (task == null) { + continue; + } + // stop tasks which is retrying if forced success happens + if (task.taskCanRetry()) { + TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); + if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { + task.setState(retryTask.getState()); + logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); + removeTaskFromStandbyList(task); + completeTaskMap.put(task.getTaskCode(), task.getId()); + taskInstanceMap.put(task.getId(), task); + submitPostNode(Long.toString(task.getTaskCode())); continue; } - // stop tasks which is retrying if forced success happens - if (task.taskCanRetry()) { - TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); - if (retryTask != null && retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) { - task.setState(retryTask.getState()); - logger.info("task: {} has been forced success, put it into complete task list and stop retrying", task.getName()); - removeTaskFromStandbyList(task); - completeTaskMap.put(task.getTaskCode(), task.getId()); - taskInstanceMap.put(task.getId(), task); - submitPostNode(Long.toString(task.getTaskCode())); - continue; - } - } - //init varPool only this task is the first time running - if (task.isFirstRun()) { - //get pre task ,get all the task varPool to this task - Set preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); - getPreVarPool(task, preTask); - } - DependResult dependResult = getDependResultForTask(task); - if (DependResult.SUCCESS == dependResult) { - Optional taskInstanceOptional = submitTaskExec(task); - if (!taskInstanceOptional.isPresent()) { - this.taskFailedSubmit = true; - // Remove and add to complete map and error map - removeTaskFromStandbyList(task); - completeTaskMap.put(task.getTaskCode(), task.getId()); - errorTaskMap.put(task.getTaskCode(), task.getId()); - logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId()); - } else { - removeTaskFromStandbyList(task); - } - } else if (DependResult.FAILED == dependResult) { - // if the dependency fails, the current node is not submitted and the state changes to failure. - dependFailedTaskMap.put(task.getTaskCode(), task.getId()); + } + //init varPool only this task is the first time running + if (task.isFirstRun()) { + //get pre task ,get all the task varPool to this task + Set preTask = dag.getPreviousNodes(Long.toString(task.getTaskCode())); + getPreVarPool(task, preTask); + } + DependResult dependResult = getDependResultForTask(task); + if (DependResult.SUCCESS == dependResult) { + Optional taskInstanceOptional = submitTaskExec(task); + if (!taskInstanceOptional.isPresent()) { + this.taskFailedSubmit = true; + // Remove and add to complete map and error map removeTaskFromStandbyList(task); - logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); - } else if (DependResult.NON_EXEC == dependResult) { - // for some reasons(depend task pause/stop) this task would not be submit + completeTaskMap.put(task.getTaskCode(), task.getId()); + errorTaskMap.put(task.getTaskCode(), task.getId()); + logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId()); + } else { removeTaskFromStandbyList(task); - logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } + } else if (DependResult.FAILED == dependResult) { + // if the dependency fails, the current node is not submitted and the state changes to failure. + dependFailedTaskMap.put(task.getTaskCode(), task.getId()); + removeTaskFromStandbyList(task); + logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); + } else if (DependResult.NON_EXEC == dependResult) { + // for some reasons(depend task pause/stop) this task would not be submit + removeTaskFromStandbyList(task); + logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult); } - } catch (Exception e) { - logger.error("submit standby task error", e); } } /** - * get recovery task instance list - * - * @param taskIdArray task id array - * @return recovery task instance list - */ - private List getRecoverTaskInstanceList(String[] taskIdArray) { - if (taskIdArray == null || taskIdArray.length == 0) { - return new ArrayList<>(); - } - List taskIdList = new ArrayList<>(taskIdArray.length); - for (String taskId : taskIdArray) { - try { - Integer id = Integer.valueOf(taskId); - taskIdList.add(id); - } catch (Exception e) { - logger.error("get recovery task instance failed ", e); - } - } - return processService.findTaskInstanceByIdList(taskIdList); - } - - /** - * get start task instance list + * Get start task instance list from recover * * @param cmdParam command param * @return task instance list */ - private List getStartTaskInstanceList(String cmdParam) { - - List instanceList = new ArrayList<>(); + protected List getRecoverTaskInstanceList(String cmdParam) { Map paramMap = JSONUtils.toMap(cmdParam); + // todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { String[] idList = paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); - instanceList = getRecoverTaskInstanceList(idList); + if (ArrayUtils.isNotEmpty(idList)) { + List taskInstanceIds = Arrays.stream(idList).map(Integer::valueOf).collect(Collectors.toList()); + return processService.findTaskInstanceByIdList(taskInstanceIds); + } } - return instanceList; + return Collections.emptyList(); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java new file mode 100644 index 0000000000..b53a500c89 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatue.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +public enum WorkflowSubmitStatue { + /** + * Submit success + */ + SUCCESS, + /** + * Submit failed, this status should be retry + */ + FAILED, + /** + * Duplicated submitted, this status should never occur. + */ + DUPLICATED_SUBMITTED, + ; +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java similarity index 93% rename from dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java rename to dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 097ba120cb..51064bc514 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.master; +package org.apache.dolphinscheduler.server.master.runner; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES; + import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -36,8 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -70,7 +71,7 @@ import org.springframework.context.ApplicationContext; */ @RunWith(PowerMockRunner.class) @PrepareForTest({WorkflowExecuteRunnable.class}) -public class WorkflowExecuteTaskTest { +public class WorkflowExecuteRunnableTest { private WorkflowExecuteRunnable workflowExecuteThread; @@ -116,7 +117,10 @@ public class WorkflowExecuteTaskTest { stateWheelExecuteThread = mock(StateWheelExecuteThread.class); curingGlobalParamsService = mock(CuringParamsService.class); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService)); + NettyExecutorManager nettyExecutorManager = mock(NettyExecutorManager.class); + ProcessAlertManager processAlertManager = mock(ProcessAlertManager.class); + workflowExecuteThread = + PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, nettyExecutorManager, processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService)); // prepareProcess init dag Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); @@ -157,9 +161,9 @@ public class WorkflowExecuteTaskTest { Arrays.asList(taskInstance1.getId(), taskInstance2.getId(), taskInstance3.getId(), taskInstance4.getId())) ).thenReturn(Arrays.asList(taskInstance1, taskInstance2, taskInstance3, taskInstance4)); Class masterExecThreadClass = WorkflowExecuteRunnable.class; - Method method = masterExecThreadClass.getDeclaredMethod("getStartTaskInstanceList", String.class); + Method method = masterExecThreadClass.getDeclaredMethod("getRecoverTaskInstanceList", String.class); method.setAccessible(true); - List taskInstances = (List) method.invoke(workflowExecuteThread, JSONUtils.toJsonString(cmdParam)); + List taskInstances = workflowExecuteThread.getRecoverTaskInstanceList(JSONUtils.toJsonString(cmdParam)); Assert.assertEquals(4, taskInstances.size()); cmdParam.put(CMD_PARAM_RECOVERY_START_NODE_STRING, "1"); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 26afcc4408..6066fe525f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -81,7 +81,7 @@ public interface ProcessService { ProcessDefinition findProcessDefineById(int processDefinitionId); - ProcessDefinition findProcessDefinition(Long processDefinitionCode, int version); + ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion); ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode);