Browse Source

[Feature] Time function analysis extension. (#10624)

* Time function analysis extension.

* param add.

* clear useless logs and update method notes

* permission omission fix.

* extending time functions to optimize static methods

* e2e rerun.
3.1.0-release
WangJPLeo 2 years ago committed by GitHub
parent
commit
b5184138fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  5. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  6. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
  7. 64
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java
  8. 96
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java
  9. 35
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java
  10. 34
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java
  11. 52
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
  12. 151
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java
  13. 51
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java
  14. 84
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
  15. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  16. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  17. 6
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
  18. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  19. 9
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -153,6 +154,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired @Autowired
private ScheduleMapper scheduleMapper; private ScheduleMapper scheduleMapper;
@Autowired
private CuringGlobalParamsService curingGlobalParamsService;
/** /**
* return top n SUCCESS process instance order by running time which started between startTime and endTime * return top n SUCCESS process instance order by running time which started between startTime and endTime
*/ */
@ -561,7 +565,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setScheduleTime(schedule); processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class); List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone); globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
processInstance.setTimeout(timeout); processInstance.setTimeout(timeout);
processInstance.setTenantCode(tenantCode); processInstance.setTenantCode(tenantCode);
processInstance.setGlobalParams(globalParams); processInstance.setGlobalParams(globalParams);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -475,7 +475,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
List<Project> projects = projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(), new ArrayList<>(projectIds)); List<Project> projects = projectMapper.selectBatchIds(projectIds);
result.put(Constants.DATA_LIST, projects); result.put(Constants.DATA_LIST, projects);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -41,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -104,6 +105,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
taskGroup.setCreateTime(new Date()); taskGroup.setCreateTime(new Date());
taskGroup.setUpdateTime(new Date()); taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) { if (taskGroupMapper.insert(taskGroup) > 0) {
permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), Collections.singletonList(taskGroup.getId()),logger);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.CREATE_TASK_GROUP_ERROR); putMsg(result, Status.CREATE_TASK_GROUP_ERROR);

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -122,11 +122,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroupMapper.updateById(workerGroup); workerGroupMapper.updateById(workerGroup);
} else { } else {
workerGroupMapper.insert(workerGroup); workerGroupMapper.insert(workerGroup);
}
putMsg(result, Status.SUCCESS);
if (id != 0) {
permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger); permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger);
} }
putMsg(result, Status.SUCCESS);
return result; return result;
} }

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -36,6 +36,8 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -70,6 +72,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.beans.factory.annotation.Autowired;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
@ -126,6 +130,9 @@ public class ProcessInstanceServiceTest {
@Mock @Mock
ScheduleMapper scheduleMapper; ScheduleMapper scheduleMapper;
@Mock
CuringGlobalParamsService curingGlobalParamsService;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java

@ -108,7 +108,6 @@ public class ProjectServiceTest {
public void testCheckProjectAndAuth() { public void testCheckProjectAndAuth() {
long projectCode = 1L; long projectCode = 1L;
// Mockito.when(projectUserMapper.queryProjectRelation(1, 1)).thenReturn(getProjectUser());
User loginUser = getLoginUser(); User loginUser = getLoginUser();
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, null, projectCode, PROJECT); Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, null, projectCode, PROJECT);
@ -142,7 +141,6 @@ public class ProjectServiceTest {
project1.setUserId(2); project1.setUserId(2);
loginUser.setUserType(UserType.GENERAL_USER); loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.PROJECTS, loginUser.getId(), PROJECT, baseServiceLogger)).thenReturn(true); Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.PROJECTS, loginUser.getId(), PROJECT, baseServiceLogger)).thenReturn(true);
// Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.PROJECTS, new Object[]{project.getId()}, 0, baseServiceLogger)).thenReturn(true);
result2 = projectService.checkProjectAndAuth(loginUser, project1, projectCode,PROJECT); result2 = projectService.checkProjectAndAuth(loginUser, project1, projectCode,PROJECT);
Assert.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM, result2.get(Constants.STATUS)); Assert.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM, result2.get(Constants.STATUS));
@ -329,9 +327,8 @@ public class ProjectServiceTest {
List<Integer> list = new ArrayList<>(1); List<Integer> list = new ArrayList<>(1);
list.add(1); list.add(1);
// not admin user // not admin user
// Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(1)).thenReturn(getList());
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set); Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set);
Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList()); Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList());
result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser); result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser);
List<Project> notAdminUserResult = (List<Project>) result.get(Constants.DATA_LIST); List<Project> notAdminUserResult = (List<Project>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(notAdminUserResult)); Assert.assertTrue(CollectionUtils.isNotEmpty(notAdminUserResult));
@ -339,8 +336,7 @@ public class ProjectServiceTest {
//admin user //admin user
loginUser.setUserType(UserType.ADMIN_USER); loginUser.setUserType(UserType.ADMIN_USER);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set); Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set);
Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList()); Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList());
// Mockito.when(projectMapper.selectList(null)).thenReturn(getList());
result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser); result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser);
List<Project> projects = (List<Project>) result.get(Constants.DATA_LIST); List<Project> projects = (List<Project>) result.get(Constants.DATA_LIST);
@ -363,9 +359,6 @@ public class ProjectServiceTest {
@Test @Test
public void testQueryUnauthorizedProject() { public void testQueryUnauthorizedProject() {
// Mockito.when(projectMapper.queryProjectExceptUserId(2)).thenReturn(getList());
// Mockito.when(projectMapper.queryProjectCreatedByUser(2)).thenReturn(getList());
// Mockito.when(projectMapper.queryAuthedProjectListByUserId(2)).thenReturn(getSingleList());
Set<Integer> set = new HashSet(); Set<Integer> set = new HashSet();
set.add(1); set.add(1);
// test admin user // test admin user

64
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import java.util.Date;
import java.util.List;
import java.util.Map;
public interface CuringGlobalParamsService {
/**
* time function need expand
* @param placeholderName
* @return
*/
boolean timeFunctionNeedExpand(String placeholderName);
/**
* time function extension
* @param processInstanceId
* @param timezone
* @param placeholderName
* @return
*/
String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName);
/**
* convert parameter placeholders
* @param val
* @param allParamMap
* @return
*/
String convertParameterPlaceholders(String val, Map<String, String> allParamMap);
/**
* curing global params
* @param processInstanceId
* @param globalParamMap
* @param globalParamList
* @param commandType
* @param scheduleTime
* @param timezone
* @return
*/
String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone);
}

96
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Component
public class DolphinSchedulerCuringGlobalParams implements CuringGlobalParamsService {
@Autowired
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
@Override
public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
}
@Override
public boolean timeFunctionNeedExpand(String placeholderName) {
return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
}
@Override
public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
}
@Override
public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
if (globalParamList == null || globalParamList.isEmpty()) {
return null;
}
Map<String, String> globalMap = new HashMap<>();
if (globalParamMap != null) {
globalMap.putAll(globalParamMap);
}
Map<String, String> allParamMap = new HashMap<>();
//If it is a complement, a complement time needs to be passed in, according to the task type
Map<String, String> timeParams = BusinessTimeUtils.
getBusinessTime(commandType, scheduleTime, timezone);
if (timeParams != null) {
allParamMap.putAll(timeParams);
}
allParamMap.putAll(globalMap);
Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
Map<String, String> resolveMap = new HashMap<>();
for (Map.Entry<String, String> entry : entries) {
String val = entry.getValue();
if (val.startsWith("$")) {
String str = "";
if (timeFunctionNeedExpand(val)) {
str = timeFunctionExtension(processInstanceId, timezone, val);
} else {
str = convertParameterPlaceholders(val, allParamMap);
}
resolveMap.put(entry.getKey(), str);
}
}
globalMap.putAll(resolveMap);
for (Property property : globalParamList) {
String val = globalMap.get(property.getProp());
if (val != null) {
property.setValue(val);
}
}
return JSONUtils.toJsonString(globalParamList);
}
}

