Browse Source

Remove unused class in dolphinscheduler-service (#16846)

dev
Wenjun Ruan 2 weeks ago committed by GitHub
parent
commit
cbbd4cf74b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
  4. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
  5. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
  6. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  7. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
  8. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
  9. 75
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
  10. 44
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
  11. 181
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java
  12. 162
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
  13. 63
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  14. 84
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  15. 42
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java
  16. 27
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
  17. 94
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java
  18. 47
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java
  19. 72
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java
  20. 81
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
  21. 162
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java
  22. 162
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java
  23. 25
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
  24. 37
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -255,8 +256,9 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
}
private List<WorkflowInstance> getWorkflowInstancesByTenant(Tenant tenant) {
return workflowInstanceMapper.queryByTenantCodeAndStatus(tenant.getTenantCode(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES);
return workflowInstanceMapper.queryByTenantCodeAndStatus(
tenant.getTenantCode(),
WorkflowExecutionStatus.getNotTerminalStatus());
}
/**

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -391,9 +392,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
return result;
}
List<WorkflowInstance> workflowInstances = workflowInstanceMapper
.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES);
List<WorkflowInstance> workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus(
workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus());
if (CollectionUtils.isNotEmpty(workflowInstances)) {
List<Integer> workflowInstanceIds =
workflowInstances.stream().map(WorkflowInstance::getId).collect(Collectors.toList());

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

@ -1060,9 +1060,8 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl implements Wo
}
// check workflow instances is already running
List<WorkflowInstance> workflowInstances = workflowInstanceService
.queryByWorkflowDefinitionCodeAndStatus(workflowDefinition.getCode(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES);
List<WorkflowInstance> workflowInstances = workflowInstanceService.queryByWorkflowDefinitionCodeAndStatus(
workflowDefinition.getCode(), WorkflowExecutionStatus.getNotTerminalStatus());
if (CollectionUtils.isNotEmpty(workflowInstances)) {
throw new ServiceException(Status.DELETE_WORKFLOW_DEFINITION_EXECUTING_FAIL, workflowInstances.size());
}

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java

@ -24,6 +24,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
@ -134,7 +135,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
workerGroup.setName("测试");
Mockito.when(workerGroupMapper.selectById(12)).thenReturn(workerGroup);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus("测试",
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
WorkflowExecutionStatus.getNotTerminalStatus()))
.thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(12)).thenReturn(1);
Mockito.when(workflowInstanceMapper.updateWorkflowInstanceByWorkerGroupName("测试", "")).thenReturn(1);

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.api.service.impl.TenantServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
@ -191,7 +192,7 @@ public class TenantServiceTest {
baseServiceLogger)).thenReturn(true);
when(tenantMapper.queryById(1)).thenReturn(getTenant());
when(workflowInstanceMapper.queryByTenantCodeAndStatus(tenantCode,
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
WorkflowExecutionStatus.getNotTerminalStatus()))
.thenReturn(getInstanceList());
when(scheduleMapper.queryScheduleListByTenant(tenantCode)).thenReturn(getScheduleList());
when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList());

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
@ -243,7 +244,7 @@ public class WorkerGroupServiceTest {
List<WorkflowInstance> workflowInstances = new ArrayList<WorkflowInstance>();
workflowInstances.add(workflowInstance);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
WorkflowExecutionStatus.getNotTerminalStatus()))
.thenReturn(workflowInstances);
Map<String, Object> deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1);
@ -261,7 +262,7 @@ public class WorkerGroupServiceTest {
WorkerGroup workerGroup = getWorkerGroup(1);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java vendored

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.impl;
import java.lang.reflect.Method;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* custom cache key generator
*/
@Component
public class CacheKeyGenerator implements KeyGenerator {
@Override
public Object generate(Object target, Method method, Object... params) {
return StringUtils.arrayToDelimitedString(params, "_");
}
}

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java

@ -18,9 +18,6 @@
package org.apache.dolphinscheduler.service.command;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
/**
* Command Service
@ -50,16 +47,4 @@ public interface CommandService {
*/
boolean verifyIsNeedCreateCommand(Command command);
/**
* create sub work process command
* @param parentWorkflowInstance parent process instance
* @param childInstance child process instance
* @param instanceMap process instance map
* @param task task instance
* @return command
*/
Command createSubProcessCommand(WorkflowInstance parentWorkflowInstance,
WorkflowInstance childInstance,
WorkflowInstanceRelation instanceMap,
TaskInstance task);
}

75
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java

