Browse Source

Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)

* Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe

(cherry picked from commit ad2646ff1f)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
9a4c7f876a
  1. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  2. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -2050,6 +2050,11 @@ public class WorkflowExecuteRunnable implements Runnable {
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -107,7 +107,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* Handle the events belong to the given workflow.
*/
public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -125,7 +125,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info("master submit success, task : {}", taskInstance.getName());
logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);

Loading…
Cancel
Save