From 527ee472fbc22cee6cd564f2cb6e3ee008939d04 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 13 Jul 2022 10:51:20 +0800 Subject: [PATCH] Catch exception when check state in StateWheelExecuteThread (#10908) * Catch exception when check state (cherry picked from commit 2a67866718a74a6abc30fb615c0e16978511e3eb) --- .../runner/StateWheelExecuteThread.java | 68 ++++++++++++------- .../runner/WorkflowExecuteRunnable.java | 5 +- .../src/main/resources/application.yaml | 2 +- .../service/process/ProcessServiceImpl.java | 2 +- .../service/process/ProcessServiceTest.java | 7 +- .../src/main/resources/application.yaml | 2 +- 6 files changed, 57 insertions(+), 29 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 0d1b3a423a..c053cb238b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -134,23 +134,34 @@ public class StateWheelExecuteThread extends BaseDaemonThread { return; } for (Integer processInstanceId : processInstanceTimeoutCheckList) { - WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); - if (workflowExecuteThread == null) { - logger.warn("Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list"); - processInstanceTimeoutCheckList.remove(processInstanceId); - continue; - } - ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); - if (processInstance == null) { - logger.warn("Check workflow timeout failed, the workflowInstance is null"); - continue; - } - long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (timeRemain < 0) { - logger.info("Workflow instance timeout, adding timeout event"); - addProcessTimeoutEvent(processInstance); - processInstanceTimeoutCheckList.remove(processInstance.getId()); - logger.info("Workflow instance timeout, added timeout event"); + try { + LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId); + WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId( + processInstanceId); + if (workflowExecuteThread == null) { + logger.warn( + "Check workflow timeout failed, can not find workflowExecuteThread from cache manager, will remove this workflowInstance from check list"); + processInstanceTimeoutCheckList.remove(processInstanceId); + continue; + } + ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + if (processInstance == null) { + logger.warn("Check workflow timeout failed, the workflowInstance is null"); + continue; + } + long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), + (long) processInstance.getTimeout() + * Constants.SEC_2_MINUTES_TIME_UNIT); + if (timeRemain < 0) { + logger.info("Workflow instance timeout, adding timeout event"); + addProcessTimeoutEvent(processInstance); + processInstanceTimeoutCheckList.remove(processInstance.getId()); + logger.info("Workflow instance timeout, added timeout event"); + } + } catch (Exception ex) { + logger.error("Check workflow instance timeout error"); + } finally { + LoggerUtils.removeWorkflowInstanceIdMDC(); } } } @@ -243,20 +254,26 @@ public class StateWheelExecuteThread extends BaseDaemonThread { } Optional taskInstanceOptional = workflowExecuteThread.getActiveTaskInstanceByTaskCode(taskCode); if (!taskInstanceOptional.isPresent()) { - logger.warn("Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}" - + "will remove this check task", taskCode); + logger.warn( + "Check task instance timeout failed, can not get taskInstance from workflowExecuteThread, taskCode: {}" + + "will remove this check task", + taskCode); taskInstanceTimeoutCheckList.remove(taskInstanceKey); continue; } TaskInstance taskInstance = taskInstanceOptional.get(); if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { - long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); + long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), + (long) taskInstance.getTaskDefine().getTimeout() + * Constants.SEC_2_MINUTES_TIME_UNIT); if (timeRemain < 0) { logger.info("Task instance is timeout, adding task timeout event and remove the check"); addTaskTimeoutEvent(taskInstance); taskInstanceTimeoutCheckList.remove(taskInstanceKey); } } + } catch (Exception ex) { + logger.error("Check task timeout error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } @@ -277,8 +294,9 @@ public class StateWheelExecuteThread extends BaseDaemonThread { WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); if (workflowExecuteThread == null) { - logger.warn("Task instance retry check failed, can not find workflowExecuteThread from cache manager, " - + "will remove this check task"); + logger.warn( + "Task instance retry check failed, can not find workflowExecuteThread from cache manager, " + + "will remove this check task"); taskInstanceRetryCheckList.remove(taskInstanceKey); continue; } @@ -308,13 +326,15 @@ public class StateWheelExecuteThread extends BaseDaemonThread { // reset taskInstance endTime and state // todo relative funtion: TaskInstance.retryTaskIntervalOverTime, WorkflowExecuteThread.cloneRetryTaskInstance logger.info("[TaskInstance-{}]The task instance can retry, will retry this task instance", - taskInstance.getId()); + taskInstance.getId()); taskInstance.setEndTime(null); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); addTaskRetryEvent(taskInstance); taskInstanceRetryCheckList.remove(taskInstanceKey); } + } catch (Exception ex) { + logger.error("Check task retry error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } @@ -349,6 +369,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread { continue; } addTaskStateChangeEvent(taskInstance); + } catch (Exception ex) { + logger.error("Task state check error, taskInstanceKey: {}", taskInstanceKey, ex); } finally { LoggerUtils.removeWorkflowInstanceIdMDC(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index a433981aca..0e94b0318d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -552,8 +552,9 @@ public class WorkflowExecuteRunnable implements Callable { } public Optional getActiveTaskInstanceByTaskCode(long taskCode) { - if (activeTaskProcessorMaps.containsKey(taskCode)) { - return Optional.ofNullable(activeTaskProcessorMaps.get(taskCode).taskInstance()); + Integer taskInstanceId = validTaskMap.get(taskCode); + if (taskInstanceId != null) { + return Optional.ofNullable(taskInstanceMap.get(taskInstanceId)); } return Optional.empty(); } diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index a277272026..ff29b19389 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -102,7 +102,7 @@ master: task-commit-retry-times: 5 # master commit task interval task-commit-interval: 1s - state-wheel-interval: 5 + state-wheel-interval: 5s # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index b12cd77de2..9cce9152f2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -909,7 +909,7 @@ public class ProcessServiceImpl implements ProcessService { command.getProcessDefinitionVersion()); if (processDefinition == null) { logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); - return null; + throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance"); } Map cmdParam = JSONUtils.toMap(command.getCommandParam()); int processInstanceId = command.getProcessInstanceId(); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index a98138dd03..82dc3f4673 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -292,7 +292,12 @@ public class ProcessServiceTest { + "\":\"111\",\"" + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); - Assert.assertNull(processService.handleCommand(host, command)); + try { + Assert.assertNull(processService.handleCommand(host, command)); + } catch (IllegalArgumentException illegalArgumentException) { + // assert throw illegalArgumentException here since the definition is null + Assert.assertTrue(true); + } int definitionVersion = 1; long definitionCode = 123; diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index cf5ce5ea88..9a940b6c0e 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -120,7 +120,7 @@ master: task-commit-retry-times: 5 # master commit task interval task-commit-interval: 1s - state-wheel-interval: 5 + state-wheel-interval: 5s # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G