@ -18,28 +18,18 @@
package org.apache.dolphinscheduler.service.command;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_WORKFLOW_ID_STRING;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ErrorCommand;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ErrorCommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.utils.ParamUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.EnumMap;
@ -134,69 +124,4 @@ public class CommandServiceImpl implements CommandService {
return isNeedCreate;
}
@Override
public Command createSubProcessCommand(WorkflowInstance parentWorkflowInstance, WorkflowInstance childInstance,
WorkflowInstanceRelation instanceMap, TaskInstance task) {
CommandType commandType = getSubCommandType(parentWorkflowInstance, childInstance);
Map<String, Object> subProcessParam = JSONUtils.toMap(task.getTaskParams(), String.class, Object.class);
long childDefineCode = 0L;
if (subProcessParam.containsKey(CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE)) {
try {
childDefineCode =
Long.parseLong(
String.valueOf(subProcessParam.get(CMD_PARAM_SUB_WORKFLOW_DEFINITION_CODE)));
} catch (NumberFormatException nfe) {
log.error("processDefinitionCode is not a number", nfe);
return null;
}
}
WorkflowDefinition subWorkflowDefinition = processDefineMapper.queryByCode(childDefineCode);
Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS);
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
Map<String, String> globalMap = ParamUtils.getGlobalParamMap(task.getVarPool());
Map<String, String> fatherParams = new HashMap<>();
if (CollectionUtils.isNotEmpty(allParam)) {
for (Property info : allParam) {
if (Direct.OUT == info.getDirect()) {
continue;
}
fatherParams.put(info.getProp(), globalMap.get(info.getProp()));
}
}
String processParam = ParamUtils.getSubWorkFlowParam(instanceMap, parentWorkflowInstance, fatherParams);
int subProcessInstanceId =
childInstance == null ? 0 : (childInstance.getId() == null ? 0 : childInstance.getId());
return new Command(
commandType,
TaskDependType.TASK_POST,
parentWorkflowInstance.getFailureStrategy(),
parentWorkflowInstance.getExecutorId(),
subWorkflowDefinition.getCode(),
processParam,
parentWorkflowInstance.getWarningType(),
parentWorkflowInstance.getWarningGroupId(),
parentWorkflowInstance.getScheduleTime(),
task.getWorkerGroup(),
task.getEnvironmentCode(),
parentWorkflowInstance.getWorkflowInstancePriority(),
parentWorkflowInstance.getDryRun(),
subProcessInstanceId,
subWorkflowDefinition.getVersion(),
parentWorkflowInstance.getTestFlag());
}
/**
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*/
private CommandType getSubCommandType(WorkflowInstance parentWorkflowInstance, WorkflowInstance childInstance) {
CommandType commandType = parentWorkflowInstance.getCommandType();
if (childInstance == null) {
String fatherHistoryCommand = parentWorkflowInstance.getHistoryCmd();
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
}
}

44
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* task priority queue exception
*/
public class TaskPriorityQueueException extends Exception {
/**
* Construct a new runtime exception with the detail message
*
* @param message message
*/
public TaskPriorityQueueException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message message
* @param cause cause
*/
public TaskPriorityQueueException(String message, Throwable cause) {
super(message, cause);
}
}

181
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java

