Browse Source

Fix task cannot use workflow's environment (#16199)

dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
b34fe46044
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java
  2. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java
  5. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  6. 29
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  7. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  8. 44
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  9. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  10. 54
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java
  11. 45
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java
  12. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  13. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
  14. 72
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java
  15. 70
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java
  16. 4
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java
  17. 57
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  18. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  19. 13
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
  20. 26
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  21. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java
  22. 7
      dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java
  23. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/task/TaskCreateRequest.java

@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.dto.task;
import static org.apache.dolphinscheduler.common.constants.Constants.VERSION_FIRST;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import java.util.Date;
@ -107,7 +107,7 @@ public class TaskCreateRequest {
taskDefinition.setProjectCode(this.projectCode);
taskDefinition.setTaskType(this.taskType);
taskDefinition.setTaskParams(this.taskParams);
taskDefinition.setWorkerGroup(this.workerGroup == null ? Constants.DEFAULT_WORKER_GROUP : this.workerGroup);
taskDefinition.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup));
taskDefinition.setEnvironmentCode(this.environmentCode);
taskDefinition.setFailRetryTimes(this.failRetryTimes);
taskDefinition.setFailRetryInterval(this.failRetryInterval);

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
@ -77,13 +76,6 @@ public interface WorkerGroupService {
*/
Map<String, Object> getWorkerAddressList();
/**
* Get task instance's worker group
* @param taskInstance task instance
* @return worker group
*/
String getTaskWorkerGroup(TaskInstance taskInstance);
/**
* Query worker group by process definition codes
* @param processDefinitionCodeList processDefinitionCodeList

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -34,7 +34,6 @@ import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_DEFINE_NOT_EX
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX;
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
@ -109,6 +108,7 @@ import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -1390,7 +1390,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDefinition.setFailRetryTimes(0);
taskDefinition.setFailRetryInterval(0);
taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
taskDefinition.setWorkerGroup(DEFAULT_WORKER_GROUP);
taskDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
taskDefinition.setTaskPriority(Priority.MEDIUM);
taskDefinition.setEnvironmentCode(-1);
taskDefinition.setTimeout(0);

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectWorkerGroupRelationServiceImpl.java

@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
@ -118,7 +119,7 @@ public class ProjectWorkerGroupRelationServiceImpl extends BaseServiceImpl
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
Collectors.toSet());
workerGroupNames.add(Constants.DEFAULT_WORKER_GROUP);
workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());
Set<String> assignedWorkerGroupNames = new HashSet<>(workerGroups);

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@ -41,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -357,12 +357,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
boolean containDefaultWorkerGroups = workerGroups.stream()
.anyMatch(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName()));
.anyMatch(workerGroup -> WorkerGroupUtils.isWorkerGroupEmpty(workerGroup.getName()));
if (!containDefaultWorkerGroups) {
// there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
Set<String> activeWorkerNodes = registryClient.getServerNodeSet(RegistryNodeType.WORKER);
WorkerGroup defaultWorkerGroup = new WorkerGroup();
defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
defaultWorkerGroup.setName(WorkerGroupUtils.getDefaultWorkerGroup());
defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
defaultWorkerGroup.setCreateTime(new Date());
defaultWorkerGroup.setUpdateTime(new Date());
@ -431,27 +431,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return result;
}
@Override
public String getTaskWorkerGroup(TaskInstance taskInstance) {
if (taskInstance == null) {
return null;
}
String workerGroup = taskInstance.getWorkerGroup();
if (StringUtils.isNotEmpty(workerGroup)) {
return workerGroup;
}
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId);
if (processInstance != null) {
return processInstance.getWorkerGroup();
}
log.info("task : {} will use default worker group", taskInstance.getId());
return Constants.DEFAULT_WORKER_GROUP;
}
@Override
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =

29
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -67,6 +67,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -271,7 +272,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 10, null, null,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 10, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@ -298,7 +299,7 @@ public class ExecuteFunctionServiceTest {
null, "123456789,987654321",
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@ -323,7 +324,7 @@ public class ExecuteFunctionServiceTest {
null, "1123456789,987654321",
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 0,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 0,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@ -354,14 +355,14 @@ public class ExecuteFunctionServiceTest {
dependentProcessDefinition.setProcessDefinitionCode(2);
dependentProcessDefinition.setProcessDefinitionVersion(1);
dependentProcessDefinition.setTaskDefinitionCode(1);
dependentProcessDefinition.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
dependentProcessDefinition.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
dependentProcessDefinition.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":2,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(processDefinitionCode))
.thenReturn(Lists.newArrayList(dependentProcessDefinition));
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
processDefinitionWorkerGroupMap.put(1L, WorkerGroupUtils.getDefaultWorkerGroup());
Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);
@ -370,7 +371,7 @@ public class ExecuteFunctionServiceTest {
command.setCommandType(CommandType.COMPLEMENT_DATA);
command.setCommandParam(
"{\"StartNodeList\":\"1\",\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}");
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessDefinitionCode(processDefinitionCode);
command.setExecutorId(1);
@ -383,7 +384,7 @@ public class ExecuteFunctionServiceTest {
childDependent.setProcessDefinitionCode(3);
childDependent.setProcessDefinitionVersion(1);
childDependent.setTaskDefinitionCode(4);
childDependent.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
childDependent.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
childDependent.setTaskParams(
"{\"localParams\":[],\"resourceList\":[],\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"depTaskCode\":3,\"status\":\"SUCCESS\"}]}]},\"conditionResult\":{\"successNode\":[1],\"failedNode\":[1]}}");
Mockito.when(processService.queryDependentProcessDefinitionByProcessDefinitionCode(
@ -409,7 +410,8 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 2,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
false,
@ -434,7 +436,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, null,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@ -460,7 +462,8 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 2,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
false,
@ -486,7 +489,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
@ -514,7 +517,7 @@ public class ExecuteFunctionServiceTest {
null,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW,
Constants.DEFAULT_WORKER_GROUP,
WorkerGroupUtils.getDefaultWorkerGroup(),
tenantCode,
100L,
110,
@ -553,7 +556,7 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode, 100L, 110, null, 15,
Priority.LOW, WorkerGroupUtils.getDefaultWorkerGroup(), tenantCode, 100L, 110, null, 15,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_YES,
ComplementDependentMode.OFF_MODE, null,

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -72,6 +72,7 @@ import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -1143,7 +1144,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
schedule.setProcessInstancePriority(Priority.MEDIUM);
schedule.setWarningType(WarningType.NONE);
schedule.setWarningGroupId(1);
schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
schedule.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
return schedule;
}

44
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
@ -65,8 +64,6 @@ import org.slf4j.LoggerFactory;
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkerGroupServiceTest {
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class);
private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class);
@ -288,47 +285,6 @@ public class WorkerGroupServiceTest {
Assertions.assertEquals("default", workerGroups.toArray()[0]);
}
@Test
public void giveNull_whenGetTaskWorkerGroup_expectNull() {
String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
Assertions.assertNull(nullWorkerGroup);
}
@Test
public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setWorkerGroup("cluster1");
String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals("cluster1", workerGroup);
}
@Test
public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setWorkerGroup("cluster1");
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals("cluster1", workerGroup);
}
@Test
public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup);
}
/**
* get Group
*/

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -508,14 +508,9 @@ public final class Constants {
* session timeout
*/
public static final int SESSION_TIME_OUT = 7200;
public static final int MAX_FILE_SIZE = 1024 * 1024 * 1024;
public static final String UDF = "UDF";
public static final String CLASS = "class";
/**
* default worker group
*/
public static final String DEFAULT_WORKER_GROUP = "default";
/**
* authorize writable perm
*/

54
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtils.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
public class EnvironmentUtils {
private static final long EMPTY_ENVIRONMENT_CODE = -1L;
/**
* Check if the environment code is empty (we should use null instead of -1, this is used to comply with the original code)
*
* @return true if the environment code is empty, false otherwise
*/
public static boolean isEnvironmentCodeEmpty(Long environmentCode) {
return environmentCode == null || environmentCode <= 0;
}
/**
* Get the empty environment code
*/
public static Long getDefaultEnvironmentCode() {
return EMPTY_ENVIRONMENT_CODE;
}
/**
* Get the environment code or the default environment code if the environment code is empty
*/
public static Long getEnvironmentCodeOrDefault(Long environmentCode) {
return getEnvironmentCodeOrDefault(environmentCode, getDefaultEnvironmentCode());
}
/**
* Get the environment code or the default environment code if the environment code is empty
*/
public static Long getEnvironmentCodeOrDefault(Long environmentCode, Long defaultEnvironmentCode) {
return isEnvironmentCodeEmpty(environmentCode) ? defaultEnvironmentCode : environmentCode;
}
}

45
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtils.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import org.apache.commons.lang3.StringUtils;
public class WorkerGroupUtils {
private static final String DEFAULT_WORKER_GROUP = "default";
/**
* Check if the worker group is empty, if the worker group is default, it is considered empty
*/
public static boolean isWorkerGroupEmpty(String workerGroup) {
return StringUtils.isEmpty(workerGroup) || getDefaultWorkerGroup().equals(workerGroup);
}
public static String getWorkerGroupOrDefault(String workerGroup) {
return getWorkerGroupOrDefault(workerGroup, getDefaultWorkerGroup());
}
public static String getWorkerGroupOrDefault(String workerGroup, String defaultWorkerGroup) {
return isWorkerGroupEmpty(workerGroup) ? defaultWorkerGroup : workerGroup;
}
public static String getDefaultWorkerGroup() {
return DEFAULT_WORKER_GROUP;
}
}

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
import static com.google.common.truth.Truth.assertThat;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import java.util.Date;
import java.util.HashMap;
@ -303,7 +303,7 @@ public class CommandMapperTest extends BaseDaoTest {
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
commandMapper.insert(command);

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.dao.repository.impl;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
@ -30,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.repository.CommandDao;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.commons.lang3.RandomUtils;
@ -94,7 +94,7 @@ class CommandDaoImplTest extends BaseDaoTest {
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
command.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(0);
return command;

72
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/EnvironmentUtilsTest.java

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import static com.google.common.truth.Truth.assertThat;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
class EnvironmentUtilsTest {
@ParameterizedTest
@ValueSource(longs = {0, -1})
void testIsEnvironmentCodeEmpty_emptyEnvironmentCode(Long environmentCode) {
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isTrue();
}
@ParameterizedTest
@ValueSource(longs = {123})
void testIsEnvironmentCodeEmpty_nonEmptyEnvironmentCode(Long environmentCode) {
assertThat(EnvironmentUtils.isEnvironmentCodeEmpty(environmentCode)).isFalse();
}
@Test
void testGetDefaultEnvironmentCode() {
assertThat(EnvironmentUtils.getDefaultEnvironmentCode()).isEqualTo(-1L);
}
@ParameterizedTest
@ValueSource(longs = {0, -1})
void testGetEnvironmentCodeOrDefault_emptyEnvironmentCode(Long environmentCode) {
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(-1L);
}
@ParameterizedTest
@ValueSource(longs = {123})
void testGetEnvironmentCodeOrDefault_nonEmptyEnvironmentCode(Long environmentCode) {
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode)).isEqualTo(environmentCode);
}
@ParameterizedTest
@CsvSource(value = {",123", "-1,123"})
void testGetEnvironmentCodeOrDefault_withDefaultValue_emptyEnvironmentCode(Long environmentCode,
Long defaultValue) {
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, defaultValue)).isEqualTo(defaultValue);
}
@ParameterizedTest
@CsvSource(value = {"1,123"})
void testGetEnvironmentCodeOrDefault_withDefaultValue_nonEmptyEnvironmentCode(Long environmentCode,
Long defaultValue) {
assertThat(EnvironmentUtils.getEnvironmentCodeOrDefault(environmentCode, defaultValue))
.isEqualTo(environmentCode);
}
}

