Browse Source

Fix task exception might not exist in task instance log (#14085)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
05df0269f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java
  2. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java
  3. 45
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  4. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
  5. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
  6. 40
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  7. 16
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java

@ -61,11 +61,11 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor {
log.info("Receive task dispatch request, command: {}", taskDispatchRequest); log.info("Receive task dispatch request, command: {}", taskDispatchRequest);
TaskExecutionContext taskExecutionContext = taskDispatchRequest.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = taskDispatchRequest.getTaskExecutionContext();
taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
try ( try {
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( // Since we need to make sure remove MDC key after cache, so we use finally to remove MDC key
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = taskExecutionContext.getTaskInstanceId());
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
// todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task // todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task
long remainTime = long remainTime =
@ -78,7 +78,8 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor {
taskExecutionContext.getTaskName(), taskExecutionContext.getTaskName(),
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime); TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
masterMessageSenderManager.getMasterTaskExecuteResultMessageSender().sendMessage(taskExecutionContext); masterMessageSenderManager.getMasterTaskExecuteResultMessageSender()
.sendMessage(taskExecutionContext);
} }
MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = masterTaskExecuteRunnableFactoryBuilder
@ -99,6 +100,9 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor {
} catch (Exception ex) { } catch (Exception ex) {
log.error("Handle task dispatch request error, command: {}", taskDispatchRequest, ex); log.error("Handle task dispatch request error, command: {}", taskDispatchRequest, ex);
sendDispatchFailedResult(channel, message, taskExecutionContext, ex); sendDispatchFailedResult(channel, message, taskExecutionContext, ex);
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
} }
} }

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java

@ -52,9 +52,9 @@ public class MasterTaskPauseProcessor implements MasterRpcProcessor {
return; return;
} }
TaskExecutionContext taskExecutionContext = masterTaskExecuteRunnable.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = masterTaskExecuteRunnable.getTaskExecutionContext();
try ( try {
LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())) { taskExecutionContext.getTaskInstanceId());
masterTaskExecuteRunnable.pauseTask(); masterTaskExecuteRunnable.pauseTask();
} catch (MasterTaskExecuteException e) { } catch (MasterTaskExecuteException e) {
log.error("Pause MasterTaskExecuteRunnable failed", e); log.error("Pause MasterTaskExecuteRunnable failed", e);

45
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -119,7 +119,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -312,10 +311,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (stateEvent == null) { if (stateEvent == null) {
return; return;
} }
try ( try {
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
stateEvent.getTaskInstanceId())) {
// if state handle success then will remove this state, otherwise will retry this state next time. // if state handle success then will remove this state, otherwise will retry this state next time.
// The state should always handle success except database error. // The state should always handle success except database error.
checkProcessInstance(stateEvent); checkProcessInstance(stateEvent);
@ -329,7 +327,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
this.stateEvents.remove(stateEvent); this.stateEvents.remove(stateEvent);
} }
} catch (StateEventHandleError stateEventHandleError) { } catch (StateEventHandleError stateEventHandleError) {
log.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); log.error("State event handle error, will remove this event: {}", stateEvent,
stateEventHandleError);
this.stateEvents.remove(stateEvent); this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) { } catch (StateEventHandleException stateEventHandleException) {
@ -345,12 +344,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
this.stateEvents.offer(stateEvent); this.stateEvents.offer(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep // we catch the exception here, since if the state event handle failed, the state event will still
// keep
// in the stateEvents queue. // in the stateEvents queue.
log.error("State event handle error, get a unknown exception, will retry this event: {}", log.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent, stateEvent,
e); e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
} }
} }
} }
@ -737,9 +739,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
@Override @Override
public WorkflowSubmitStatus call() { public WorkflowSubmitStatus call() {
try ( try {
LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowInstanceIdMDC(processInstance.getId());
LogUtils.setWorkflowInstanceIdMDC(processInstance.getId())) {
if (isStart()) { if (isStart()) {
// This case should not been happened // This case should not been happened
log.warn("The workflow has already been started, current state: {}", workflowRunnableStatus); log.warn("The workflow has already been started, current state: {}", workflowRunnableStatus);
@ -764,6 +765,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
} catch (Exception e) { } catch (Exception e) {
log.error("Start workflow error", e); log.error("Start workflow error", e);
return WorkflowSubmitStatus.FAILED; return WorkflowSubmitStatus.FAILED;
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
} }
} }
@ -1860,18 +1863,16 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
continue; continue;
} }
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = taskExecuteRunnableMap.get(taskCode); DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = taskExecuteRunnableMap.get(taskCode);
CompletableFuture.runAsync(defaultTaskExecuteRunnable::kill) defaultTaskExecuteRunnable.kill();
.thenRun(() -> { if (defaultTaskExecuteRunnable.getTaskInstance().getState().isFinished()) {
if (defaultTaskExecuteRunnable.getTaskInstance().getState().isFinished()) { TaskStateEvent taskStateEvent = TaskStateEvent.builder()
TaskStateEvent taskStateEvent = TaskStateEvent.builder() .processInstanceId(processInstance.getId())
.processInstanceId(processInstance.getId()) .taskInstanceId(taskInstance.getId())
.taskInstanceId(taskInstance.getId()) .status(defaultTaskExecuteRunnable.getTaskInstance().getState())
.status(defaultTaskExecuteRunnable.getTaskInstance().getState()) .type(StateEventType.TASK_STATE_CHANGE)
.type(StateEventType.TASK_STATE_CHANGE) .build();
.build(); this.addStateEvent(taskStateEvent);
this.addStateEvent(taskStateEvent); }
}
});
} }
} }
} }

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java

