From cbbd4cf74b84598fb4dd797679970f32be0e51ea Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 27 Nov 2024 17:00:37 +0800 Subject: [PATCH] Remove unused class in dolphinscheduler-service (#16846) --- .../api/service/impl/TenantServiceImpl.java | 6 +- .../service/impl/WorkerGroupServiceImpl.java | 7 +- .../impl/WorkflowDefinitionServiceImpl.java | 5 +- .../controller/WorkerGroupControllerTest.java | 3 +- .../api/service/TenantServiceTest.java | 3 +- .../api/service/WorkerGroupServiceTest.java | 5 +- .../service/cache/impl/CacheKeyGenerator.java | 36 ---- .../service/command/CommandService.java | 15 -- .../service/command/CommandServiceImpl.java | 75 -------- .../TaskPriorityQueueException.java | 44 ----- .../StandByTaskInstancePriorityQueue.java | 181 ------------------ .../service/queue/TaskPriority.java | 162 ---------------- .../service/queue/TaskPriorityQueue.java | 63 ------ .../service/queue/TaskPriorityQueueImpl.java | 84 -------- .../service/utils/Constants.java | 42 ---- .../service/utils/DagHelper.java | 27 --- .../service/utils/ParamUtils.java | 94 --------- .../service/utils/ProcessData.java | 47 ----- .../service/utils/ProcessUtils.java | 72 ------- .../command/MessageServiceImplTest.java | 81 -------- .../StandByTaskInstancePriorityQueueTest.java | 162 ---------------- .../queue/TaskPriorityQueueImplTest.java | 162 ---------------- .../service/utils/DagHelperTest.java | 25 +++ .../service/utils/ParamUtilsTest.java | 37 ---- 24 files changed, 42 insertions(+), 1396 deletions(-) delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 02141c3fad..d318892a4d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -255,8 +256,9 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService } private List getWorkflowInstancesByTenant(Tenant tenant) { - return workflowInstanceMapper.queryByTenantCodeAndStatus(tenant.getTenantCode(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES); + return workflowInstanceMapper.queryByTenantCodeAndStatus( + tenant.getTenantCode(), + WorkflowExecutionStatus.getNotTerminalStatus()); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 639c41e601..c55b868797 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.Schedule; @@ -391,9 +392,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST); return result; } - List workflowInstances = workflowInstanceMapper - .queryByWorkerGroupNameAndStatus(workerGroup.getName(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES); + List workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus( + workerGroup.getName(), + WorkflowExecutionStatus.getNotTerminalStatus()); if (CollectionUtils.isNotEmpty(workflowInstances)) { List workflowInstanceIds = workflowInstances.stream().map(WorkflowInstance::getId).collect(Collectors.toList()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java index fb297167a4..2bae5e12b9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java @@ -1060,9 +1060,8 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl implements Wo } // check workflow instances is already running - List workflowInstances = workflowInstanceService - .queryByWorkflowDefinitionCodeAndStatus(workflowDefinition.getCode(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES); + List workflowInstances = workflowInstanceService.queryByWorkflowDefinitionCodeAndStatus( + workflowDefinition.getCode(), WorkflowExecutionStatus.getNotTerminalStatus()); if (CollectionUtils.isNotEmpty(workflowInstances)) { throw new ServiceException(Status.DELETE_WORKFLOW_DEFINITION_EXECUTING_FAIL, workflowInstances.size()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 691157fd16..6234088ce5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -24,6 +24,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; @@ -134,7 +135,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { workerGroup.setName("测试"); Mockito.when(workerGroupMapper.selectById(12)).thenReturn(workerGroup); Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus("测试", - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)) + WorkflowExecutionStatus.getNotTerminalStatus())) .thenReturn(null); Mockito.when(workerGroupMapper.deleteById(12)).thenReturn(1); Mockito.when(workflowInstanceMapper.updateWorkflowInstanceByWorkerGroupName("测试", "")).thenReturn(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java index 2612082a4d..c595415b0b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.api.service.impl.TenantServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -191,7 +192,7 @@ public class TenantServiceTest { baseServiceLogger)).thenReturn(true); when(tenantMapper.queryById(1)).thenReturn(getTenant()); when(workflowInstanceMapper.queryByTenantCodeAndStatus(tenantCode, - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)) + WorkflowExecutionStatus.getNotTerminalStatus())) .thenReturn(getInstanceList()); when(scheduleMapper.queryScheduleListByTenant(tenantCode)).thenReturn(getScheduleList()); when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index e92a0fabdd..5cce297d8a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; @@ -243,7 +244,7 @@ public class WorkerGroupServiceTest { List workflowInstances = new ArrayList(); workflowInstances.add(workflowInstance); Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)) + WorkflowExecutionStatus.getNotTerminalStatus())) .thenReturn(workflowInstances); Map deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1); @@ -261,7 +262,7 @@ public class WorkerGroupServiceTest { WorkerGroup workerGroup = getWorkerGroup(1); Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null); + WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null); Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java deleted file mode 100644 index 2a036542a4..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.impl; - -import java.lang.reflect.Method; - -import org.springframework.cache.interceptor.KeyGenerator; -import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; - -/** - * custom cache key generator - */ -@Component -public class CacheKeyGenerator implements KeyGenerator { - - @Override - public Object generate(Object target, Method method, Object... params) { - return StringUtils.arrayToDelimitedString(params, "_"); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java index 4328072054..df78dfc6e4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java @@ -18,9 +18,6 @@ package org.apache.dolphinscheduler.service.command; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; /** * Command Service @@ -50,16 +47,4 @@ public interface CommandService { */ boolean verifyIsNeedCreateCommand(Command command); - /** - * create sub work process command - * @param parentWorkflowInstance parent process instance - * @param childInstance child process instance - * @param instanceMap process instance map - * @param task task instance - * @return command - */ - Command createSubProcessCommand(WorkflowInstance parentWorkflowInstance, - WorkflowInstance childInstance, - WorkflowInstanceRelation instanceMap, - TaskInstance task); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java index 06518ae9ec..fa7a0a4f14 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java @@ -18,28 +18,18 @@ package org.apache.dolphinscheduler.service.command; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_WORKFLOW_ID_STRING; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ErrorCommand; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; -import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.service.utils.ParamUtils; -import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.util.EnumMap; @@ -134,69 +124,4 @@ public class CommandServiceImpl implements CommandService { return isNeedCreate; } - @Override - public Command createSubProcessCommand(WorkflowInstance parentWorkflowInstance, WorkflowInstance childInstance, - WorkflowInstanceRelation instanceMap, TaskInstance task) { - CommandType commandType = getSubCommandType(parentWorkflowInstance, childInstance); - Map subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class); - long childDefineCode = 0L; - if (subProcessParam.containsKey(CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE)) { - try { - childDefineCode = - Long.parseLong( - String.valueOf(subProcessParam.get(CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE))); - } catch (NumberFormatException nfe) { - log.error("processDefinitionCode is not a number", nfe); - return null; - } - } - WorkflowDefinition subWorkflowDefinition = processDefineMapper.queryByCode(childDefineCode); - - Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); - List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); - Map globalMap = ParamUtils.getGlobalParamMap(task.getVarPool()); - Map fatherParams = new HashMap<>(); - if (CollectionUtils.isNotEmpty(allParam)) { - for (Property info : allParam) { - if (Direct.OUT == info.getDirect()) { - continue; - } - fatherParams.put(info.getProp(), globalMap.get(info.getProp())); - } - } - String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentWorkflowInstance, fatherParams); - int subProcessInstanceId = - childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId()); - return new Command( - commandType, - TaskDependType.TASK_POST, - parentWorkflowInstance.getFailureStrategy(), - parentWorkflowInstance.getExecutorId(), - subWorkflowDefinition.getCode(), - processParam, - parentWorkflowInstance.getWarningType(), - parentWorkflowInstance.getWarningGroupId(), - parentWorkflowInstance.getScheduleTime(), - task.getWorkerGroup(), - task.getEnvironmentCode(), - parentWorkflowInstance.getWorkflowInstancePriority(), - parentWorkflowInstance.getDryRun(), - subProcessInstanceId, - subWorkflowDefinition.getVersion(), - parentWorkflowInstance.getTestFlag()); - } - - /** - * get sub work flow command type - * child instance exist: child command = fatherCommand - * child instance not exists: child command = fatherCommand[0] - */ - private CommandType getSubCommandType(WorkflowInstance parentWorkflowInstance, WorkflowInstance childInstance) { - CommandType commandType = parentWorkflowInstance.getCommandType(); - if (childInstance == null) { - String fatherHistoryCommand = parentWorkflowInstance.getHistoryCmd(); - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } - return commandType; - } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java deleted file mode 100644 index 30a72144bb..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.exceptions; - -/** - * task priority queue exception - */ -public class TaskPriorityQueueException extends Exception { - - /** - * Construct a new runtime exception with the detail message - * - * @param message message - */ - public TaskPriorityQueueException(String message) { - super(message); - } - - /** - * Construct a new runtime exception with the detail message and cause - * - * @param message message - * @param cause cause - */ - public TaskPriorityQueueException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java deleted file mode 100644 index 8cde9c1497..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Preconditions; - -/** - * Task instances priority queue implementation - * All the task instances are in the same process instance. - */ -public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue { - - /** - * queue - */ - private final PriorityQueue queue = new PriorityQueue<>(new TaskInstancePriorityComparator()); - private final Set taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>()); - - /** - * put task instance to priority queue - * - * @param taskInstance taskInstance - */ - @Override - public void put(TaskInstance taskInstance) { - Preconditions.checkNotNull(taskInstance); - queue.add(taskInstance); - taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance)); - } - - /** - * take task info - * - * @return task instance - * @throws TaskPriorityQueueException - */ - @Override - public TaskInstance take() throws TaskPriorityQueueException { - TaskInstance taskInstance = queue.poll(); - if (taskInstance != null) { - taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance)); - } - return taskInstance; - } - - /** - * poll task info with timeout - *

- * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) - * because this method of override interface used without considering accuracy of timeout - * - * @param timeout - * @param unit - * @return - * @throws TaskPriorityQueueException - * @throws InterruptedException - */ - @Override - public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException { - throw new TaskPriorityQueueException( - "This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!"); - } - - /** - * peek taskInfo - * - * @return task instance - */ - public TaskInstance peek() { - return queue.peek(); - } - - /** - * queue size - * - * @return size - */ - @Override - public int size() { - return queue.size(); - } - - /** - * clear task - * - */ - public void clear() { - queue.clear(); - taskInstanceIdentifySet.clear(); - } - - /** - * whether contains the task instance - * - * @param taskInstance task instance - * @return true is contains - */ - public boolean contains(TaskInstance taskInstance) { - Preconditions.checkNotNull(taskInstance); - return taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance)); - } - - /** - * remove task - * - * @param taskInstance task instance - * @return true if remove success - */ - public boolean remove(TaskInstance taskInstance) { - Preconditions.checkNotNull(taskInstance); - taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance)); - return queue.remove(taskInstance); - } - - /** - * get iterator - * - * @return Iterator - */ - public Iterator iterator() { - return queue.iterator(); - } - - // since the task instance will not contain taskInstanceId until insert into database - // So we use processInstanceId + taskCode + version to identify a taskInstance. - private String getTaskInstanceIdentify(TaskInstance taskInstance) { - return String.join( - String.valueOf(taskInstance.getWorkflowInstanceId()), - String.valueOf(taskInstance.getTaskCode()), - String.valueOf(taskInstance.getTaskDefinitionVersion()), "-"); - } - - /** - * This comparator is used to sort task instances in the standby queue. - * If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup. - * Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow. - */ - private static class TaskInstancePriorityComparator implements Comparator { - - @Override - public int compare(TaskInstance o1, TaskInstance o2) { - int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority()); - int taskInstancePriorityInWorkflow = - Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode()); - - if (o1.getTaskGroupId() == o2.getTaskGroupId()) { - // If at the same taskGroup - if (taskPriorityInTaskGroup != 0) { - return taskPriorityInTaskGroup; - } - } - return taskInstancePriorityInWorkflow; - } - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java deleted file mode 100644 index 2989e09852..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Map; -import java.util.Objects; - -import lombok.Data; - -@Data -public class TaskPriority implements Comparable { - - private int workflowInstancePriority; - - private int workflowInstanceId; - - private int taskInstancePriority; - - private int taskId; - - private TaskExecutionContext taskExecutionContext; - - private String groupName; - - private Map context; - private long checkpoint; - - private int taskGroupPriority; - - public TaskPriority() { - this.checkpoint = System.currentTimeMillis(); - } - - public TaskPriority(int workflowInstancePriority, - int workflowInstanceId, - int taskInstancePriority, - int taskId, - int taskGroupPriority, String groupName) { - this.workflowInstancePriority = workflowInstancePriority; - this.workflowInstanceId = workflowInstanceId; - this.taskInstancePriority = taskInstancePriority; - this.taskId = taskId; - this.taskGroupPriority = taskGroupPriority; - this.groupName = groupName; - this.checkpoint = System.currentTimeMillis(); - } - - @Override - public int compareTo(TaskPriority other) { - if (this.getWorkflowInstancePriority() > other.getWorkflowInstancePriority()) { - return 1; - } - if (this.getWorkflowInstancePriority() < other.getWorkflowInstancePriority()) { - return -1; - } - - if (this.getWorkflowInstanceId() > other.getWorkflowInstanceId()) { - return 1; - } - if (this.getWorkflowInstanceId() < other.getWorkflowInstanceId()) { - return -1; - } - - if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) { - return 1; - } - if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) { - return -1; - } - if (this.getTaskGroupPriority() != other.getTaskGroupPriority()) { - // larger number, higher priority - return Constants.OPPOSITE_VALUE - * Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority()); - } - if (this.getTaskId() > other.getTaskId()) { - return 1; - } - if (this.getTaskId() < other.getTaskId()) { - return -1; - } - String thisGroupName = - StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING; - String otherGroupName = - StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING; - if (!thisGroupName.equals(otherGroupName)) { - return thisGroupName.compareTo(otherGroupName); - } - return Long.compare(this.getCheckpoint(), other.getCheckpoint()); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaskPriority that = (TaskPriority) o; - return workflowInstancePriority == that.workflowInstancePriority - && workflowInstanceId == that.workflowInstanceId - && taskInstancePriority == that.taskInstancePriority - && taskId == that.taskId - && taskGroupPriority == that.taskGroupPriority - && Objects.equals(groupName, that.groupName); - } - - @Override - public int hashCode() { - return Objects.hash(workflowInstancePriority, - workflowInstanceId, - taskInstancePriority, - taskId, - taskGroupPriority, - groupName); - } - - @Override - public String toString() { - return "TaskPriority{" - + "workflowInstancePriority=" - + workflowInstancePriority - + ", workflowInstanceId=" - + workflowInstanceId - + ", taskInstancePriority=" - + taskInstancePriority - + ", taskId=" - + taskId - + ", taskExecutionContext=" - + taskExecutionContext - + ", groupName='" - + groupName - + '\'' - + ", context=" - + context - + ", checkpoint=" - + checkpoint - + ", taskGroupPriority=" - + taskGroupPriority - + '}'; - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java deleted file mode 100644 index 736e117fe5..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; - -import java.util.concurrent.TimeUnit; - -/** - * task priority queue - * @param - */ -public interface TaskPriorityQueue { - - /** - * put task info - * - * @param taskInfo taskInfo - * @throws TaskPriorityQueueException - */ - void put(T taskInfo); - - /** - * take taskInfo - * - * @return taskInfo - * @throws TaskPriorityQueueException - */ - T take() throws TaskPriorityQueueException, InterruptedException; - - /** - * poll taskInfo with timeout - * @param timeout - * @param unit - * @return - * @throws TaskPriorityQueueException - * @throws InterruptedException - */ - T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException; - - /** - * size - * - * @return size - * @throws TaskPriorityQueueException - */ - int size() throws TaskPriorityQueueException; -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java deleted file mode 100644 index d9578b0757..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; - -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.springframework.stereotype.Service; - -/** - * A singleton of a task queue implemented using PriorityBlockingQueue - */ -@Service -public class TaskPriorityQueueImpl implements TaskPriorityQueue { - - /** - * Task queue, this queue is unbounded, this means it will cause OutOfMemoryError. - * The master will stop to generate the task if memory is too high. - */ - private final PriorityBlockingQueue queue = new PriorityBlockingQueue<>(3000); - - /** - * put task takePriorityInfo - * - * @param taskPriorityInfo takePriorityInfo - */ - @Override - public void put(TaskPriority taskPriorityInfo) { - queue.put(taskPriorityInfo); - } - - /** - * take taskInfo - * - * @return taskInfo - * @throws TaskPriorityQueueException - */ - @Override - public TaskPriority take() throws TaskPriorityQueueException, InterruptedException { - return queue.take(); - } - - /** - * poll taskInfo with timeout - * - * @param timeout - * @param unit - * @return - * @throws TaskPriorityQueueException - * @throws InterruptedException - */ - @Override - public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { - return queue.poll(timeout, unit); - } - - /** - * queue size - * - * @return size - * @throws TaskPriorityQueueException - */ - @Override - public int size() throws TaskPriorityQueueException { - return queue.size(); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java deleted file mode 100644 index 9ed22d6d6b..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; - -import lombok.experimental.UtilityClass; - -@UtilityClass -public final class Constants { - - public static final int[] NOT_TERMINATED_STATES = new int[]{ - TaskExecutionStatus.DISPATCH.getCode(), - WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(), - WorkflowExecutionStatus.READY_PAUSE.getCode(), - WorkflowExecutionStatus.READY_STOP.getCode(), - TaskExecutionStatus.NEED_FAULT_TOLERANCE.getCode(), - }; - - public static final int[] RUNNING_PROCESS_STATE = new int[]{ - TaskExecutionStatus.RUNNING_EXECUTION.getCode(), - TaskExecutionStatus.SUBMITTED_SUCCESS.getCode(), - TaskExecutionStatus.DISPATCH.getCode(), - WorkflowExecutionStatus.SERIAL_WAIT.getCode() - }; -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java index cc4e9b7100..414e9970ec 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java @@ -207,33 +207,6 @@ public class DagHelper { return resultList; } - /** - * generate dag by start nodes and recovery nodes - * - * @param totalTaskNodeList totalTaskNodeList - * @param startNodeNameList startNodeNameList - * @param recoveryNodeCodeList recoveryNodeCodeList - * @param depNodeType depNodeType - * @return workflow dag - * @throws Exception if error throws Exception - */ - public static WorkflowDag generateFlowDag(List totalTaskNodeList, - List startNodeNameList, - List recoveryNodeCodeList, - TaskDependType depNodeType) throws Exception { - - List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, - recoveryNodeCodeList, depNodeType); - if (destTaskNodeList.isEmpty()) { - return null; - } - List taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList); - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(destTaskNodeList); - return workflowDag; - } - /** * find node by node code * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java deleted file mode 100644 index 54349af38b..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; - -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.StringUtils; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import com.google.common.base.Strings; - -/** - * Param Utility class - */ -public class ParamUtils { - - /** - * convert globalParams string to global parameter map - * @param globalParams globalParams - * @return parameter map - */ - public static Map getGlobalParamMap(String globalParams) { - List propList; - Map globalParamMap = new HashMap<>(); - if (!Strings.isNullOrEmpty(globalParams)) { - propList = JSONUtils.toList(globalParams, Property.class); - globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - } - return globalParamMap; - } - - /** - * Get sub workflow parameters - * @param instanceMap workflow instance map - * @param parentWorkflowInstance parent workflow instance - * @param fatherParams fatherParams - * @return sub workflow parameters - */ - public static String getSubWorkFlowParam(WorkflowInstanceRelation instanceMap, - WorkflowInstance parentWorkflowInstance, - Map fatherParams) { - // set sub work workflow command - String workflowMapStr = JSONUtils.toJsonString(instanceMap); - Map cmdParam = JSONUtils.toMap(workflowMapStr); - if (parentWorkflowInstance.isComplementData()) { - Map parentParam = JSONUtils.toMap(parentWorkflowInstance.getCommandParam()); - String endTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE); - String startTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE); - String scheduleTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST); - if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) { - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endTime); - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startTime); - } - if (StringUtils.isNotEmpty(scheduleTime)) { - cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime); - } - workflowMapStr = JSONUtils.toJsonString(cmdParam); - } - if (MapUtils.isNotEmpty(fatherParams)) { - cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams)); - workflowMapStr = JSONUtils.toJsonString(cmdParam); - } - return workflowMapStr; - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java deleted file mode 100644 index ae12269acf..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.service.model.TaskNode; - -import java.util.List; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; - -@Data -@NoArgsConstructor -public class ProcessData { - - @EqualsAndHashCode.Include - private List tasks; - - @EqualsAndHashCode.Include - private List globalParams; - - private int timeout; - - private int tenantId; - - public ProcessData(List tasks, List globalParams) { - this.tasks = tasks; - this.globalParams = globalParams; - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java deleted file mode 100644 index f0c8c59ed3..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - -import java.nio.file.Paths; -import java.util.Collections; -import java.util.List; - -import javax.annotation.Nullable; - -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; - -/** - * mainly used to get the start command line of a process. - */ -@Slf4j -public class ProcessUtils { - - /** - * find logs and kill yarn tasks. - * - * @param taskExecutionContext taskExecutionContext - * @return yarn application ids - */ - public static @Nullable List killApplication(@NonNull List appIds, - @NonNull TaskExecutionContext taskExecutionContext) { - try { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - if (CollectionUtils.isNotEmpty(appIds)) { - taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds)); - if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { - taskExecutionContext - .setExecutePath(FileUtils - .getTaskInstanceWorkingDirectory(taskExecutionContext.getTaskInstanceId())); - } - FileUtils.createDirectoryWith755(Paths.get(taskExecutionContext.getExecutePath())); - org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext); - return appIds; - } else { - log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}", - taskExecutionContext.getTaskInstanceId()); - } - } catch (Exception e) { - log.error("Kill yarn job failure, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId(), e); - } - return Collections.emptyList(); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java index dc4547dcb5..3f246abbf8 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java @@ -17,28 +17,16 @@ package org.apache.dolphinscheduler.service.command; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_WORKFLOW_ID_STRING; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; import org.apache.dolphinscheduler.dao.mapper.CommandMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -50,8 +38,6 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import com.fasterxml.jackson.databind.JsonNode; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) class MessageServiceImplTest { @@ -68,73 +54,6 @@ class MessageServiceImplTest { @Mock private ScheduleMapper scheduleMapper; - @Test - public void testCreateSubCommand() { - WorkflowInstance parentInstance = new WorkflowInstance(); - parentInstance.setWarningType(WarningType.SUCCESS); - parentInstance.setWarningGroupId(0); - - TaskInstance task = new TaskInstance(); - task.setTaskParams("{\"processDefinitionCode\":10}}"); - task.setId(10); - task.setTaskCode(1L); - task.setTaskDefinitionVersion(1); - - WorkflowInstance childInstance = null; - WorkflowInstanceRelation instanceMap = new WorkflowInstanceRelation(); - instanceMap.setParentWorkflowInstanceId(1); - instanceMap.setParentTaskInstanceId(10); - Command command; - - // father history: start; child null == command type: start - parentInstance.setHistoryCmd("START_PROCESS"); - parentInstance.setCommandType(CommandType.START_PROCESS); - WorkflowDefinition workflowDefinition = new WorkflowDefinition(); - workflowDefinition.setCode(10L); - Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(workflowDefinition); - Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(workflowDefinition); - command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); - - // father history: start,start failure; child null == command type: start - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); - command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType()); - - // father history: scheduler,start failure; child null == command type: scheduler - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); - command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType()); - - // father history: complement,start failure; child null == command type: complement - - String startString = "2020-01-01 00:00:00"; - String endString = "2020-01-10 00:00:00"; - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); - Map complementMap = new HashMap<>(); - complementMap.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startString); - complementMap.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endString); - parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); - command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); - - JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); - Date start = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE).asText()); - Date end = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE).asText()); - Assertions.assertEquals(startString, DateUtils.dateToString(start)); - Assertions.assertEquals(endString, DateUtils.dateToString(end)); - - // father history: start,failure,start failure; child not null == command type: start failure - childInstance = new WorkflowInstance(); - parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); - parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); - command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); - Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); - } - @Test public void testVerifyIsNeedCreateCommand() { diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java deleted file mode 100644 index 6c22944b50..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; - -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class StandByTaskInstancePriorityQueueTest { - - @Test - public void put() throws TaskPriorityQueueException { - StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); - TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); - queue.put(taskInstanceHigPriority); - queue.put(taskInstanceMediumPriority); - Assertions.assertEquals(2, queue.size()); - Assertions.assertTrue(queue.contains(taskInstanceHigPriority)); - Assertions.assertTrue(queue.contains(taskInstanceMediumPriority)); - } - - @Test - public void take() throws Exception { - StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - queue.take(); - Assertions.assertTrue(queue.size() < peekBeforeLength); - } - - @Test - public void poll() throws Exception { - StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - Assertions.assertThrows(TaskPriorityQueueException.class, () -> { - queue.poll(1000, TimeUnit.MILLISECONDS); - }); - } - - @Test - public void peek() throws Exception { - StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - Assertions.assertEquals(peekBeforeLength, queue.size()); - } - - @Test - public void peekTaskGroupPriority() throws Exception { - StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); - - TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1); - queue.put(taskInstanceMediumPriority); - queue.put(taskInstanceHigPriority); - TaskInstance taskInstance = queue.peek(); - queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "high"); - - taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); - taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2); - queue.put(taskInstanceMediumPriority); - queue.put(taskInstanceHigPriority); - taskInstance = queue.peek(); - queue.clear(); - Assertions.assertEquals("medium", taskInstance.getName()); - - taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); - taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2); - queue.put(taskInstanceMediumPriority); - queue.put(taskInstanceHigPriority); - taskInstance = queue.peek(); - queue.clear(); - Assertions.assertEquals("medium", taskInstance.getName()); - - taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); - taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); - queue.put(taskInstanceMediumPriority); - queue.put(taskInstanceHigPriority); - taskInstance = queue.peek(); - queue.clear(); - Assertions.assertEquals("high", taskInstance.getName()); - - } - - @Test - public void size() throws Exception { - Assertions.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); - } - - @Test - public void contains() throws Exception { - StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); - queue.put(taskInstanceMediumPriority); - Assertions.assertTrue(queue.contains(taskInstanceMediumPriority)); - TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1); - taskInstance2.setWorkflowInstanceId(2); - Assertions.assertFalse(queue.contains(taskInstance2)); - } - - @Test - public void remove() { - StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); - queue.put(taskInstanceMediumPriority); - int peekBeforeLength = queue.size(); - queue.remove(taskInstanceMediumPriority); - Assertions.assertNotEquals(peekBeforeLength, queue.size()); - Assertions.assertFalse(queue.contains(taskInstanceMediumPriority)); - } - - /** - * get queue - * - * @return queue - * @throws Exception - */ - private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { - StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); - TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); - taskInstanceHigPriority.setTaskGroupPriority(3); - taskInstanceMediumPriority.setTaskGroupPriority(2); - queue.put(taskInstanceMediumPriority); - queue.put(taskInstanceHigPriority); - return queue; - } - - /** - * create task instance - * - * @param name name - * @param priority priority - * @return - */ - private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) { - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setName(name); - taskInstance.setTaskInstancePriority(priority); - taskInstance.setTaskGroupPriority(taskGroupPriority); - return taskInstance; - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java deleted file mode 100644 index aeaf079c9e..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.queue; - -import org.apache.dolphinscheduler.common.enums.Priority; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class TaskPriorityQueueImplTest { - - @Test - public void testSort() { - TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default"); - TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default"); - TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default"); - List taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPriorities); - - priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default"); - priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default"); - priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPriorities); - - priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default"); - priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default"); - priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPriorities); - - priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default"); - priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default"); - priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPriorities); - - priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); - priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2"); - priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPriorities); - - priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1"); - priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2"); - priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityThree, priorityOne, priorityTwo), - taskPriorities); - - priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); - priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2"); - priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3"); - taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityThree, priorityOne, priorityTwo), - taskPriorities); - - priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_1"); - priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); - priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1"); - taskPriorities = Arrays.asList(priorityTwo, priorityOne, priorityThree); - Collections.sort(taskPriorities); - Assertions.assertEquals( - Arrays.asList(priorityThree, priorityTwo, priorityOne), - taskPriorities); - } - - @Test - public void put() throws Exception { - TaskPriorityQueue queue = getPriorityQueue(); - Assertions.assertEquals(2, queue.size()); - } - - @Test - public void take() throws Exception { - TaskPriorityQueue queue = getPriorityQueue(); - int peekBeforeLength = queue.size(); - queue.take(); - Assertions.assertTrue(queue.size() < peekBeforeLength); - } - - @Test - public void poll() throws Exception { - TaskPriorityQueue queue = getPriorityQueue(); - int peekBeforeLength = queue.size(); - queue.poll(1000, TimeUnit.MILLISECONDS); - queue.poll(1000, TimeUnit.MILLISECONDS); - Assertions.assertEquals(0, queue.size()); - queue.poll(1000, TimeUnit.MILLISECONDS); - } - - @Test - public void size() throws Exception { - Assertions.assertEquals(2, getPriorityQueue().size()); - } - - /** - * get queue - * - * @return queue - * @throws Exception - */ - private TaskPriorityQueue getPriorityQueue() throws Exception { - TaskPriorityQueue queue = new TaskPriorityQueueImpl(); - TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1); - TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2); - queue.put(taskInstanceHigPriority); - queue.put(taskInstanceMediumPriority); - return queue; - } - - /** - * create task priority - * - * @param priority - * @param processInstanceId - * @return - */ - private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) { - TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default"); - return priorityOne; - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java index f5d8b3674b..5489b9de8a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; @@ -40,6 +41,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -679,4 +684,24 @@ public class DagHelperTest { Assertions.assertNotNull(dag); } + @Data + @NoArgsConstructor + private static class ProcessData { + + @EqualsAndHashCode.Include + private List tasks; + + @EqualsAndHashCode.Include + private List globalParams; + + private int timeout; + + private int tenantId; + + public ProcessData(List tasks, List globalParams) { + this.tasks = tasks; + this.globalParams = globalParams; + } + } + } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java deleted file mode 100644 index cdd0d761a4..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import java.util.Map; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class ParamUtilsTest { - - @Test - public void testGetGlobalParamMap() { - String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]"; - Map globalParamMap = ParamUtils.getGlobalParamMap(globalParam); - Assertions.assertEquals(globalParamMap.size(), 1); - Assertions.assertEquals(globalParamMap.get("startParam1"), ""); - - Map emptyParamMap = ParamUtils.getGlobalParamMap(null); - Assertions.assertEquals(emptyParamMap.size(), 0); - } -}