70
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/WorkerGroupUtilsTest.java

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import static com.google.common.truth.Truth.assertThat;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
class WorkerGroupUtilsTest {
@ParameterizedTest
@ValueSource(strings = {"", "default"})
void testIsWorkerGroupEmpty_emptyWorkerGroup(String workerGroup) {
assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isTrue();
}
@ParameterizedTest
@ValueSource(strings = {"123", "default1"})
void testIsWorkerGroupEmpty_nonEmptyWorkerGroup(String workerGroup) {
assertThat(WorkerGroupUtils.isWorkerGroupEmpty(workerGroup)).isFalse();
}
@ParameterizedTest
@ValueSource(strings = {"", "default"})
void testGetWorkerGroupOrDefault_emptyWorkerGroup(String workerGroup) {
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup))
.isEqualTo(WorkerGroupUtils.getDefaultWorkerGroup());
}
@ParameterizedTest
@ValueSource(strings = {"test"})
void testGetWorkerGroupOrDefault_nonEmptyWorkerGroup(String workerGroup) {
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
}
@ParameterizedTest
@CsvSource(value = {",test", "default,test"})
void testGetWorkerGroupOrDefault_withDefaultValue_emptyWorkerGroup(String workerGroup, String defaultValue) {
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup, defaultValue)).isEqualTo(defaultValue);
}
@ParameterizedTest
@CsvSource(value = {"test1,test"})
void testGetWorkerGroupOrDefault_withDefaultValue_nonEmptyWorkerGroup(String workerGroup, String defaultValue) {
assertThat(WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup)).isEqualTo(workerGroup);
}
@Test
void getDefaultWorkerGroup() {
assertThat(WorkerGroupUtils.getDefaultWorkerGroup()).isEqualTo("default");
}
}

