From 789f7abe703de7c44a53192d6f039be73e4a3ff3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 19 Dec 2024 11:09:05 +0800 Subject: [PATCH] [Chore] Remove unused utils in common module (#16902) --- .../service/impl/TaskInstanceServiceImpl.java | 10 +- .../common/utils/CollectionUtils.java | 76 -- .../common/utils/ConnectionUtils.java | 51 -- .../common/utils/RetryerUtils.java | 112 --- .../common/utils/TriFunction.java | 28 - .../common/utils/RetryerUtilsTest.java | 218 ------ .../common/utils/StringTest.java | 54 -- .../service/utils/DagHelper.java | 458 ------------ .../service/utils/DagHelperTest.java | 707 ------------------ 9 files changed, 2 insertions(+), 1712 deletions(-) delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java delete mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index b1518e3cc7..d7f6faea53 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -55,10 +54,8 @@ import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResp import org.apache.commons.lang3.StringUtils; import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -145,7 +142,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst Date start = checkAndParseDateParameters(startDate); Date end = checkAndParseDateParameters(endDate); Page page = new Page<>(pageNo, pageSize); - PageInfo> pageInfo = new PageInfo<>(pageNo, pageSize); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); IPage taskInstanceIPage; if (taskExecuteType == TaskExecuteType.STREAM) { // stream task without workflow instance @@ -178,9 +175,6 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst start, end); } - Set exclusionSet = new HashSet<>(); - exclusionSet.add(Constants.CLASS); - exclusionSet.add("taskJson"); List taskInstanceList = taskInstanceIPage.getRecords(); List executorIds = taskInstanceList.stream().map(TaskInstance::getExecutorId).distinct().collect(Collectors.toList()); @@ -194,7 +188,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst } } pageInfo.setTotal((int) taskInstanceIPage.getTotal()); - pageInfo.setTotalList(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet)); + pageInfo.setTotalList(taskInstanceList); result.setData(pageInfo); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java deleted file mode 100644 index db1044f05f..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import org.apache.commons.beanutils.BeanMap; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Provides utility methods and decorators for {@link Collection} instances. - *

- * Various utility methods might put the input objects into a Set/Map/Bag. In case - * the input objects override {@link Object#equals(Object)}, it is mandatory that - * the general contract of the {@link Object#hashCode()} method is maintained. - *

- * NOTE: From 4.0, method parameters will take {@link Iterable} objects when possible. - * - * @version $Id: CollectionUtils.java 1686855 2015-06-22 13:00:27Z tn $ - * @since 1.0 - */ -public class CollectionUtils { - - private CollectionUtils() { - throw new UnsupportedOperationException("Construct CollectionUtils"); - } - - /** - * Removes certain attributes of each object in the list - * - * @param originList origin list - * @param exclusionSet exclusion set - * @param T - * @return removes certain attributes of each object in the list - */ - public static List> getListByExclusion(List originList, - Set exclusionSet) { - List> instanceList = new ArrayList<>(); - if (originList == null) { - return instanceList; - } - Map instanceMap; - for (T instance : originList) { - BeanMap beanMap = new BeanMap(instance); - instanceMap = new LinkedHashMap<>(16, 0.75f, true); - for (Map.Entry entry : beanMap.entrySet()) { - if (exclusionSet != null && exclusionSet.contains(entry.getKey())) { - continue; - } - instanceMap.put((String) entry.getKey(), entry.getValue()); - } - instanceList.add(instanceMap); - } - return instanceList; - } - -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java deleted file mode 100644 index 1c1f464c1d..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.util.Arrays; -import java.util.Objects; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class ConnectionUtils { - - private ConnectionUtils() { - throw new UnsupportedOperationException("Construct ConnectionUtils"); - } - - /** - * release resource - * - * @param resources resources - */ - public static void releaseResource(AutoCloseable... resources) { - - if (resources == null || resources.length == 0) { - return; - } - Arrays.stream(resources).filter(Objects::nonNull) - .forEach(resource -> { - try { - resource.close(); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - }); - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java deleted file mode 100644 index 1764ba1e0a..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/RetryerUtils.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.Retryer; -import com.github.rholder.retry.RetryerBuilder; -import com.github.rholder.retry.StopStrategies; -import com.github.rholder.retry.WaitStrategies; - -public class RetryerUtils { - - private static Retryer defaultRetryerResultCheck; - private static Retryer defaultRetryerResultNoCheck; - - private RetryerUtils() { - throw new UnsupportedOperationException("Construct RetryerUtils"); - } - - private static Retryer getDefaultRetryerResultNoCheck() { - if (defaultRetryerResultNoCheck == null) { - defaultRetryerResultNoCheck = RetryerBuilder - .newBuilder() - .retryIfException() - .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .build(); - } - return defaultRetryerResultNoCheck; - } - - /** - * Gets default retryer. - * the retryer will retry 3 times if exceptions throw - * and wait 1 second between each retry - * - * @param checkResult true means the callable must return true before retrying - * false means that retry callable only throw exceptions - * @return the default retryer - */ - public static Retryer getDefaultRetryer(boolean checkResult) { - return checkResult ? getDefaultRetryer() : getDefaultRetryerResultNoCheck(); - } - - /** - * Gets default retryer. - * the retryer will retry 3 times if exceptions throw - * and wait 1 second between each retry - * - * @return the default retryer - */ - public static Retryer getDefaultRetryer() { - if (defaultRetryerResultCheck == null) { - defaultRetryerResultCheck = RetryerBuilder - .newBuilder() - .retryIfResult(Boolean.FALSE::equals) - .retryIfException() - .withWaitStrategy(WaitStrategies.fixedWait(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .build(); - } - return defaultRetryerResultCheck; - } - - /** - * Use RETRYER to invoke the Callable - * - * @param callable the callable - * @param checkResult true means that retry callable before returning true - * false means that retry callable only throw exceptions - * @return the final result of callable - * @throws ExecutionException the execution exception - * @throws RetryException the retry exception - */ - public static Boolean retryCall(final Callable callable, - boolean checkResult) throws ExecutionException, RetryException { - return getDefaultRetryer(checkResult).call(callable); - } - - /** - * Use RETRYER to invoke the Callable before returning true - * - * @param callable the callable - * @return the boolean - * @throws ExecutionException the execution exception - * @throws RetryException the retry exception - */ - public static Boolean retryCall(final Callable callable) throws ExecutionException, RetryException { - return retryCall(callable, true); - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java deleted file mode 100644 index fae03af6b1..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TriFunction.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -/** - * tri function function interface - */ -@FunctionalInterface -public interface TriFunction { - - OUT1 apply(IN1 in1, IN2 in2, IN3 in3); - -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java deleted file mode 100644 index bcf52adf26..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/RetryerUtilsTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.util.concurrent.ExecutionException; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import com.github.rholder.retry.RetryException; -import com.github.rholder.retry.Retryer; - -public class RetryerUtilsTest { - - @Test - public void testDefaultRetryer() { - Retryer retryer = RetryerUtils.getDefaultRetryer(); - Assertions.assertNotNull(retryer); - try { - boolean result = retryer.call(() -> true); - Assertions.assertTrue(result); - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - Retryer retryer1 = RetryerUtils.getDefaultRetryer(true); - Assertions.assertEquals(retryer, retryer1); - } - - @Test - public void testDefaultRetryerResultCheck() { - Retryer retryer = RetryerUtils.getDefaultRetryer(); - Assertions.assertNotNull(retryer); - try { - for (int execTarget = 1; execTarget <= 3; execTarget++) { - int finalExecTarget = execTarget; - int[] execTime = {0}; - boolean result = retryer.call(() -> { - execTime[0]++; - return execTime[0] == finalExecTarget; - }); - Assertions.assertEquals(finalExecTarget, execTime[0]); - Assertions.assertTrue(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - int[] execTime = {0}; - try { - retryer.call(() -> { - execTime[0]++; - return execTime[0] == 4; - }); - Assertions.fail("Retry times not reached"); - } catch (RetryException e) { - Assertions.assertEquals(3, e.getNumberOfFailedAttempts()); - Assertions.assertEquals(3, execTime[0]); - } catch (ExecutionException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - @Test - public void testDefaultRetryerResultNoCheck() { - Retryer retryer = RetryerUtils.getDefaultRetryer(false); - Assertions.assertNotNull(retryer); - try { - for (int execTarget = 1; execTarget <= 5; execTarget++) { - int[] execTime = {0}; - boolean result = retryer.call(() -> { - execTime[0]++; - return execTime[0] > 1; - }); - Assertions.assertEquals(1, execTime[0]); - Assertions.assertFalse(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - @Test - public void testRecallResultCheck() { - try { - for (int execTarget = 1; execTarget <= 3; execTarget++) { - int finalExecTarget = execTarget; - int[] execTime = {0}; - boolean result = RetryerUtils.retryCall(() -> { - execTime[0]++; - return execTime[0] == finalExecTarget; - }); - Assertions.assertEquals(finalExecTarget, execTime[0]); - Assertions.assertTrue(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - int[] execTime = {0}; - try { - RetryerUtils.retryCall(() -> { - execTime[0]++; - return execTime[0] == 4; - }); - Assertions.fail("Recall times not reached"); - } catch (RetryException e) { - Assertions.assertEquals(3, e.getNumberOfFailedAttempts()); - Assertions.assertEquals(3, execTime[0]); - } catch (ExecutionException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - @Test - public void testRecallResultCheckWithPara() { - try { - for (int execTarget = 1; execTarget <= 3; execTarget++) { - int finalExecTarget = execTarget; - int[] execTime = {0}; - boolean result = RetryerUtils.retryCall(() -> { - execTime[0]++; - return execTime[0] == finalExecTarget; - }, true); - Assertions.assertEquals(finalExecTarget, execTime[0]); - Assertions.assertTrue(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - int[] execTime = {0}; - try { - RetryerUtils.retryCall(() -> { - execTime[0]++; - return execTime[0] == 4; - }, true); - Assertions.fail("Recall times not reached"); - } catch (RetryException e) { - Assertions.assertEquals(3, e.getNumberOfFailedAttempts()); - Assertions.assertEquals(3, execTime[0]); - } catch (ExecutionException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - @Test - public void testRecallResultNoCheck() { - try { - for (int execTarget = 1; execTarget <= 5; execTarget++) { - int[] execTime = {0}; - boolean result = RetryerUtils.retryCall(() -> { - execTime[0]++; - return execTime[0] > 1; - }, false); - Assertions.assertEquals(1, execTime[0]); - Assertions.assertFalse(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - private void testRetryExceptionWithPara(boolean checkResult) { - try { - for (int execTarget = 1; execTarget <= 3; execTarget++) { - int finalExecTarget = execTarget; - int[] execTime = {0}; - boolean result = RetryerUtils.retryCall(() -> { - execTime[0]++; - if (execTime[0] != finalExecTarget) { - throw new IllegalArgumentException(String.valueOf(execTime[0])); - } - return true; - }, checkResult); - Assertions.assertEquals(finalExecTarget, execTime[0]); - Assertions.assertTrue(result); - } - } catch (ExecutionException | RetryException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - int[] execTime = {0}; - try { - RetryerUtils.retryCall(() -> { - execTime[0]++; - if (execTime[0] != 4) { - throw new IllegalArgumentException(String.valueOf(execTime[0])); - } - return true; - }, checkResult); - Assertions.fail("Recall times not reached"); - } catch (RetryException e) { - Assertions.assertEquals(3, e.getNumberOfFailedAttempts()); - Assertions.assertEquals(3, execTime[0]); - Assertions.assertNotNull(e.getCause()); - Assertions.assertEquals(3, Integer.parseInt(e.getCause().getMessage())); - } catch (ExecutionException e) { - Assertions.fail("Retry call failed " + e.getMessage()); - } - } - - @Test - public void testRetryException() { - testRetryExceptionWithPara(true); - testRetryExceptionWithPara(false); - } -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java deleted file mode 100644 index 8c685c736e..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StringTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class StringTest { - - @Test - public void stringCompareTest() { - - for (int j = 0; j < 5; j++) { - long start = System.currentTimeMillis(); - int size = 10000; - - List taskList = new ArrayList<>(size); - - // init - for (int i = 0; i < size; i++) { - taskList.add(String.format("%d_%010d_%010d", 1, i, i + 1)); - } - - String origin = taskList.get(0); - for (int i = 1; i < taskList.size(); i++) { - String str = taskList.get(i); - int result = str.compareTo(origin); - if (result < 0) { - origin = str; - } - } - double during = (System.currentTimeMillis() - start) / 1000.0; - Assertions.assertEquals("1_0000000000_0000000001", origin); - } - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java index 414e9970ec..50dc2a3f49 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java @@ -17,421 +17,27 @@ package org.apache.dolphinscheduler.service.utils; -import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowTaskRelation; -import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; -import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; -import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; -import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.WorkflowDag; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; import lombok.extern.slf4j.Slf4j; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.Lists; - /** * dag tools */ @Slf4j public class DagHelper { - /** - * generate flow node relation list by task node list; - * Edges that are not in the task Node List will not be added to the result - * - * @param taskNodeList taskNodeList - * @return task node relation list - */ - public static List generateRelationListByFlowNodes(List taskNodeList) { - List nodeRelationList = new ArrayList<>(); - for (TaskNode taskNode : taskNodeList) { - String preTasks = taskNode.getPreTasks(); - List preTaskList = JSONUtils.toList(preTasks, Long.class); - if (preTaskList != null) { - for (Long depNodeCode : preTaskList) { - if (null != findNodeByCode(taskNodeList, depNodeCode)) { - nodeRelationList.add(new TaskNodeRelation(depNodeCode, taskNode.getCode())); - } - } - } - } - return nodeRelationList; - } - - /** - * generate task nodes needed by dag - * - * @param taskNodeList taskNodeList - * @param startNodeNameList startNodeNameList - * @param recoveryNodeCodeList recoveryNodeCodeList - * @param taskDependType taskDependType - * @return task node list - */ - public static List generateFlowNodeListByStartNode(List taskNodeList, - List startNodeNameList, - List recoveryNodeCodeList, - TaskDependType taskDependType) { - List destFlowNodeList = new ArrayList<>(); - List startNodeList = startNodeNameList; - - if (taskDependType != TaskDependType.TASK_POST && CollectionUtils.isEmpty(startNodeList)) { - log.error("start node list is empty! cannot continue run the workflow "); - return destFlowNodeList; - } - - List destTaskNodeList = new ArrayList<>(); - List tmpTaskNodeList = new ArrayList<>(); - - if (taskDependType == TaskDependType.TASK_POST - && CollectionUtils.isNotEmpty(recoveryNodeCodeList)) { - startNodeList = recoveryNodeCodeList; - } - if (CollectionUtils.isEmpty(startNodeList)) { - // no special designation start nodes - tmpTaskNodeList = taskNodeList; - } else { - // specified start nodes or resume execution - for (Long startNodeCode : startNodeList) { - TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode); - List childNodeList = new ArrayList<>(); - if (startNode == null) { - log.error("start node name [{}] is not in task node list [{}] ", - startNodeCode, - taskNodeList); - continue; - } else if (TaskDependType.TASK_POST == taskDependType) { - List visitedNodeCodeList = new ArrayList<>(); - childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList); - } else if (TaskDependType.TASK_PRE == taskDependType) { - List visitedNodeCodeList = new ArrayList<>(); - childNodeList = - getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList); - } else { - childNodeList.add(startNode); - } - tmpTaskNodeList.addAll(childNodeList); - } - } - - for (TaskNode taskNode : tmpTaskNodeList) { - if (null == findNodeByCode(destTaskNodeList, taskNode.getCode())) { - destTaskNodeList.add(taskNode); - } - } - return destTaskNodeList; - } - - /** - * find all the nodes that depended on the start node - * - * @param startNode startNode - * @param taskNodeList taskNodeList - * @return task node list - */ - private static List getFlowNodeListPost(TaskNode startNode, - List taskNodeList, - List visitedNodeCodeList) { - List resultList = new ArrayList<>(); - for (TaskNode taskNode : taskNodeList) { - List depList = taskNode.getDepList(); - if (null != depList && null != startNode && depList.contains(startNode.getCode()) - && !visitedNodeCodeList.contains(taskNode.getCode())) { - resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList)); - } - } - // why add (startNode != null) condition? for SonarCloud Quality Gate passed - if (null != startNode) { - visitedNodeCodeList.add(startNode.getCode()); - } - - resultList.add(startNode); - return resultList; - } - - /** - * find all nodes that start nodes depend on. - * - * @param startNode startNode - * @param recoveryNodeCodeList recoveryNodeCodeList - * @param taskNodeList taskNodeList - * @return task node list - */ - private static List getFlowNodeListPre(TaskNode startNode, - List recoveryNodeCodeList, - List taskNodeList, - List visitedNodeCodeList) { - - List resultList = new ArrayList<>(); - - List depList = new ArrayList<>(); - if (null != startNode) { - depList = startNode.getDepList(); - resultList.add(startNode); - } - if (CollectionUtils.isEmpty(depList)) { - return resultList; - } - for (Long depNodeCode : depList) { - TaskNode start = findNodeByCode(taskNodeList, depNodeCode); - if (recoveryNodeCodeList.contains(depNodeCode)) { - resultList.add(start); - } else if (!visitedNodeCodeList.contains(depNodeCode)) { - resultList.addAll(getFlowNodeListPre(start, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList)); - } - } - // why add (startNode != null) condition? for SonarCloud Quality Gate passed - if (null != startNode) { - visitedNodeCodeList.add(startNode.getCode()); - } - return resultList; - } - - /** - * find node by node code - * - * @param nodeDetails nodeDetails - * @param nodeCode nodeCode - * @return task node - */ - public static TaskNode findNodeByCode(List nodeDetails, Long nodeCode) { - for (TaskNode taskNode : nodeDetails) { - if (taskNode.getCode() == nodeCode) { - return taskNode; - } - } - return null; - } - - /** - * the task can be submit when all the depends nodes are forbidden or complete - * - * @param taskNode taskNode - * @param dag dag - * @param completeTaskList completeTaskList - * @return can submit - */ - public static boolean allDependsForbiddenOrEnd(TaskNode taskNode, - DAG dag, - Map skipTaskNodeList, - Map completeTaskList) { - List dependList = taskNode.getDepList(); - if (dependList == null) { - return true; - } - for (Long dependNodeCode : dependList) { - TaskNode dependNode = dag.getNode(dependNodeCode); - if (dependNode == null || completeTaskList.containsKey(dependNodeCode) - || dependNode.isForbidden() - || skipTaskNodeList.containsKey(dependNodeCode)) { - continue; - } else { - return false; - } - } - return true; - } - - /** - * parse the successor nodes of previous node. - * this function parse the condition node to find the right branch. - * also check all the depends nodes forbidden or complete - * - * @return successor nodes - */ - public static Set parsePostNodes(Long preNodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - Set postNodeList = new HashSet<>(); - Collection startVertexes = new ArrayList<>(); - - if (preNodeCode == null) { - startVertexes = dag.getBeginNode(); - } else if (TaskTypeUtils.isConditionTask(dag.getNode(preNodeCode).getType())) { - List conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); - startVertexes.addAll(conditionTaskList); - } else if (TaskTypeUtils.isSwitchTask(dag.getNode(preNodeCode).getType())) { - List conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); - startVertexes.addAll(conditionTaskList); - } else { - startVertexes = dag.getSubsequentNodes(preNodeCode); - } - for (Long subsequent : startVertexes) { - TaskNode taskNode = dag.getNode(subsequent); - if (taskNode == null) { - log.error("taskNode {} is null, please check dag", subsequent); - continue; - } - if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) { - setTaskNodeSkip(subsequent, dag, skipTaskNodeList); - continue; - } - if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, skipTaskNodeList, completeTaskList)) { - continue; - } - if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) { - postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList)); - continue; - } - postNodeList.add(subsequent); - } - return postNodeList; - } - - /** - * if all of the task dependence are skipped, skip it too. - */ - private static boolean isTaskNodeNeedSkip(TaskNode taskNode, - Map skipTaskNodeList) { - if (CollectionUtils.isEmpty(taskNode.getDepList())) { - return false; - } - for (Long depNode : taskNode.getDepList()) { - if (!skipTaskNodeList.containsKey(depNode)) { - return false; - } - } - return true; - } - - /** - * parse condition task find the branch workflow - * set skip flag for another one. - */ - public static List parseConditionTask(Long nodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - List conditionTaskList = new ArrayList<>(); - TaskNode taskNode = dag.getNode(nodeCode); - if (!TaskTypeUtils.isConditionTask(taskNode.getType())) { - return conditionTaskList; - } - if (!completeTaskList.containsKey(nodeCode)) { - return conditionTaskList; - } - TaskInstance taskInstance = completeTaskList.get(nodeCode); - ConditionsParameters conditionsParameters = - JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference() { - }); - ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult(); - - List skipNodeList = new ArrayList<>(); - if (conditionResult.isConditionSuccess()) { - conditionTaskList = conditionResult.getSuccessNode(); - skipNodeList = conditionResult.getFailedNode(); - } else { - conditionTaskList = conditionResult.getFailedNode(); - skipNodeList = conditionResult.getSuccessNode(); - } - - if (CollectionUtils.isNotEmpty(skipNodeList)) { - skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, skipTaskNodeList)); - } - // the conditionTaskList maybe null if no next task - conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new ArrayList<>()); - return conditionTaskList; - } - - /** - * parse condition task find the branch workflow - * set skip flag for another one. - * - * @param nodeCode - * @return - */ - public static List parseSwitchTask(Long nodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - List conditionTaskList = new ArrayList<>(); - TaskNode taskNode = dag.getNode(nodeCode); - if (!SwitchLogicTaskChannelFactory.NAME.equals(taskNode.getType())) { - return conditionTaskList; - } - if (!completeTaskList.containsKey(nodeCode)) { - return conditionTaskList; - } - conditionTaskList = skipTaskNode4Switch(skipTaskNodeList, completeTaskList.get(nodeCode), dag); - return conditionTaskList; - } - - public static List skipTaskNode4Switch(Map skipTaskNodeList, - TaskInstance taskInstance, - DAG dag) { - SwitchParameters switchParameters = - JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference() { - }); - - SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult(); - Long nextBranch = switchParameters.getNextBranch(); - if (switchResult == null) { - log.error("switchResult is null, please check the switch task configuration"); - return Collections.emptyList(); - } - if (nextBranch == null) { - log.error("switchParameters.getNextBranch() is null, please check the switch task configuration"); - return Collections.emptyList(); - } - - Set allNextBranches = new HashSet<>(); - if (switchResult.getNextNode() != null) { - allNextBranches.add(switchResult.getNextNode()); - } - if (CollectionUtils.isNotEmpty(switchResult.getDependTaskList())) { - for (SwitchResultVo switchResultVo : switchResult.getDependTaskList()) { - allNextBranches.add(switchResultVo.getNextNode()); - } - } - - allNextBranches.remove(nextBranch); - - for (Long branch : allNextBranches) { - setTaskNodeSkip(branch, dag, skipTaskNodeList); - } - return Lists.newArrayList(nextBranch); - } - - /** - * set task node and the post nodes skip flag - */ - private static void setTaskNodeSkip(Long skipNodeCode, - DAG dag, - Map skipTaskNodeList) { - if (!dag.containsNode(skipNodeCode)) { - return; - } - skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode)); - Collection postNodeList = dag.getSubsequentNodes(skipNodeCode); - for (Long post : postNodeList) { - TaskNode postNode = dag.getNode(post); - if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) { - setTaskNodeSkip(post, dag, skipTaskNodeList); - } - } - } - /*** * build dag graph * @param workflowDag workflowDag @@ -457,34 +63,6 @@ public class DagHelper { return dag; } - /** - * get workflow dag - * - * @param taskNodeList task node list - * @return workflow dag - */ - public static WorkflowDag getWorkflowDag(List taskNodeList) { - List taskNodeRelations = new ArrayList<>(); - - // Traverse node information and build relationships - for (TaskNode taskNode : taskNodeList) { - String preTasks = taskNode.getPreTasks(); - List preTasksList = JSONUtils.toList(preTasks, Long.class); - - // If the dependency is not empty - if (preTasksList != null) { - for (Long depNode : preTasksList) { - taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getCode())); - } - } - } - - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(taskNodeList); - return workflowDag; - } - /** * get workflow dag * @@ -518,40 +96,4 @@ public class DagHelper { return workflowDag; } - /** - * is there have conditions after the parent node - */ - public static boolean haveConditionsAfterNode(Long parentNodeCode, - DAG dag) { - return haveSubAfterNode(parentNodeCode, dag, ConditionsLogicTaskChannelFactory.NAME); - } - - /** - * is there have all node after the parent node - */ - public static boolean haveAllNodeAfterNode(Long parentNodeCode, - DAG dag) { - return haveSubAfterNode(parentNodeCode, dag, null); - } - - /** - * Whether there is a specified type of child node after the parent node - */ - public static boolean haveSubAfterNode(Long parentNodeCode, - DAG dag, String filterNodeType) { - Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); - if (CollectionUtils.isEmpty(subsequentNodes)) { - return false; - } - if (StringUtils.isBlank(filterNodeType)) { - return true; - } - for (Long nodeName : subsequentNodes) { - TaskNode taskNode = dag.getNode(nodeName); - if (taskNode.getType().equalsIgnoreCase(filterNodeType)) { - return true; - } - } - return false; - } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java deleted file mode 100644 index 5489b9de8a..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java +++ /dev/null @@ -1,707 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.utils; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.TaskDependType; -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; -import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; -import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; -import org.apache.dolphinscheduler.plugin.task.api.task.ConditionsLogicTaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.task.DependentLogicTaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.task.SwitchLogicTaskChannelFactory; -import org.apache.dolphinscheduler.service.model.TaskNode; -import org.apache.dolphinscheduler.service.process.WorkflowDag; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.collect.Lists; -import com.google.common.truth.Truth; - -public class DagHelperTest { - - @Test - public void testHaveSubAfterNode() { - Long parentNodeCode = 5293789969856L; - List taskNodeRelations = new ArrayList<>(); - TaskNodeRelation relation = new TaskNodeRelation(); - relation.setStartNode(5293789969856L); - relation.setEndNode(5293789969857L); - taskNodeRelations.add(relation); - - TaskNodeRelation relationNext = new TaskNodeRelation(); - relationNext.setStartNode(5293789969856L); - relationNext.setEndNode(5293789969858L); - taskNodeRelations.add(relationNext); - - List taskNodes = new ArrayList<>(); - TaskNode node = new TaskNode(); - node.setCode(5293789969856L); - node.setType("SHELL"); - - TaskNode subNode = new TaskNode(); - subNode.setCode(5293789969857L); - subNode.setType("BLOCKING"); - subNode.setPreTasks("[5293789969856]"); - - TaskNode subNextNode = new TaskNode(); - subNextNode.setCode(5293789969858L); - subNextNode.setType("CONDITIONS"); - subNextNode.setPreTasks("[5293789969856]"); - - taskNodes.add(node); - taskNodes.add(subNode); - taskNodes.add(subNextNode); - - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(taskNodes); - DAG dag = DagHelper.buildDagGraph(workflowDag); - boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag); - Assertions.assertTrue(canSubmit); - - boolean haveConditions = DagHelper.haveConditionsAfterNode(parentNodeCode, dag); - Assertions.assertTrue(haveConditions); - - boolean dependent = DagHelper.haveSubAfterNode(parentNodeCode, dag, DependentLogicTaskChannelFactory.NAME); - Assertions.assertFalse(dependent); - } - - @Test - public void testTaskNodeCanSubmit() { - List taskNodeList = new ArrayList<>(); - TaskNode node1 = new TaskNode(); - node1.setId("1"); - node1.setName("1"); - node1.setCode(1); - node1.setType("SHELL"); - taskNodeList.add(node1); - - TaskNode node2 = new TaskNode(); - node2.setId("2"); - node2.setName("2"); - node2.setCode(2); - node2.setType("SHELL"); - List dep2 = new ArrayList<>(); - dep2.add("1"); - node2.setPreTasks(JSONUtils.toJsonString(dep2)); - taskNodeList.add(node2); - - TaskNode node4 = new TaskNode(); - node4.setId("4"); - node4.setName("4"); - node4.setCode(4); - node4.setType("SHELL"); - taskNodeList.add(node4); - - TaskNode node3 = new TaskNode(); - node3.setId("3"); - node3.setName("3"); - node3.setCode(3); - node3.setType("SHELL"); - List dep3 = new ArrayList<>(); - dep3.add("2"); - dep3.add("4"); - node3.setPreTasks(JSONUtils.toJsonString(dep3)); - taskNodeList.add(node3); - - TaskNode node5 = new TaskNode(); - node5.setId("5"); - node5.setName("5"); - node5.setCode(5); - node5.setType("SHELL"); - List dep5 = new ArrayList<>(); - dep5.add("3"); - dep5.add("8"); - node5.setPreTasks(JSONUtils.toJsonString(dep5)); - taskNodeList.add(node5); - - TaskNode node6 = new TaskNode(); - node6.setId("6"); - node6.setName("6"); - node6.setCode(6); - node6.setType("SHELL"); - List dep6 = new ArrayList<>(); - dep6.add("3"); - node6.setPreTasks(JSONUtils.toJsonString(dep6)); - taskNodeList.add(node6); - - TaskNode node7 = new TaskNode(); - node7.setId("7"); - node7.setName("7"); - node7.setCode(7); - node7.setType("SHELL"); - List dep7 = new ArrayList<>(); - dep7.add("5"); - node7.setPreTasks(JSONUtils.toJsonString(dep7)); - taskNodeList.add(node7); - - TaskNode node8 = new TaskNode(); - node8.setId("8"); - node8.setName("8"); - node8.setCode(8); - node8.setType("SHELL"); - List dep8 = new ArrayList<>(); - dep8.add("2"); - node8.setPreTasks(JSONUtils.toJsonString(dep8)); - taskNodeList.add(node8); - - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); - List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, - startNodes, recoveryNodes, TaskDependType.TASK_POST); - List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(destTaskNodeList); - - // 1->2->3->5->7 - // 4->3->6 - // 1->2->8->5->7 - DAG dag = DagHelper.buildDagGraph(workflowDag); - TaskNode taskNode3 = dag.getNode(3L); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - completeTaskList.putIfAbsent(1L, new TaskInstance()); - Boolean canSubmit = false; - - // 2/4 are forbidden submit 3 - node2 = dag.getNode(2L); - node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - TaskNode nodex = dag.getNode(4L); - nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); - Assertions.assertEquals(canSubmit, true); - - // 2forbidden, 3 cannot be submit - completeTaskList.putIfAbsent(2L, new TaskInstance()); - TaskNode nodey = dag.getNode(4L); - nodey.setRunFlag(""); - canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); - Assertions.assertEquals(canSubmit, false); - - // 2/3 forbidden submit 5 - node3 = dag.getNode(3L); - node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - node8 = dag.getNode(8L); - node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - node5 = dag.getNode(5L); - canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList); - Assertions.assertEquals(canSubmit, true); - } - - @Test - public void testParsePostNodeList() { - List taskNodeList = new ArrayList<>(); - TaskNode node1 = new TaskNode(); - node1.setId("1"); - node1.setName("1"); - node1.setCode(1); - node1.setType("SHELL"); - taskNodeList.add(node1); - - TaskNode node2 = new TaskNode(); - node2.setId("2"); - node2.setName("2"); - node2.setCode(2); - node2.setType("SHELL"); - List dep2 = new ArrayList<>(); - dep2.add("1"); - node2.setPreTasks(JSONUtils.toJsonString(dep2)); - taskNodeList.add(node2); - - TaskNode node4 = new TaskNode(); - node4.setId("4"); - node4.setName("4"); - node4.setCode(4); - node4.setType("SHELL"); - taskNodeList.add(node4); - - TaskNode node3 = new TaskNode(); - node3.setId("3"); - node3.setName("3"); - node3.setCode(3); - node3.setType("SHELL"); - List dep3 = new ArrayList<>(); - dep3.add("2"); - dep3.add("4"); - node3.setPreTasks(JSONUtils.toJsonString(dep3)); - taskNodeList.add(node3); - - TaskNode node5 = new TaskNode(); - node5.setId("5"); - node5.setName("5"); - node5.setCode(5); - node5.setType("SHELL"); - List dep5 = new ArrayList<>(); - dep5.add("3"); - dep5.add("8"); - node5.setPreTasks(JSONUtils.toJsonString(dep5)); - taskNodeList.add(node5); - - TaskNode node6 = new TaskNode(); - node6.setId("6"); - node6.setName("6"); - node6.setCode(6); - node6.setType("SHELL"); - List dep6 = new ArrayList<>(); - dep6.add("3"); - node6.setPreTasks(JSONUtils.toJsonString(dep6)); - taskNodeList.add(node6); - - TaskNode node7 = new TaskNode(); - node7.setId("7"); - node7.setName("7"); - node7.setCode(7); - node7.setType("SHELL"); - List dep7 = new ArrayList<>(); - dep7.add("5"); - node7.setPreTasks(JSONUtils.toJsonString(dep7)); - taskNodeList.add(node7); - - TaskNode node8 = new TaskNode(); - node8.setId("8"); - node8.setName("8"); - node8.setCode(8); - node8.setType("SHELL"); - List dep8 = new ArrayList<>(); - dep8.add("2"); - node8.setPreTasks(JSONUtils.toJsonString(dep8)); - taskNodeList.add(node8); - - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); - List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, - startNodes, recoveryNodes, TaskDependType.TASK_POST); - List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(destTaskNodeList); - - // 1->2->3->5->7 - // 4->3->6 - // 1->2->8->5->7 - DAG dag = DagHelper.buildDagGraph(workflowDag); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - - Set postNodes = null; - // complete : null - // expect post: 1/4 - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(1L)); - Assertions.assertTrue(postNodes.contains(4L)); - - // complete : 1 - // expect post: 2/4 - completeTaskList.put(1L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(2L)); - Assertions.assertTrue(postNodes.contains(4L)); - - // complete : 1/2 - // expect post: 4 - completeTaskList.put(2L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(4L)); - Assertions.assertTrue(postNodes.contains(8L)); - - // complete : 1/2/4 - // expect post: 3 - completeTaskList.put(4L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(3L)); - Assertions.assertTrue(postNodes.contains(8L)); - - // complete : 1/2/4/3 - // expect post: 8/6 - completeTaskList.put(3L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(8L)); - Assertions.assertTrue(postNodes.contains(6L)); - - // complete : 1/2/4/3/8 - // expect post: 6/5 - completeTaskList.put(8L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(5L)); - Assertions.assertTrue(postNodes.contains(6L)); - // complete : 1/2/4/3/5/6/8 - // expect post: 7 - completeTaskList.put(6L, new TaskInstance()); - completeTaskList.put(5L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains(7L)); - } - - @Test - public void testForbiddenPostNode() throws IOException { - DAG dag = generateDag(); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - Set postNodes = null; - // dag: 1-2-3-5-7 4-3-6 2-8-5-7 - // forbid:2 complete:1 post:4/8 - completeTaskList.put(1L, new TaskInstance()); - TaskNode node2 = dag.getNode(2L); - node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(4L)); - Assertions.assertTrue(postNodes.contains(8L)); - - // forbid:2/4 complete:1 post:3/8 - TaskNode node4 = dag.getNode(4L); - node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains(3L)); - Assertions.assertTrue(postNodes.contains(8L)); - - // forbid:2/4/5 complete:1/8 post:3 - completeTaskList.put(8L, new TaskInstance()); - TaskNode node5 = dag.getNode(5L); - node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains(3L)); - } - - @Test - public void testConditionPostNode() throws IOException { - DAG dag = generateDag(); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - Set postNodes = null; - // dag: 1-2-3-5-7 4-3-6 2-8-5-7 - // 3-if - completeTaskList.put(1L, new TaskInstance()); - completeTaskList.put(2L, new TaskInstance()); - completeTaskList.put(4L, new TaskInstance()); - - TaskInstance taskInstance3 = new TaskInstance(); - taskInstance3.setTaskType(ConditionsLogicTaskChannelFactory.NAME); - ConditionsParameters.ConditionResult conditionResult = ConditionsParameters.ConditionResult.builder() - .conditionSuccess(true) - .successNode(Lists.newArrayList(5L)) - .failedNode(Lists.newArrayList(6L)) - .build(); - ConditionsParameters conditionsParameters = new ConditionsParameters(); - conditionsParameters.setConditionResult(conditionResult); - taskInstance3.setTaskParams(JSONUtils.toJsonString(conditionsParameters)); - taskInstance3.setState(TaskExecutionStatus.SUCCESS); - TaskNode node3 = dag.getNode(3L); - node3.setType(ConditionsLogicTaskChannelFactory.NAME); - // complete 1/2/3/4 expect:8 - completeTaskList.put(3L, taskInstance3); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains(8L)); - - // 2.complete 1/2/3/4/8 expect:5 skip:6 - completeTaskList.put(8L, new TaskInstance()); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertTrue(postNodes.contains(5L)); - Assertions.assertEquals(1, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey(6L)); - - // 3.complete 1/2/3/4/5/8 expect post:7 skip:6 - skipNodeList.clear(); - TaskInstance taskInstance1 = new TaskInstance(); - completeTaskList.put(5L, taskInstance1); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains(7L)); - Assertions.assertEquals(1, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey(6L)); - - } - - @Test - public void testSwitchPostNode() { - List taskNodeList = new ArrayList<>(); - - TaskNode node = new TaskNode(); - node.setId("0"); - node.setName("0"); - node.setCode(0); - node.setType("SHELL"); - taskNodeList.add(node); - - TaskNode node1 = new TaskNode(); - node1.setId("1"); - node1.setName("1"); - node1.setCode(1); - node1.setType(SwitchLogicTaskChannelFactory.NAME); - SwitchParameters switchParameters = new SwitchParameters(); - node1.setParams(JSONUtils.toJsonString(switchParameters)); - taskNodeList.add(node1); - - TaskNode node2 = new TaskNode(); - node2.setId("2"); - node2.setName("2"); - node2.setCode(2); - node2.setType("SHELL"); - List dep2 = new ArrayList<>(); - dep2.add("1"); - node2.setPreTasks(JSONUtils.toJsonString(dep2)); - taskNodeList.add(node2); - - TaskNode node4 = new TaskNode(); - node4.setId("4"); - node4.setName("4"); - node4.setCode(4); - node4.setType("SHELL"); - List dep4 = new ArrayList<>(); - dep4.add("1"); - node4.setPreTasks(JSONUtils.toJsonString(dep4)); - taskNodeList.add(node4); - - TaskNode node5 = new TaskNode(); - node5.setId("5"); - node5.setName("5"); - node5.setCode(5); - node5.setType("SHELL"); - List dep5 = new ArrayList<>(); - dep5.add(1L); - node5.setPreTasks(JSONUtils.toJsonString(dep5)); - taskNodeList.add(node5); - - TaskNode node6 = new TaskNode(); - node5.setId("6"); - node5.setName("6"); - node5.setCode(6); - node5.setType("SHELL"); - List dep6 = new ArrayList<>(); - dep5.add(2L); - dep5.add(4L); - node5.setPreTasks(JSONUtils.toJsonString(dep6)); - taskNodeList.add(node6); - - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); - - // 0 - // 1->2->6 - // 1->4->6 - // 1->5 - List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, - startNodes, recoveryNodes, TaskDependType.TASK_POST); - List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(destTaskNodeList); - - DAG dag = DagHelper.buildDagGraph(workflowDag); - Map skipTaskNodeList = new HashMap<>(); - Map completeTaskList = new HashMap<>(); - completeTaskList.put(0L, new TaskInstance()); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setState(TaskExecutionStatus.SUCCESS); - taskInstance.setTaskCode(1L); - taskInstance.setTaskType(SwitchLogicTaskChannelFactory.NAME); - switchParameters = SwitchParameters.builder() - .nextBranch(5L) - .switchResult(SwitchParameters.SwitchResult.builder() - .dependTaskList(Lists.newArrayList( - new SwitchResultVo("", 2L), - new SwitchResultVo("", 4L))) - .nextNode(5L) - .build()) - .build(); - taskInstance.setTaskParams(JSONUtils.toJsonString(switchParameters)); - completeTaskList.put(1l, taskInstance); - List nextBranch = DagHelper.skipTaskNode4Switch(skipTaskNodeList, taskInstance, dag); - Assertions.assertNotNull(skipTaskNodeList.get(2L)); - Assertions.assertNotNull(skipTaskNodeList.get(4L)); - Assertions.assertEquals(2, skipTaskNodeList.size()); - Truth.assertThat(nextBranch).containsExactly(5L); - } - - /** - * process: - * 1->2->3->5->7 - * 4->3->6 - * 1->2->8->5->7 - * DAG graph: - * 4 -> -> 6 - * \ / - * 1 -> 2 -> 3 -> 5 -> 7 - * \ / - * -> 8 -> - * - * @return dag - * @throws JsonProcessingException if error throws JsonProcessingException - */ - private DAG generateDag() throws IOException { - List taskNodeList = new ArrayList<>(); - TaskNode node1 = new TaskNode(); - node1.setId("1"); - node1.setName("1"); - node1.setCode(1); - node1.setType("SHELL"); - taskNodeList.add(node1); - - TaskNode node2 = new TaskNode(); - node2.setId("2"); - node2.setName("2"); - node2.setCode(2); - node2.setType("SHELL"); - List dep2 = new ArrayList<>(); - dep2.add("1"); - node2.setPreTasks(JSONUtils.toJsonString(dep2)); - taskNodeList.add(node2); - - TaskNode node4 = new TaskNode(); - node4.setId("4"); - node4.setName("4"); - node4.setCode(4); - node4.setType("SHELL"); - taskNodeList.add(node4); - - TaskNode node3 = new TaskNode(); - node3.setId("3"); - node3.setName("3"); - node3.setCode(3); - node3.setType("SHELL"); - List dep3 = new ArrayList<>(); - dep3.add("2"); - dep3.add("4"); - node3.setPreTasks(JSONUtils.toJsonString(dep3)); - taskNodeList.add(node3); - - TaskNode node5 = new TaskNode(); - node5.setId("5"); - node5.setName("5"); - node5.setCode(5); - node5.setType("SHELL"); - List dep5 = new ArrayList<>(); - dep5.add("3"); - dep5.add("8"); - node5.setPreTasks(JSONUtils.toJsonString(dep5)); - taskNodeList.add(node5); - - TaskNode node6 = new TaskNode(); - node6.setId("6"); - node6.setName("6"); - node6.setCode(6); - node6.setType("SHELL"); - List dep6 = new ArrayList<>(); - dep6.add("3"); - node6.setPreTasks(JSONUtils.toJsonString(dep6)); - taskNodeList.add(node6); - - TaskNode node7 = new TaskNode(); - node7.setId("7"); - node7.setName("7"); - node7.setCode(7); - node7.setType("SHELL"); - List dep7 = new ArrayList<>(); - dep7.add("5"); - node7.setPreTasks(JSONUtils.toJsonString(dep7)); - taskNodeList.add(node7); - - TaskNode node8 = new TaskNode(); - node8.setId("8"); - node8.setName("8"); - node8.setCode(8); - node8.setType("SHELL"); - List dep8 = new ArrayList<>(); - dep8.add("2"); - node8.setPreTasks(JSONUtils.toJsonString(dep8)); - taskNodeList.add(node8); - - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); - List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, - startNodes, recoveryNodes, TaskDependType.TASK_POST); - List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); - WorkflowDag workflowDag = new WorkflowDag(); - workflowDag.setEdges(taskNodeRelations); - workflowDag.setNodes(destTaskNodeList); - return DagHelper.buildDagGraph(workflowDag); - } - - @Test - public void testBuildDagGraph() { - String shellJson = - "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-9527\",\"name\":\"shell-1\"," - + - "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," - + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," - + - "\"timeout\":{\"strategy\":\"\",\"interval\":1,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," - + - "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; - - ProcessData processData = JSONUtils.parseObject(shellJson, ProcessData.class); - assert processData != null; - List taskNodeList = processData.getTasks(); - WorkflowDag workflowDag = DagHelper.getWorkflowDag(taskNodeList); - DAG dag = DagHelper.buildDagGraph(workflowDag); - Assertions.assertNotNull(dag); - } - - @Data - @NoArgsConstructor - private static class ProcessData { - - @EqualsAndHashCode.Include - private List tasks; - - @EqualsAndHashCode.Include - private List globalParams; - - private int timeout; - - private int tenantId; - - public ProcessData(List tasks, List globalParams) { - this.tasks = tasks; - this.globalParams = globalParams; - } - } - -}