Browse Source

fix NPE while retry task (#12903)

3.1.2-release
Kerwin 2 years ago committed by zhuangchong
parent
commit
6ffac27dbc
  1. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
  2. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEvent.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEvent.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  5. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  6. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java

@ -31,7 +31,7 @@ public interface StateEvent {
int getProcessInstanceId(); int getProcessInstanceId();
int getTaskInstanceId(); Integer getTaskInstanceId();
@NonNull @NonNull
StateEventType getType(); StateEventType getType();

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEvent.java

@ -35,7 +35,7 @@ public class TaskStateEvent implements StateEvent {
// todo: use wrapper type // todo: use wrapper type
private int processInstanceId; private int processInstanceId;
private int taskInstanceId; private Integer taskInstanceId;
private long taskCode; private long taskCode;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEvent.java

@ -34,7 +34,7 @@ public class WorkflowStateEvent implements StateEvent {
/** /**
* Some event may contains taskInstanceId * Some event may contains taskInstanceId
*/ */
private int taskInstanceId; private Integer taskInstanceId;
private WorkflowExecutionStatus status; private WorkflowExecutionStatus status;

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -208,8 +208,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return; return;
} }
taskInstanceRetryCheckList.add(taskInstanceKey); taskInstanceRetryCheckList.add(taskInstanceKey);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Added task instance into retry check list", logger.info("[WorkflowInstance-{}][TaskInstanceKey-{}:{}] Added task instance into retry check list",
processInstance.getId(), taskInstance.getId()); processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
} }
public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) { public void removeTask4RetryCheck(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
@ -341,8 +341,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
// reset taskInstance endTime and state // reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, // todo relative funtion: TaskInstance.retryTaskIntervalOverTime,
// WorkflowExecuteThread.cloneRetryTaskInstance // WorkflowExecuteThread.cloneRetryTaskInstance
logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance", logger.info("[TaskInstanceKey-{}:{}]The task instance can retry, will retry this task instance",
taskInstance.getId()); taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
taskInstance.setEndTime(null); taskInstance.setEndTime(null);
taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -523,7 +523,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* check if task instance exist by state event * check if task instance exist by state event
*/ */
public void checkTaskInstanceByStateEvent(TaskStateEvent stateEvent) throws StateEventHandleError { public void checkTaskInstanceByStateEvent(TaskStateEvent stateEvent) throws StateEventHandleError {
if (stateEvent.getTaskInstanceId() == 0) { if (stateEvent.getTaskInstanceId() == null || stateEvent.getTaskInstanceId() == 0) {
throw new StateEventHandleError("The taskInstanceId is 0"); throw new StateEventHandleError("The taskInstanceId is 0");
} }
@ -546,10 +546,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* get task instance from memory * get task instance from memory
*/ */
public Optional<TaskInstance> getTaskInstance(int taskInstanceId) { public Optional<TaskInstance> getTaskInstance(int taskInstanceId) {
if (taskInstanceMap.containsKey(taskInstanceId)) { return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
}
return Optional.empty();
} }
public Optional<TaskInstance> getTaskInstance(long taskCode) { public Optional<TaskInstance> getTaskInstance(long taskCode) {

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/LoggerUtils.java

@ -79,16 +79,16 @@ public class LoggerUtils {
return ""; return "";
} }
public static void setWorkflowAndTaskInstanceIDMDC(int workflowInstanceId, int taskInstanceId) { public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) {
setWorkflowInstanceIdMDC(workflowInstanceId); setWorkflowInstanceIdMDC(workflowInstanceId);
setTaskInstanceIdMDC(taskInstanceId); setTaskInstanceIdMDC(taskInstanceId);
} }
public static void setWorkflowInstanceIdMDC(int workflowInstanceId) { public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) {
MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId));
} }
public static void setTaskInstanceIdMDC(int taskInstanceId) { public static void setTaskInstanceIdMDC(Integer taskInstanceId) {
MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId));
} }

Loading…
Cancel
Save