Browse Source

Catch exception when check state in StateWheelExecuteThread (#10908)

* Catch exception when check state

(cherry picked from commit 2a67866718)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
527ee472fb
  1. 68
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  2. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 2
      dolphinscheduler-master/src/main/resources/application.yaml
  4. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  5. 7
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  6. 2
      dolphinscheduler-standalone-server/src/main/resources/application.yaml

68
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -134,23 +134,34 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
return; return;
} }
for (Integer processInstanceId : processInstanceTimeoutCheckList) { for (Integer processInstanceId : processInstanceTimeoutCheckList) {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); try {
if (workflowExecuteThread == null) { LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list"); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceTimeoutCheckList.remove(processInstanceId); processInstanceId);
continue; if (workflowExecuteThread == null) {
} logger.warn(
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); "Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list");
if (processInstance == null) { processInstanceTimeoutCheckList.remove(processInstanceId);
logger.warn("Check workflow timeout failed, the workflowInstance is null"); continue;
continue; }
} ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); if (processInstance == null) {
if (timeRemain < 0) { logger.warn("Check workflow timeout failed, the workflowInstance is null");
logger.info("Workflow instance timeout, adding timeout event"); continue;
addProcessTimeoutEvent(processInstance); }
processInstanceTimeoutCheckList.remove(processInstance.getId()); long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(),
logger.info("Workflow instance timeout, added timeout event"); (long) processInstance.getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
logger.info("Workflow instance timeout, adding timeout event");
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
logger.info("Workflow instance timeout, added timeout event");
}
} catch (Exception ex) {
logger.error("Check workflow instance timeout error");
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
} }
} }
} }
@ -243,20 +254,26 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
} }
Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); Optional<TaskInstance> taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode);
if (!taskInstanceOptional.isPresent()) { if (!taskInstanceOptional.isPresent()) {
logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}" logger.warn(
+ "will remove this check task", taskCode); "Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}"
+ "will remove this check task",
taskCode);
taskInstanceTimeoutCheckList.remove(taskInstanceKey); taskInstanceTimeoutCheckList.remove(taskInstanceKey);
continue; continue;
} }
TaskInstance taskInstance = taskInstanceOptional.get(); TaskInstance taskInstance = taskInstanceOptional.get();
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(),
(long) taskInstance.getTaskDefine().getTimeout()
* Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) { if (timeRemain < 0) {
logger.info("Task instance is timeout, adding task timeout event and remove the check"); logger.info("Task instance is timeout, adding task timeout event and remove the check");
addTaskTimeoutEvent(taskInstance); addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstanceKey); taskInstanceTimeoutCheckList.remove(taskInstanceKey);
} }
} }
} catch (Exception ex) {
logger.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally { } finally {
LoggerUtils.removeWorkflowInstanceIdMDC(); LoggerUtils.removeWorkflowInstanceIdMDC();
} }
@ -277,8 +294,9 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) { if (workflowExecuteThread == null) {
logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, " logger.warn(
+ "will remove this check task"); "Task instance retry check failed, can not find workflowExecuteThread from cache manager, "
+ "will remove this check task");
taskInstanceRetryCheckList.remove(taskInstanceKey); taskInstanceRetryCheckList.remove(taskInstanceKey);
continue; continue;
} }
@ -308,13 +326,15 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
// reset taskInstance endTime and state // reset taskInstance endTime and state
// todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance
logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance", logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance",
taskInstance.getId()); taskInstance.getId());
taskInstance.setEndTime(null); taskInstance.setEndTime(null);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
addTaskRetryEvent(taskInstance); addTaskRetryEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstanceKey); taskInstanceRetryCheckList.remove(taskInstanceKey);
} }
} catch (Exception ex) {
logger.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally { } finally {
LoggerUtils.removeWorkflowInstanceIdMDC(); LoggerUtils.removeWorkflowInstanceIdMDC();
} }
@ -349,6 +369,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
continue; continue;
} }
addTaskStateChangeEvent(taskInstance); addTaskStateChangeEvent(taskInstance);
} catch (Exception ex) {
logger.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex);
} finally { } finally {
LoggerUtils.removeWorkflowInstanceIdMDC(); LoggerUtils.removeWorkflowInstanceIdMDC();
} }

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -552,8 +552,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) { public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
if (activeTaskProcessorMaps.containsKey(taskCode)) { Integer taskInstanceId = validTaskMap.get(taskCode);
return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance()); if (taskInstanceId != null) {
return Optional.ofNullable(taskInstanceMap.get(taskInstanceId));
} }
return Optional.empty(); return Optional.empty();
} }

2
dolphinscheduler-master/src/main/resources/application.yaml

@ -102,7 +102,7 @@ master:
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval # master commit task interval
task-commit-interval: 1s task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -909,7 +909,7 @@ public class ProcessServiceImpl implements ProcessService {
command.getProcessDefinitionVersion()); command.getProcessDefinitionVersion());
if (processDefinition == null) { if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null; throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
} }
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
int processInstanceId = command.getProcessInstanceId(); int processInstanceId = command.getProcessInstanceId();

7
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -292,7 +292,12 @@ public class ProcessServiceTest {
+ "\":\"111\",\"" + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + CMD_PARAM_SUB_PROCESS_DEFINE_CODE
+ "\":\"222\"}"); + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(host, command)); try {
Assert.assertNull(processService.handleCommand(host, command));
} catch (IllegalArgumentException illegalArgumentException) {
// assert throw illegalArgumentException here since the definition is null
Assert.assertTrue(true);
}
int definitionVersion = 1; int definitionVersion = 1;
long definitionCode = 123; long definitionCode = 123;

2
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -120,7 +120,7 @@ master:
task-commit-retry-times: 5 task-commit-retry-times: 5
# master commit task interval # master commit task interval
task-commit-interval: 1s task-commit-interval: 1s
state-wheel-interval: 5 state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1 max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G

Loading…
Cancel
Save