From b5184138fab7c43b38e23e899b5b23d606fe7e9d Mon Sep 17 00:00:00 2001 From: WangJPLeo <103574007+WangJPLeo@users.noreply.github.com> Date: Tue, 28 Jun 2022 11:24:51 +0800 Subject: [PATCH] [Feature] Time function analysis extension. (#10624) * Time function analysis extension. * param add. * clear useless logs and update method notes * permission omission fix. * extending time functions to optimize static methods * e2e rerun. --- .../impl/ProcessInstanceServiceImpl.java | 6 +- .../api/service/impl/ProjectServiceImpl.java | 2 +- .../service/impl/TaskGroupServiceImpl.java | 2 + .../service/impl/WorkerGroupServiceImpl.java | 4 +- .../service/ProcessInstanceServiceTest.java | 7 + .../api/service/ProjectServiceTest.java | 11 +- .../expand/CuringGlobalParamsService.java | 64 ++++++++ .../DolphinSchedulerCuringGlobalParams.java | 96 +++++++++++ .../TimePlaceholderResolverExpandService.java | 35 ++++ ...ePlaceholderResolverExpandServiceImpl.java | 34 ++++ .../common/utils/ParameterUtils.java | 52 ------ .../expand/CuringGlobalParamsServiceTest.java | 151 ++++++++++++++++++ ...ePlaceholderResolverExpandServiceTest.java | 51 ++++++ .../common/utils/ParameterUtilsTest.java | 84 ++-------- .../master/runner/MasterSchedulerService.java | 7 +- .../runner/WorkflowExecuteRunnable.java | 15 +- .../master/WorkflowExecuteTaskTest.java | 6 +- .../service/process/ProcessServiceImpl.java | 35 ++-- .../service/process/ProcessServiceTest.java | 9 ++ 19 files changed, 516 insertions(+), 155 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index ca137aaf0d..3b9e3f89c7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -153,6 +154,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired private ScheduleMapper scheduleMapper; + @Autowired + private CuringGlobalParamsService curingGlobalParamsService; + /** * return top n SUCCESS process instance order by running time which started between startTime and endTime */ @@ -561,7 +565,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.setScheduleTime(schedule); List globalParamList = JSONUtils.toList(globalParams, Property.class); Map globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone); + globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone); processInstance.setTimeout(timeout); processInstance.setTenantCode(tenantCode); processInstance.setGlobalParams(globalParams); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java index 17575f5dab..b10ee78392 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java @@ -475,7 +475,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic putMsg(result, Status.SUCCESS); return result; } - List projects = projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(), new ArrayList<>(projectIds)); + List projects = projectMapper.selectBatchIds(projectIds); result.put(Constants.DATA_LIST, projects); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 32ea9ad39f..e02fbca658 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -41,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -104,6 +105,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe taskGroup.setCreateTime(new Date()); taskGroup.setUpdateTime(new Date()); if (taskGroupMapper.insert(taskGroup) > 0) { + permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), Collections.singletonList(taskGroup.getId()),logger); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.CREATE_TASK_GROUP_ERROR); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index d371d32c18..c083a8a52f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -122,11 +122,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro workerGroupMapper.updateById(workerGroup); } else { workerGroupMapper.insert(workerGroup); - } - putMsg(result, Status.SUCCESS); - if (id != 0) { permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger); } + putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index fe64eab94d..fe8149536d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -36,6 +36,8 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -70,6 +72,8 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.beans.factory.annotation.Autowired; + import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; @@ -126,6 +130,9 @@ public class ProcessInstanceServiceTest { @Mock ScheduleMapper scheduleMapper; + @Mock + CuringGlobalParamsService curingGlobalParamsService; + private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java index b1e81e10ba..8eb675ea4f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java @@ -108,7 +108,6 @@ public class ProjectServiceTest { public void testCheckProjectAndAuth() { long projectCode = 1L; -// Mockito.when(projectUserMapper.queryProjectRelation(1, 1)).thenReturn(getProjectUser()); User loginUser = getLoginUser(); Map result = projectService.checkProjectAndAuth(loginUser, null, projectCode, PROJECT); @@ -142,7 +141,6 @@ public class ProjectServiceTest { project1.setUserId(2); loginUser.setUserType(UserType.GENERAL_USER); Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.PROJECTS, loginUser.getId(), PROJECT, baseServiceLogger)).thenReturn(true); -// Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.PROJECTS, new Object[]{project.getId()}, 0, baseServiceLogger)).thenReturn(true); result2 = projectService.checkProjectAndAuth(loginUser, project1, projectCode,PROJECT); Assert.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM, result2.get(Constants.STATUS)); @@ -329,9 +327,8 @@ public class ProjectServiceTest { List list = new ArrayList<>(1); list.add(1); // not admin user - // Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(1)).thenReturn(getList()); Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set); - Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList()); + Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList()); result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser); List notAdminUserResult = (List) result.get(Constants.DATA_LIST); Assert.assertTrue(CollectionUtils.isNotEmpty(notAdminUserResult)); @@ -339,8 +336,7 @@ public class ProjectServiceTest { //admin user loginUser.setUserType(UserType.ADMIN_USER); Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set); - Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList()); -// Mockito.when(projectMapper.selectList(null)).thenReturn(getList()); + Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList()); result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser); List projects = (List) result.get(Constants.DATA_LIST); @@ -363,9 +359,6 @@ public class ProjectServiceTest { @Test public void testQueryUnauthorizedProject() { -// Mockito.when(projectMapper.queryProjectExceptUserId(2)).thenReturn(getList()); - // Mockito.when(projectMapper.queryProjectCreatedByUser(2)).thenReturn(getList()); -// Mockito.when(projectMapper.queryAuthedProjectListByUserId(2)).thenReturn(getSingleList()); Set set = new HashSet(); set.add(1); // test admin user diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java new file mode 100644 index 0000000000..63ea658c9f --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.expand; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.Date; +import java.util.List; +import java.util.Map; + +public interface CuringGlobalParamsService { + + /** + * time function need expand + * @param placeholderName + * @return + */ + boolean timeFunctionNeedExpand(String placeholderName); + + /** + * time function extension + * @param processInstanceId + * @param timezone + * @param placeholderName + * @return + */ + String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName); + + /** + * convert parameter placeholders + * @param val + * @param allParamMap + * @return + */ + String convertParameterPlaceholders(String val, Map allParamMap); + + /** + * curing global params + * @param processInstanceId + * @param globalParamMap + * @param globalParamList + * @param commandType + * @param scheduleTime + * @param timezone + * @return + */ + String curingGlobalParams(Integer processInstanceId, Map globalParamMap, List globalParamList, CommandType commandType, Date scheduleTime, String timezone); +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java new file mode 100644 index 0000000000..dde31af4fa --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.expand; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Component +public class DolphinSchedulerCuringGlobalParams implements CuringGlobalParamsService { + + @Autowired + private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; + + @Override + public String convertParameterPlaceholders(String val, Map allParamMap) { + return ParameterUtils.convertParameterPlaceholders(val, allParamMap); + } + + @Override + public boolean timeFunctionNeedExpand(String placeholderName) { + return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName); + } + + @Override + public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) { + return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName); + } + + @Override + public String curingGlobalParams(Integer processInstanceId, Map globalParamMap, List globalParamList, CommandType commandType, Date scheduleTime, String timezone) { + if (globalParamList == null || globalParamList.isEmpty()) { + return null; + } + Map globalMap = new HashMap<>(); + if (globalParamMap != null) { + globalMap.putAll(globalParamMap); + } + Map allParamMap = new HashMap<>(); + //If it is a complement, a complement time needs to be passed in, according to the task type + Map timeParams = BusinessTimeUtils. + getBusinessTime(commandType, scheduleTime, timezone); + + if (timeParams != null) { + allParamMap.putAll(timeParams); + } + allParamMap.putAll(globalMap); + Set> entries = allParamMap.entrySet(); + Map resolveMap = new HashMap<>(); + for (Map.Entry entry : entries) { + String val = entry.getValue(); + if (val.startsWith("$")) { + String str = ""; + if (timeFunctionNeedExpand(val)) { + str = timeFunctionExtension(processInstanceId, timezone, val); + } else { + str = convertParameterPlaceholders(val, allParamMap); + } + resolveMap.put(entry.getKey(), str); + } + } + globalMap.putAll(resolveMap); + for (Property property : globalParamList) { + String val = globalMap.get(property.getProp()); + if (val != null) { + property.setValue(val); + } + } + return JSONUtils.toJsonString(globalParamList); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java new file mode 100644 index 0000000000..bdd811522c --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.expand; + +public interface TimePlaceholderResolverExpandService { + + /** + * check is need expand function + * @param placeholderName + * @return + */ + boolean timeFunctionNeedExpand(String placeholderName); + + /** + * time function extension + * @param placeholderName + * @return + */ + String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName); +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java new file mode 100644 index 0000000000..b37fcf076b --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.expand; + +import org.springframework.stereotype.Component; + +@Component +public class TimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService { + + @Override + public boolean timeFunctionNeedExpand(String placeholderName) { + return false; + } + + @Override + public String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName) { + return null; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index feaae67297..c4edc5d1b7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -77,58 +77,6 @@ public class ParameterUtils { return parameterString; } - /** - * curing user define parameters - * - * @param globalParamMap global param map - * @param globalParamList global param list - * @param commandType command type - * @param scheduleTime schedule time - * @return curing user define parameters - */ - public static String curingGlobalParams(Map globalParamMap, List globalParamList, - CommandType commandType, Date scheduleTime, String timezone) { - - if (globalParamList == null || globalParamList.isEmpty()) { - return null; - } - - Map globalMap = new HashMap<>(); - if (globalParamMap != null) { - globalMap.putAll(globalParamMap); - } - Map allParamMap = new HashMap<>(); - //If it is a complement, a complement time needs to be passed in, according to the task type - Map timeParams = BusinessTimeUtils. - getBusinessTime(commandType, scheduleTime, timezone); - - if (timeParams != null) { - allParamMap.putAll(timeParams); - } - - allParamMap.putAll(globalMap); - - Set> entries = allParamMap.entrySet(); - - Map resolveMap = new HashMap<>(); - for (Map.Entry entry : entries) { - String val = entry.getValue(); - if (val.startsWith("$")) { - String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap); - resolveMap.put(entry.getKey(), str); - } - } - globalMap.putAll(resolveMap); - - for (Property property : globalParamList) { - String val = globalMap.get(property.getProp()); - if (val != null) { - property.setValue(val); - } - } - return JSONUtils.toJsonString(globalParamList); - } - /** * handle escapes * diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java new file mode 100644 index 0000000000..f6ea074aa0 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.dolphinscheduler.common.expand; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@RunWith(MockitoJUnitRunner.class) +public class CuringGlobalParamsServiceTest { + + private static final String placeHolderName = "$[yyyy-MM-dd-1]"; + + @Mock + private CuringGlobalParamsService curingGlobalParamsService; + + @InjectMocks + private DolphinSchedulerCuringGlobalParams dolphinSchedulerCuringGlobalParams; + + @Mock + private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; + + @InjectMocks + private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl; + + private final Map globalParamMap = new HashMap<>(); + + @Before + public void init() { + globalParamMap.put("globalParams1", "Params1"); + } + + @Test + public void testConvertParameterPlaceholders() { + Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap)).thenReturn("2022-06-26"); + String result = curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap); + Assert.assertNotNull(result); + } + + @Test + public void testTimeFunctionNeedExpand() { + boolean result = curingGlobalParamsService.timeFunctionNeedExpand(placeHolderName); + Assert.assertFalse(result); + } + + @Test + public void testTimeFunctionExtension() { + String result = curingGlobalParamsService.timeFunctionExtension(1, "", placeHolderName); + Assert.assertNull(result); + } + + @Test + public void testCuringGlobalParams() { + //define globalMap + Map globalParamMap = new HashMap<>(); + globalParamMap.put("globalParams1", "Params1"); + + //define globalParamList + List globalParamList = new ArrayList<>(); + + //define scheduleTime + Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00"); + + //test globalParamList is null + String result = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); + Assert.assertNull(result); + Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null)); + Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null)); + + //test globalParamList is not null + Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam"); + globalParamList.add(property); + + String result2 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); + Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList)); + + String result3 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null); + Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList)); + + String result4 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); + Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList)); + + //test var $ startsWith + globalParamMap.put("bizDate", "${system.biz.date}"); + globalParamMap.put("b1zCurdate", "${system.biz.curdate}"); + + Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList"); + Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}"); + Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}"); + + globalParamList.add(property2); + globalParamList.add(property3); + globalParamList.add(property4); + + String result5 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); + Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList)); + + Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, ""); + globalParamList.add(testStartParamProperty); + Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]"); + globalParamList.add(testStartParam2Property); + globalParamMap.put("testStartParam", ""); + globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]"); + + Map startParamMap = new HashMap<>(2); + startParamMap.put("testStartParam", "$[yyyyMMdd]"); + + for (Map.Entry param : globalParamMap.entrySet()) { + String val = startParamMap.get(param.getKey()); + if (val != null) { + param.setValue(val); + } + } + + String result6 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); + Assert.assertTrue(result6.contains("20191220")); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java new file mode 100644 index 0000000000..542869530e --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.expand; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TimePlaceholderResolverExpandServiceTest { + + @Mock + private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; + + @InjectMocks + private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl; + + private static final String placeHolderName = "$[yyyy-MM-dd-1]"; + + @Test + public void testTimePlaceholderResolverExpandService() { + boolean checkResult = timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeHolderName); + Assert.assertFalse(checkResult); + String resultString = timePlaceholderResolverExpandService.timeFunctionExtension(1, "", placeHolderName); + Assert.assertTrue(StringUtils.isEmpty(resultString)); + + boolean implCheckResult = timePlaceholderResolverExpandServiceImpl.timeFunctionNeedExpand(placeHolderName); + Assert.assertFalse(implCheckResult); + String implResultString = timePlaceholderResolverExpandServiceImpl.timeFunctionExtension(1, "", placeHolderName); + Assert.assertTrue(StringUtils.isEmpty(implResultString)); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index 01a1eab9a7..6f9bb89c0f 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -21,6 +21,8 @@ import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlacehold import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService; +import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl; import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; @@ -34,13 +36,25 @@ import java.util.List; import java.util.Map; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(MockitoJUnitRunner.class) public class ParameterUtilsTest { public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class); + @Mock + private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; + + @InjectMocks + private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl; + /** * Test convertParameterPlaceholders */ @@ -84,76 +98,6 @@ public class ParameterUtilsTest { parameterString); } - /** - * Test curingGlobalParams - */ - @Test - public void testCuringGlobalParams() { - //define globalMap - Map globalParamMap = new HashMap<>(); - globalParamMap.put("globalParams1", "Params1"); - - //define globalParamList - List globalParamList = new ArrayList<>(); - - //define scheduleTime - Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00"); - - //test globalParamList is null - String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); - Assert.assertNull(result); - Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null)); - Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null)); - - //test globalParamList is not null - Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam"); - globalParamList.add(property); - - String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); - Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList)); - - String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null); - Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList)); - - String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); - Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList)); - - //test var $ startsWith - globalParamMap.put("bizDate", "${system.biz.date}"); - globalParamMap.put("b1zCurdate", "${system.biz.curdate}"); - - Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList"); - Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}"); - Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}"); - - globalParamList.add(property2); - globalParamList.add(property3); - globalParamList.add(property4); - - String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); - Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList)); - - Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, ""); - globalParamList.add(testStartParamProperty); - Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]"); - globalParamList.add(testStartParam2Property); - globalParamMap.put("testStartParam", ""); - globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]"); - - Map startParamMap = new HashMap<>(2); - startParamMap.put("testStartParam", "$[yyyyMMdd]"); - - for (Map.Entry param : globalParamMap.entrySet()) { - String val = startParamMap.get(param.getKey()); - if (val != null) { - param.setValue(val); - } - } - - String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null); - Assert.assertTrue(result6.contains("20191220")); - } - /** * Test handleEscapes */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 023d871355..3f5a715d7c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.SlotCheckState; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -104,6 +105,9 @@ public class MasterSchedulerService extends BaseDaemonThread { @Autowired private StateWheelExecuteThread stateWheelExecuteThread; + @Autowired + private CuringGlobalParamsService curingGlobalParamsService; + protected MasterSchedulerService() { super("MasterCommandLoopThread"); } @@ -183,7 +187,8 @@ public class MasterSchedulerService extends BaseDaemonThread { , nettyExecutorManager , processAlertManager , masterConfig - , stateWheelExecuteThread); + , stateWheelExecuteThread + , curingGlobalParamsService); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); if (processInstance.getTimeout() > 0) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 9f7643832b..9fa452c66e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -235,6 +236,11 @@ public class WorkflowExecuteRunnable implements Runnable { */ private final StateWheelExecuteThread stateWheelExecuteThread; + /** + * curing global params service + */ + private final CuringGlobalParamsService curingGlobalParamsService; + /** * @param processInstance processInstance * @param processService processService @@ -248,13 +254,15 @@ public class WorkflowExecuteRunnable implements Runnable { , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread) { + , StateWheelExecuteThread stateWheelExecuteThread + , CuringGlobalParamsService curingGlobalParamsService) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; + this.curingGlobalParamsService = curingGlobalParamsService; TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -999,10 +1007,11 @@ public class WorkflowExecuteRunnable implements Runnable { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( + String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE))); + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); + processInstance.setGlobalParams(globalParams); processService.updateProcessInstance(processInstance); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java index f2d263f699..8c7104ca30 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java @@ -26,6 +26,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -86,6 +87,8 @@ public class WorkflowExecuteTaskTest { private StateWheelExecuteThread stateWheelExecuteThread; + private CuringGlobalParamsService curingGlobalParamsService; + @Before public void init() throws Exception { applicationContext = mock(ApplicationContext.class); @@ -113,7 +116,8 @@ public class WorkflowExecuteTaskTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); stateWheelExecuteThread = mock(StateWheelExecuteThread.class); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread)); + curingGlobalParamsService = mock(CuringGlobalParamsService.class); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService)); // prepareProcess init dag Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index d03d165071..a8ae3f33a9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -276,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private K8sMapper k8sMapper; + @Autowired + private CuringGlobalParamsService curingGlobalParamsService; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -802,11 +806,12 @@ public class ProcessServiceImpl implements ProcessService { timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE); } - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - getCommandTypeIfComplement(processInstance, command), - processInstance.getScheduleTime(), timezoneId)); + String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + getCommandTypeIfComplement(processInstance, command), + processInstance.getScheduleTime(), timezoneId); + processInstance.setGlobalParams(globalParams); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); @@ -953,11 +958,12 @@ public class ProcessServiceImpl implements ProcessService { String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); // Recalculate global parameters after rerun. - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - commandTypeIfComplement, - processInstance.getScheduleTime(), timezoneId)); + String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + commandTypeIfComplement, + processInstance.getScheduleTime(), timezoneId); + processInstance.setGlobalParams(globalParams); processInstance.setProcessDefinition(processDefinition); } //reset command parameter @@ -1139,10 +1145,11 @@ public class ProcessServiceImpl implements ProcessService { // time zone String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); - processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), - CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId)); + String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId); + processInstance.setGlobalParams(globalParams); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 09ba19e911..0a859235ac 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -159,6 +160,9 @@ public class ProcessServiceTest { @Mock private ScheduleMapper scheduleMapper; + @Mock + CuringGlobalParamsService curingGlobalParamsService; + @Test public void testCreateSubCommand() { ProcessInstance parentInstance = new ProcessInstance(); @@ -365,6 +369,11 @@ public class ProcessServiceTest { command5.setCommandType(CommandType.START_PROCESS); command5.setDryRun(Constants.DRY_RUN_FLAG_NO); Mockito.when(commandMapper.deleteById(5)).thenReturn(1); + Mockito.when(curingGlobalParamsService.curingGlobalParams(0, + processDefinition.getGlobalParamMap(), + processDefinition.getGlobalParamList(), + CommandType.START_PROCESS, + processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\""); ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));