From 3ab9ee13fcffcfe69de90353ad43f8eb86c015a6 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 23 Jun 2022 11:45:06 +0800 Subject: [PATCH] Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516) * Optimize master log, add workflow instance id and task instance id in log * Use MDC to set the workflow info in log4j * Add workflowInstanceId and taskInstanceId in MDC (cherry picked from commit db595b3eff8f5e4728665b4b7b7082af710e8f9d) --- .../dolphinscheduler/alert/AlertServer.java | 23 +- .../dolphinscheduler/common/Constants.java | 36 +-- .../common/thread/Stopper.java | 34 ++- .../common/thread/ThreadUtils.java | 18 +- .../common/utils/LoggerUtils.java | 34 ++- .../server/master/MasterServer.java | 25 +- .../master/processor/StateEventProcessor.java | 12 +- .../master/processor/TaskEventProcessor.java | 14 +- .../TaskExecuteResponseProcessor.java | 12 +- .../processor/TaskKillResponseProcessor.java | 3 +- .../queue/StateEventResponseService.java | 23 +- .../processor/queue/TaskExecuteRunnable.java | 3 + .../queue/TaskExecuteThreadPool.java | 12 +- .../master/registry/MasterRegistryClient.java | 15 +- .../master/runner/EventExecuteService.java | 18 +- .../master/runner/FailoverExecuteThread.java | 5 +- .../master/runner/MasterSchedulerService.java | 60 +++-- .../runner/StateWheelExecuteThread.java | 222 +++++++++--------- .../runner/WorkflowExecuteRunnable.java | 24 +- .../runner/WorkflowExecuteThreadPool.java | 29 ++- .../runner/task/TaskProcessorFactory.java | 9 +- .../master/service/FailoverService.java | 24 +- .../src/main/resources/logback-spring.xml | 4 +- .../service/alert/AlertClientService.java | 21 +- .../service/task/TaskPluginManager.java | 5 +- .../src/main/resources/logback-spring.xml | 4 +- .../plugin/task/api/ProcessUtils.java | 15 +- .../plugin/task/api/TaskPluginException.java | 25 ++ .../server/worker/WorkerServer.java | 117 +++------ .../server/worker/prc/WorkerRpcServer.java | 97 ++++++++ .../processor/TaskExecuteProcessor.java | 169 +++++++------ .../TaskExecuteRunningAckProcessor.java | 17 +- .../runner/RetryReportTaskStatusThread.java | 4 +- .../worker/runner/TaskExecuteThread.java | 31 ++- .../worker/runner/WorkerManagerThread.java | 5 +- .../src/main/resources/logback-spring.xml | 4 +- .../processor/TaskExecuteProcessorTest.java | 42 ++-- .../worker/runner/TaskExecuteThreadTest.java | 27 ++- 38 files changed, 768 insertions(+), 474 deletions(-) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index ee9d5b3f62..5f449fcab4 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -49,7 +49,10 @@ public class AlertServer implements Closeable { private final AlertConfig alertConfig; private NettyRemotingServer nettyRemotingServer; - public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) { + public AlertServer(PluginDao pluginDao, + AlertSenderService alertSenderService, + AlertRequestProcessor alertRequestProcessor, + AlertConfig alertConfig) { this.pluginDao = pluginDao; this.alertSenderService = alertSenderService; this.alertRequestProcessor = alertRequestProcessor; @@ -68,11 +71,12 @@ public class AlertServer implements Closeable { @EventListener public void run(ApplicationReadyEvent readyEvent) { - logger.info("alert server starting..."); + logger.info("Alert server is staring ..."); checkTable(); startServer(); alertSenderService.start(); + logger.info("Alert server is started ..."); } @Override @@ -89,24 +93,23 @@ public class AlertServer implements Closeable { public void destroy(String cause) { try { + // set stop signal is true // execute only once - if (Stopper.isStopped()) { + if (!Stopper.stop()) { + logger.warn("AlterServer is already stopped"); return; } - logger.info("alert server is stopping ..., cause : {}", cause); - - // set stop signal is true - Stopper.stop(); + logger.info("Alert server is stopping, cause: {}", cause); // thread sleep 3 seconds for thread quietly stop - ThreadUtils.sleep(3000L); + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); // close this.nettyRemotingServer.close(); - + logger.info("Alter server stopped, cause: {}", cause); } catch (Exception e) { - logger.error("alert server stop exception ", e); + logger.error("Alert server stop failed, cause: {}", cause, e); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index b7714c7817..f39a0c94aa 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; +import java.time.Duration; import java.util.regex.Pattern; /** @@ -376,6 +377,8 @@ public final class Constants { */ public static final long SLEEP_TIME_MILLIS_SHORT = 100L; + public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3); + /** * one second mils */ @@ -636,28 +639,31 @@ public final class Constants { */ public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path"; + public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId"; + public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId"; + /** * task log info format */ public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s"; - public static final int[] NOT_TERMINATED_STATES = new int[]{ - ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.DISPATCH.ordinal(), - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.DELAY_EXECUTION.ordinal(), - ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.READY_STOP.ordinal(), - ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), - ExecutionStatus.WAITING_THREAD.ordinal(), - ExecutionStatus.WAITING_DEPEND.ordinal() + public static final int[] NOT_TERMINATED_STATES = new int[] { + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.DELAY_EXECUTION.ordinal(), + ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.READY_STOP.ordinal(), + ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), + ExecutionStatus.WAITING_THREAD.ordinal(), + ExecutionStatus.WAITING_DEPEND.ordinal() }; - public static final int[] RUNNING_PROCESS_STATE = new int[]{ - ExecutionStatus.RUNNING_EXECUTION.ordinal(), - ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), - ExecutionStatus.DISPATCH.ordinal(), - ExecutionStatus.SERIAL_WAIT.ordinal() + public static final int[] RUNNING_PROCESS_STATE = new int[] { + ExecutionStatus.RUNNING_EXECUTION.ordinal(), + ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), + ExecutionStatus.DISPATCH.ordinal(), + ExecutionStatus.SERIAL_WAIT.ordinal() }; /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java index 67abde7e7a..777203c0f3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java @@ -19,22 +19,40 @@ package org.apache.dolphinscheduler.common.thread; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.experimental.UtilityClass; + /** - * if the process closes, a signal is placed as true, and all threads get this flag to stop working + * If the process closes, a signal is placed as true, and all threads get this flag to stop working. */ +@UtilityClass public class Stopper { - private static AtomicBoolean signal = new AtomicBoolean(false); + private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false); - public static final boolean isStopped() { - return signal.get(); + /** + * Return the flag if the Server is stopped. + * + * @return True, if the server is stopped; False, the server is still running. + */ + public static boolean isStopped() { + return stoppedSignal.get(); } - public static final boolean isRunning() { - return !signal.get(); + /** + * Return the flag if the Server is stopped. + * + * @return True, if the server is running, False, the server is stopped. + */ + public static boolean isRunning() { + return !stoppedSignal.get(); } - public static final void stop() { - signal.set(true); + /** + * Stop the server + * + * @return True, if the server stopped success. False, if the server is already stopped. + */ + public static boolean stop() { + return stoppedSignal.compareAndSet(false, true); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 75f624d15d..5c8020b7cd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -23,24 +23,28 @@ import java.util.concurrent.ThreadFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; -/** - * thread utils - */ +import lombok.experimental.UtilityClass; + +@UtilityClass public class ThreadUtils { /** * Wrapper over newDaemonFixedThreadExecutor. + * * @param threadName threadName * @param threadsNum threadsNum * @return ExecutorService */ - public static ExecutorService newDaemonFixedThreadExecutor(String threadName,int threadsNum) { + public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) { ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(threadName) - .build(); + .setDaemon(true) + .setNameFormat(threadName) + .build(); return Executors.newFixedThreadPool(threadsNum, threadFactory); } + /** + * Sleep in given mills, this is not accuracy. + */ public static void sleep(final long millis) { try { Thread.sleep(millis); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index 8dd4969494..8c749c53fc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -32,16 +32,16 @@ import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import lombok.experimental.UtilityClass; /** * logger utils */ +@UtilityClass public class LoggerUtils { - private LoggerUtils() { - throw new UnsupportedOperationException("Construct LoggerUtils"); - } - private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class); /** @@ -109,4 +109,30 @@ public class LoggerUtils { } return ""; } + + public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) { + setWorkflowInstanceIdMDC(workflowInstanceId); + setTaskInstanceIdMDC(taskInstanceId); + } + + public static void setWorkflowInstanceIdMDC(int workflowInstanceId) { + MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); + } + + public static void setTaskInstanceIdMDC(int taskInstanceId) { + MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); + } + + public static void removeWorkflowAndTaskInstanceIdMDC() { + removeWorkflowInstanceIdMDC(); + removeTaskInstanceIdMDC(); + } + + public static void removeWorkflowInstanceIdMDC() { + MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY); + } + + public static void removeTaskInstanceIdMDC() { + MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index cb01e14504..95409a616b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; @@ -103,7 +104,7 @@ public class MasterServer implements IStoppable { Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { - close("shutdownHook"); + close("MasterServer shutdownHook"); } })); } @@ -116,23 +117,17 @@ public class MasterServer implements IStoppable { public void close(String cause) { try { + // set stop signal is true // execute only once - if (Stopper.isStopped()) { - logger.warn("MasterServer has been stopped ..., current cause: {}", cause); + if (!Stopper.stop()) { + logger.warn("MasterServer is already stopped, current cause: {}", cause); return; } - logger.info("master server is stopping ..., cause : {}", cause); - - // set stop signal is true - Stopper.stop(); + logger.info("Master server is stopping, current cause : {}", cause); - try { - // thread sleep 3 seconds for thread quietly stop - Thread.sleep(3000L); - } catch (Exception e) { - logger.warn("thread sleep exception ", e); - } + // thread sleep 3 seconds for thread quietly stop + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); // close this.masterSchedulerService.close(); this.masterRPCServer.close(); @@ -141,9 +136,9 @@ public class MasterServer implements IStoppable { // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc springApplicationContext.close(); - logger.info("MasterServer stopped..."); + logger.info("MasterServer stopped, current cause: {}", cause); } catch (Exception e) { - logger.error("master server stop exception ", e); + logger.error("MasterServer stop failed, current cause: {}", cause, e); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index e132f285de..b2c47e4112 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -64,8 +65,15 @@ public class StateEventProcessor implements NettyRequestProcessor { StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; stateEvent.setType(type); - logger.info("received command : {}", stateEvent); - stateEventResponseService.addResponse(stateEvent); + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); + + logger.info("Received state event change command, event: {}", stateEvent); + stateEventResponseService.addResponse(stateEvent); + }finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java index ea2e4539c3..e597f1cae5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; @@ -49,8 +50,8 @@ public class TaskEventProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType() - || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType() - , String.format("invalid command type: %s", command.getType())); + || CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType() + , String.format("invalid command type: %s", command.getType())); TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class); StateEvent stateEvent = new StateEvent(); @@ -58,8 +59,13 @@ public class TaskEventProcessor implements NettyRequestProcessor { stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId()); stateEvent.setType(StateEventType.WAIT_TASK_GROUP); - logger.info("received command : {}", stateEvent); - stateEventResponseService.addEvent2WorkflowExecute(stateEvent); + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); + logger.info("Received task event change command, event: {}", stateEvent); + stateEventResponseService.addEvent2WorkflowExecute(stateEvent); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java index 264e42e3a5..b17def873c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -57,9 +58,14 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class); - logger.info("received command : {}", taskExecuteResponseCommand); - TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel); - taskEventService.addEvent(taskResponseEvent); + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId()); + logger.info("Received task execute response, event: {}", taskResponseEvent); + + taskEventService.addEvent(taskResponseEvent); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 135257c9a8..6079329b90 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -51,7 +51,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class); - logger.info("received task kill response command : {}", responseCommand); + logger.info("[TaskInstance-{}] Received task kill response command : {}", + responseCommand.getTaskInstanceId(), responseCommand); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index e34dedca67..4ca6b9eccb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -81,7 +82,13 @@ public class StateEventResponseService { List remainEvents = new ArrayList<>(eventQueue.size()); eventQueue.drainTo(remainEvents); for (StateEvent event : remainEvents) { - this.persist(event); + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); + this.persist(event); + + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } } } } @@ -93,7 +100,7 @@ public class StateEventResponseService { try { eventQueue.put(stateEvent); } catch (InterruptedException e) { - logger.error("put state event : {} error :{}", stateEvent, e); + logger.error("Put state event : {} error", stateEvent, e); Thread.currentThread().interrupt(); } } @@ -109,18 +116,22 @@ public class StateEventResponseService { @Override public void run() { - + logger.info("State event loop service started"); while (Stopper.isRunning()) { try { // if not task , blocking here StateEvent stateEvent = eventQueue.take(); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); persist(stateEvent); } catch (InterruptedException e) { + logger.warn("State event loop service interrupted, will stop this loop", e); Thread.currentThread().interrupt(); break; + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } - logger.info("StateEventResponseWorker stopped"); + logger.info("State event loop service stopped"); } } @@ -135,6 +146,8 @@ public class StateEventResponseService { private void persist(StateEvent stateEvent) { try { if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { + logger.warn("Persist event into workflow execute thread error, " + + "cannot find the workflow instance from cache manager, event: {}", stateEvent); writeResponse(stateEvent, ExecutionStatus.FAILURE); return; } @@ -152,7 +165,7 @@ public class StateEventResponseService { workflowExecuteThreadPool.submitStateEvent(stateEvent); writeResponse(stateEvent, ExecutionStatus.SUCCESS); } catch (Exception e) { - logger.error("persist event queue error, event: {}", stateEvent, e); + logger.error("Persist event queue error, event: {}", stateEvent, e); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java index 593567c6eb..3bf249ed3e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; @@ -71,11 +72,13 @@ public class TaskExecuteRunnable implements Runnable { while (!this.events.isEmpty()) { TaskEvent event = this.events.peek(); try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId()); persist(event); } catch (Exception e) { logger.error("persist error, event:{}, error: {}", event, e); } finally { this.events.remove(event); + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java index 323ea86411..68e4511c42 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java @@ -115,11 +115,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onFailure(Throwable ex) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); - logger.error("persist event failed processInstanceId: {}", processInstanceId, ex); + logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); - logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", - processInstanceId); + logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } @@ -127,11 +127,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onSuccess(Object result) { Integer processInstanceId = taskExecuteThread.getProcessInstanceId(); - logger.info("persist events succeeded, processInstanceId: {}", processInstanceId); + logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId); if (!processInstanceExecCacheManager.contains(processInstanceId)) { taskExecuteThreadMap.remove(processInstanceId); - logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}", - processInstanceId); + logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap", + processInstanceId); } multiThreadFilterMap.remove(taskExecuteThread.getKey()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 5cf1c945af..771bac4a7a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -99,7 +99,6 @@ public class MasterRegistryClient { registryClient)); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener()); } catch (Exception e) { - logger.error("master start up exception", e); throw new RegistryException("Master registry client start up error", e); } } @@ -186,7 +185,7 @@ public class MasterRegistryClient { * Registry the current master server itself to registry. */ void registry() { - logger.info("master node : {} registering to registry center...", masterAddress); + logger.info("Master node : {} registering to registry center", masterAddress); String localNodePath = getCurrentNodePath(); int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, @@ -201,7 +200,7 @@ public class MasterRegistryClient { registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) { - logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost()); + logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost()); ThreadUtils.sleep(SLEEP_TIME_MILLIS); } @@ -212,9 +211,7 @@ public class MasterRegistryClient { registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", - masterAddress, - masterHeartbeatInterval); + logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval); } @@ -223,12 +220,12 @@ public class MasterRegistryClient { String address = getLocalAddress(); String localNodePath = getCurrentNodePath(); registryClient.remove(localNodePath); - logger.info("master node : {} unRegistry to register center.", address); + logger.info("Master node : {} unRegistry to register center.", address); heartBeatExecutor.shutdown(); - logger.info("heartbeat executor shutdown"); + logger.info("MasterServer heartbeat executor shutdown"); registryClient.close(); } catch (Exception e) { - logger.error("remove registry path exception ", e); + logger.error("MasterServer remove registry path exception ", e); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 97c67d8493..3ce7d4e240 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import java.util.concurrent.TimeUnit; @@ -49,25 +50,36 @@ public class EventExecuteService extends BaseDaemonThread { @Override public synchronized void start() { + logger.info("Master Event execute service starting"); super.start(); + logger.info("Master Event execute service started"); } @Override public void run() { - logger.info("Event service started"); while (Stopper.isRunning()) { try { eventHandler(); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); + } catch (InterruptedException interruptedException) { + logger.warn("Master event service interrupted, will exit this loop", interruptedException); + Thread.currentThread().interrupt(); + break; } catch (Exception e) { - logger.error("Event service thread error", e); + logger.error("Master event execute service error", e); } } } private void eventHandler() { for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { - workflowExecuteThreadPool.executeEvent(workflowExecuteThread); + try { + LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); + workflowExecuteThreadPool.executeEvent(workflowExecuteThread); + + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 1c6ea144e4..d6f3937f4b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -49,7 +49,9 @@ public class FailoverExecuteThread extends BaseDaemonThread { @Override public synchronized void start() { + logger.info("Master failover thread staring"); super.start(); + logger.info("Master failover thread stared"); } @Override @@ -57,14 +59,13 @@ public class FailoverExecuteThread extends BaseDaemonThread { // when startup, wait 10s for ready ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10); - logger.info("failover execute thread started"); while (Stopper.isRunning()) { try { // todo: DO we need to schedule a task to do this kind of check // This kind of check may only need to be executed when a master server start failoverService.checkMasterFailover(); } catch (Exception e) { - logger.error("failover execute error", e); + logger.error("Master failover thread execute error", e); } finally { ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 94991a50c3..023d871355 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; @@ -111,20 +112,23 @@ public class MasterSchedulerService extends BaseDaemonThread { * constructor of MasterSchedulerService */ public void init() { - this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads()); + this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); } @Override public synchronized void start() { + logger.info("Master schedule service starting.."); this.stateWheelExecuteThread.start(); super.start(); + logger.info("Master schedule service started..."); } public void close() { + logger.info("Master schedule service stopping..."); nettyRemotingClient.close(); - logger.info("master schedule service stopped..."); + logger.info("Master schedule service stopped..."); } /** @@ -132,7 +136,6 @@ public class MasterSchedulerService extends BaseDaemonThread { */ @Override public void run() { - logger.info("master scheduler started"); while (Stopper.isRunning()) { try { boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()); @@ -142,8 +145,12 @@ public class MasterSchedulerService extends BaseDaemonThread { continue; } scheduleProcess(); + } catch (InterruptedException interruptedException) { + logger.warn("Master schedule service interrupted, close the loop", interruptedException); + Thread.currentThread().interrupt(); + break; } catch (Exception e) { - logger.error("master scheduler thread error", e); + logger.error("Master schedule service loop command error", e); } } } @@ -152,7 +159,7 @@ public class MasterSchedulerService extends BaseDaemonThread { * 1. get command by slot * 2. donot handle command if slot is empty */ - private void scheduleProcess() throws Exception { + private void scheduleProcess() throws InterruptedException { List commands = findCommands(); if (CollectionUtils.isEmpty(commands)) { //indicate that no command ,sleep for 1s @@ -167,8 +174,10 @@ public class MasterSchedulerService extends BaseDaemonThread { MasterServerMetrics.incMasterConsumeCommand(commands.size()); for (ProcessInstance processInstance : processInstances) { - - WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + logger.info("Master schedule service starting workflow instance"); + WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable( processInstance , processService , nettyExecutorManager @@ -176,15 +185,21 @@ public class MasterSchedulerService extends BaseDaemonThread { , masterConfig , stateWheelExecuteThread); - this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); - if (processInstance.getTimeout() > 0) { - stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); + this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); + if (processInstance.getTimeout() > 0) { + stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); + } + workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable); + logger.info("Master schedule service started workflow instance"); + + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } - workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable); } } - private List command2ProcessInstance(List commands) { + private List command2ProcessInstance(List commands) throws InterruptedException { + logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size()); List processInstances = Collections.synchronizedList(new ArrayList<>(commands.size())); CountDownLatch latch = new CountDownLatch(commands.size()); for (final Command command : commands) { @@ -193,7 +208,7 @@ public class MasterSchedulerService extends BaseDaemonThread { // slot check again SlotCheckState slotCheckState = slotCheck(command); if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { - logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState); + logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); return; } ProcessInstance processInstance = processService.handleCommand(logger, @@ -201,10 +216,10 @@ public class MasterSchedulerService extends BaseDaemonThread { command); if (processInstance != null) { processInstances.add(processInstance); - logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId()); + logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); } } catch (Exception e) { - logger.error("handle command {} error ", command.getId(), e); + logger.error("Master handle command {} error ", command.getId(), e); processService.moveToErrorCommand(command, e.toString()); } finally { latch.countDown(); @@ -212,13 +227,10 @@ public class MasterSchedulerService extends BaseDaemonThread { }); } - try { - // make sure to finish handling command each time before next scan - latch.await(); - } catch (InterruptedException e) { - logger.error("countDownLatch await error ", e); - } - + // make sure to finish handling command each time before next scan + latch.await(); + logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}", + commands.size(), processInstances.size()); return processInstances; } @@ -231,6 +243,10 @@ public class MasterSchedulerService extends BaseDaemonThread { int masterCount = ServerNodeManager.getMasterSize(); if (masterCount > 0) { result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot); + if (CollectionUtils.isNotEmpty(result)) { + logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}", + result.size(), thisMasterSlot, masterCount); + } } } return result; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 12d404406c..53acb4ce83 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import lombok.NonNull; + /** * Check thread * 1. timeout task check @@ -110,10 +113,16 @@ public class StateWheelExecuteThread extends BaseDaemonThread { public void addProcess4TimeoutCheck(ProcessInstance processInstance) { processInstanceTimeoutCheckList.add(processInstance.getId()); + logger.info("Success add workflow instance into timeout check list"); } public void removeProcess4TimeoutCheck(ProcessInstance processInstance) { - processInstanceTimeoutCheckList.remove(processInstance.getId()); + boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId()); + if (removeFlag) { + logger.info("Success remove workflow instance from timeout check list"); + } else { + logger.warn("Failed to remove workflow instance from timeout check list"); + } } private void checkProcess4Timeout() { @@ -121,106 +130,95 @@ public class StateWheelExecuteThread extends BaseDaemonThread { return; } for (Integer processInstanceId : processInstanceTimeoutCheckList) { - if (processInstanceId == null) { - continue; - } WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId); + logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list"); processInstanceTimeoutCheckList.remove(processInstanceId); continue; } ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); if (processInstance == null) { + logger.warn("Check workflow timeout failed, the workflowInstance is null"); continue; } long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); if (timeRemain < 0) { + logger.info("Workflow instance timeout, adding timeout event"); addProcessTimeoutEvent(processInstance); processInstanceTimeoutCheckList.remove(processInstance.getId()); + logger.info("Workflow instance timeout, added timeout event"); } } } - public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } + logger.info("Adding task instance into timeout check list"); if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) { + logger.warn("Task instance is already in timeout check list"); return; } TaskDefinition taskDefinition = taskInstance.getTaskDefine(); if (taskDefinition == null) { - logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + logger.error("Failed to add task instance into timeout check list, taskDefinition is null"); return; } if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { taskInstanceTimeoutCheckList.add(taskInstanceKey); + logger.info("Timeout flag is open, added task instance into timeout check list"); } if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { taskInstanceTimeoutCheckList.add(taskInstanceKey); + logger.info("task instance is dependTask orSubProcess, added task instance into timeout check list"); } } - public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } taskInstanceTimeoutCheckList.remove(taskInstanceKey); + logger.info("remove task instance from timeout check list"); } - public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { + logger.info("Adding task instance into retry check list"); TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } if (taskInstanceRetryCheckList.contains(taskInstanceKey)) { + logger.warn("Task instance is already in retry check list"); return; } TaskDefinition taskDefinition = taskInstance.getTaskDefine(); if (taskDefinition == null) { - logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + logger.error("Add task instance into retry check list error, taskDefinition is null"); return; } - logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId()); taskInstanceRetryCheckList.add(taskInstanceKey); + logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list", + processInstance.getId(), taskInstance.getId()); } - public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } taskInstanceRetryCheckList.remove(taskInstanceKey); + logger.info("remove task instance from retry check list"); } - public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { + logger.info("Adding task instance into state check list"); TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } if (taskInstanceStateCheckList.contains(taskInstanceKey)) { + logger.warn("Task instance is already in state check list"); return; } if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { taskInstanceStateCheckList.add(taskInstanceKey); + logger.info("Added task instance into state check list"); } } - public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) { + public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance); - if (taskInstanceKey == null) { - logger.error("taskInstanceKey is null"); - return; - } taskInstanceStateCheckList.remove(taskInstanceKey); + logger.info("Removed task instance from state check list"); } private void checkTask4Timeout() { @@ -228,30 +226,35 @@ public class StateWheelExecuteThread extends BaseDaemonThread { return; } for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) { - int processInstanceId = taskInstanceKey.getProcessInstanceId(); - long taskCode = taskInstanceKey.getTaskCode(); + try { + int processInstanceId = taskInstanceKey.getProcessInstanceId(); + LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + long taskCode = taskInstanceKey.getTaskCode(); - WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceTimeoutCheckList.remove(taskInstanceKey); - continue; - } - Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); - if (!taskInstanceOptional.isPresent()) { - logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceTimeoutCheckList.remove(taskInstanceKey); - continue; - } - TaskInstance taskInstance = taskInstanceOptional.get(); - if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { - long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (timeRemain < 0) { - addTaskTimeoutEvent(taskInstance); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task"); + taskInstanceTimeoutCheckList.remove(taskInstanceKey); + continue; + } + Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); + if (!taskInstanceOptional.isPresent()) { + logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}" + + "will remove this check task", taskCode); taskInstanceTimeoutCheckList.remove(taskInstanceKey); + continue; + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { + long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); + if (timeRemain < 0) { + logger.info("Task instance is timeout, adding task timeout event and remove the check"); + addTaskTimeoutEvent(taskInstance); + taskInstanceTimeoutCheckList.remove(taskInstanceKey); + } } + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } } } @@ -264,41 +267,46 @@ public class StateWheelExecuteThread extends BaseDaemonThread { for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); - WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceRetryCheckList.remove(taskInstanceKey); - continue; - } + if (workflowExecuteThread == null) { + logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, " + + "will remove this check task"); + taskInstanceRetryCheckList.remove(taskInstanceKey); + continue; + } - Optional taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); - ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + Optional taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); + ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); - if (processInstance.getState() == ExecutionStatus.READY_STOP) { - addProcessStopEvent(processInstance); - taskInstanceRetryCheckList.remove(taskInstanceKey); - break; - } + if (processInstance.getState() == ExecutionStatus.READY_STOP) { + logger.warn("The process instance is ready to stop, will send process stop event and remove the check task"); + addProcessStopEvent(processInstance); + taskInstanceRetryCheckList.remove(taskInstanceKey); + break; + } - if (!taskInstanceOptional.isPresent()) { - logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceRetryCheckList.remove(taskInstanceKey); - continue; - } + if (!taskInstanceOptional.isPresent()) { + logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check"); + taskInstanceRetryCheckList.remove(taskInstanceKey); + continue; + } - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.retryTaskIntervalOverTime()) { - // reset taskInstance endTime and state - // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance - taskInstance.setEndTime(null); - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.retryTaskIntervalOverTime()) { + // reset taskInstance endTime and state + // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance + taskInstance.setEndTime(null); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - addTaskRetryEvent(taskInstance); - taskInstanceRetryCheckList.remove(taskInstanceKey); + addTaskRetryEvent(taskInstance); + taskInstanceRetryCheckList.remove(taskInstanceKey); + } + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } } } @@ -311,25 +319,29 @@ public class StateWheelExecuteThread extends BaseDaemonThread { int processInstanceId = taskInstanceKey.getProcessInstanceId(); long taskCode = taskInstanceKey.getTaskCode(); - WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread == null) { - logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceStateCheckList.remove(taskInstanceKey); - continue; - } - Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); - if (!taskInstanceOptional.isPresent()) { - logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}", - processInstanceId, taskCode); - taskInstanceStateCheckList.remove(taskInstanceKey); - continue; - } - TaskInstance taskInstance = taskInstanceOptional.get(); - if (taskInstance.getState().typeIsFinished()) { - continue; + try { + LoggerUtils.setTaskInstanceIdMDC(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task"); + taskInstanceStateCheckList.remove(taskInstanceKey); + continue; + } + Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); + if (!taskInstanceOptional.isPresent()) { + logger.warn( + "Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event"); + taskInstanceStateCheckList.remove(taskInstanceKey); + continue; + } + TaskInstance taskInstance = taskInstanceOptional.get(); + if (taskInstance.getState().typeIsFinished()) { + continue; + } + addTaskStateChangeEvent(taskInstance); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } - addTaskStateChangeEvent(taskInstance); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 07e0f14c11..9c5f4062f7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.Command; @@ -271,12 +272,16 @@ public class WorkflowExecuteRunnable implements Runnable { while (!this.stateEvents.isEmpty()) { try { StateEvent stateEvent = this.stateEvents.peek(); + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); if (stateEventHandler(stateEvent)) { this.stateEvents.remove(stateEvent); } } catch (Exception e) { logger.error("state handle error:", e); + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } + } } @@ -775,7 +780,16 @@ public class WorkflowExecuteRunnable implements Runnable { if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) { cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); } - cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null)); + + if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { + cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, + cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST) + .substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA) + 1)); + } + + if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) { + cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null)); + } command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setTaskDependType(processInstance.getTaskDependType()); command.setFailureStrategy(processInstance.getFailureStrategy()); @@ -962,8 +976,12 @@ public class WorkflowExecuteRunnable implements Runnable { // reset global params while there are start parameters setGlobalParamIfCommanded(processDefinition, cmdParam); - Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); - Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + Date start = null; + Date end = null; + if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){ + start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); + end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); + } List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode()); if (complementListDate.isEmpty() && needComplementProcess()) { complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 92ff7c04f1..031f8986ea 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -48,6 +49,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback; import com.google.common.base.Strings; +import lombok.NonNull; + /** * Used to execute {@link WorkflowExecuteRunnable}, when */ @@ -79,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @PostConstruct private void init() { this.setDaemon(true); - this.setThreadNamePrefix("Workflow-Execute-Thread-"); + this.setThreadNamePrefix("WorkflowExecuteThread-"); this.setMaxPoolSize(masterConfig.getExecThreads()); this.setCorePoolSize(masterConfig.getExecThreads()); } @@ -90,10 +93,11 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { public void submitStateEvent(StateEvent stateEvent) { WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); if (workflowExecuteThread == null) { - logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent); + logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent); return; } workflowExecuteThread.addStateEvent(stateEvent); + logger.info("Submit state event success, stateEvent: {}", stateEvent); } /** @@ -112,7 +116,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { return; } if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { - logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey()); + logger.warn("The workflow has been executed by another thread"); return; } multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); @@ -121,24 +125,31 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { - logger.error("handle events {} failed", processInstanceId, ex); - multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + try { + logger.error("Workflow instance events handle failed", ex); + multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); + } } @Override public void onSuccess(Object result) { try { + LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); if (workflowExecuteThread.workFlowFinish()) { stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance()); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); notifyProcessChanged(workflowExecuteThread.getProcessInstance()); - logger.info("process instance {} finished.", processInstanceId); + logger.info("Workflow instance is finished."); } } catch (Exception e) { - logger.error("handle events {} success, but notify changed error", processInstanceId, e); + logger.error("Workflow instance is finished, but notify changed error", e); } finally { // make sure the process has been removed from multiThreadFilterMap multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + LoggerUtils.removeWorkflowInstanceIdMDC(); } } }); @@ -167,9 +178,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * notify myself */ - private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { - logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); + private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { if (!processInstanceExecCacheManager.contains(processInstance.getId())) { + logger.warn("The execute cache manager doesn't contains this workflow instance"); return; } StateEvent stateEvent = new StateEvent(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 41c2bd56d3..4c732a0325 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -30,9 +30,12 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.experimental.UtilityClass; + /** * the factory to create task processor */ +@UtilityClass public final class TaskProcessorFactory { private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); @@ -46,7 +49,7 @@ public final class TaskProcessorFactory { try { PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor) iTaskProcessor.getClass().getConstructor()); } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("The task processor should has a no args constructor"); + throw new IllegalArgumentException("The task processor should has a no args constructor", e); } } } @@ -57,7 +60,6 @@ public final class TaskProcessorFactory { } Constructor iTaskProcessorConstructor = PROCESS_MAP.get(type); if (iTaskProcessorConstructor == null) { - logger.warn("ITaskProcessor could not found for taskType: {}", type); iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR); } @@ -74,7 +76,4 @@ public final class TaskProcessorFactory { return PROCESS_MAP.containsKey(type); } - private TaskProcessorFactory() { - throw new UnsupportedOperationException("TaskProcessorFactory cannot be instantiated"); - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java index 3cc92fb905..94f33ccd83 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java @@ -17,8 +17,7 @@ package org.apache.dolphinscheduler.server.master.service; -import io.micrometer.core.annotation.Counted; -import io.micrometer.core.annotation.Timed; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -55,6 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import io.micrometer.core.annotation.Counted; +import io.micrometer.core.annotation.Timed; + /** * failover service */ @@ -66,12 +68,14 @@ public class FailoverService { private final ProcessService processService; private final WorkflowExecuteThreadPool workflowExecuteThreadPool; - public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService, + public FailoverService(RegistryClient registryClient, + MasterConfig masterConfig, + ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool) { - this.registryClient = registryClient; - this.masterConfig = masterConfig; - this.processService = processService; - this.workflowExecuteThreadPool = workflowExecuteThreadPool; + this.registryClient = checkNotNull(registryClient); + this.masterConfig = checkNotNull(masterConfig); + this.processService = checkNotNull(processService); + this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool); } /** @@ -84,7 +88,7 @@ public class FailoverService { if (CollectionUtils.isEmpty(hosts)) { return; } - LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts); + LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts); for (String host : hosts) { failoverMasterWithLock(host); @@ -274,7 +278,7 @@ public class FailoverService { while (iterator.hasNext()) { String host = iterator.next(); if (registryClient.checkNodeExists(host, NodeType.MASTER)) { - if (!host.equals(getLocalAddress())) { + if (!getLocalAddress().equals(host)) { iterator.remove(); } } @@ -294,7 +298,7 @@ public class FailoverService { boolean taskNeedFailover = true; if (taskInstance == null) { - LOGGER.error("failover task instance error, taskInstance is null"); + LOGGER.error("Master failover task instance error, taskInstance is null"); return false; } diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index 2e9cb45ada..f1a2c1d51b 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -21,7 +21,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 @@ -57,7 +57,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java index f5d7f935b3..ce6ad57035 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import java.util.concurrent.atomic.AtomicBoolean; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,7 @@ public class AlertClientService implements AutoCloseable { private final NettyRemotingClient client; - private volatile boolean isRunning; + private final AtomicBoolean isRunning; private String host; @@ -53,16 +55,14 @@ public class AlertClientService implements AutoCloseable { public AlertClientService() { this.clientConfig = new NettyClientConfig(); this.client = new NettyRemotingClient(clientConfig); - this.isRunning = true; + this.isRunning = new AtomicBoolean(true); } /** * alert client */ public AlertClientService(String host, int port) { - this.clientConfig = new NettyClientConfig(); - this.client = new NettyRemotingClient(clientConfig); - this.isRunning = true; + this(); this.host = host; this.port = port; } @@ -72,9 +72,14 @@ public class AlertClientService implements AutoCloseable { */ @Override public void close() { + if (isRunning.compareAndSet(true, false)) { + logger.warn("Alert client is already closed"); + return; + } + + logger.info("Alter client closing"); this.client.close(); - this.isRunning = false; - logger.info("alter client closed"); + logger.info("Alter client closed"); } /** @@ -116,6 +121,6 @@ public class AlertClientService implements AutoCloseable { } public boolean isRunning() { - return isRunning; + return isRunning.get(); } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java index 7dffe9c204..289e8ddbfb 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; @@ -93,7 +94,7 @@ public class TaskPluginManager { logger.info("Registering task plugin: {}", name); if (!names.add(name)) { - throw new IllegalStateException(format("Duplicate task plugins named '%s'", name)); + throw new TaskPluginException(format("Duplicate task plugins named '%s'", name)); } loadTaskChannel(factory); @@ -106,7 +107,7 @@ public class TaskPluginManager { PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson); int count = pluginDao.addOrUpdatePluginDefine(pluginDefine); if (count <= 0) { - throw new RuntimeException("Failed to update task plugin: " + name); + throw new TaskPluginException("Failed to update task plugin: " + name); } }); } diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index d6ef76fa9e..0cdeabde5e 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -22,7 +22,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 @@ -63,7 +63,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n UTF-8 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java index d052075988..528dfa4058 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java @@ -25,6 +25,8 @@ import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import lombok.NonNull; + public final class ProcessUtils { private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class); @@ -46,13 +48,13 @@ public final class ProcessUtils { /** * kill tasks according to different task types. */ - public static void kill(TaskExecutionContext request) { + public static boolean kill(@NonNull TaskExecutionContext request) { try { + logger.info("Begin kill task instance, processId: {}", request.getProcessId()); int processId = request.getProcessId(); if (processId == 0) { - logger.error("process kill failed, process id :{}, task id:{}", - processId, request.getTaskInstanceId()); - return; + logger.error("Task instance kill failed, processId is not exist"); + return false; } String cmd = String.format("kill -9 %s", getPidsStr(processId)); @@ -60,8 +62,11 @@ public final class ProcessUtils { logger.info("process id:{}, cmd:{}", processId, cmd); OSUtils.exeCmd(cmd); + logger.info("Success kill task instance, processId: {}", request.getProcessId()); + return true; } catch (Exception e) { - logger.error("kill task failed", e); + logger.error("Kill task instance error, processId: {}", request.getProcessId(), e); + return false; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java new file mode 100644 index 0000000000..246835260b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api; + +public class TaskPluginException extends RuntimeException { + + public TaskPluginException(String message) { + super(message); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 8ac528dee4..93dc6309db 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -21,18 +21,11 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; -import org.apache.dolphinscheduler.remote.NettyRemotingServer; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; -import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; -import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor; -import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; -import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; +import org.apache.dolphinscheduler.server.worker.prc.WorkerRpcServer; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; @@ -65,17 +58,6 @@ public class WorkerServer implements IStoppable { */ private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); - /** - * netty remote server - */ - private NettyRemotingServer nettyRemotingServer; - - /** - * worker config - */ - @Autowired - private WorkerConfig workerConfig; - /** * spring application context * only use it for initialization @@ -105,22 +87,7 @@ public class WorkerServer implements IStoppable { private TaskPluginManager taskPluginManager; @Autowired - private TaskExecuteProcessor taskExecuteProcessor; - - @Autowired - private TaskKillProcessor taskKillProcessor; - - @Autowired - private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; - - @Autowired - private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor; - - @Autowired - private HostUpdateProcessor hostUpdateProcessor; - - @Autowired - private LoggerRequestProcessor loggerRequestProcessor; + private WorkerRpcServer workerRpcServer; /** * worker server startup, not use web service @@ -132,48 +99,19 @@ public class WorkerServer implements IStoppable { SpringApplication.run(WorkerServer.class); } - /** - * worker server run - */ @PostConstruct public void run() { - // init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(workerConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); - - // logger server - this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); - - this.nettyRemotingServer.start(); - - // install task plugin - this.taskPluginManager.installPlugin(); + this.workerRpcServer.start(); - // worker registry - try { - this.workerRegistryClient.registry(); - this.workerRegistryClient.setRegistryStoppable(this); - Set workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); + this.taskPluginManager.installPlugin(); - this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new RuntimeException(e); - } + this.workerRegistryClient.registry(); + this.workerRegistryClient.setRegistryStoppable(this); + Set workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); + this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); - // task execute manager this.workerManagerThread.start(); - // retry report task status this.retryReportTaskStatusThread.start(); /* @@ -181,7 +119,7 @@ public class WorkerServer implements IStoppable { */ Runtime.getRuntime().addShutdownHook(new Thread(() -> { if (Stopper.isRunning()) { - close("shutdownHook"); + close("WorkerServer shutdown hook"); } })); } @@ -189,24 +127,23 @@ public class WorkerServer implements IStoppable { public void close(String cause) { try { // execute only once - if (Stopper.isStopped()) { + // set stop signal is true + if (!Stopper.stop()) { + logger.warn("WorkerServer is already stopped, current cause: {}", cause); return; } - logger.info("worker server is stopping ..., cause : {}", cause); - - // set stop signal is true - Stopper.stop(); + logger.info("Worker server is stopping, current cause : {}", cause); try { // thread sleep 3 seconds for thread quitely stop - Thread.sleep(3000L); + Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); } catch (Exception e) { - logger.warn("thread sleep exception", e); + logger.warn("Worker server close wait error", e); } // close - this.nettyRemotingServer.close(); + this.workerRpcServer.close(); this.workerRegistryClient.unRegistry(); this.alertClientService.close(); @@ -215,8 +152,9 @@ public class WorkerServer implements IStoppable { // close the application context this.springApplicationContext.close(); + logger.info("Worker server stopped, current cause: {}", cause); } catch (Exception e) { - logger.error("worker server stop exception ", e); + logger.error("Worker server stop failed, current cause: {}", cause, e); } } @@ -230,15 +168,22 @@ public class WorkerServer implements IStoppable { */ public void killAllRunningTasks() { Collection taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList(); - logger.info("ready to kill all cache job, job size:{}", taskRequests.size()); - if (CollectionUtils.isEmpty(taskRequests)) { return; } - + logger.info("Worker begin to kill all cache task, task size: {}", taskRequests.size()); + int killNumber = 0; for (TaskExecutionContext taskRequest : taskRequests) { // kill task when it's not finished yet - org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest); + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId()); + if (ProcessUtils.kill(taskRequest)) { + killNumber++; + } + } finally { + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); + } } + logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java new file mode 100644 index 0000000000..acb0ac6508 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.prc; + +import org.apache.dolphinscheduler.remote.NettyRemotingServer; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; +import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor; + +import java.io.Closeable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class WorkerRpcServer implements Closeable { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class); + + @Autowired + private TaskExecuteProcessor taskExecuteProcessor; + + @Autowired + private TaskKillProcessor taskKillProcessor; + + @Autowired + private TaskRecallAckProcessor taskRecallAckProcessor; + + @Autowired + private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor; + + @Autowired + private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor; + + @Autowired + private HostUpdateProcessor hostUpdateProcessor; + + @Autowired + private LoggerRequestProcessor loggerRequestProcessor; + + @Autowired + private WorkerConfig workerConfig; + + private NettyRemotingServer nettyRemotingServer; + + public void start() { + LOGGER.info("Worker rpc server starting"); + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(workerConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor); + // logger server + this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor); + this.nettyRemotingServer.start(); + LOGGER.info("Worker rpc server started"); + } + + @Override + public void close() { + LOGGER.info("Worker rpc server closing"); + this.nettyRemotingServer.close(); + LOGGER.info("Worker rpc server closed"); + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 34382257d7..b355c6702b 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -90,15 +92,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { @Autowired private WorkerManagerThread workerManager; - @Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count") - @Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) + @Autowired(required = false) + private StorageOperate storageOperate; + + @Counted(value = "ds.task.execution.count", description = "task execute total count") + @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true) @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); - TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject( - command.getBody(), TaskExecuteRequestCommand.class); + TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(command.getBody(), + TaskExecuteRequestCommand.class); if (taskRequestCommand == null) { logger.error("task execute request command is null"); @@ -113,70 +118,98 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.error("task execution context is null"); return; } - TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); - - // set cache, it will be used when kill task - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); - - // todo custom logger - - taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); - taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); - - if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { - if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { - OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); - } - - // check if the OS user exists - if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { - logger.error("tenantCode: {} does not exist, taskInstanceId: {}", - taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskExecutionContext.setEndTime(new Date()); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); - return; - } - - // local execute path - String execLocalPath = getExecLocalPath(taskExecutionContext); - logger.info("task instance local execute path : {}", execLocalPath); - taskExecutionContext.setExecutePath(execLocalPath); - - try { - FileUtils.createWorkDirIfAbsent(execLocalPath); - } catch (Throwable ex) { - logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId()); - logger.error("create executeLocalPath fail", ex); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); - return; + try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + + TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); + + // set cache, it will be used when kill task + TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + + // todo custom logger + + taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); + taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); + + if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { + boolean osUserExistFlag; + //if Using distributed is true and Currently supported systems are linux,Should not let it automatically + //create tenants,so TenantAutoCreate has no effect + if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) { + //use the id command to judge in linux + osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode()); + } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { + // if not exists this user, then create + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); + } else { + osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode()); + } + if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { + if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) { + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + } + + // check if the OS user exists + if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { + logger.error("tenantCode: {} does not exist, taskInstanceId: {}", + taskExecutionContext.getTenantCode(), + taskExecutionContext.getTaskInstanceId()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskExecutionContext.setEndTime(new Date()); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; + } + + // local execute path + String execLocalPath = getExecLocalPath(taskExecutionContext); + logger.info("task instance local execute path : {}", execLocalPath); + taskExecutionContext.setExecutePath(execLocalPath); + + try { + FileUtils.createWorkDirIfAbsent(execLocalPath); + } catch (Throwable ex) { + logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", + execLocalPath, + taskExecutionContext.getTaskInstanceId()); + logger.error("create executeLocalPath fail", ex); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + return; + } + } + + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + + // delay task process + long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), + taskExecutionContext.getDelayTime() * 60L); + if (remainTime > 0) { + logger.info("delay the execution of task instance {}, delay time: {} s", + taskExecutionContext.getTaskInstanceId(), + remainTime); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setStartTime(null); + taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext); + } + + // submit task to manager + boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, + taskCallbackService, + alertClientService, + taskPluginManager)); + if (!offer) { + logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", + workerManager.getDelayQueueSize(), + taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); + taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); + } } - } - - taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); - - // delay task process - long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L); - if (remainTime > 0) { - logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); - taskExecutionContext.setStartTime(null); - taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext); - } - - // submit task to manager - boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager)); - if (!offer) { - logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", - workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); - taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); - taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); - } - } /** * get execute local path diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java index 9d74dc5dcc..421e92a530 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -44,19 +45,23 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject( - command.getBody(), TaskExecuteRunningAckCommand.class); - + command.getBody(), TaskExecuteRunningAckCommand.class); if (runningAckCommand == null) { logger.error("task execute running ack command is null"); return; } - logger.info("task execute running ack command : {}", runningAckCommand); + try { + LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId()); + logger.info("task execute running ack command : {}", runningAckCommand); - if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { - ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId()); + if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { + ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId()); + } + } finally { + LoggerUtils.removeTaskInstanceIdMDC(); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index fc737ca1de..0e48465a07 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -47,9 +47,11 @@ public class RetryReportTaskStatusThread implements Runnable { private TaskCallbackService taskCallbackService; public void start() { + logger.info("Retry report task status thread starting"); Thread thread = new Thread(this, "RetryReportTaskStatusThread"); thread.setDaemon(true); thread.start(); + logger.info("Retry report task status thread started"); } /** @@ -83,7 +85,7 @@ public class RetryReportTaskStatusThread implements Runnable { } } } catch (Exception e) { - logger.warn("retry report task status error", e); + logger.warn("Retry report task status error", e); } } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index e6ff2b5653..ae89e9e946 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -76,14 +76,6 @@ public class TaskExecuteThread implements Runnable, Delayed { */ private TaskExecutionContext taskExecutionContext; - public StorageOperate getStorageOperate() { - return storageOperate; - } - - public void setStorageOperate(StorageOperate storageOperate) { - this.storageOperate = storageOperate; - } - private StorageOperate storageOperate; /** @@ -107,24 +99,28 @@ public class TaskExecuteThread implements Runnable, Delayed { * constructor * * @param taskExecutionContext taskExecutionContext - * @param taskCallbackService taskCallbackService + * @param taskCallbackService taskCallbackService */ public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, - AlertClientService alertClientService) { + AlertClientService alertClientService, + StorageOperate storageOperate) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; + this.storageOperate = storageOperate; } public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, AlertClientService alertClientService, - TaskPluginManager taskPluginManager) { + TaskPluginManager taskPluginManager, + StorageOperate storageOperate) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; this.taskPluginManager = taskPluginManager; + this.storageOperate = storageOperate; } @Override @@ -139,6 +135,7 @@ public class TaskExecuteThread implements Runnable, Delayed { } try { + LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); logger.info("script path : {}", taskExecutionContext.getExecutePath()); if (taskExecutionContext.getStartTime() == null) { taskExecutionContext.setStartTime(new Date()); @@ -151,7 +148,7 @@ public class TaskExecuteThread implements Runnable, Delayed { // copy hdfs/minio file to local List> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); - if (!fileDownloads.isEmpty()){ + if (!fileDownloads.isEmpty()) { downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); } @@ -211,6 +208,7 @@ public class TaskExecuteThread implements Runnable, Delayed { TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); clearTaskExecPath(); + LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } @@ -308,11 +306,12 @@ public class TaskExecuteThread implements Runnable, Delayed { /** * download resource check + * * @param execLocalPath * @param projectRes * @return */ - public List> downloadCheck(String execLocalPath, Map projectRes){ + public List> downloadCheck(String execLocalPath, Map projectRes) { if (MapUtils.isEmpty(projectRes)) { return Collections.emptyList(); } @@ -320,13 +319,13 @@ public class TaskExecuteThread implements Runnable, Delayed { projectRes.forEach((key, value) -> { File resFile = new File(execLocalPath, key); boolean notExist = !resFile.exists(); - if (notExist){ + if (notExist) { downloadFile.add(Pair.of(key, value)); - } else{ + } else { logger.info("file : {} exists ", resFile.getName()); } }); - if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){ + if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) { throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); } return downloadFile; diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 2702d4af42..ae735f4338 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -24,13 +24,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,9 +129,11 @@ public class WorkerManagerThread implements Runnable { } public void start() { + logger.info("Worker manager thread starting"); Thread thread = new Thread(this, this.getClass().getName()); thread.setDaemon(true); thread.start(); + logger.info("Worker manager thread started"); } @Override diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index c6d8ee415d..571a3addd2 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -22,7 +22,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 @@ -58,7 +58,7 @@ - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n UTF-8 diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java index 2bb23cad6a..13c533f1cf 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -53,7 +54,7 @@ import org.slf4j.Logger; */ @RunWith(PowerMockRunner.class) @PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class, - JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) + JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class}) @Ignore public class TaskExecuteProcessorTest { @@ -63,6 +64,8 @@ public class TaskExecuteProcessorTest { private ExecutorService workerExecService; + private StorageOperate storageOperate; + private WorkerConfig workerConfig; private Command command; @@ -99,19 +102,23 @@ public class TaskExecuteProcessorTest { PowerMockito.mockStatic(SpringApplicationContext.class); PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)) - .thenReturn(taskCallbackService); + .thenReturn(taskCallbackService); PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) - .thenReturn(workerConfig); + .thenReturn(workerConfig); workerManager = PowerMockito.mock(WorkerManagerThread.class); - PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE); + + storageOperate = PowerMockito.mock(StorageOperate.class); + PowerMockito.when( + workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate))) + .thenReturn(Boolean.TRUE); PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)) - .thenReturn(workerManager); + .thenReturn(workerManager); PowerMockito.mockStatic(ThreadUtils.class); PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads())) - .thenReturn(workerExecService); + .thenReturn(workerExecService); PowerMockito.mockStatic(JsonSerializer.class); PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class)) @@ -125,16 +132,17 @@ public class TaskExecuteProcessorTest { PowerMockito.mockStatic(FileUtils.class); PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())) - .thenReturn(taskExecutionContext.getExecutePath()); + taskExecutionContext.getProcessDefineCode(), + taskExecutionContext.getProcessDefineVersion(), + taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId())) + .thenReturn(taskExecutionContext.getExecutePath()); PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath()); - SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService); + SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread( + null, null, null, alertClientService, storageOperate); PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments() - .thenReturn(simpleTaskExecuteThread); + .thenReturn(simpleTaskExecuteThread); } @Test @@ -172,8 +180,12 @@ public class TaskExecuteProcessorTest { private static class SimpleTaskExecuteThread extends TaskExecuteThread { - public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) { - super(taskExecutionContext, taskCallbackService, alertClientService); + public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, + TaskCallbackService taskCallbackService, + Logger taskLogger, + AlertClientService alertClientService, + StorageOperate storageOperate) { + super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate); } @Override diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index f847690a6c..90577111b9 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -17,12 +17,20 @@ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; + +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,11 +39,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - @RunWith(PowerMockRunner.class) public class TaskExecuteThreadTest { @@ -50,20 +53,24 @@ public class TaskExecuteThreadTest { @Mock private AlertClientService alertClientService; + @Mock + private StorageOperate storageOperate; + @Mock private TaskPluginManager taskPluginManager; @Test - public void checkTest(){ - TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager); + public void checkTest() { + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, + alertClientService, taskPluginManager, storageOperate); String path = "/"; Map projectRes = new HashMap<>(); projectRes.put("shell", "shell.sh"); List> downloads = new ArrayList<>(); - try{ + try { downloads = taskExecuteThread.downloadCheck(path, projectRes); - }catch (Exception e){ + } catch (Exception e) { Assert.assertNotNull(e); } downloads.add(Pair.of("shell", "shell.sh"));