@ -1,181 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
/**
* queue
*/
private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(new TaskInstancePriorityComparator());
private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
*
* @param taskInstance taskInstance
*/
@Override
public void put(TaskInstance taskInstance) {
Preconditions.checkNotNull(taskInstance);
queue.add(taskInstance);
taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));
}
/**
* take task info
*
* @return task instance
* @throws TaskPriorityQueueException
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
TaskInstance taskInstance = queue.poll();
if (taskInstance != null) {
taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
}
return taskInstance;
}
/**
* poll task info with timeout
* <p>
* WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit)
* because this method of override interface used without considering accuracy of timeout
*
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
@Override
public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException {
throw new TaskPriorityQueueException(
"This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!");
}
/**
* peek taskInfo
*
* @return task instance
*/
public TaskInstance peek() {
return queue.peek();
}
/**
* queue size
*
* @return size
*/
@Override
public int size() {
return queue.size();
}
/**
* clear task
*
*/
public void clear() {
queue.clear();
taskInstanceIdentifySet.clear();
}
/**
* whether contains the task instance
*
* @param taskInstance task instance
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
Preconditions.checkNotNull(taskInstance);
return taskInstanceIdentifySet.contains(getTaskInstanceIdentify(taskInstance));
}
/**
* remove task
*
* @param taskInstance task instance
* @return true if remove success
*/
public boolean remove(TaskInstance taskInstance) {
Preconditions.checkNotNull(taskInstance);
taskInstanceIdentifySet.remove(getTaskInstanceIdentify(taskInstance));
return queue.remove(taskInstance);
}
/**
* get iterator
*
* @return Iterator
*/
public Iterator<TaskInstance> iterator() {
return queue.iterator();
}
// since the task instance will not contain taskInstanceId until insert into database
// So we use processInstanceId + taskCode + version to identify a taskInstance.
private String getTaskInstanceIdentify(TaskInstance taskInstance) {
return String.join(
String.valueOf(taskInstance.getWorkflowInstanceId()),
String.valueOf(taskInstance.getTaskCode()),
String.valueOf(taskInstance.getTaskDefinitionVersion()), "-");
}
/**
* This comparator is used to sort task instances in the standby queue.
* If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
* Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow.
*/
private static class TaskInstancePriorityComparator implements Comparator<TaskInstance> {
@Override
public int compare(TaskInstance o1, TaskInstance o2) {
int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
int taskInstancePriorityInWorkflow =
Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode());
if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
// If at the same taskGroup
if (taskPriorityInTaskGroup != 0) {
return taskPriorityInTaskGroup;
}
}
return taskInstancePriorityInWorkflow;
}
}
}

162
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -1,162 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
import java.util.Objects;
import lombok.Data;
@Data
public class TaskPriority implements Comparable<TaskPriority> {
private int workflowInstancePriority;
private int workflowInstanceId;
private int taskInstancePriority;
private int taskId;
private TaskExecutionContext taskExecutionContext;
private String groupName;
private Map<String, String> context;
private long checkpoint;
private int taskGroupPriority;
public TaskPriority() {
this.checkpoint = System.currentTimeMillis();
}
public TaskPriority(int workflowInstancePriority,
int workflowInstanceId,
int taskInstancePriority,
int taskId,
int taskGroupPriority, String groupName) {
this.workflowInstancePriority = workflowInstancePriority;
this.workflowInstanceId = workflowInstanceId;
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.taskGroupPriority = taskGroupPriority;
this.groupName = groupName;
this.checkpoint = System.currentTimeMillis();
}
@Override
public int compareTo(TaskPriority other) {
if (this.getWorkflowInstancePriority() > other.getWorkflowInstancePriority()) {
return 1;
}
if (this.getWorkflowInstancePriority() < other.getWorkflowInstancePriority()) {
return -1;
}
if (this.getWorkflowInstanceId() > other.getWorkflowInstanceId()) {
return 1;
}
if (this.getWorkflowInstanceId() < other.getWorkflowInstanceId()) {
return -1;
}
if (this.getTaskInstancePriority() > other.getTaskInstancePriority()) {
return 1;
}
if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
return -1;
}
if (this.getTaskGroupPriority() != other.getTaskGroupPriority()) {
// larger number, higher priority
return Constants.OPPOSITE_VALUE
* Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority());
}
if (this.getTaskId() > other.getTaskId()) {
return 1;
}
if (this.getTaskId() < other.getTaskId()) {
return -1;
}
String thisGroupName =
StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING;
String otherGroupName =
StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING;
if (!thisGroupName.equals(otherGroupName)) {
return thisGroupName.compareTo(otherGroupName);
}
return Long.compare(this.getCheckpoint(), other.getCheckpoint());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskPriority that = (TaskPriority) o;
return workflowInstancePriority == that.workflowInstancePriority
&& workflowInstanceId == that.workflowInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& taskGroupPriority == that.taskGroupPriority
&& Objects.equals(groupName, that.groupName);
}
@Override
public int hashCode() {
return Objects.hash(workflowInstancePriority,
workflowInstanceId,
taskInstancePriority,
taskId,
taskGroupPriority,
groupName);
}
@Override
public String toString() {
return "TaskPriority{"
+ "workflowInstancePriority="
+ workflowInstancePriority
+ ", workflowInstanceId="
+ workflowInstanceId
+ ", taskInstancePriority="
+ taskInstancePriority
+ ", taskId="
+ taskId
+ ", taskExecutionContext="
+ taskExecutionContext
+ ", groupName='"
+ groupName
+ '\''
+ ", context="
+ context
+ ", checkpoint="
+ checkpoint
+ ", taskGroupPriority="
+ taskGroupPriority
+ '}';
}
}

63
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.TimeUnit;
/**
* task priority queue
* @param <T>
*/
public interface TaskPriorityQueue<T> {
/**
* put task info
*
* @param taskInfo taskInfo
* @throws TaskPriorityQueueException
*/
void put(T taskInfo);
/**
* take taskInfo
*
* @return taskInfo
* @throws TaskPriorityQueueException
*/
T take() throws TaskPriorityQueueException, InterruptedException;
/**
* poll taskInfo with timeout
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException;
/**
* size
*
* @return size
* @throws TaskPriorityQueueException
*/
int size() throws TaskPriorityQueueException;
}

84
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service;
/**
* A singleton of a task queue implemented using PriorityBlockingQueue
*/
@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
/**
* Task queue, this queue is unbounded, this means it will cause OutOfMemoryError.
* The master will stop to generate the task if memory is too high.
*/
private final PriorityBlockingQueue<TaskPriority> queue = new PriorityBlockingQueue<>(3000);
/**
* put task takePriorityInfo
*
* @param taskPriorityInfo takePriorityInfo
*/
@Override
public void put(TaskPriority taskPriorityInfo) {
queue.put(taskPriorityInfo);
}
/**
* take taskInfo
*
* @return taskInfo
* @throws TaskPriorityQueueException
*/
@Override
public TaskPriority take() throws TaskPriorityQueueException, InterruptedException {
return queue.take();
}
/**
* poll taskInfo with timeout
*
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
@Override
public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException {
return queue.poll(timeout, unit);
}
/**
* queue size
*
* @return size
* @throws TaskPriorityQueueException
*/
@Override
public int size() throws TaskPriorityQueueException {
return queue.size();
}
}

42
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/Constants.java

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import lombok.experimental.UtilityClass;
@UtilityClass
public final class Constants {
public static final int[] NOT_TERMINATED_STATES = new int[]{
TaskExecutionStatus.DISPATCH.getCode(),
WorkflowExecutionStatus.RUNNING_EXECUTION.getCode(),
WorkflowExecutionStatus.READY_PAUSE.getCode(),
WorkflowExecutionStatus.READY_STOP.getCode(),
TaskExecutionStatus.NEED_FAULT_TOLERANCE.getCode(),
};
public static final int[] RUNNING_PROCESS_STATE = new int[]{
TaskExecutionStatus.RUNNING_EXECUTION.getCode(),
TaskExecutionStatus.SUBMITTED_SUCCESS.getCode(),
TaskExecutionStatus.DISPATCH.getCode(),
WorkflowExecutionStatus.SERIAL_WAIT.getCode()
};
}

27
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java

@ -207,33 +207,6 @@ public class DagHelper {
return resultList;
}
/**
* generate dag by start nodes and recovery nodes
*
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param depNodeType depNodeType
* @return workflow dag
* @throws Exception if error throws Exception
*/
public static WorkflowDag generateFlowDag(List<TaskNode> totalTaskNodeList,
List<Long> startNodeNameList,
List<Long> recoveryNodeCodeList,
TaskDependType depNodeType) throws Exception {
List<TaskNode> destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList,
recoveryNodeCodeList, depNodeType);
if (destTaskNodeList.isEmpty()) {
return null;
}
List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(destTaskNodeList);
return workflowDag;
}
/**
* find node by node code
*

94
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ParamUtils.java

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_FATHER_PARAMS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
/**
* Param Utility class
*/
public class ParamUtils {
/**
* convert globalParams string to global parameter map
* @param globalParams globalParams
* @return parameter map
*/
public static Map<String, String> getGlobalParamMap(String globalParams) {
List<Property> propList;
Map<String, String> globalParamMap = new HashMap<>();
if (!Strings.isNullOrEmpty(globalParams)) {
propList = JSONUtils.toList(globalParams, Property.class);
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return globalParamMap;
}
/**
* Get sub workflow parameters
* @param instanceMap workflow instance map
* @param parentWorkflowInstance parent workflow instance
* @param fatherParams fatherParams
* @return sub workflow parameters
*/
public static String getSubWorkFlowParam(WorkflowInstanceRelation instanceMap,
WorkflowInstance parentWorkflowInstance,
Map<String, String> fatherParams) {
// set sub work workflow command
String workflowMapStr = JSONUtils.toJsonString(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(workflowMapStr);
if (parentWorkflowInstance.isComplementData()) {
Map<String, String> parentParam = JSONUtils.toMap(parentWorkflowInstance.getCommandParam());
String endTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
String scheduleTime = parentParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)) {
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startTime);
}
if (StringUtils.isNotEmpty(scheduleTime)) {
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, scheduleTime);
}
workflowMapStr = JSONUtils.toJsonString(cmdParam);
}
if (MapUtils.isNotEmpty(fatherParams)) {
cmdParam.put(CMD_PARAM_FATHER_PARAMS, JSONUtils.toJsonString(fatherParams));
workflowMapStr = JSONUtils.toJsonString(cmdParam);
}
return workflowMapStr;
}
}

47
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessData.java

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.service.model.TaskNode;
import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class ProcessData {
@EqualsAndHashCode.Include
private List<TaskNode> tasks;
@EqualsAndHashCode.Include
private List<Property> globalParams;
private int timeout;
private int tenantId;
public ProcessData(List<TaskNode> tasks, List<Property> globalParams) {
this.tasks = tasks;
this.globalParams = globalParams;
}
}

72
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
/**
* mainly used to get the start command line of a process.
*/
@Slf4j
public class ProcessUtils {
/**
* find logs and kill yarn tasks.
*
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static @Nullable List<String> killApplication(@NonNull List<String> appIds,
@NonNull TaskExecutionContext taskExecutionContext) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if (CollectionUtils.isNotEmpty(appIds)) {
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext
.setExecutePath(FileUtils
.getTaskInstanceWorkingDirectory(taskExecutionContext.getTaskInstanceId()));
}
FileUtils.createDirectoryWith755(Paths.get(taskExecutionContext.getExecutePath()));
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils.cancelApplication(taskExecutionContext);
return appIds;
} else {
log.info("The current appId is empty, don't need to kill the yarn job, taskInstanceId: {}",
taskExecutionContext.getTaskInstanceId());
}
} catch (Exception e) {
log.error("Kill yarn job failure, taskInstanceId: {}", taskExecutionContext.getTaskInstanceId(), e);
}
return Collections.emptyList();
}
}

81
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java

@ -17,28 +17,16 @@
package org.apache.dolphinscheduler.service.command;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVER_WORKFLOW_ID_STRING;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -50,8 +38,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import com.fasterxml.jackson.databind.JsonNode;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class MessageServiceImplTest {
@ -68,73 +54,6 @@ class MessageServiceImplTest {
@Mock
private ScheduleMapper scheduleMapper;
@Test
public void testCreateSubCommand() {
WorkflowInstance parentInstance = new WorkflowInstance();
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);
TaskInstance task = new TaskInstance();
task.setTaskParams("{\"processDefinitionCode\":10}}");
task.setId(10);
task.setTaskCode(1L);
task.setTaskDefinitionVersion(1);
WorkflowInstance childInstance = null;
WorkflowInstanceRelation instanceMap = new WorkflowInstanceRelation();
instanceMap.setParentWorkflowInstanceId(1);
instanceMap.setParentTaskInstanceId(10);
Command command;
// father history: start; child null == command type: start
parentInstance.setHistoryCmd("START_PROCESS");
parentInstance.setCommandType(CommandType.START_PROCESS);
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
workflowDefinition.setCode(10L);
Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(workflowDefinition);
Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(workflowDefinition);
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: start,start failure; child null == command type: start
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_PROCESS, command.getCommandType());
// father history: scheduler,start failure; child null == command type: scheduler
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.SCHEDULER, command.getCommandType());
// father history: complement,start failure; child null == command type: complement
String startString = "2020-01-01 00:00:00";
String endString = "2020-01-10 00:00:00";
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS");
Map<String, String> complementMap = new HashMap<>();
complementMap.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, startString);
complementMap.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endString);
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap));
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType());
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam());
Date start = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE).asText());
Date end = DateUtils.stringToDate(complementDate.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE).asText());
Assertions.assertEquals(startString, DateUtils.dateToString(start));
Assertions.assertEquals(endString, DateUtils.dateToString(end));
// father history: start,failure,start failure; child not null == command type: start failure
childInstance = new WorkflowInstance();
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS");
command = commandService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task);
Assertions.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType());
}
@Test
public void testVerifyIsNeedCreateCommand() {

162
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java

@ -1,162 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class StandByTaskInstancePriorityQueueTest {
@Test
public void put() throws TaskPriorityQueueException {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
Assertions.assertEquals(2, queue.size());
Assertions.assertTrue(queue.contains(taskInstanceHigPriority));
Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
}
@Test
public void take() throws Exception {
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assertions.assertTrue(queue.size() < peekBeforeLength);
}
@Test
public void poll() throws Exception {
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
queue.poll(1000, TimeUnit.MILLISECONDS);
});
}
@Test
public void peek() throws Exception {
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
Assertions.assertEquals(peekBeforeLength, queue.size());
}
@Test
public void peekTaskGroupPriority() throws Exception {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
TaskInstance taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals(taskInstance.getName(), "high");
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals("medium", taskInstance.getName());
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals("medium", taskInstance.getName());
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals("high", taskInstance.getName());
}
@Test
public void size() throws Exception {
Assertions.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
}
@Test
public void contains() throws Exception {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1);
taskInstance2.setWorkflowInstanceId(2);
Assertions.assertFalse(queue.contains(taskInstance2));
}
@Test
public void remove() {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
int peekBeforeLength = queue.size();
queue.remove(taskInstanceMediumPriority);
Assertions.assertNotEquals(peekBeforeLength, queue.size());
Assertions.assertFalse(queue.contains(taskInstanceMediumPriority));
}
/**
* get queue
*
* @return queue
* @throws Exception
*/
private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
taskInstanceHigPriority.setTaskGroupPriority(3);
taskInstanceMediumPriority.setTaskGroupPriority(2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
return queue;
}
/**
* create task instance
*
* @param name name
* @param priority priority
* @return
*/
private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName(name);
taskInstance.setTaskInstancePriority(priority);
taskInstance.setTaskGroupPriority(taskGroupPriority);
return taskInstance;
}
}

162
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java

@ -1,162 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.enums.Priority;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TaskPriorityQueueImplTest {
@Test
public void testSort() {
TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default");
TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default");
TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default");
List<TaskPriority> taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPriorities);
priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default");
priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default");
priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPriorities);
priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default");
priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default");
priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPriorities);
priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default");
priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default");
priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPriorities);
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPriorities);
priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityThree, priorityOne, priorityTwo),
taskPriorities);
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
taskPriorities = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityThree, priorityOne, priorityTwo),
taskPriorities);
priorityTwo = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1");
taskPriorities = Arrays.asList(priorityTwo, priorityOne, priorityThree);
Collections.sort(taskPriorities);
Assertions.assertEquals(
Arrays.asList(priorityThree, priorityTwo, priorityOne),
taskPriorities);
}
@Test
public void put() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
Assertions.assertEquals(2, queue.size());
}
@Test
public void take() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assertions.assertTrue(queue.size() < peekBeforeLength);
}
@Test
public void poll() throws Exception {
TaskPriorityQueue queue = getPriorityQueue();
int peekBeforeLength = queue.size();
queue.poll(1000, TimeUnit.MILLISECONDS);
queue.poll(1000, TimeUnit.MILLISECONDS);
Assertions.assertEquals(0, queue.size());
queue.poll(1000, TimeUnit.MILLISECONDS);
}
@Test
public void size() throws Exception {
Assertions.assertEquals(2, getPriorityQueue().size());
}
/**
* get queue
*
* @return queue
* @throws Exception
*/
private TaskPriorityQueue getPriorityQueue() throws Exception {
TaskPriorityQueue queue = new TaskPriorityQueueImpl();
TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1);
TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
return queue;
}
/**
* create task priority
*
* @param priority
* @param processInstanceId
* @return
*/
private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) {
TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default");
return priorityOne;
}
}

25
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
@ -40,6 +41,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -679,4 +684,24 @@ public class DagHelperTest {
Assertions.assertNotNull(dag);
}
@Data
@NoArgsConstructor
private static class ProcessData {
@EqualsAndHashCode.Include
private List<TaskNode> tasks;
@EqualsAndHashCode.Include
private List<Property> globalParams;
private int timeout;
private int tenantId;
public ProcessData(List<TaskNode> tasks, List<Property> globalParams) {
this.tasks = tasks;
this.globalParams = globalParams;
}
}
}

37
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/ParamUtilsTest.java

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ParamUtilsTest {
@Test
public void testGetGlobalParamMap() {
String globalParam = "[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]";
Map<String, String> globalParamMap = ParamUtils.getGlobalParamMap(globalParam);
Assertions.assertEquals(globalParamMap.size(), 1);
Assertions.assertEquals(globalParamMap.get("startParam1"), "");
Map<String, String> emptyParamMap = ParamUtils.getGlobalParamMap(null);
Assertions.assertEquals(emptyParamMap.size(), 0);
}
}
Loading…
Cancel
Save