diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index 80db6c86af..b6533ad25c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.User; import io.swagger.annotations.*; -import org.apache.dolphinscheduler.service.queue.ITaskQueue; import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,8 +239,7 @@ public class ProcessInstanceController extends BaseController{ logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}", loginUser.getUserName(), projectName, processInstanceId); // task queue - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); + Map result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId); return returnDataList(result); }catch (Exception e){ logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e); @@ -370,7 +368,6 @@ public class ProcessInstanceController extends BaseController{ logger.info("delete process instance by ids, login user:{}, project name:{}, process instance ids :{}", loginUser.getUserName(), projectName, processInstanceIds); // task queue - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); Map result = new HashMap<>(5); List deleteFailedIdList = new ArrayList<>(); if(StringUtils.isNotEmpty(processInstanceIds)){ @@ -379,7 +376,7 @@ public class ProcessInstanceController extends BaseController{ for (String strProcessInstanceId:processInstanceIdArray) { int processInstanceId = Integer.parseInt(strProcessInstanceId); try { - Map deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue); + Map deleteResult = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId); if(!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))){ deleteFailedIdList.add(strProcessInstanceId); logger.error((String)deleteResult.get(Constants.MSG)); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java index bafe833fab..0c93e00a80 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataAnalysisService.java @@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.ITaskQueue; -import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -318,9 +316,8 @@ public class DataAnalysisService extends BaseService{ return result; } - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - List tasksQueueList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); - List tasksKillList = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL); + List tasksQueueList = new ArrayList<>(); + List tasksKillList = new ArrayList<>(); Map dataMap = new HashMap<>(); if (loginUser.getUserType() == UserType.ADMIN_USER){ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 2b1f04e6ce..4b809a8d01 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -38,7 +38,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.ITaskQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -404,8 +403,6 @@ public class ProcessInstanceService extends BaseDAGService { processInstance.setProcessInstanceJson(processInstanceJson); processInstance.setGlobalParams(globalParams); } -// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson, -// globalParams, schedule, flag, locations, connects); int update = processService.updateProcessInstance(processInstance); int updateDefine = 1; if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) { @@ -472,11 +469,10 @@ public class ProcessInstanceService extends BaseDAGService { * @param loginUser login user * @param projectName project name * @param processInstanceId process instance id - * @param tasksQueue task queue * @return delete result code */ @Transactional(rollbackFor = Exception.class) - public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId, ITaskQueue tasksQueue) { + public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -494,52 +490,6 @@ public class ProcessInstanceService extends BaseDAGService { return result; } - //process instance priority - int processInstancePriority = processInstance.getProcessInstancePriority().ordinal(); - // delete zk queue - if (CollectionUtils.isNotEmpty(taskInstanceList)){ - for (TaskInstance taskInstance : taskInstanceList){ - // task instance priority - int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal(); - - StringBuilder nodeValueSb = new StringBuilder(100); - nodeValueSb.append(processInstancePriority) - .append(UNDERLINE) - .append(processInstanceId) - .append(UNDERLINE) - .append(taskInstancePriority) - .append(UNDERLINE) - .append(taskInstance.getId()) - .append(UNDERLINE); - - int taskWorkerGroupId = processService.getTaskWorkerGroupId(taskInstance); - WorkerGroup workerGroup = workerGroupMapper.selectById(taskWorkerGroupId); - - if(workerGroup == null){ - nodeValueSb.append(DEFAULT_WORKER_ID); - }else { - - String ips = workerGroup.getIpList(); - StringBuilder ipSb = new StringBuilder(100); - String[] ipArray = ips.split(COMMA); - - for (String ip : ipArray) { - long ipLong = IpUtils.ipToLong(ip); - ipSb.append(ipLong).append(COMMA); - } - - if(ipSb.length() > 0) { - ipSb.deleteCharAt(ipSb.length() - 1); - } - nodeValueSb.append(ipSb); - } - - logger.info("delete task queue node : {}",nodeValueSb.toString()); - tasksQueue.removeNode(org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE, nodeValueSb.toString()); - - } - } - // delete database cascade int delete = processService.deleteWorkProcessInstanceById(processInstanceId); processService.deleteAllSubWorkProcessByParentId(processInstanceId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java index 6f308e7b17..10220e2d31 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.ITaskQueue; import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.junit.After; import org.junit.Assert; @@ -74,8 +73,7 @@ public class DataAnalysisServiceTest { @Mock TaskInstanceMapper taskInstanceMapper; - @Mock - ITaskQueue taskQueue; + @Mock ProcessService processService; @@ -183,30 +181,6 @@ public class DataAnalysisServiceTest { } - @Test - public void testCountQueueState(){ - - PowerMockito.mockStatic(TaskQueueFactory.class); - List taskQueueList = new ArrayList<>(1); - taskQueueList.add("1_0_1_1_-1"); - List taskKillList = new ArrayList<>(1); - taskKillList.add("1-0"); - PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE)).thenReturn(taskQueueList); - PowerMockito.when(taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_KILL)).thenReturn(taskKillList); - PowerMockito.when(TaskQueueFactory.getTaskQueueInstance()).thenReturn(taskQueue); - //checkProject false - Map result = dataAnalysisService.countQueueState(user,2); - Assert.assertTrue(result.isEmpty()); - - result = dataAnalysisService.countQueueState(user,1); - Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); - //admin - user.setUserType(UserType.ADMIN_USER); - result = dataAnalysisService.countQueueState(user,1); - Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); - - } - /** * get list * @return diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index dc463fe764..3fc40ca5a8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -194,6 +194,7 @@ public class TaskInstance implements Serializable { /** * workerGroup */ + @TableField(exist = false) private String workerGroup; public ProcessInstance getProcessInstance() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java index 7db5f45ad2..991eeed493 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskPriority.java @@ -132,7 +132,8 @@ public class TaskPriority { */ public static TaskPriority of(String taskPriorityInfo){ String[] parts = taskPriorityInfo.split(UNDERLINE); - if (parts.length != 4) { + + if (parts.length != 5) { throw new IllegalArgumentException(String.format("TaskPriority : %s illegal.", taskPriorityInfo)); } TaskPriority taskPriority = new TaskPriority( diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6923cd02e1..12fe25b30d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.consumer.TaskUpdateQueueConsumer; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; @@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; +import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +168,8 @@ public class MasterServer implements IStoppable { logger.error("start Quartz failed", e); } - + TaskUpdateQueueConsumer taskUpdateQueueConsumer = SpringApplicationContext.getBean(TaskUpdateQueueConsumer.class); + taskUpdateQueueConsumer.start(); /** * register hooks, which are called before the process exits */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index c8b7b0eb1b..b0fd6322c3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable { this.cancel = false; this.taskInstance = taskInstance; this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class); - this.taskUpdateQueue = new TaskUpdateQueueImpl(); + this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class); } /** @@ -180,8 +180,7 @@ public class MasterBaseTaskExecThread implements Callable { processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), taskInstance.getId(), - taskInstance.getWorkerGroup()); - + org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); taskUpdateQueue.put(taskPriorityInfo); logger.info(String.format("master submit success, task : %s", taskInstance.getName()) ); return true;