35
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
public interface TimePlaceholderResolverExpandService {
/**
* check is need expand function
* @param placeholderName
* @return
*/
boolean timeFunctionNeedExpand(String placeholderName);
/**
* time function extension
* @param placeholderName
* @return
*/
String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName);
}

34
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
import org.springframework.stereotype.Component;
@Component
public class TimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
@Override
public boolean timeFunctionNeedExpand(String placeholderName) {
return false;
}
@Override
public String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName) {
return null;
}
}

52
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java

@ -77,58 +77,6 @@ public class ParameterUtils {
return parameterString; return parameterString;
} }
/**
* curing user define parameters
*
* @param globalParamMap global param map
* @param globalParamList global param list
* @param commandType command type
* @param scheduleTime schedule time
* @return curing user define parameters
*/
public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
CommandType commandType, Date scheduleTime, String timezone) {
if (globalParamList == null || globalParamList.isEmpty()) {
return null;
}
Map<String, String> globalMap = new HashMap<>();
if (globalParamMap != null) {
globalMap.putAll(globalParamMap);
}
Map<String, String> allParamMap = new HashMap<>();
//If it is a complement, a complement time needs to be passed in, according to the task type
Map<String, String> timeParams = BusinessTimeUtils.
getBusinessTime(commandType, scheduleTime, timezone);
if (timeParams != null) {
allParamMap.putAll(timeParams);
}
allParamMap.putAll(globalMap);
Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
Map<String, String> resolveMap = new HashMap<>();
for (Map.Entry<String, String> entry : entries) {
String val = entry.getValue();
if (val.startsWith("$")) {
String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap);
resolveMap.put(entry.getKey(), str);
}
}
globalMap.putAll(resolveMap);
for (Property property : globalParamList) {
String val = globalMap.get(property.getProp());
if (val != null) {
property.setValue(val);
}
}
return JSONUtils.toJsonString(globalParamList);
}
/** /**
* handle escapes * handle escapes
* *

151
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class CuringGlobalParamsServiceTest {
private static final String placeHolderName = "$[yyyy-MM-dd-1]";
@Mock
private CuringGlobalParamsService curingGlobalParamsService;
@InjectMocks
private DolphinSchedulerCuringGlobalParams dolphinSchedulerCuringGlobalParams;
@Mock
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
@InjectMocks
private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
private final Map<String, String> globalParamMap = new HashMap<>();
@Before
public void init() {
globalParamMap.put("globalParams1", "Params1");
}
@Test
public void testConvertParameterPlaceholders() {
Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap)).thenReturn("2022-06-26");
String result = curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap);
Assert.assertNotNull(result);
}
@Test
public void testTimeFunctionNeedExpand() {
boolean result = curingGlobalParamsService.timeFunctionNeedExpand(placeHolderName);
Assert.assertFalse(result);
}
@Test
public void testTimeFunctionExtension() {
String result = curingGlobalParamsService.timeFunctionExtension(1, "", placeHolderName);
Assert.assertNull(result);
}
@Test
public void testCuringGlobalParams() {
//define globalMap
Map<String, String> globalParamMap = new HashMap<>();
globalParamMap.put("globalParams1", "Params1");
//define globalParamList
List<Property> globalParamList = new ArrayList<>();
//define scheduleTime
Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
//test globalParamList is null
String result = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertNull(result);
Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
//test globalParamList is not null
Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
globalParamList.add(property);
String result2 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
String result3 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
String result4 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
//test var $ startsWith
globalParamMap.put("bizDate", "${system.biz.date}");
globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList");
Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}");
Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}");
globalParamList.add(property2);
globalParamList.add(property3);
globalParamList.add(property4);
String result5 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
globalParamList.add(testStartParamProperty);
Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
globalParamList.add(testStartParam2Property);
globalParamMap.put("testStartParam", "");
globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
Map<String, String> startParamMap = new HashMap<>(2);
startParamMap.put("testStartParam", "$[yyyyMMdd]");
for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
String result6 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertTrue(result6.contains("20191220"));
}
}

51
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.expand;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TimePlaceholderResolverExpandServiceTest {
@Mock
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
@InjectMocks
private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
private static final String placeHolderName = "$[yyyy-MM-dd-1]";
@Test
public void testTimePlaceholderResolverExpandService() {
boolean checkResult = timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeHolderName);
Assert.assertFalse(checkResult);
String resultString = timePlaceholderResolverExpandService.timeFunctionExtension(1, "", placeHolderName);
Assert.assertTrue(StringUtils.isEmpty(resultString));
boolean implCheckResult = timePlaceholderResolverExpandServiceImpl.timeFunctionNeedExpand(placeHolderName);
Assert.assertFalse(implCheckResult);
String implResultString = timePlaceholderResolverExpandServiceImpl.timeFunctionExtension(1, "", placeHolderName);
Assert.assertTrue(StringUtils.isEmpty(implResultString));
}
}

84
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java

@ -21,6 +21,8 @@ import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlacehold
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService;
import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils; import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@ -34,13 +36,25 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@RunWith(MockitoJUnitRunner.class)
public class ParameterUtilsTest { public class ParameterUtilsTest {
public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class); public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class);
@Mock
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
@InjectMocks
private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
/** /**
* Test convertParameterPlaceholders * Test convertParameterPlaceholders
*/ */
@ -84,76 +98,6 @@ public class ParameterUtilsTest {
parameterString); parameterString);
} }
/**
* Test curingGlobalParams
*/
@Test
public void testCuringGlobalParams() {
//define globalMap
Map<String, String> globalParamMap = new HashMap<>();
globalParamMap.put("globalParams1", "Params1");
//define globalParamList
List<Property> globalParamList = new ArrayList<>();
//define scheduleTime
Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
//test globalParamList is null
String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertNull(result);
Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
//test globalParamList is not null
Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
globalParamList.add(property);
String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
//test var $ startsWith
globalParamMap.put("bizDate", "${system.biz.date}");
globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList");
Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}");
Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}");
globalParamList.add(property2);
globalParamList.add(property3);
globalParamList.add(property4);
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
globalParamList.add(testStartParamProperty);
Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
globalParamList.add(testStartParam2Property);
globalParamMap.put("testStartParam", "");
globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
Map<String, String> startParamMap = new HashMap<>(2);
startParamMap.put("testStartParam", "$[yyyyMMdd]");
for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
Assert.assertTrue(result6.contains("20191220"));
}
/** /**
* Test handleEscapes * Test handleEscapes
*/ */

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -104,6 +105,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired @Autowired
private StateWheelExecuteThread stateWheelExecuteThread; private StateWheelExecuteThread stateWheelExecuteThread;
@Autowired
private CuringGlobalParamsService curingGlobalParamsService;
protected MasterSchedulerService() { protected MasterSchedulerService() {
super("MasterCommandLoopThread"); super("MasterCommandLoopThread");
} }
@ -183,7 +187,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
, nettyExecutorManager , nettyExecutorManager
, processAlertManager , processAlertManager
, masterConfig , masterConfig
, stateWheelExecuteThread); , stateWheelExecuteThread
, curingGlobalParamsService);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) { if (processInstance.getTimeout() > 0) {

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -235,6 +236,11 @@ public class WorkflowExecuteRunnable implements Runnable {
*/ */
private final StateWheelExecuteThread stateWheelExecuteThread; private final StateWheelExecuteThread stateWheelExecuteThread;
/**
* curing global params service
*/
private final CuringGlobalParamsService curingGlobalParamsService;
/** /**
* @param processInstance processInstance * @param processInstance processInstance
* @param processService processService * @param processService processService
@ -248,13 +254,15 @@ public class WorkflowExecuteRunnable implements Runnable {
, NettyExecutorManager nettyExecutorManager , NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager , ProcessAlertManager processAlertManager
, MasterConfig masterConfig , MasterConfig masterConfig
, StateWheelExecuteThread stateWheelExecuteThread) { , StateWheelExecuteThread stateWheelExecuteThread
, CuringGlobalParamsService curingGlobalParamsService) {
this.processService = processService; this.processService = processService;
this.processInstance = processInstance; this.processInstance = processInstance;
this.masterConfig = masterConfig; this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager; this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager; this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread; this.stateWheelExecuteThread = stateWheelExecuteThread;
this.curingGlobalParamsService = curingGlobalParamsService;
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
} }
@ -999,10 +1007,11 @@ public class WorkflowExecuteRunnable implements Runnable {
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0)); processInstance.setScheduleTime(complementListDate.get(0));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE))); CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
} }
} }

6
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java

@ -26,6 +26,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -86,6 +87,8 @@ public class WorkflowExecuteTaskTest {
private StateWheelExecuteThread stateWheelExecuteThread; private StateWheelExecuteThread stateWheelExecuteThread;
private CuringGlobalParamsService curingGlobalParamsService;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
applicationContext = mock(ApplicationContext.class); applicationContext = mock(ApplicationContext.class);
@ -113,7 +116,8 @@ public class WorkflowExecuteTaskTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class); stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread)); curingGlobalParamsService = mock(CuringGlobalParamsService.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag // prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true); dag.setAccessible(true);

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -276,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired @Autowired
private K8sMapper k8sMapper; private K8sMapper k8sMapper;
@Autowired
private CuringGlobalParamsService curingGlobalParamsService;
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
@ -802,11 +806,12 @@ public class ProcessServiceImpl implements ProcessService {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE); timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
} }
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command), getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime(), timezoneId)); processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
// set process instance priority // set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@ -953,11 +958,12 @@ public class ProcessServiceImpl implements ProcessService {
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun. // Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamList(),
commandTypeIfComplement, commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId)); processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition); processInstance.setProcessDefinition(processDefinition);
} }
//reset command parameter //reset command parameter
@ -1139,10 +1145,11 @@ public class ProcessServiceImpl implements ProcessService {
// time zone // time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE); String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(), processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId)); CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
} }
/** /**

9
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -159,6 +160,9 @@ public class ProcessServiceTest {
@Mock @Mock
private ScheduleMapper scheduleMapper; private ScheduleMapper scheduleMapper;
@Mock
CuringGlobalParamsService curingGlobalParamsService;
@Test @Test
public void testCreateSubCommand() { public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance(); ProcessInstance parentInstance = new ProcessInstance();
@ -365,6 +369,11 @@ public class ProcessServiceTest {
command5.setCommandType(CommandType.START_PROCESS); command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO); command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1); Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
Mockito.when(curingGlobalParamsService.curingGlobalParams(0,
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.START_PROCESS,
processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\"");
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5); ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));

Loading…
Cancel
Save