4
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/WorkflowJavaTaskE2ETest.java

@ -92,8 +92,8 @@ public class WorkflowJavaTaskE2ETest {
.goToNav(SecurityPage.class)
.goToTab(UserPage.class);
new WebDriverWait(userPage.driver(), Duration.ofSeconds(20)).until(ExpectedConditions.visibilityOfElementLocated(
new By.ByClassName("name")));
new WebDriverWait(userPage.driver(), Duration.ofSeconds(20))
.until(ExpectedConditions.visibilityOfElementLocated(new By.ByClassName("name")));
userPage.update(user, user, email, phone, tenant)
.goToNav(ProjectPage.class)

57
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.context;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ExecutionContext {
private Host host;
private TaskInstance taskInstance;
private ExecutorType executorType;
/**
* worker group
*/
private String workerGroup;
public ExecutionContext(ExecutorType executorType, TaskInstance taskInstance) {
this(executorType, DEFAULT_WORKER_GROUP, taskInstance);
}
public ExecutionContext(ExecutorType executorType, String workerGroup, TaskInstance taskInstance) {
this.executorType = executorType;
this.workerGroup = workerGroup;
this.taskInstance = taskInstance;
}
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
@ -36,7 +37,6 @@ import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -273,8 +273,8 @@ public class ServerNodeManager implements InitializingBean {
.filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
}
if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet());
if (!tmpWorkerGroupMappings.containsKey(WorkerGroupUtils.getDefaultWorkerGroup())) {
tmpWorkerGroupMappings.put(WorkerGroupUtils.getDefaultWorkerGroup(), workerNodeInfo.keySet());
}
} finally {
workerNodeInfoReadLock.unlock();
@ -307,9 +307,7 @@ public class ServerNodeManager implements InitializingBean {
public Set<String> getWorkerGroupNodes(String workerGroup) throws WorkerGroupNotFoundException {
workerGroupReadLock.lock();
try {
if (StringUtils.isEmpty(workerGroup)) {
workerGroup = Constants.DEFAULT_WORKER_GROUP;
}
workerGroup = WorkerGroupUtils.getWorkerGroupOrDefault(workerGroup);
Set<String> nodes = workerGroupNodes.get(workerGroup);
if (nodes == null) {
throw new WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", workerGroup));

13
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
@ -31,6 +29,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
@ -270,12 +270,11 @@ public class StreamTaskExecuteRunnable implements Runnable {
// task dry run flag
taskInstance.setDryRun(taskExecuteStartMessage.getDryRun());
taskInstance.setWorkerGroup(StringUtils.isBlank(taskDefinition.getWorkerGroup()) ? DEFAULT_WORKER_GROUP
: taskDefinition.getWorkerGroup());
taskInstance.setEnvironmentCode(
taskDefinition.getEnvironmentCode() == 0 ? -1 : taskDefinition.getEnvironmentCode());
taskInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(taskDefinition.getWorkerGroup()));
taskInstance
.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(taskDefinition.getEnvironmentCode()));
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
if (!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());

26
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -25,8 +25,9 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMA;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.dao.utils.EnvironmentUtils.getEnvironmentCodeOrDefault;
import static org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils.getWorkerGroupOrDefault;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import org.apache.dolphinscheduler.common.constants.Constants;
@ -51,7 +52,9 @@ import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
@ -1112,25 +1115,22 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
String processWorkerGroup = processInstance.getWorkerGroup();
processWorkerGroup = StringUtils.isBlank(processWorkerGroup) ? DEFAULT_WORKER_GROUP : processWorkerGroup;
String taskWorkerGroup =
StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
String processWorkerGroup = getWorkerGroupOrDefault(processInstance.getWorkerGroup());
String taskWorkerGroup = getWorkerGroupOrDefault(taskNode.getWorkerGroup());
Long processEnvironmentCode =
Objects.isNull(processInstance.getEnvironmentCode()) ? -1 : processInstance.getEnvironmentCode();
Long taskEnvironmentCode =
Objects.isNull(taskNode.getEnvironmentCode()) ? processEnvironmentCode : taskNode.getEnvironmentCode();
Long processEnvironmentCode = getEnvironmentCodeOrDefault(processInstance.getEnvironmentCode());
Long taskEnvironmentCode = getEnvironmentCodeOrDefault(taskNode.getEnvironmentCode());
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
if (WorkerGroupUtils.isWorkerGroupEmpty(taskWorkerGroup)) {
// If the task workerGroup is empty, then use the workflow workerGroup/environment
taskInstance.setWorkerGroup(processWorkerGroup);
taskInstance.setEnvironmentCode(processEnvironmentCode);
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode, processEnvironmentCode));
} else {
taskInstance.setWorkerGroup(taskWorkerGroup);
taskInstance.setEnvironmentCode(taskEnvironmentCode);
taskInstance.setEnvironmentCode(getEnvironmentCodeOrDefault(taskEnvironmentCode, processEnvironmentCode));
}
if (!taskInstance.getEnvironmentCode().equals(-1L)) {
if (!EnvironmentUtils.isEnvironmentCodeEmpty(taskInstance.getEnvironmentCode())) {
Environment environment = processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
if (Objects.nonNull(environment) && StringUtils.isNotEmpty(environment.getConfig())) {
taskInstance.setEnvironmentConfig(environment.getConfig());

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/utils/WorkflowInstanceUtilsTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.sql.Date;
@ -52,7 +53,7 @@ public class WorkflowInstanceUtilsTest {
workflowInstance.setDryRun(0);
workflowInstance.setTenantCode("default");
workflowInstance.setRestartTime(Date.valueOf("2023-08-01"));
workflowInstance.setWorkerGroup("default");
workflowInstance.setWorkerGroup(WorkerGroupUtils.getDefaultWorkerGroup());
workflowInstance.setStartTime(Date.valueOf("2023-08-01"));
workflowInstance.setEndTime(Date.valueOf("2023-08-01"));
Assertions.assertEquals("\n"

7
dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java

@ -17,17 +17,15 @@
package org.apache.dolphinscheduler.scheduler.quartz;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import lombok.extern.slf4j.Slf4j;
@ -93,8 +91,7 @@ public class ProcessScheduleTask extends QuartzJobBean {
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP
: schedule.getWorkerGroup();
String workerGroup = WorkerGroupUtils.getWorkerGroupOrDefault(schedule.getWorkerGroup());
command.setWorkerGroup(workerGroup);
command.setTenantCode(schedule.getTenantCode());
command.setEnvironmentCode(schedule.getEnvironmentCode());

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -107,6 +107,8 @@ import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.DqRuleUtils;
import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
@ -573,10 +575,8 @@ public class ProcessServiceImpl implements ProcessService {
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
String workerGroup = StringUtils.defaultIfEmpty(command.getWorkerGroup(), Constants.DEFAULT_WORKER_GROUP);
processInstance.setWorkerGroup(workerGroup);
processInstance
.setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : command.getEnvironmentCode());
processInstance.setWorkerGroup(WorkerGroupUtils.getWorkerGroupOrDefault(command.getWorkerGroup()));
processInstance.setEnvironmentCode(EnvironmentUtils.getEnvironmentCodeOrDefault(command.getEnvironmentCode()));
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantCode(command.getTenantCode());
return processInstance;

Loading…
Cancel
Save