diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 791c0bb558..c675ad55bd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -439,7 +439,7 @@ public final class Constants { /** * default master commit retry interval */ - public static final int defaultMasterCommitRetryInterval = 100; + public static final int defaultMasterCommitRetryInterval = 3000; /** * time unit secong to minutes diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java index 054e25dd3a..6e937f0c3e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java @@ -45,7 +45,7 @@ public interface ITaskQueue { * @param key queue name * @param value */ - void add(String key, String value); + boolean add(String key, String value); /** * an element pops out of the queue diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 76d88868f2..3fd012dd30 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -118,14 +118,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override - public void add(String key, String value) { + public boolean add(String key, String value){ try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); + return true; } catch (Exception e) { logger.error("add task to tasks queue exception",e); + return false; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 3080efa234..eb97ad75b2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -758,7 +758,7 @@ public class ProcessDao { } /** - * submit task to mysql and task queue + * submit task to db * submit sub process to command * @param taskInstance taskInstance * @param processInstance processInstance @@ -769,21 +769,18 @@ public class ProcessDao { logger.info("start submit task : {}, instance id:{}, state: {}, ", taskInstance.getName(), processInstance.getId(), processInstance.getState() ); processInstance = this.findProcessInstanceDetailById(processInstance.getId()); - //submit to mysql - TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance); - if(task.isSubProcess() && !task.getState().typeIsFinished()){ - ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task); - - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task); - }else if(!task.getState().typeIsFinished()){ - //submit to task queue - task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - submitTaskToQueue(task); - } - logger.info("submit task :{} state:{} complete, instance id:{} state: {} ", + //submit to db + TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); + if(task == null){ + logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + return task; + } + if(!task.getState().typeIsFinished()){ + createSubWorkProcessCommand(processInstance, task); + } + + logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -845,13 +842,18 @@ public class ProcessDao { /** * create sub work process command * @param parentProcessInstance parentProcessInstance - * @param instanceMap instanceMap - * @param childDefineId instanceMap * @param task task */ private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - ProcessInstanceMap instanceMap, - Integer childDefineId, TaskInstance task){ + TaskInstance task){ + if(!task.isSubProcess()){ + return; + } + ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); CommandType fatherType = parentProcessInstance.getCommandType(); @@ -921,7 +923,7 @@ public class ProcessDao { * @param processInstance processInstance * @return task instance */ - public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){ + public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){ ExecutionStatus processInstanceState = processInstance.getState(); if(taskInstance.getState().typeIsFailure()){ @@ -949,7 +951,10 @@ public class ProcessDao { taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); taskInstance.setSubmitTime(new Date()); - saveTaskInstance(taskInstance); + boolean saveResult = saveTaskInstance(taskInstance); + if(!saveResult){ + return null; + } return taskInstance; } @@ -961,6 +966,10 @@ public class ProcessDao { public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ + if(taskInstance.getState().typeIsFinished()){ + logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); + return true; + } // task cannot submit when running if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName())); @@ -971,14 +980,13 @@ public class ProcessDao { return true; } logger.info("task ready to queue: {}" , taskInstance); - taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); + boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) ); - return true; + return insertQueueResult; }catch (Exception e){ logger.error("submit task to queue Exception: ", e); logger.error("task queue error : %s", JSONUtils.toJson(taskInstance)); return false; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 4934df1978..9bb5c555fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -114,21 +114,37 @@ public class MasterBaseTaskExecThread implements Callable { Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); int retryTimes = 1; - - while (retryTimes <= commitRetryTimes){ + boolean taskDBFlag = false; + boolean taskQueueFlag = false; + TaskInstance task = null; + while (true){ try { - TaskInstance task = processDao.submitTask(taskInstance, processInstance); - if(task != null){ + if(!taskDBFlag){ + // submit task to db + task = processDao.submitTask(taskInstance, processInstance); + if(task != null && task.getId() != 0){ + taskDBFlag = true; + } + } + if(taskDBFlag && !taskQueueFlag){ + // submit task to queue + taskQueueFlag = processDao.submitTaskToQueue(task); + } + if(taskDBFlag && taskQueueFlag){ return task; } - logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes); + if(!taskDBFlag){ + logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); + }else if(!taskQueueFlag){ + logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); + + } Thread.sleep(commitRetryInterval); } catch (Exception e) { logger.error("task commit to mysql and queue failed : " + e.getMessage(),e); } retryTimes += 1; } - return null; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index f617d5f74d..e91deca511 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -91,6 +91,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public Boolean waitTaskQuit(){ // query new state taskInstance = processDao.findTaskInstanceById(taskInstance.getId()); + logger.info("wait task: process id: {}, task id:{}, task name:{} complete", + this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); // task time out Boolean checkTimeout = false; TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter();