Browse Source

[Chore] Remove unused utils in common module (#16902)

dev
Wenjun Ruan 1 week ago committed by GitHub
parent
commit
789f7abe70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  2. 76
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java
  3. 51
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java
  4. 112
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java
  5. 28
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java
  6. 218
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java
  7. 54
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java
  8. 458
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
  9. 707
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -55,10 +54,8 @@ import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResp
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -145,7 +142,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
Date start = checkAndParseDateParameters(startDate); Date start = checkAndParseDateParameters(startDate);
Date end = checkAndParseDateParameters(endDate); Date end = checkAndParseDateParameters(endDate);
Page<TaskInstance> page = new Page<>(pageNo, pageSize); Page<TaskInstance> page = new Page<>(pageNo, pageSize);
PageInfo<Map<String, Object>> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<TaskInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
IPage<TaskInstance> taskInstanceIPage; IPage<TaskInstance> taskInstanceIPage;
if (taskExecuteType == TaskExecuteType.STREAM) { if (taskExecuteType == TaskExecuteType.STREAM) {
// stream task without workflow instance // stream task without workflow instance
@ -178,9 +175,6 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
start, start,
end); end);
} }
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("taskJson");
List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords(); List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
List<Integer> executorIds = List<Integer> executorIds =
taskInstanceList.stream().map(TaskInstance::getExecutorId).distinct().collect(Collectors.toList()); taskInstanceList.stream().map(TaskInstance::getExecutorId).distinct().collect(Collectors.toList());
@ -194,7 +188,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
} }
} }
pageInfo.setTotal((int) taskInstanceIPage.getTotal()); pageInfo.setTotal((int) taskInstanceIPage.getTotal());
pageInfo.setTotalList(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet)); pageInfo.setTotalList(taskInstanceList);
result.setData(pageInfo); result.setData(pageInfo);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;

76
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.commons.beanutils.BeanMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Provides utility methods and decorators for {@link Collection} instances.
* <p>
* Various utility methods might put the input objects into a Set/Map/Bag. In case
* the input objects override {@link Object#equals(Object)}, it is mandatory that
* the general contract of the {@link Object#hashCode()} method is maintained.
* <p>
* NOTE: From 4.0, method parameters will take {@link Iterable} objects when possible.
*
* @version $Id: CollectionUtils.java 1686855 2015-06-22 13:00:27Z tn $
* @since 1.0
*/
public class CollectionUtils {
private CollectionUtils() {
throw new UnsupportedOperationException("Construct CollectionUtils");
}
/**
* Removes certain attributes of each object in the list
*
* @param originList origin list
* @param exclusionSet exclusion set
* @param <T> T
* @return removes certain attributes of each object in the list
*/
public static <T extends Object> List<Map<String, Object>> getListByExclusion(List<T> originList,
Set<String> exclusionSet) {
List<Map<String, Object>> instanceList = new ArrayList<>();
if (originList == null) {
return instanceList;
}
Map<String, Object> instanceMap;
for (T instance : originList) {
BeanMap beanMap = new BeanMap(instance);
instanceMap = new LinkedHashMap<>(16, 0.75f, true);
for (Map.Entry<Object, Object> entry : beanMap.entrySet()) {
if (exclusionSet != null && exclusionSet.contains(entry.getKey())) {
continue;
}
instanceMap.put((String) entry.getKey(), entry.getValue());
}
instanceList.add(instanceMap);
}
return instanceList;
}
}

51
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.Arrays;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConnectionUtils {
private ConnectionUtils() {
throw new UnsupportedOperationException("Construct ConnectionUtils");
}
/**
* release resource
*
* @param resources resources
*/
public static void releaseResource(AutoCloseable... resources) {
if (resources == null || resources.length == 0) {
return;
}
Arrays.stream(resources).filter(Objects::nonNull)
.forEach(resource -> {
try {
resource.close();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
}
}

112
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java

@ -1,112 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
public class RetryerUtils {
private static Retryer<Boolean> defaultRetryerResultCheck;
private static Retryer<Boolean> defaultRetryerResultNoCheck;
private RetryerUtils() {
throw new UnsupportedOperationException("Construct RetryerUtils");
}
private static Retryer<Boolean> getDefaultRetryerResultNoCheck() {
if (defaultRetryerResultNoCheck == null) {
defaultRetryerResultNoCheck = RetryerBuilder
.<Boolean>newBuilder()
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
}
return defaultRetryerResultNoCheck;
}
/**
* Gets default retryer.
* the retryer will retry 3 times if exceptions throw
* and wait 1 second between each retry
*
* @param checkResult true means the callable must return true before retrying
* false means that retry callable only throw exceptions
* @return the default retryer
*/
public static Retryer<Boolean> getDefaultRetryer(boolean checkResult) {
return checkResult ? getDefaultRetryer() : getDefaultRetryerResultNoCheck();
}
/**
* Gets default retryer.
* the retryer will retry 3 times if exceptions throw
* and wait 1 second between each retry
*
* @return the default retryer
*/
public static Retryer<Boolean> getDefaultRetryer() {
if (defaultRetryerResultCheck == null) {
defaultRetryerResultCheck = RetryerBuilder
.<Boolean>newBuilder()
.retryIfResult(Boolean.FALSE::equals)
.retryIfException()
.withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
}
return defaultRetryerResultCheck;
}
/**
* Use RETRYER to invoke the Callable
*
* @param callable the callable
* @param checkResult true means that retry callable before returning true
* false means that retry callable only throw exceptions
* @return the final result of callable
* @throws ExecutionException the execution exception
* @throws RetryException the retry exception
*/
public static Boolean retryCall(final Callable<Boolean> callable,
boolean checkResult) throws ExecutionException, RetryException {
return getDefaultRetryer(checkResult).call(callable);
}
/**
* Use RETRYER to invoke the Callable before returning true
*
* @param callable the callable
* @return the boolean
* @throws ExecutionException the execution exception
* @throws RetryException the retry exception
*/
public static Boolean retryCall(final Callable<Boolean> callable) throws ExecutionException, RetryException {
return retryCall(callable, true);
}
}

28
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java

@ -1,28 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
/**
* tri function function interface
*/
@FunctionalInterface
public interface TriFunction<IN1, IN2, IN3, OUT1> {
OUT1 apply(IN1 in1, IN2 in2, IN3 in3);
}

218
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java

@ -1,218 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.concurrent.ExecutionException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
public class RetryerUtilsTest {
@Test
public void testDefaultRetryer() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
Assertions.assertNotNull(retryer);
try {
boolean result = retryer.call(() -> true);
Assertions.assertTrue(result);
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
Retryer<Boolean> retryer1 = RetryerUtils.getDefaultRetryer(true);
Assertions.assertEquals(retryer, retryer1);
}
@Test
public void testDefaultRetryerResultCheck() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer();
Assertions.assertNotNull(retryer);
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = retryer.call(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
});
Assertions.assertEquals(finalExecTarget, execTime[0]);
Assertions.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
retryer.call(() -> {
execTime[0]++;
return execTime[0] == 4;
});
Assertions.fail("Retry times not reached");
} catch (RetryException e) {
Assertions.assertEquals(3, e.getNumberOfFailedAttempts());
Assertions.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testDefaultRetryerResultNoCheck() {
Retryer<Boolean> retryer = RetryerUtils.getDefaultRetryer(false);
Assertions.assertNotNull(retryer);
try {
for (int execTarget = 1; execTarget <= 5; execTarget++) {
int[] execTime = {0};
boolean result = retryer.call(() -> {
execTime[0]++;
return execTime[0] > 1;
});
Assertions.assertEquals(1, execTime[0]);
Assertions.assertFalse(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultCheck() {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
});
Assertions.assertEquals(finalExecTarget, execTime[0]);
Assertions.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == 4;
});
Assertions.fail("Recall times not reached");
} catch (RetryException e) {
Assertions.assertEquals(3, e.getNumberOfFailedAttempts());
Assertions.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultCheckWithPara() {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == finalExecTarget;
}, true);
Assertions.assertEquals(finalExecTarget, execTime[0]);
Assertions.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] == 4;
}, true);
Assertions.fail("Recall times not reached");
} catch (RetryException e) {
Assertions.assertEquals(3, e.getNumberOfFailedAttempts());
Assertions.assertEquals(3, execTime[0]);
} catch (ExecutionException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRecallResultNoCheck() {
try {
for (int execTarget = 1; execTarget <= 5; execTarget++) {
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
return execTime[0] > 1;
}, false);
Assertions.assertEquals(1, execTime[0]);
Assertions.assertFalse(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
private void testRetryExceptionWithPara(boolean checkResult) {
try {
for (int execTarget = 1; execTarget <= 3; execTarget++) {
int finalExecTarget = execTarget;
int[] execTime = {0};
boolean result = RetryerUtils.retryCall(() -> {
execTime[0]++;
if (execTime[0] != finalExecTarget) {
throw new IllegalArgumentException(String.valueOf(execTime[0]));
}
return true;
}, checkResult);
Assertions.assertEquals(finalExecTarget, execTime[0]);
Assertions.assertTrue(result);
}
} catch (ExecutionException | RetryException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
int[] execTime = {0};
try {
RetryerUtils.retryCall(() -> {
execTime[0]++;
if (execTime[0] != 4) {
throw new IllegalArgumentException(String.valueOf(execTime[0]));
}
return true;
}, checkResult);
Assertions.fail("Recall times not reached");
} catch (RetryException e) {
Assertions.assertEquals(3, e.getNumberOfFailedAttempts());
Assertions.assertEquals(3, execTime[0]);
Assertions.assertNotNull(e.getCause());
Assertions.assertEquals(3, Integer.parseInt(e.getCause().getMessage()));
} catch (ExecutionException e) {
Assertions.fail("Retry call failed " + e.getMessage());
}
}
@Test
public void testRetryException() {
testRetryExceptionWithPara(true);
testRetryExceptionWithPara(false);
}
}

54
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class StringTest {
@Test
public void stringCompareTest() {
for (int j = 0; j < 5; j++) {
long start = System.currentTimeMillis();
int size = 10000;
List<String> taskList = new ArrayList<>(size);
// init
for (int i = 0; i < size; i++) {
taskList.add(String.format("%d_%010d_%010d", 1, i, i + 1));
}
String origin = taskList.get(0);
for (int i = 1; i < taskList.size(); i++) {
String str = taskList.get(i);
int result = str.compareTo(origin);
if (result < 0) {
origin = str;
}
}
double during = (System.currentTimeMillis() - start) / 1000.0;
Assertions.assertEquals("1_0000000000_0000000001", origin);
}
}
}

458
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java

@ -17,421 +17,27 @@
package org.apache.dolphinscheduler.service.utils; package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation; import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.WorkflowDag; import org.apache.dolphinscheduler.service.process.WorkflowDag;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
/** /**
* dag tools * dag tools
*/ */
@Slf4j @Slf4j
public class DagHelper { public class DagHelper {
/**
* generate flow node relation list by task node list;
* Edges that are not in the task Node List will not be added to the result
*
* @param taskNodeList taskNodeList
* @return task node relation list
*/
public static List<TaskNodeRelation> generateRelationListByFlowNodes(List<TaskNode> taskNodeList) {
List<TaskNodeRelation> nodeRelationList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<Long> preTaskList = JSONUtils.toList(preTasks, Long.class);
if (preTaskList != null) {
for (Long depNodeCode : preTaskList) {
if (null != findNodeByCode(taskNodeList, depNodeCode)) {
nodeRelationList.add(new TaskNodeRelation(depNodeCode, taskNode.getCode()));
}
}
}
}
return nodeRelationList;
}
/**
* generate task nodes needed by dag
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList,
List<Long> startNodeNameList,
List<Long> recoveryNodeCodeList,
TaskDependType taskDependType) {
List<TaskNode> destFlowNodeList = new ArrayList<>();
List<Long> startNodeList = startNodeNameList;
if (taskDependType != TaskDependType.TASK_POST && CollectionUtils.isEmpty(startNodeList)) {
log.error("start node list is empty! cannot continue run the workflow ");
return destFlowNodeList;
}
List<TaskNode> destTaskNodeList = new ArrayList<>();
List<TaskNode> tmpTaskNodeList = new ArrayList<>();
if (taskDependType == TaskDependType.TASK_POST
&& CollectionUtils.isNotEmpty(recoveryNodeCodeList)) {
startNodeList = recoveryNodeCodeList;
}
if (CollectionUtils.isEmpty(startNodeList)) {
// no special designation start nodes
tmpTaskNodeList = taskNodeList;
} else {
// specified start nodes or resume execution
for (Long startNodeCode : startNodeList) {
TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode);
List<TaskNode> childNodeList = new ArrayList<>();
if (startNode == null) {
log.error("start node name [{}] is not in task node list [{}] ",
startNodeCode,
taskNodeList);
continue;
} else if (TaskDependType.TASK_POST == taskDependType) {
List<Long> visitedNodeCodeList = new ArrayList<>();
childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList);
} else if (TaskDependType.TASK_PRE == taskDependType) {
List<Long> visitedNodeCodeList = new ArrayList<>();
childNodeList =
getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList);
} else {
childNodeList.add(startNode);
}
tmpTaskNodeList.addAll(childNodeList);
}
}
for (TaskNode taskNode : tmpTaskNodeList) {
if (null == findNodeByCode(destTaskNodeList, taskNode.getCode())) {
destTaskNodeList.add(taskNode);
}
}
return destTaskNodeList;
}
/**
* find all the nodes that depended on the start node
*
* @param startNode startNode
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPost(TaskNode startNode,
List<TaskNode> taskNodeList,
List<Long> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
for (TaskNode taskNode : taskNodeList) {
List<Long> depList = taskNode.getDepList();
if (null != depList && null != startNode && depList.contains(startNode.getCode())
&& !visitedNodeCodeList.contains(taskNode.getCode())) {
resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
visitedNodeCodeList.add(startNode.getCode());
}
resultList.add(startNode);
return resultList;
}
/**
* find all nodes that start nodes depend on.
*
* @param startNode startNode
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
List<Long> recoveryNodeCodeList,
List<TaskNode> taskNodeList,
List<Long> visitedNodeCodeList) {
List<TaskNode> resultList = new ArrayList<>();
List<Long> depList = new ArrayList<>();
if (null != startNode) {
depList = startNode.getDepList();
resultList.add(startNode);
}
if (CollectionUtils.isEmpty(depList)) {
return resultList;
}
for (Long depNodeCode : depList) {
TaskNode start = findNodeByCode(taskNodeList, depNodeCode);
if (recoveryNodeCodeList.contains(depNodeCode)) {
resultList.add(start);
} else if (!visitedNodeCodeList.contains(depNodeCode)) {
resultList.addAll(getFlowNodeListPre(start, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList));
}
}
// why add (startNode != null) condition? for SonarCloud Quality Gate passed
if (null != startNode) {
visitedNodeCodeList.add(startNode.getCode());
}
return resultList;
}
/**
* find node by node code
*
* @param nodeDetails nodeDetails
* @param nodeCode nodeCode
* @return task node
*/
public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long nodeCode) {
for (TaskNode taskNode : nodeDetails) {
if (taskNode.getCode() == nodeCode) {
return taskNode;
}
}
return null;
}
/**
* the task can be submit when all the depends nodes are forbidden or complete
*
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
*/
public static boolean allDependsForbiddenOrEnd(TaskNode taskNode,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskNode> skipTaskNodeList,
Map<Long, TaskInstance> completeTaskList) {
List<Long> dependList = taskNode.getDepList();
if (dependList == null) {
return true;
}
for (Long dependNodeCode : dependList) {
TaskNode dependNode = dag.getNode(dependNodeCode);
if (dependNode == null || completeTaskList.containsKey(dependNodeCode)
|| dependNode.isForbidden()
|| skipTaskNodeList.containsKey(dependNodeCode)) {
continue;
} else {
return false;
}
}
return true;
}
/**
* parse the successor nodes of previous node.
* this function parse the condition node to find the right branch.
* also check all the depends nodes forbidden or complete
*
* @return successor nodes
*/
public static Set<Long> parsePostNodes(Long preNodeCode,
Map<Long, TaskNode> skipTaskNodeList,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskInstance> completeTaskList) {
Set<Long> postNodeList = new HashSet<>();
Collection<Long> startVertexes = new ArrayList<>();
if (preNodeCode == null) {
startVertexes = dag.getBeginNode();
} else if (TaskTypeUtils.isConditionTask(dag.getNode(preNodeCode).getType())) {
List<Long> conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else if (TaskTypeUtils.isSwitchTask(dag.getNode(preNodeCode).getType())) {
List<Long> conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeCode);
}
for (Long subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
if (taskNode == null) {
log.error("taskNode {} is null, please check dag", subsequent);
continue;
}
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, skipTaskNodeList);
continue;
}
if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) {
continue;
}
if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) {
postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList));
continue;
}
postNodeList.add(subsequent);
}
return postNodeList;
}
/**
* if all of the task dependence are skipped, skip it too.
*/
private static boolean isTaskNodeNeedSkip(TaskNode taskNode,
Map<Long, TaskNode> skipTaskNodeList) {
if (CollectionUtils.isEmpty(taskNode.getDepList())) {
return false;
}
for (Long depNode : taskNode.getDepList()) {
if (!skipTaskNodeList.containsKey(depNode)) {
return false;
}
}
return true;
}
/**
* parse condition task find the branch workflow
* set skip flag for another one.
*/
public static List<Long> parseConditionTask(Long nodeCode,
Map<Long, TaskNode> skipTaskNodeList,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskInstance> completeTaskList) {
List<Long> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeCode);
if (!TaskTypeUtils.isConditionTask(taskNode.getType())) {
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<ConditionsParameters>() {
});
ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult();
List<Long> skipNodeList = new ArrayList<>();
if (conditionResult.isConditionSuccess()) {
conditionTaskList = conditionResult.getSuccessNode();
skipNodeList = conditionResult.getFailedNode();
} else {
conditionTaskList = conditionResult.getFailedNode();
skipNodeList = conditionResult.getSuccessNode();
}
if (CollectionUtils.isNotEmpty(skipNodeList)) {
skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, skipTaskNodeList));
}
// the conditionTaskList maybe null if no next task
conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new ArrayList<>());
return conditionTaskList;
}
/**
* parse condition task find the branch workflow
* set skip flag for another one.
*
* @param nodeCode
* @return
*/
public static List<Long> parseSwitchTask(Long nodeCode,
Map<Long, TaskNode> skipTaskNodeList,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskInstance> completeTaskList) {
List<Long> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeCode);
if (!SwitchLogicTaskChannelFactory.NAME.equals(taskNode.getType())) {
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeCode)) {
return conditionTaskList;
}
conditionTaskList = skipTaskNode4Switch(skipTaskNodeList, completeTaskList.get(nodeCode), dag);
return conditionTaskList;
}
public static List<Long> skipTaskNode4Switch(Map<Long, TaskNode> skipTaskNodeList,
TaskInstance taskInstance,
DAG<Long, TaskNode, TaskNodeRelation> dag) {
SwitchParameters switchParameters =
JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<SwitchParameters>() {
});
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
Long nextBranch = switchParameters.getNextBranch();
if (switchResult == null) {
log.error("switchResult is null, please check the switch task configuration");
return Collections.emptyList();
}
if (nextBranch == null) {
log.error("switchParameters.getNextBranch() is null, please check the switch task configuration");
return Collections.emptyList();
}
Set<Long> allNextBranches = new HashSet<>();
if (switchResult.getNextNode() != null) {
allNextBranches.add(switchResult.getNextNode());
}
if (CollectionUtils.isNotEmpty(switchResult.getDependTaskList())) {
for (SwitchResultVo switchResultVo : switchResult.getDependTaskList()) {
allNextBranches.add(switchResultVo.getNextNode());
}
}
allNextBranches.remove(nextBranch);
for (Long branch : allNextBranches) {
setTaskNodeSkip(branch, dag, skipTaskNodeList);
}
return Lists.newArrayList(nextBranch);
}
/**
* set task node and the post nodes skip flag
*/
private static void setTaskNodeSkip(Long skipNodeCode,
DAG<Long, TaskNode, TaskNodeRelation> dag,
Map<Long, TaskNode> skipTaskNodeList) {
if (!dag.containsNode(skipNodeCode)) {
return;
}
skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode));
Collection<Long> postNodeList = dag.getSubsequentNodes(skipNodeCode);
for (Long post : postNodeList) {
TaskNode postNode = dag.getNode(post);
if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) {
setTaskNodeSkip(post, dag, skipTaskNodeList);
}
}
}
/*** /***
* build dag graph * build dag graph
* @param workflowDag workflowDag * @param workflowDag workflowDag
@ -457,34 +63,6 @@ public class DagHelper {
return dag; return dag;
} }
/**
* get workflow dag
*
* @param taskNodeList task node list
* @return workflow dag
*/
public static WorkflowDag getWorkflowDag(List<TaskNode> taskNodeList) {
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<Long> preTasksList = JSONUtils.toList(preTasks, Long.class);
// If the dependency is not empty
if (preTasksList != null) {
for (Long depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getCode()));
}
}
}
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(taskNodeList);
return workflowDag;
}
/** /**
* get workflow dag * get workflow dag
* *
@ -518,40 +96,4 @@ public class DagHelper {
return workflowDag; return workflowDag;
} }
/**
* is there have conditions after the parent node
*/
public static boolean haveConditionsAfterNode(Long parentNodeCode,
DAG<Long, TaskNode, TaskNodeRelation> dag) {
return haveSubAfterNode(parentNodeCode, dag, ConditionsLogicTaskChannelFactory.NAME);
}
/**
* is there have all node after the parent node
*/
public static boolean haveAllNodeAfterNode(Long parentNodeCode,
DAG<Long, TaskNode, TaskNodeRelation> dag) {
return haveSubAfterNode(parentNodeCode, dag, null);
}
/**
* Whether there is a specified type of child node after the parent node
*/
public static boolean haveSubAfterNode(Long parentNodeCode,
DAG<Long, TaskNode, TaskNodeRelation> dag, String filterNodeType) {
Set<Long> subsequentNodes = dag.getSubsequentNodes(parentNodeCode);
if (CollectionUtils.isEmpty(subsequentNodes)) {
return false;
}
if (StringUtils.isBlank(filterNodeType)) {
return true;
}
for (Long nodeName : subsequentNodes) {
TaskNode taskNode = dag.getNode(nodeName);
if (taskNode.getType().equalsIgnoreCase(filterNodeType)) {
return true;
}
}
return false;
}
} }

707
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

@ -1,707 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.utils;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.WorkflowDag;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.truth.Truth;
public class DagHelperTest {
@Test
public void testHaveSubAfterNode() {
Long parentNodeCode = 5293789969856L;
List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
TaskNodeRelation relation = new TaskNodeRelation();
relation.setStartNode(5293789969856L);
relation.setEndNode(5293789969857L);
taskNodeRelations.add(relation);
TaskNodeRelation relationNext = new TaskNodeRelation();
relationNext.setStartNode(5293789969856L);
relationNext.setEndNode(5293789969858L);
taskNodeRelations.add(relationNext);
List<TaskNode> taskNodes = new ArrayList<>();
TaskNode node = new TaskNode();
node.setCode(5293789969856L);
node.setType("SHELL");
TaskNode subNode = new TaskNode();
subNode.setCode(5293789969857L);
subNode.setType("BLOCKING");
subNode.setPreTasks("[5293789969856]");
TaskNode subNextNode = new TaskNode();
subNextNode.setCode(5293789969858L);
subNextNode.setType("CONDITIONS");
subNextNode.setPreTasks("[5293789969856]");
taskNodes.add(node);
taskNodes.add(subNode);
taskNodes.add(subNextNode);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(taskNodes);
DAG<Long, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(workflowDag);
boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag);
Assertions.assertTrue(canSubmit);
boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag);
Assertions.assertTrue(haveConditions);
boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, DependentLogicTaskChannelFactory.NAME);
Assertions.assertFalse(dependent);
}
@Test
public void testTaskNodeCanSubmit() {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType("SHELL");
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType("SHELL");
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setCode(3);
node3.setType("SHELL");
List<String> dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
node3.setPreTasks(JSONUtils.toJsonString(dep3));
taskNodeList.add(node3);
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setCode(5);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("3");
dep5.add("8");
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setCode(6);
node6.setType("SHELL");
List<String> dep6 = new ArrayList<>();
dep6.add("3");
node6.setPreTasks(JSONUtils.toJsonString(dep6));
taskNodeList.add(node6);
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setCode(7);
node7.setType("SHELL");
List<String> dep7 = new ArrayList<>();
dep7.add("5");
node7.setPreTasks(JSONUtils.toJsonString(dep7));
taskNodeList.add(node7);
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setCode(8);
node8.setType("SHELL");
List<String> dep8 = new ArrayList<>();
dep8.add("2");
node8.setPreTasks(JSONUtils.toJsonString(dep8));
taskNodeList.add(node8);
List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(destTaskNodeList);
// 1->2->3->5->7
// 4->3->6
// 1->2->8->5->7
DAG<Long, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(workflowDag);
TaskNode taskNode3 = dag.getNode(3L);
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
Map<Long, TaskNode> skipNodeList = new HashMap<>();
completeTaskList.putIfAbsent(1L, new TaskInstance());
Boolean canSubmit = false;
// 2/4 are forbidden submit 3
node2 = dag.getNode(2L);
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
TaskNode nodex = dag.getNode(4L);
nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assertions.assertEquals(canSubmit, true);
// 2forbidden, 3 cannot be submit
completeTaskList.putIfAbsent(2L, new TaskInstance());
TaskNode nodey = dag.getNode(4L);
nodey.setRunFlag("");
canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList);
Assertions.assertEquals(canSubmit, false);
// 2/3 forbidden submit 5
node3 = dag.getNode(3L);
node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
node8 = dag.getNode(8L);
node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
node5 = dag.getNode(5L);
canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList);
Assertions.assertEquals(canSubmit, true);
}
@Test
public void testParsePostNodeList() {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType("SHELL");
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType("SHELL");
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setCode(3);
node3.setType("SHELL");
List<String> dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
node3.setPreTasks(JSONUtils.toJsonString(dep3));
taskNodeList.add(node3);
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setCode(5);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("3");
dep5.add("8");
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setCode(6);
node6.setType("SHELL");
List<String> dep6 = new ArrayList<>();
dep6.add("3");
node6.setPreTasks(JSONUtils.toJsonString(dep6));
taskNodeList.add(node6);
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setCode(7);
node7.setType("SHELL");
List<String> dep7 = new ArrayList<>();
dep7.add("5");
node7.setPreTasks(JSONUtils.toJsonString(dep7));
taskNodeList.add(node7);
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setCode(8);
node8.setType("SHELL");
List<String> dep8 = new ArrayList<>();
dep8.add("2");
node8.setPreTasks(JSONUtils.toJsonString(dep8));
taskNodeList.add(node8);
List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(destTaskNodeList);
// 1->2->3->5->7
// 4->3->6
// 1->2->8->5->7
DAG<Long, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(workflowDag);
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
Map<Long, TaskNode> skipNodeList = new HashMap<>();
Set<Long> postNodes = null;
// complete : null
// expect post: 1/4
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(1L));
Assertions.assertTrue(postNodes.contains(4L));
// complete : 1
// expect post: 2/4
completeTaskList.put(1L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(2L));
Assertions.assertTrue(postNodes.contains(4L));
// complete : 1/2
// expect post: 4
completeTaskList.put(2L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(4L));
Assertions.assertTrue(postNodes.contains(8L));
// complete : 1/2/4
// expect post: 3
completeTaskList.put(4L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(3L));
Assertions.assertTrue(postNodes.contains(8L));
// complete : 1/2/4/3
// expect post: 8/6
completeTaskList.put(3L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(8L));
Assertions.assertTrue(postNodes.contains(6L));
// complete : 1/2/4/3/8
// expect post: 6/5
completeTaskList.put(8L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(5L));
Assertions.assertTrue(postNodes.contains(6L));
// complete : 1/2/4/3/5/6/8
// expect post: 7
completeTaskList.put(6L, new TaskInstance());
completeTaskList.put(5L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(7L));
}
@Test
public void testForbiddenPostNode() throws IOException {
DAG<Long, TaskNode, TaskNodeRelation> dag = generateDag();
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
Map<Long, TaskNode> skipNodeList = new HashMap<>();
Set<Long> postNodes = null;
// dag: 1-2-3-5-7 4-3-6 2-8-5-7
// forbid:2 complete:1 post:4/8
completeTaskList.put(1L, new TaskInstance());
TaskNode node2 = dag.getNode(2L);
node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(4L));
Assertions.assertTrue(postNodes.contains(8L));
// forbid:2/4 complete:1 post:3/8
TaskNode node4 = dag.getNode(4L);
node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(2, postNodes.size());
Assertions.assertTrue(postNodes.contains(3L));
Assertions.assertTrue(postNodes.contains(8L));
// forbid:2/4/5 complete:1/8 post:3
completeTaskList.put(8L, new TaskInstance());
TaskNode node5 = dag.getNode(5L);
node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(3L));
}
@Test
public void testConditionPostNode() throws IOException {
DAG<Long, TaskNode, TaskNodeRelation> dag = generateDag();
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
Map<Long, TaskNode> skipNodeList = new HashMap<>();
Set<Long> postNodes = null;
// dag: 1-2-3-5-7 4-3-6 2-8-5-7
// 3-if
completeTaskList.put(1L, new TaskInstance());
completeTaskList.put(2L, new TaskInstance());
completeTaskList.put(4L, new TaskInstance());
TaskInstance taskInstance3 = new TaskInstance();
taskInstance3.setTaskType(ConditionsLogicTaskChannelFactory.NAME);
ConditionsParameters.ConditionResult conditionResult = ConditionsParameters.ConditionResult.builder()
.conditionSuccess(true)
.successNode(Lists.newArrayList(5L))
.failedNode(Lists.newArrayList(6L))
.build();
ConditionsParameters conditionsParameters = new ConditionsParameters();
conditionsParameters.setConditionResult(conditionResult);
taskInstance3.setTaskParams(JSONUtils.toJsonString(conditionsParameters));
taskInstance3.setState(TaskExecutionStatus.SUCCESS);
TaskNode node3 = dag.getNode(3L);
node3.setType(ConditionsLogicTaskChannelFactory.NAME);
// complete 1/2/3/4 expect:8
completeTaskList.put(3L, taskInstance3);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(8L));
// 2.complete 1/2/3/4/8 expect:5 skip:6
completeTaskList.put(8L, new TaskInstance());
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertTrue(postNodes.contains(5L));
Assertions.assertEquals(1, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(6L));
// 3.complete 1/2/3/4/5/8 expect post:7 skip:6
skipNodeList.clear();
TaskInstance taskInstance1 = new TaskInstance();
completeTaskList.put(5L, taskInstance1);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(7L));
Assertions.assertEquals(1, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(6L));
}
@Test
public void testSwitchPostNode() {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node = new TaskNode();
node.setId("0");
node.setName("0");
node.setCode(0);
node.setType("SHELL");
taskNodeList.add(node);
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType(SwitchLogicTaskChannelFactory.NAME);
SwitchParameters switchParameters = new SwitchParameters();
node1.setParams(JSONUtils.toJsonString(switchParameters));
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType("SHELL");
List<String> dep4 = new ArrayList<>();
dep4.add("1");
node4.setPreTasks(JSONUtils.toJsonString(dep4));
taskNodeList.add(node4);
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setCode(5);
node5.setType("SHELL");
List<Long> dep5 = new ArrayList<>();
dep5.add(1L);
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node5.setId("6");
node5.setName("6");
node5.setCode(6);
node5.setType("SHELL");
List<Long> dep6 = new ArrayList<>();
dep5.add(2L);
dep5.add(4L);
node5.setPreTasks(JSONUtils.toJsonString(dep6));
taskNodeList.add(node6);
List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>();
// 0
// 1->2->6
// 1->4->6
// 1->5
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(destTaskNodeList);
DAG<Long, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(workflowDag);
Map<Long, TaskNode> skipTaskNodeList = new HashMap<>();
Map<Long, TaskInstance> completeTaskList = new HashMap<>();
completeTaskList.put(0L, new TaskInstance());
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance.setTaskCode(1L);
taskInstance.setTaskType(SwitchLogicTaskChannelFactory.NAME);
switchParameters = SwitchParameters.builder()
.nextBranch(5L)
.switchResult(SwitchParameters.SwitchResult.builder()
.dependTaskList(Lists.newArrayList(
new SwitchResultVo("", 2L),
new SwitchResultVo("", 4L)))
.nextNode(5L)
.build())
.build();
taskInstance.setTaskParams(JSONUtils.toJsonString(switchParameters));
completeTaskList.put(1l, taskInstance);
List<Long> nextBranch = DagHelper.skipTaskNode4Switch(skipTaskNodeList, taskInstance, dag);
Assertions.assertNotNull(skipTaskNodeList.get(2L));
Assertions.assertNotNull(skipTaskNodeList.get(4L));
Assertions.assertEquals(2, skipTaskNodeList.size());
Truth.assertThat(nextBranch).containsExactly(5L);
}
/**
* process:
* 1->2->3->5->7
* 4->3->6
* 1->2->8->5->7
* DAG graph:
* 4 -> -> 6
* \ /
* 1 -> 2 -> 3 -> 5 -> 7
* \ /
* -> 8 ->
*
* @return dag
* @throws JsonProcessingException if error throws JsonProcessingException
*/
private DAG<Long, TaskNode, TaskNodeRelation> generateDag() throws IOException {
List<TaskNode> taskNodeList = new ArrayList<>();
TaskNode node1 = new TaskNode();
node1.setId("1");
node1.setName("1");
node1.setCode(1);
node1.setType("SHELL");
taskNodeList.add(node1);
TaskNode node2 = new TaskNode();
node2.setId("2");
node2.setName("2");
node2.setCode(2);
node2.setType("SHELL");
List<String> dep2 = new ArrayList<>();
dep2.add("1");
node2.setPreTasks(JSONUtils.toJsonString(dep2));
taskNodeList.add(node2);
TaskNode node4 = new TaskNode();
node4.setId("4");
node4.setName("4");
node4.setCode(4);
node4.setType("SHELL");
taskNodeList.add(node4);
TaskNode node3 = new TaskNode();
node3.setId("3");
node3.setName("3");
node3.setCode(3);
node3.setType("SHELL");
List<String> dep3 = new ArrayList<>();
dep3.add("2");
dep3.add("4");
node3.setPreTasks(JSONUtils.toJsonString(dep3));
taskNodeList.add(node3);
TaskNode node5 = new TaskNode();
node5.setId("5");
node5.setName("5");
node5.setCode(5);
node5.setType("SHELL");
List<String> dep5 = new ArrayList<>();
dep5.add("3");
dep5.add("8");
node5.setPreTasks(JSONUtils.toJsonString(dep5));
taskNodeList.add(node5);
TaskNode node6 = new TaskNode();
node6.setId("6");
node6.setName("6");
node6.setCode(6);
node6.setType("SHELL");
List<String> dep6 = new ArrayList<>();
dep6.add("3");
node6.setPreTasks(JSONUtils.toJsonString(dep6));
taskNodeList.add(node6);
TaskNode node7 = new TaskNode();
node7.setId("7");
node7.setName("7");
node7.setCode(7);
node7.setType("SHELL");
List<String> dep7 = new ArrayList<>();
dep7.add("5");
node7.setPreTasks(JSONUtils.toJsonString(dep7));
taskNodeList.add(node7);
TaskNode node8 = new TaskNode();
node8.setId("8");
node8.setName("8");
node8.setCode(8);
node8.setType("SHELL");
List<String> dep8 = new ArrayList<>();
dep8.add("2");
node8.setPreTasks(JSONUtils.toJsonString(dep8));
taskNodeList.add(node8);
List<Long> startNodes = new ArrayList<>();
List<Long> recoveryNodes = new ArrayList<>();
List<TaskNode> destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList,
startNodes, recoveryNodes, TaskDependType.TASK_POST);
List<TaskNodeRelation> taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList);
WorkflowDag workflowDag = new WorkflowDag();
workflowDag.setEdges(taskNodeRelations);
workflowDag.setNodes(destTaskNodeList);
return DagHelper.buildDagGraph(workflowDag);
}
@Test
public void testBuildDagGraph() {
String shellJson =
"{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\","
+
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"},"
+
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\","
+
"\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\","
+
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
ProcessData processData = JSONUtils.parseObject(shellJson, ProcessData.class);
assert processData != null;
List<TaskNode> taskNodeList = processData.getTasks();
WorkflowDag workflowDag = DagHelper.getWorkflowDag(taskNodeList);
DAG<Long, TaskNode, TaskNodeRelation> dag = DagHelper.buildDagGraph(workflowDag);
Assertions.assertNotNull(dag);
}
@Data
@NoArgsConstructor
private static class ProcessData {
@EqualsAndHashCode.Include
private List<TaskNode> tasks;
@EqualsAndHashCode.Include
private List<Property> globalParams;
private int timeout;
private int tenantId;
public ProcessData(List<TaskNode> tasks, List<Property> globalParams) {
this.tasks = tasks;
this.globalParams = globalParams;
}
}
}
Loading…
Cancel
Save