@ -92,14 +92,15 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction = final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
asyncTaskExecutionContext.getAsyncTaskCallbackFunction(); asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
try ( try {
LogUtils.MDCAutoClosableContext mdcAutoClosableContext2 = LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) { LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
AsyncTaskExecuteFunction.AsyncTaskExecutionStatus asyncTaskExecutionStatus = AsyncTaskExecuteFunction.AsyncTaskExecutionStatus asyncTaskExecutionStatus =
asyncTaskExecuteFunction.getAsyncTaskExecutionStatus(); asyncTaskExecuteFunction.getAsyncTaskExecutionStatus();
switch (asyncTaskExecutionStatus) { switch (asyncTaskExecutionStatus) {
case RUNNING: case RUNNING:
// If the task status is running, means the task real status is not finished. We will // If the task status is running, means the task real status is not finished. We
// will
// put it back to the queue to get the status again. // put it back to the queue to get the status again.
asyncMasterTaskDelayQueue.addAsyncTask(asyncTaskExecutionContext); asyncMasterTaskDelayQueue.addAsyncTask(asyncTaskExecutionContext);
break; break;
@ -112,6 +113,9 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
} }
} catch (Exception ex) { } catch (Exception ex) {
asyncTaskCallbackFunction.executeThrowing(ex); asyncTaskCallbackFunction.executeThrowing(ex);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
LogUtils.removeTaskInstanceIdMDC();
} }
}); });
} }

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java

@ -90,11 +90,11 @@ public abstract class MasterTaskExecuteRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
try ( try {
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) {
TaskInstanceLogHeader.printInitializeTaskContextHeader(); TaskInstanceLogHeader.printInitializeTaskContextHeader();
initializeTask(); initializeTask();
@ -109,6 +109,9 @@ public abstract class MasterTaskExecuteRunnable implements Runnable {
log.error("Task execute failed, due to meet an exception", ex); log.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex); afterThrowing(ex);
closeLogAppender(); closeLogAppender();
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
} }
} }

40
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -123,26 +123,28 @@ public class WorkerFailoverService {
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = final LogUtils.MDCAutoClosableContext mdcAutoClosableContext =
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), LogUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(),
taskInstance.getId())) { taskInstance.getId())) {
ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( try {
taskInstance.getProcessInstanceId(), k -> { ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( taskInstance.getProcessInstanceId(), k -> {
taskInstance.getProcessInstanceId()); WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
if (workflowExecuteRunnable == null) { taskInstance.getProcessInstanceId());
return null; if (workflowExecuteRunnable == null) {
} return null;
return workflowExecuteRunnable.getProcessInstance(); }
}); return workflowExecuteRunnable.getProcessInstance();
if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { });
log.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
continue; log.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
continue;
}
log.info(
"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
workerHost);
failoverTaskInstance(processInstance, taskInstance);
log.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
} catch (Exception ex) {
log.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
} }
log.info(
"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
workerHost);
failoverTaskInstance(processInstance, taskInstance);
log.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
} catch (Exception ex) {
log.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
} }
} }
failoverTimeCost.stop(); failoverTimeCost.stop();

16
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -142,11 +142,11 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
@Override @Override
public void run() { public void run() {
try ( try {
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = LogUtils.setWorkflowAndTaskInstanceIDMDC( LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
final LogUtils.MDCAutoClosableContext mdcAutoClosableContext1 = LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath())) {
TaskInstanceLogHeader.printInitializeTaskContextHeader(); TaskInstanceLogHeader.printInitializeTaskContextHeader();
initializeTask(); initializeTask();
@ -154,7 +154,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis()); taskExecutionContext.setEndTime(System.currentTimeMillis());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, MessageType.TASK_EXECUTE_RESULT_MESSAGE); workerMessageSender.sendMessageWithRetry(taskExecutionContext,
MessageType.TASK_EXECUTE_RESULT_MESSAGE);
log.info( log.info(
"The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success"); "The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return; return;
@ -177,6 +178,9 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
log.error("Task execute failed, due to meet an exception", ex); log.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex); afterThrowing(ex);
closeLogAppender(); closeLogAppender();
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
} }
} }

Loading…
Cancel
Save