Browse Source

Optimize master log, use MDC to inject workflow instance id and task instance id in log (#10516)

* Optimize master log, add workflow instance id and task instance id in log

* Use MDC to set the workflow info in log4j

* Add workflowInstanceId and taskInstanceId in MDC

(cherry picked from commit db595b3eff)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
3ab9ee13fc
  1. 23
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 36
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 34
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java
  4. 18
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  5. 34
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  6. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  7. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  8. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
  9. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java
  10. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
  11. 23
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  12. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  13. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  14. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  15. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  16. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  17. 60
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  18. 222
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  19. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  20. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  21. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  22. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  23. 4
      dolphinscheduler-master/src/main/resources/logback-spring.xml
  24. 21
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
  25. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
  26. 4
      dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml
  27. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java
  28. 25
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java
  29. 117
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  30. 97
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java
  31. 169
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  32. 17
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java
  33. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
  34. 31
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  35. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  36. 4
      dolphinscheduler-worker/src/main/resources/logback-spring.xml
  37. 42
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  38. 27
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

23
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -49,7 +49,10 @@ public class AlertServer implements Closeable {
private final AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;
public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
public AlertServer(PluginDao pluginDao,
AlertSenderService alertSenderService,
AlertRequestProcessor alertRequestProcessor,
AlertConfig alertConfig) {
this.pluginDao = pluginDao;
this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
@ -68,11 +71,12 @@ public class AlertServer implements Closeable {
@EventListener
public void run(ApplicationReadyEvent readyEvent) {
logger.info("alert server starting...");
logger.info("Alert server is staring ...");
checkTable();
startServer();
alertSenderService.start();
logger.info("Alert server is started ...");
}
@Override
@ -89,24 +93,23 @@ public class AlertServer implements Closeable {
public void destroy(String cause) {
try {
// set stop signal is true
// execute only once
if (Stopper.isStopped()) {
if (!Stopper.stop()) {
logger.warn("AlterServer is already stopped");
return;
}
logger.info("alert server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
logger.info("Alert server is stopping, cause: {}", cause);
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(3000L);
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.nettyRemotingServer.close();
logger.info("Alter server stopped, cause: {}", cause);
} catch (Exception e) {
logger.error("alert server stop exception ", e);
logger.error("Alert server stop failed, cause: {}", cause, e);
}
}

36
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import java.time.Duration;
import java.util.regex.Pattern;
/**
@ -376,6 +377,8 @@ public final class Constants {
*/
public static final long SLEEP_TIME_MILLIS_SHORT = 100L;
public static final Duration SERVER_CLOSE_WAIT_TIME = Duration.ofSeconds(3);
/**
* one second mils
*/
@ -636,28 +639,31 @@ public final class Constants {
*/
public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path";
public static final String WORKFLOW_INSTANCE_ID_MDC_KEY = "workflowInstanceId";
public static final String TASK_INSTANCE_ID_MDC_KEY = "taskInstanceId";
/**
* task log info format
*/
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
public static final int[] NOT_TERMINATED_STATES = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITING_THREAD.ordinal(),
ExecutionStatus.WAITING_DEPEND.ordinal()
public static final int[] NOT_TERMINATED_STATES = new int[] {
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITING_THREAD.ordinal(),
ExecutionStatus.WAITING_DEPEND.ordinal()
};
public static final int[] RUNNING_PROCESS_STATE = new int[]{
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.SERIAL_WAIT.ordinal()
public static final int[] RUNNING_PROCESS_STATE = new int[] {
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.SERIAL_WAIT.ordinal()
};
/**

34
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/Stopper.java

@ -19,22 +19,40 @@ package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.experimental.UtilityClass;
/**
* if the process closes, a signal is placed as true, and all threads get this flag to stop working
* If the process closes, a signal is placed as true, and all threads get this flag to stop working.
*/
@UtilityClass
public class Stopper {
private static AtomicBoolean signal = new AtomicBoolean(false);
private static final AtomicBoolean stoppedSignal = new AtomicBoolean(false);
public static final boolean isStopped() {
return signal.get();
/**
* Return the flag if the Server is stopped.
*
* @return True, if the server is stopped; False, the server is still running.
*/
public static boolean isStopped() {
return stoppedSignal.get();
}
public static final boolean isRunning() {
return !signal.get();
/**
* Return the flag if the Server is stopped.
*
* @return True, if the server is running, False, the server is stopped.
*/
public static boolean isRunning() {
return !stoppedSignal.get();
}
public static final void stop() {
signal.set(true);
/**
* Stop the server
*
* @return True, if the server stopped success. False, if the server is already stopped.
*/
public static boolean stop() {
return stoppedSignal.compareAndSet(false, true);
}
}

18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -23,24 +23,28 @@ import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* thread utils
*/
import lombok.experimental.UtilityClass;
@UtilityClass
public class ThreadUtils {
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName,int threadsNum) {
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.build();
.setDaemon(true)
.setNameFormat(threadName)
.build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
/**
* Sleep in given mills, this is not accuracy.
*/
public static void sleep(final long millis) {
try {
Thread.sleep(millis);

34
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -32,16 +32,16 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import lombok.experimental.UtilityClass;
/**
* logger utils
*/
@UtilityClass
public class LoggerUtils {
private LoggerUtils() {
throw new UnsupportedOperationException("Construct LoggerUtils");
}
private static final Logger logger = LoggerFactory.getLogger(LoggerUtils.class);
/**
@ -109,4 +109,30 @@ public class LoggerUtils {
}
return "";
}
public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) {
setWorkflowInstanceIdMDC(workflowInstanceId);
setTaskInstanceIdMDC(taskInstanceId);
}
public static void setWorkflowInstanceIdMDC(int workflowInstanceId) {
MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
}
public static void setTaskInstanceIdMDC(int taskInstanceId) {
MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
}
public static void removeWorkflowAndTaskInstanceIdMDC() {
removeWorkflowInstanceIdMDC();
removeTaskInstanceIdMDC();
}
public static void removeWorkflowInstanceIdMDC() {
MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY);
}
public static void removeTaskInstanceIdMDC() {
MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY);
}
}

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
@ -103,7 +104,7 @@ public class MasterServer implements IStoppable {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
close("MasterServer shutdownHook");
}
}));
}
@ -116,23 +117,17 @@ public class MasterServer implements IStoppable {
public void close(String cause) {
try {
// set stop signal is true
// execute only once
if (Stopper.isStopped()) {
logger.warn("MasterServer has been stopped ..., current cause: {}", cause);
if (!Stopper.stop()) {
logger.warn("MasterServer is already stopped, current cause: {}", cause);
return;
}
logger.info("master server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
logger.info("Master server is stopping, current cause : {}", cause);
try {
// thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.masterSchedulerService.close();
this.masterRPCServer.close();
@ -141,9 +136,9 @@ public class MasterServer implements IStoppable {
// like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("MasterServer stopped...");
logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("master server stop exception ", e);
logger.error("MasterServer stop failed, current cause: {}", cause, e);
}
}

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -64,8 +65,15 @@ public class StateEventProcessor implements NettyRequestProcessor {
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
stateEvent.setType(type);
logger.info("received command : {}", stateEvent);
stateEventResponseService.addResponse(stateEvent);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
logger.info("Received state event change command, event: {}", stateEvent);
stateEventResponseService.addResponse(stateEvent);
}finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
@ -49,8 +50,8 @@ public class TaskEventProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_FORCE_STATE_EVENT_REQUEST == command.getType()
|| CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
, String.format("invalid command type: %s", command.getType()));
|| CommandType.TASK_WAKEUP_EVENT_REQUEST == command.getType()
, String.format("invalid command type: %s", command.getType()));
TaskEventChangeCommand taskEventChangeCommand = JSONUtils.parseObject(command.getBody(), TaskEventChangeCommand.class);
StateEvent stateEvent = new StateEvent();
@ -58,8 +59,13 @@ public class TaskEventProcessor implements NettyRequestProcessor {
stateEvent.setProcessInstanceId(taskEventChangeCommand.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEventChangeCommand.getTaskInstanceId());
stateEvent.setType(StateEventType.WAIT_TASK_GROUP);
logger.info("received command : {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
logger.info("Received task event change command, event: {}", stateEvent);
stateEventResponseService.addEvent2WorkflowExecute(stateEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -57,9 +58,14 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", taskExecuteResponseCommand);
TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
taskEventService.addEvent(taskResponseEvent);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
logger.info("Received task execute response, event: {}", taskResponseEvent);
taskEventService.addEvent(taskResponseEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java

@ -51,7 +51,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
logger.info("received task kill response command : {}", responseCommand);
logger.info("[TaskInstance-{}] Received task kill response command : {}",
responseCommand.getTaskInstanceId(), responseCommand);
}
}

23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -81,7 +82,13 @@ public class StateEventResponseService {
List<StateEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (StateEvent event : remainEvents) {
this.persist(event);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
this.persist(event);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
}
@ -93,7 +100,7 @@ public class StateEventResponseService {
try {
eventQueue.put(stateEvent);
} catch (InterruptedException e) {
logger.error("put state event : {} error :{}", stateEvent, e);
logger.error("Put state event : {} error", stateEvent, e);
Thread.currentThread().interrupt();
}
}
@ -109,18 +116,22 @@ public class StateEventResponseService {
@Override
public void run() {
logger.info("State event loop service started");
while (Stopper.isRunning()) {
try {
// if not task , blocking here
StateEvent stateEvent = eventQueue.take();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
persist(stateEvent);
} catch (InterruptedException e) {
logger.warn("State event loop service interrupted, will stop this loop", e);
Thread.currentThread().interrupt();
break;
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
logger.info("StateEventResponseWorker stopped");
logger.info("State event loop service stopped");
}
}
@ -135,6 +146,8 @@ public class StateEventResponseService {
private void persist(StateEvent stateEvent) {
try {
if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
logger.warn("Persist event into workflow execute thread error, "
+ "cannot find the workflow instance from cache manager, event: {}", stateEvent);
writeResponse(stateEvent, ExecutionStatus.FAILURE);
return;
}
@ -152,7 +165,7 @@ public class StateEventResponseService {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {
logger.error("persist event queue error, event: {}", stateEvent, e);
logger.error("Persist event queue error, event: {}", stateEvent, e);
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
@ -71,11 +72,13 @@ public class TaskExecuteRunnable implements Runnable {
while (!this.events.isEmpty()) {
TaskEvent event = this.events.peek();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
logger.error("persist error, event:{}, error: {}", event, e);
} finally {
this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -115,11 +115,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onFailure(Throwable ex) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@ -127,11 +127,11 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Override
public void onSuccess(Object result) {
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -99,7 +99,6 @@ public class MasterRegistryClient {
registryClient));
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
logger.error("master start up exception", e);
throw new RegistryException("Master registry client start up error", e);
}
}
@ -186,7 +185,7 @@ public class MasterRegistryClient {
* Registry the current master server itself to registry.
*/
void registry() {
logger.info("master node : {} registering to registry center...", masterAddress);
logger.info("Master node : {} registering to registry center", masterAddress);
String localNodePath = getCurrentNodePath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
@ -201,7 +200,7 @@ public class MasterRegistryClient {
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
logger.warn("The current master server node:{} cannot find in registry....", NetUtils.getHost());
logger.warn("The current master server node:{} cannot find in registry", NetUtils.getHost());
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
@ -212,9 +211,7 @@ public class MasterRegistryClient {
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0L, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s",
masterAddress,
masterHeartbeatInterval);
logger.info("Master node : {} registered to registry center successfully with heartBeatInterval : {}s", masterAddress, masterHeartbeatInterval);
}
@ -223,12 +220,12 @@ public class MasterRegistryClient {
String address = getLocalAddress();
String localNodePath = getCurrentNodePath();
registryClient.remove(localNodePath);
logger.info("master node : {} unRegistry to register center.", address);
logger.info("Master node : {} unRegistry to register center.", address);
heartBeatExecutor.shutdown();
logger.info("heartbeat executor shutdown");
logger.info("MasterServer heartbeat executor shutdown");
registryClient.close();
} catch (Exception e) {
logger.error("remove registry path exception ", e);
logger.error("MasterServer remove registry path exception ", e);
}
}

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import java.util.concurrent.TimeUnit;
@ -49,25 +50,36 @@ public class EventExecuteService extends BaseDaemonThread {
@Override
public synchronized void start() {
logger.info("Master Event execute service starting");
super.start();
logger.info("Master Event execute service started");
}
@Override
public void run() {
logger.info("Event service started");
while (Stopper.isRunning()) {
try {
eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (InterruptedException interruptedException) {
logger.warn("Master event service interrupted, will exit this loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Event service thread error", e);
logger.error("Master event execute service error", e);
}
}
}
private void eventHandler() {
for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -49,7 +49,9 @@ public class FailoverExecuteThread extends BaseDaemonThread {
@Override
public synchronized void start() {
logger.info("Master failover thread staring");
super.start();
logger.info("Master failover thread stared");
}
@Override
@ -57,14 +59,13 @@ public class FailoverExecuteThread extends BaseDaemonThread {
// when startup, wait 10s for ready
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
logger.info("failover execute thread started");
while (Stopper.isRunning()) {
try {
// todo: DO we need to schedule a task to do this kind of check
// This kind of check may only need to be executed when a master server start
failoverService.checkMasterFailover();
} catch (Exception e) {
logger.error("failover execute error", e);
logger.error("Master failover thread execute error", e);
} finally {
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * masterConfig.getFailoverInterval() * 60);
}

60
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@ -111,20 +112,23 @@ public class MasterSchedulerService extends BaseDaemonThread {
* constructor of MasterSchedulerService
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
@Override
public synchronized void start() {
logger.info("Master schedule service starting..");
this.stateWheelExecuteThread.start();
super.start();
logger.info("Master schedule service started...");
}
public void close() {
logger.info("Master schedule service stopping...");
nettyRemotingClient.close();
logger.info("master schedule service stopped...");
logger.info("Master schedule service stopped...");
}
/**
@ -132,7 +136,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
@Override
public void run() {
logger.info("master scheduler started");
while (Stopper.isRunning()) {
try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
@ -142,8 +145,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
continue;
}
scheduleProcess();
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule service interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("master scheduler thread error", e);
logger.error("Master schedule service loop command error", e);
}
}
}
@ -152,7 +159,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
* 1. get command by slot
* 2. donot handle command if slot is empty
*/
private void scheduleProcess() throws Exception {
private void scheduleProcess() throws InterruptedException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
@ -167,8 +174,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance");
WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
@ -176,15 +185,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
, masterConfig
, stateWheelExecuteThread);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
logger.info("Master schedule service started workflow instance");
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
}
}
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
@ -193,7 +208,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(logger,
@ -201,10 +216,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
}
} catch (Exception e) {
logger.error("handle command {} error ", command.getId(), e);
logger.error("Master handle command {} error ", command.getId(), e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
@ -212,13 +227,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
});
}
try {
// make sure to finish handling command each time before next scan
latch.await();
} catch (InterruptedException e) {
logger.error("countDownLatch await error ", e);
}
// make sure to finish handling command each time before next scan
latch.await();
logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
return processInstances;
}
@ -231,6 +243,10 @@ public class MasterSchedulerService extends BaseDaemonThread {
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
}
}
return result;

222
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.NonNull;
/**
* Check thread
* 1. timeout task check
@ -110,10 +113,16 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.add(processInstance.getId());
logger.info("Success add workflow instance into timeout check list");
}
public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.remove(processInstance.getId());
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
} else {
logger.warn("Failed to remove workflow instance from timeout check list");
}
}
private void checkProcess4Timeout() {
@ -121,106 +130,95 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return;
}
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
if (processInstanceId == null) {
continue;
}
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
processInstanceTimeoutCheckList.remove(processInstanceId);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
logger.warn("Check workflow timeout failed, the workflowInstance is null");
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
logger.info("Workflow instance timeout, adding timeout event");
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
logger.info("Workflow instance timeout, added timeout event");
}
}
}
public void addTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void addTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
logger.info("Adding task instance into timeout check list");
if (taskInstanceTimeoutCheckList.contains(taskInstanceKey)) {
logger.warn("Task instance is already in timeout check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
logger.error("Failed to add task instance into timeout check list, taskDefinition is null");
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
logger.info("Timeout flag is open, added task instance into timeout check list");
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceTimeoutCheckList.add(taskInstanceKey);
logger.info("task instance is dependTask orSubProcess, added task instance into timeout check list");
}
}
public void removeTask4TimeoutCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void removeTask4TimeoutCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
logger.info("remove task instance from timeout check list");
}
public void addTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void addTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
logger.info("Adding task instance into retry check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
if (taskInstanceRetryCheckList.contains(taskInstanceKey)) {
logger.warn("Task instance is already in retry check list");
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
logger.error("Add task instance into retry check list error, taskDefinition is null");
return;
}
logger.debug("addTask4RetryCheck, taskCode:{}, processInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getProcessInstanceId());
taskInstanceRetryCheckList.add(taskInstanceKey);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list",
processInstance.getId(), taskInstance.getId());
}
public void removeTask4RetryCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceRetryCheckList.remove(taskInstanceKey);
logger.info("remove task instance from retry check list");
}
public void addTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void addTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
logger.info("Adding task instance into state check list");
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
if (taskInstanceStateCheckList.contains(taskInstanceKey)) {
logger.warn("Task instance is already in state check list");
return;
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceStateCheckList.add(taskInstanceKey);
logger.info("Added task instance into state check list");
}
}
public void removeTask4StateCheck(ProcessInstance processInstance, TaskInstance taskInstance) {
public void removeTask4StateCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
TaskInstanceKey taskInstanceKey = TaskInstanceKey.getTaskInstanceKey(processInstance, taskInstance);
if (taskInstanceKey == null) {
logger.error("taskInstanceKey is null");
return;
}
taskInstanceStateCheckList.remove(taskInstanceKey);
logger.info("Removed task instance from state check list");
}
private void checkTask4Timeout() {
@ -228,30 +226,35 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return;
}
for (TaskInstanceKey taskInstanceKey : taskInstanceTimeoutCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
try {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("Check task instance timeout failed, can not find workflowExecuteThread from cache manager, will remove this check task");
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ "will remove this check task", taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
logger.info("Task instance is timeout, adding task timeout event and remove the check");
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstanceKey);
}
}
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@ -264,41 +267,46 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
for (TaskInstanceKey taskInstanceKey : taskInstanceRetryCheckList) {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
if (workflowExecuteThread == null) {
logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ "will remove this check task");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode);
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
addProcessStopEvent(processInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
break;
}
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
logger.warn("The process instance is ready to stop, will send process stop event and remove the check task");
addProcessStopEvent(processInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
break;
}
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
if (!taskInstanceOptional.isPresent()) {
logger.warn("Task instance retry check failed, can not find taskInstance from workflowExecuteThread, will remove this check");
taskInstanceRetryCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.retryTaskIntervalOverTime()) {
// reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
addTaskRetryEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
addTaskRetryEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey);
}
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
@ -311,25 +319,29 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
int processInstanceId = taskInstanceKey.getProcessInstanceId();
long taskCode = taskInstanceKey.getTaskCode();
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn("can not find taskInstance from workflowExecuteThread, this check event will remove, processInstanceId:{}, taskCode:{}",
processInstanceId, taskCode);
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
continue;
try {
LoggerUtils.setTaskInstanceIdMDC(processInstanceId);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("Task instance state check failed, can not find workflowExecuteThread from cache manager, will remove this check task");
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) {
logger.warn(
"Task instance state check failed, can not find taskInstance from workflowExecuteThread, will remove this check event");
taskInstanceStateCheckList.remove(taskInstanceKey);
continue;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
continue;
}
addTaskStateChangeEvent(taskInstance);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
addTaskStateChangeEvent(taskInstance);
}
}

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@ -271,12 +272,16 @@ public class WorkflowExecuteRunnable implements Runnable {
while (!this.stateEvents.isEmpty()) {
try {
StateEvent stateEvent = this.stateEvents.peek();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
if (stateEventHandler(stateEvent)) {
this.stateEvents.remove(stateEvent);
}
} catch (Exception e) {
logger.error("state handle error:", e);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}
@ -775,7 +780,16 @@ public class WorkflowExecuteRunnable implements Runnable {
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
}
cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss", null));
if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)
.substring(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).indexOf(COMMA) + 1));
}
if (cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null));
}
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setTaskDependType(processInstance.getTaskDependType());
command.setFailureStrategy(processInstance.getFailureStrategy());
@ -962,8 +976,12 @@ public class WorkflowExecuteRunnable implements Runnable {
// reset global params while there are start parameters
setGlobalParamIfCommanded(processDefinition, cmdParam);
Date start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
Date start = null;
Date end = null;
if(cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE) && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_END_DATE)){
start = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
end = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
}
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(processInstance.getProcessDefinitionCode());
if (complementListDate.isEmpty() && needComplementProcess()) {
complementListDate = CronUtils.getSelfFireDateList(start, end, schedules);

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -48,6 +49,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
import lombok.NonNull;
/**
* Used to execute {@link WorkflowExecuteRunnable}, when
*/
@ -79,7 +82,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
@PostConstruct
private void init() {
this.setDaemon(true);
this.setThreadNamePrefix("Workflow-Execute-Thread-");
this.setThreadNamePrefix("WorkflowExecuteThread-");
this.setMaxPoolSize(masterConfig.getExecThreads());
this.setCorePoolSize(masterConfig.getExecThreads());
}
@ -90,10 +93,11 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("workflowExecuteThread is null, stateEvent:{}", stateEvent);
logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", stateEvent);
return;
}
workflowExecuteThread.addStateEvent(stateEvent);
logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
/**
@ -112,7 +116,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
logger.warn("The workflow:{} has been executed by another thread", workflowExecuteThread.getKey());
logger.warn("The workflow has been executed by another thread");
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
@ -121,24 +125,31 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error("handle events {} failed", processInstanceId, ex);
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
try {
logger.error("Workflow instance events handle failed", ex);
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
@Override
public void onSuccess(Object result) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("process instance {} finished.", processInstanceId);
logger.info("Workflow instance is finished.");
}
} catch (Exception e) {
logger.error("handle events {} success, but notify changed error", processInstanceId, e);
logger.error("Workflow instance is finished, but notify changed error", e);
} finally {
// make sure the process has been removed from multiThreadFilterMap
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
});
@ -167,9 +178,9 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* notify myself
*/
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
private void notifyMyself(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.warn("The execute cache manager doesn't contains this workflow instance");
return;
}
StateEvent stateEvent = new StateEvent();

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -30,9 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.experimental.UtilityClass;
/**
* the factory to create task processor
*/
@UtilityClass
public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
@ -46,7 +49,7 @@ public final class TaskProcessorFactory {
try {
PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("The task processor should has a no args constructor");
throw new IllegalArgumentException("The task processor should has a no args constructor", e);
}
}
}
@ -57,7 +60,6 @@ public final class TaskProcessorFactory {
}
Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
if (iTaskProcessorConstructor == null) {
logger.warn("ITaskProcessor could not found for taskType: {}", type);
iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
@ -74,7 +76,4 @@ public final class TaskProcessorFactory {
return PROCESS_MAP.containsKey(type);
}
private TaskProcessorFactory() {
throw new UnsupportedOperationException("TaskProcessorFactory cannot be instantiated");
}
}

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -17,8 +17,7 @@
package org.apache.dolphinscheduler.server.master.service;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -55,6 +54,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
/**
* failover service
*/
@ -66,12 +68,14 @@ public class FailoverService {
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
public FailoverService(RegistryClient registryClient, MasterConfig masterConfig, ProcessService processService,
public FailoverService(RegistryClient registryClient,
MasterConfig masterConfig,
ProcessService processService,
WorkflowExecuteThreadPool workflowExecuteThreadPool) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.registryClient = checkNotNull(registryClient);
this.masterConfig = checkNotNull(masterConfig);
this.processService = checkNotNull(processService);
this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
}
/**
@ -84,7 +88,7 @@ public class FailoverService {
if (CollectionUtils.isEmpty(hosts)) {
return;
}
LOGGER.info("{} begin to failover hosts:{}", getLocalAddress(), hosts);
LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
@ -274,7 +278,7 @@ public class FailoverService {
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!host.equals(getLocalAddress())) {
if (!getLocalAddress().equals(host)) {
iterator.remove();
}
}
@ -294,7 +298,7 @@ public class FailoverService {
boolean taskNeedFailover = true;
if (taskInstance == null) {
LOGGER.error("failover task instance error, taskInstance is null");
LOGGER.error("Master failover task instance error, taskInstance is null");
return false;
}

4
dolphinscheduler-master/src/main/resources/logback-spring.xml

@ -21,7 +21,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@ -57,7 +57,7 @@
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>

21
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java

@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +38,7 @@ public class AlertClientService implements AutoCloseable {
private final NettyRemotingClient client;
private volatile boolean isRunning;
private final AtomicBoolean isRunning;
private String host;
@ -53,16 +55,14 @@ public class AlertClientService implements AutoCloseable {
public AlertClientService() {
this.clientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
this.isRunning = new AtomicBoolean(true);
}
/**
* alert client
*/
public AlertClientService(String host, int port) {
this.clientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
this();
this.host = host;
this.port = port;
}
@ -72,9 +72,14 @@ public class AlertClientService implements AutoCloseable {
*/
@Override
public void close() {
if (isRunning.compareAndSet(true, false)) {
logger.warn("Alert client is already closed");
return;
}
logger.info("Alter client closing");
this.client.close();
this.isRunning = false;
logger.info("alter client closed");
logger.info("Alter client closed");
}
/**
@ -116,6 +121,6 @@ public class AlertClientService implements AutoCloseable {
}
public boolean isRunning() {
return isRunning;
return isRunning.get();
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
@ -93,7 +94,7 @@ public class TaskPluginManager {
logger.info("Registering task plugin: {}", name);
if (!names.add(name)) {
throw new IllegalStateException(format("Duplicate task plugins named '%s'", name));
throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
}
loadTaskChannel(factory);
@ -106,7 +107,7 @@ public class TaskPluginManager {
PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new RuntimeException("Failed to update task plugin: " + name);
throw new TaskPluginException("Failed to update task plugin: " + name);
}
});
}

4
dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml

@ -22,7 +22,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@ -63,7 +63,7 @@
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - %messsage%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} [%thread] %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>

15
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ProcessUtils.java

@ -25,6 +25,8 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
public final class ProcessUtils {
private static final Logger logger = LoggerFactory.getLogger(ProcessUtils.class);
@ -46,13 +48,13 @@ public final class ProcessUtils {
/**
* kill tasks according to different task types.
*/
public static void kill(TaskExecutionContext request) {
public static boolean kill(@NonNull TaskExecutionContext request) {
try {
logger.info("Begin kill task instance, processId: {}", request.getProcessId());
int processId = request.getProcessId();
if (processId == 0) {
logger.error("process kill failed, process id :{}, task id:{}",
processId, request.getTaskInstanceId());
return;
logger.error("Task instance kill failed, processId is not exist");
return false;
}
String cmd = String.format("kill -9 %s", getPidsStr(processId));
@ -60,8 +62,11 @@ public final class ProcessUtils {
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
logger.info("Success kill task instance, processId: {}", request.getProcessId());
return true;
} catch (Exception e) {
logger.error("kill task failed", e);
logger.error("Kill task instance error, processId: {}", request.getProcessId(), e);
return false;
}
}

25
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskPluginException.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
public class TaskPluginException extends RuntimeException {
public TaskPluginException(String message) {
super(message);
}
}

117
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -21,18 +21,11 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.prc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
@ -65,17 +58,6 @@ public class WorkerServer implements IStoppable {
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
/**
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
* spring application context
* only use it for initialization
@ -105,22 +87,7 @@ public class WorkerServer implements IStoppable {
private TaskPluginManager taskPluginManager;
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
private WorkerRpcServer workerRpcServer;
/**
* worker server startup, not use web service
@ -132,48 +99,19 @@ public class WorkerServer implements IStoppable {
SpringApplication.run(WorkerServer.class);
}
/**
* worker server run
*/
@PostConstruct
public void run() {
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
// install task plugin
this.taskPluginManager.installPlugin();
this.workerRpcServer.start();
// worker registry
try {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.taskPluginManager.installPlugin();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();
this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
// task execute manager
this.workerManagerThread.start();
// retry report task status
this.retryReportTaskStatusThread.start();
/*
@ -181,7 +119,7 @@ public class WorkerServer implements IStoppable {
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
close("WorkerServer shutdown hook");
}
}));
}
@ -189,24 +127,23 @@ public class WorkerServer implements IStoppable {
public void close(String cause) {
try {
// execute only once
if (Stopper.isStopped()) {
// set stop signal is true
if (!Stopper.stop()) {
logger.warn("WorkerServer is already stopped, current cause: {}", cause);
return;
}
logger.info("worker server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
logger.info("Worker server is stopping, current cause : {}", cause);
try {
// thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
} catch (Exception e) {
logger.warn("thread sleep exception", e);
logger.warn("Worker server close wait error", e);
}
// close
this.nettyRemotingServer.close();
this.workerRpcServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
@ -215,8 +152,9 @@ public class WorkerServer implements IStoppable {
// close the application context
this.springApplicationContext.close();
logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("worker server stop exception ", e);
logger.error("Worker server stop failed, current cause: {}", cause, e);
}
}
@ -230,15 +168,22 @@ public class WorkerServer implements IStoppable {
*/
public void killAllRunningTasks() {
Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
if (CollectionUtils.isEmpty(taskRequests)) {
return;
}
logger.info("Worker begin to kill all cache task, task size: {}", taskRequests.size());
int killNumber = 0;
for (TaskExecutionContext taskRequest : taskRequests) {
// kill task when it's not finished yet
org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), taskRequest.getTaskInstanceId());
if (ProcessUtils.kill(taskRequest)) {
killNumber++;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
logger.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), killNumber);
}
}

97
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/prc/WorkerRpcServer.java

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.prc;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskRecallAckProcessor;
import java.io.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class WorkerRpcServer implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRpcServer.class);
@Autowired
private TaskExecuteProcessor taskExecuteProcessor;
@Autowired
private TaskKillProcessor taskKillProcessor;
@Autowired
private TaskRecallAckProcessor taskRecallAckProcessor;
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
@Autowired
private LoggerRequestProcessor loggerRequestProcessor;
@Autowired
private WorkerConfig workerConfig;
private NettyRemotingServer nettyRemotingServer;
public void start() {
LOGGER.info("Worker rpc server starting");
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_RECALL_ACK, taskRecallAckProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);
this.nettyRemotingServer.start();
LOGGER.info("Worker rpc server started");
}
@Override
public void close() {
LOGGER.info("Worker rpc server closing");
this.nettyRemotingServer.close();
LOGGER.info("Worker rpc server closed");
}
}

169
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -90,15 +92,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
@Autowired
private WorkerManagerThread workerManager;
@Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count")
@Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Autowired(required = false)
private StorageOperate storageOperate;
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRequestCommand.class);
TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(command.getBody(),
TaskExecuteRequestCommand.class);
if (taskRequestCommand == null) {
logger.error("task execute request command is null");
@ -113,70 +118,98 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null");
return;
}
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
boolean osUserExistFlag;
//if Using distributed is true and Currently supported systems are linux,Should not let it automatically
//create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
//use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
}
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
taskCallbackService,
alertClientService,
taskPluginManager));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
}
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
}
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
}
/**
* get execute local path

17
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteRunningAckProcessor.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -44,19 +45,23 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRunningAckCommand.class);
command.getBody(), TaskExecuteRunningAckCommand.class);
if (runningAckCommand == null) {
logger.error("task execute running ack command is null");
return;
}
logger.info("task execute running ack command : {}", runningAckCommand);
try {
LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
logger.info("task execute running ack command : {}", runningAckCommand);
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -47,9 +47,11 @@ public class RetryReportTaskStatusThread implements Runnable {
private TaskCallbackService taskCallbackService;
public void start() {
logger.info("Retry report task status thread starting");
Thread thread = new Thread(this, "RetryReportTaskStatusThread");
thread.setDaemon(true);
thread.start();
logger.info("Retry report task status thread started");
}
/**
@ -83,7 +85,7 @@ public class RetryReportTaskStatusThread implements Runnable {
}
}
} catch (Exception e) {
logger.warn("retry report task status error", e);
logger.warn("Retry report task status error", e);
}
}
}

31
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -76,14 +76,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
*/
private TaskExecutionContext taskExecutionContext;
public StorageOperate getStorageOperate() {
return storageOperate;
}
public void setStorageOperate(StorageOperate storageOperate) {
this.storageOperate = storageOperate;
}
private StorageOperate storageOperate;
/**
@ -107,24 +99,28 @@ public class TaskExecuteThread implements Runnable, Delayed {
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
* @param taskCallbackService taskCallbackService
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService) {
AlertClientService alertClientService,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
this.storageOperate = storageOperate;
}
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
TaskPluginManager taskPluginManager) {
TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
@Override
@ -139,6 +135,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
@ -151,7 +148,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()){
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
@ -211,6 +208,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@ -308,11 +306,12 @@ public class TaskExecuteThread implements Runnable, Delayed {
/**
* download resource check
*
* @param execLocalPath
* @param projectRes
* @return
*/
public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
if (MapUtils.isEmpty(projectRes)) {
return Collections.emptyList();
}
@ -320,13 +319,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist){
if (notExist) {
downloadFile.add(Pair.of(key, value));
} else{
} else {
logger.info("file : {} exists ", resFile.getName());
}
});
if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
return downloadFile;

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -24,13 +24,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -132,9 +129,11 @@ public class WorkerManagerThread implements Runnable {
}
public void start() {
logger.info("Worker manager thread starting");
Thread thread = new Thread(this, this.getClass().getName());
thread.setDaemon(true);
thread.start();
logger.info("Worker manager thread started");
}
@Override

4
dolphinscheduler-worker/src/main/resources/logback-spring.xml

@ -22,7 +22,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>
@ -58,7 +58,7 @@
</rollingPolicy>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - %msg%n
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} %logger{96}:[%line] - [WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}] - %msg%n
</pattern>
<charset>UTF-8</charset>
</encoder>

42
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -53,7 +54,7 @@ import org.slf4j.Logger;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskExecuteProcessorTest {
@ -63,6 +64,8 @@ public class TaskExecuteProcessorTest {
private ExecutorService workerExecService;
private StorageOperate storageOperate;
private WorkerConfig workerConfig;
private Command command;
@ -99,19 +102,23 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
.thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
.thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);
storageOperate = PowerMockito.mock(StorageOperate.class);
PowerMockito.when(
workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, storageOperate)))
.thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
.thenReturn(workerManager);
.thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()))
.thenReturn(workerExecService);
.thenReturn(workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
@ -125,16 +132,17 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(
null, null, null, alertClientService, storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
.thenReturn(simpleTaskExecuteThread);
.thenReturn(simpleTaskExecuteThread);
}
@Test
@ -172,8 +180,12 @@ public class TaskExecuteProcessorTest {
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
super(taskExecutionContext, taskCallbackService, alertClientService);
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
Logger taskLogger,
AlertClientService alertClientService,
StorageOperate storageOperate) {
super(taskExecutionContext, taskCallbackService, alertClientService, storageOperate);
}
@Override

27
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -17,12 +17,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -31,11 +39,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class)
public class TaskExecuteThreadTest {
@ -50,20 +53,24 @@ public class TaskExecuteThreadTest {
@Mock
private AlertClientService alertClientService;
@Mock
private StorageOperate storageOperate;
@Mock
private TaskPluginManager taskPluginManager;
@Test
public void checkTest(){
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
public void checkTest() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService,
alertClientService, taskPluginManager, storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();
projectRes.put("shell", "shell.sh");
List<Pair<String, String>> downloads = new ArrayList<>();
try{
try {
downloads = taskExecuteThread.downloadCheck(path, projectRes);
}catch (Exception e){
} catch (Exception e) {
Assert.assertNotNull(e);
}
downloads.add(Pair.of("shell", "shell.sh"));

Loading…
Cancel
Save