diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java index 34c8a15d52..009a684e92 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java @@ -61,11 +61,11 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor { log.info("Receive task dispatch request, command: {}", taskDispatchRequest); TaskExecutionContext taskExecutionContext = taskDispatchRequest.getTaskExecutionContext(); taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); - try ( - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( - taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { + try { + // Since we need to make sure remove MDC key after cache, so we use finally to remove MDC key + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task long remainTime = @@ -78,7 +78,8 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor { taskExecutionContext.getTaskName(), TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); - masterMessageSenderManager.getMasterTaskExecuteResultMessageSender().sendMessage(taskExecutionContext); + masterMessageSenderManager.getMasterTaskExecuteResultMessageSender() + .sendMessage(taskExecutionContext); } MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder @@ -99,6 +100,9 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor { } catch (Exception ex) { log.error("Handle task dispatch request error, command: {}", taskDispatchRequest, ex); sendDispatchFailedResult(channel, message, taskExecutionContext, ex); + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java index 8e53b26396..5b7176c18f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java @@ -52,9 +52,9 @@ public class MasterTaskPauseProcessor implements MasterRpcProcessor { return; } TaskExecutionContext taskExecutionContext = masterTaskExecuteRunnable.getTaskExecutionContext(); - try ( - LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( - taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); masterTaskExecuteRunnable.pauseTask(); } catch (MasterTaskExecuteException e) { log.error("Pause MasterTaskExecuteRunnable failed", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 2837404102..7f77a5c99f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -119,7 +119,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -312,10 +311,9 @@ public class WorkflowExecuteRunnable implements Callable { if (stateEvent == null) { return; } - try ( - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = - LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), - stateEvent.getTaskInstanceId())) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), + stateEvent.getTaskInstanceId()); // if state handle success then will remove this state, otherwise will retry this state next time. // The state should always handle success except database error. checkProcessInstance(stateEvent); @@ -329,7 +327,8 @@ public class WorkflowExecuteRunnable implements Callable { this.stateEvents.remove(stateEvent); } } catch (StateEventHandleError stateEventHandleError) { - log.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); + log.error("State event handle error, will remove this event: {}", stateEvent, + stateEventHandleError); this.stateEvents.remove(stateEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { @@ -345,12 +344,15 @@ public class WorkflowExecuteRunnable implements Callable { this.stateEvents.offer(stateEvent); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { - // we catch the exception here, since if the state event handle failed, the state event will still keep + // we catch the exception here, since if the state event handle failed, the state event will still + // keep // in the stateEvents queue. log.error("State event handle error, get a unknown exception, will retry this event: {}", stateEvent, e); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } @@ -737,9 +739,8 @@ public class WorkflowExecuteRunnable implements Callable { @Override public WorkflowSubmitStatus call() { - try ( - LogUtils.MDCAutoClosableContext mdcAutoClosableContext = - LogUtils.setWorkflowInstanceIdMDC(processInstance.getId())) { + try { + LogUtils.setWorkflowInstanceIdMDC(processInstance.getId()); if (isStart()) { // This case should not been happened log.warn("The workflow has already been started, current state: {}", workflowRunnableStatus); @@ -764,6 +765,8 @@ public class WorkflowExecuteRunnable implements Callable { } catch (Exception e) { log.error("Start workflow error", e); return WorkflowSubmitStatus.FAILED; + } finally { + LogUtils.removeWorkflowInstanceIdMDC(); } } @@ -1860,18 +1863,16 @@ public class WorkflowExecuteRunnable implements Callable { continue; } DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = taskExecuteRunnableMap.get(taskCode); - CompletableFuture.runAsync(defaultTaskExecuteRunnable::kill) - .thenRun(() -> { - if (defaultTaskExecuteRunnable.getTaskInstance().getState().isFinished()) { - TaskStateEvent taskStateEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) - .taskInstanceId(taskInstance.getId()) - .status(defaultTaskExecuteRunnable.getTaskInstance().getState()) - .type(StateEventType.TASK_STATE_CHANGE) - .build(); - this.addStateEvent(taskStateEvent); - } - }); + defaultTaskExecuteRunnable.kill(); + if (defaultTaskExecuteRunnable.getTaskInstance().getState().isFinished()) { + TaskStateEvent taskStateEvent = TaskStateEvent.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(taskInstance.getId()) + .status(defaultTaskExecuteRunnable.getTaskInstance().getState()) + .type(StateEventType.TASK_STATE_CHANGE) + .build(); + this.addStateEvent(taskStateEvent); + } } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java index d71a116f91..afdab35eb1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java @@ -92,14 +92,15 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction(); - try ( - LogUtils.MDCAutoClosableContext mdcAutoClosableContext2 = - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { + try { + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); + LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); AsyncTaskExecuteFunction.AsyncTaskExecutionStatus asyncTaskExecutionStatus = asyncTaskExecuteFunction.getAsyncTaskExecutionStatus(); switch (asyncTaskExecutionStatus) { case RUNNING: - // If the task status is running, means the task real status is not finished. We will + // If the task status is running, means the task real status is not finished. We + // will // put it back to the queue to get the status again. asyncMasterTaskDelayQueue.addAsyncTask(asyncTaskExecutionContext); break; @@ -112,6 +113,9 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements } } catch (Exception ex) { asyncTaskCallbackFunction.executeThrowing(ex); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeTaskInstanceIdMDC(); } }); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java index 4dbdcfce9d..973baf978e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java @@ -90,11 +90,11 @@ public abstract class MasterTaskExecuteRunnable implements Runnable { @Override public void run() { - try ( - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( - taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); + TaskInstanceLogHeader.printInitializeTaskContextHeader(); initializeTask(); @@ -109,6 +109,9 @@ public abstract class MasterTaskExecuteRunnable implements Runnable { log.error("Task execute failed, due to meet an exception", ex); afterThrowing(ex); closeLogAppender(); + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index e75d19d7a8..144eaa3d26 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -123,26 +123,28 @@ public class WorkerFailoverService { final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId())) { - ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( - taskInstance.getProcessInstanceId(), k -> { - WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( - taskInstance.getProcessInstanceId()); - if (workflowExecuteRunnable == null) { - return null; - } - return workflowExecuteRunnable.getProcessInstance(); - }); - if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { - log.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); - continue; + try { + ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( + taskInstance.getProcessInstanceId(), k -> { + WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( + taskInstance.getProcessInstanceId()); + if (workflowExecuteRunnable == null) { + return null; + } + return workflowExecuteRunnable.getProcessInstance(); + }); + if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { + log.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); + continue; + } + log.info( + "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE", + workerHost); + failoverTaskInstance(processInstance, taskInstance); + log.info("Worker[{}] failover: Finish failover taskInstance", workerHost); + } catch (Exception ex) { + log.info("Worker[{}] failover taskInstance occur exception", workerHost, ex); } - log.info( - "Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE", - workerHost); - failoverTaskInstance(processInstance, taskInstance); - log.info("Worker[{}] failover: Finish failover taskInstance", workerHost); - } catch (Exception ex) { - log.info("Worker[{}] failover taskInstance occur exception", workerHost, ex); } } failoverTimeCost.stop(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 598fc6cfc9..46629f3634 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -142,11 +142,11 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { @Override public void run() { - try ( - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( - taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); - final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { + try { + LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), + taskExecutionContext.getTaskInstanceId()); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); + TaskInstanceLogHeader.printInitializeTaskContextHeader(); initializeTask(); @@ -154,7 +154,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(System.currentTimeMillis()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - workerMessageSender.sendMessageWithRetry(taskExecutionContext, MessageType.TASK_EXECUTE_RESULT_MESSAGE); + workerMessageSender.sendMessageWithRetry(taskExecutionContext, + MessageType.TASK_EXECUTE_RESULT_MESSAGE); log.info( "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); return; @@ -177,6 +178,9 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { log.error("Task execute failed, due to meet an exception", ex); afterThrowing(ex); closeLogAppender(); + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceLogFullPathMDC(); } }