diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 62cd1a07fe..943b1c9f82 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -175,17 +175,25 @@ public class FetchTaskThread implements Runnable{ } // get task instance id - taskInstId = getTaskInstanceId(taskQueueStr); // get task instance relation taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId); + // verify task instance is null + if (verifyTaskInstanceIsNull(taskInstance)) { + logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr); + taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr); + continue; + } + Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), taskInstance.getProcessDefine().getUserId()); // verify tenant is null - if (verifyTenantIsNull(taskQueueStr, tenant)) { + if (verifyTenantIsNull(tenant)) { + logger.warn("remove task queue : {} due to tenant is null", taskQueueStr); + taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr); continue; } @@ -199,11 +207,6 @@ public class FetchTaskThread implements Runnable{ // mainly to wait for the master insert task to succeed waitForMasterEnterQueue(); - // verify task instance is null - if (verifyTaskInstanceIsNull(taskQueueStr)) { - continue; - } - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ continue; } @@ -240,13 +243,11 @@ public class FetchTaskThread implements Runnable{ /** * verify task instance is null - * @param taskQueueStr * @return */ - private boolean verifyTaskInstanceIsNull(String taskQueueStr) { + private boolean verifyTaskInstanceIsNull(TaskInstance taskInstance) { if (taskInstance == null ) { logger.error("task instance is null. task id : {} ", taskInstId); - taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr); return true; } return false; @@ -254,17 +255,15 @@ public class FetchTaskThread implements Runnable{ /** * verify tenant is null - * @param taskQueueStr * @param tenant * @return */ - private boolean verifyTenantIsNull(String taskQueueStr, Tenant tenant) { + private boolean verifyTenantIsNull(Tenant tenant) { if(tenant == null){ logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}", taskInstance.getProcessDefine().getId(), taskInstance.getProcessInstance().getId(), taskInstance.getId()); - taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr); return true; } return false;