Browse Source

move verifyTaskInstanceIsNull after taskInstance (#1047)

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable
pull/2/head
Tboy 5 years ago committed by dailidong
parent
commit
4ef2ef6121
  1. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -175,17 +175,25 @@ public class FetchTaskThread implements Runnable{
}
// get task instance id
taskInstId = getTaskInstanceId(taskQueueStr);
// get task instance relation
taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
continue;
}
Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
taskInstance.getProcessDefine().getUserId());
// verify tenant is null
if (verifyTenantIsNull(taskQueueStr, tenant)) {
if (verifyTenantIsNull(tenant)) {
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
continue;
}
@ -199,11 +207,6 @@ public class FetchTaskThread implements Runnable{
// mainly to wait for the master insert task to succeed
waitForMasterEnterQueue();
// verify task instance is null
if (verifyTaskInstanceIsNull(taskQueueStr)) {
continue;
}
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
@ -240,13 +243,11 @@ public class FetchTaskThread implements Runnable{
/**
* verify task instance is null
* @param taskQueueStr
* @return
*/
private boolean verifyTaskInstanceIsNull(String taskQueueStr) {
private boolean verifyTaskInstanceIsNull(TaskInstance taskInstance) {
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskInstId);
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
return true;
}
return false;
@ -254,17 +255,15 @@ public class FetchTaskThread implements Runnable{
/**
* verify tenant is null
* @param taskQueueStr
* @param tenant
* @return
*/
private boolean verifyTenantIsNull(String taskQueueStr, Tenant tenant) {
private boolean verifyTenantIsNull(Tenant tenant) {
if(tenant == null){
logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
taskQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskQueueStr);
return true;
}
return false;

Loading…
Cancel
Save