Browse Source

refactor worker group (#2132)

* replace worker group id with worker group

* add worker group field in ddl

* remove worker group id
pull/2/head
lgcareer 5 years ago committed by GitHub
parent
commit
bbe2cd4864
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
  4. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java
  5. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  6. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  7. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  8. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  9. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  10. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  11. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  12. 16
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  13. 24
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  14. 29
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  15. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java
  16. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  17. 3
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
  18. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  19. 30
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  20. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  21. 64
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -64,7 +64,7 @@ public class ExecutorController extends BaseController {
* @param receiversCc receivers cc
* @param runMode run mode
* @param processInstancePriority process instance priority
* @param workerGroupId worker group id
* @param workerGroup worker group
* @param timeout timeout
* @return start process result code
*/
@ -82,7 +82,7 @@ public class ExecutorController extends BaseController {
@ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ),
@ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String",example = "default"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"),
})
@PostMapping(value = "start-process-instance")
@ -101,15 +101,15 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "timeout", required = false) Integer timeout) {
try {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}",
+ "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}",
loginUser.getUserName(), projectName, processDefinitionId, scheduleTime,
failureStrategy, startNodeList, taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,
workerGroupId, timeout);
failureStrategy, startNodeList, taskDependType, warningType, workerGroup,receivers,receiversCc,runMode,processInstancePriority,
workerGroup, timeout);
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@ -117,7 +117,7 @@ public class ExecutorController extends BaseController {
Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType,
warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroupId, timeout);
warningGroupId,receivers,receiversCc, runMode,processInstancePriority, workerGroup, timeout);
return returnDataList(result);
} catch (Exception e) {
logger.error(Status.START_PROCESS_INSTANCE_ERROR.getMsg(),e);

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java

@ -70,7 +70,7 @@ public class SchedulerController extends BaseController {
* @param processInstancePriority process instance priority
* @param receivers receivers
* @param receiversCc receivers cc
* @param workerGroupId worker group id
* @param workerGroup worker group
* @return create result code
*/
@ApiOperation(value = "createSchedule", notes= "CREATE_SCHEDULE_NOTES")
@ -96,15 +96,15 @@ public class SchedulerController extends BaseController {
@RequestParam(value = "failureStrategy", required = false, defaultValue = DEFAULT_FAILURE_POLICY) FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {}, process name: {}, create schedule: {}, warning type: {}, warning group id: {}," +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {}, workGroupId:{}",
loginUser.getUserName(), projectName, processDefinitionId, schedule, warningType, warningGroupId,
failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId);
failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup);
try {
Map<String, Object> result = schedulerService.insertSchedule(loginUser, projectName, processDefinitionId, schedule,
warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroupId);
warningType, warningGroupId, failureStrategy, receivers, receiversCc, processInstancePriority, workerGroup);
return returnDataList(result);
} catch (Exception e) {
@ -124,7 +124,7 @@ public class SchedulerController extends BaseController {
* @param warningGroupId warning group id
* @param failureStrategy failure strategy
* @param receivers receivers
* @param workerGroupId worker group id
* @param workerGroup worker group
* @param processInstancePriority process instance priority
* @param receiversCc receivers cc
* @return update result code
@ -151,16 +151,16 @@ public class SchedulerController extends BaseController {
@RequestParam(value = "failureStrategy", required = false, defaultValue = "END") FailureStrategy failureStrategy,
@RequestParam(value = "receivers", required = false) String receivers,
@RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "workerGroupId", required = false, defaultValue = "-1") int workerGroupId,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority) {
logger.info("login user {}, project name: {},id: {}, updateProcessInstance schedule: {}, notify type: {}, notify mails: {}, " +
"failure policy: {},receivers : {},receiversCc : {},processInstancePriority : {},workerGroupId:{}",
loginUser.getUserName(), projectName, id, schedule, warningType, warningGroupId, failureStrategy,
receivers, receiversCc, processInstancePriority, workerGroupId);
receivers, receiversCc, processInstancePriority, workerGroup);
try {
Map<String, Object> result = schedulerService.updateSchedule(loginUser, projectName, id, schedule,
warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroupId);
warningType, warningGroupId, failureStrategy, receivers, receiversCc, null, processInstancePriority, workerGroup);
return returnDataList(result);
} catch (Exception e) {

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java

@ -27,6 +27,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -34,6 +35,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import java.util.List;
import java.util.Map;
/**
@ -46,7 +48,6 @@ public class WorkerGroupController extends BaseController{
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupController.class);
@Autowired
WorkerGroupService workerGroupService;
@ -135,6 +136,7 @@ public class WorkerGroupController extends BaseController{
loginUser.getUserName() );
try {
Map<String, Object> result = workerGroupService.queryAllGroup();
return returnDataList(result);
}catch (Exception e){

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java

@ -96,11 +96,6 @@ public class ProcessMeta {
*/
private String scheduleProcessInstancePriority;
/**
* worker group id
*/
private Integer scheduleWorkerGroupId;
/**
* worker group name
*/
@ -229,14 +224,6 @@ public class ProcessMeta {
this.scheduleProcessInstancePriority = scheduleProcessInstancePriority;
}
public Integer getScheduleWorkerGroupId() {
return scheduleWorkerGroupId;
}
public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) {
this.scheduleWorkerGroupId = scheduleWorkerGroupId;
}
public String getScheduleWorkerGroupName() {
return scheduleWorkerGroupName;
}

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -85,7 +85,7 @@ public class ExecutorService extends BaseService{
* @param receivers receivers
* @param receiversCc receivers cc
* @param processInstancePriority process instance priority
* @param workerGroupId worker group id
* @param workerGroup worker group name
* @param runMode run mode
* @param timeout timeout
* @return execute process instance code
@ -96,7 +96,7 @@ public class ExecutorService extends BaseService{
FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId,
String receivers, String receiversCc, RunMode runMode,
Priority processInstancePriority, int workerGroupId, Integer timeout) throws ParseException {
Priority processInstancePriority, String workerGroup, Integer timeout) throws ParseException {
Map<String, Object> result = new HashMap<>(5);
// timeout is valid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@ -128,7 +128,7 @@ public class ExecutorService extends BaseService{
*/
int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode,processInstancePriority, workerGroupId);
warningGroupId, runMode,processInstancePriority, workerGroup);
if(create > 0 ){
/**
* according to the process definition ID updateProcessInstance and CC recipient
@ -452,11 +452,29 @@ public class ExecutorService extends BaseService{
* @return
* @throws ParseException
*/
/**
* create commonad
* @param commandType command type
* @param processDefineId process define id
* @param nodeDep node dependency
* @param failureStrategy failure strategy
* @param startNodeList start node list
* @param schedule schedule
* @param warningType warning type
* @param executorId executor id
* @param warningGroupId warning group id
* @param runMode run mode
* @param processInstancePriority process instance priority
* @param workerGroup worker group
* @return create command result
* @throws ParseException parse exception
*/
private int createCommand(CommandType commandType, int processDefineId,
TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType,
int excutorId, int warningGroupId,
RunMode runMode,Priority processInstancePriority, int workerGroupId) throws ParseException {
int executorId, int warningGroupId,
RunMode runMode,Priority processInstancePriority, String workerGroup) throws ParseException {
/**
* instantiate command schedule instance
@ -484,10 +502,10 @@ public class ExecutorService extends BaseService{
command.setWarningType(warningType);
}
command.setCommandParam(JSONUtils.toJson(cmdParam));
command.setExecutorId(excutorId);
command.setExecutorId(executorId);
command.setWarningGroupId(warningGroupId);
command.setProcessInstancePriority(processInstancePriority);
command.setWorkerGroupId(workerGroupId);
command.setWorkerGroup(workerGroup);
Date start = null;
Date end = null;

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -561,13 +561,13 @@ public class ProcessDefinitionService extends BaseDAGService {
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());
/*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId());
if (null == workerGroup && schedule.getWorkerGroupId() == -1) {
workerGroup = new WorkerGroup();
workerGroup.setId(-1);
workerGroup.setName("");
}
}*/
exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString());
exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId());
@ -577,11 +577,7 @@ public class ProcessDefinitionService extends BaseDAGService {
exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE));
exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));
if (null != workerGroup) {
exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId());
exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName());
}
exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup());
}
//create workflow json file
return JSONUtils.toJsonString(exportProcessMeta);
@ -780,15 +776,9 @@ public class ProcessDefinitionService extends BaseDAGService {
if (null != processMeta.getScheduleProcessInstancePriority()) {
scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority()));
}
if (null != processMeta.getScheduleWorkerGroupId()) {
scheduleObj.setWorkerGroupId(processMeta.getScheduleWorkerGroupId());
} else {
if (null != processMeta.getScheduleWorkerGroupName()) {
List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(processMeta.getScheduleWorkerGroupName());
if(CollectionUtils.isNotEmpty(workerGroups)){
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
}
}
if (null != processMeta.getScheduleWorkerGroupName()) {
scheduleObj.setWorkerGroup(processMeta.getScheduleWorkerGroupName());
}
return scheduleMapper.insert(scheduleObj);

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -112,9 +112,9 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
String workerGroupName = "";
if(processInstance.getWorkerGroupId() == -1){
workerGroupName = DEFAULT;
/*String workerGroupName = "";
if(StringUtils.isBlank(processInstance.getWorkerGroup())){
workerGroupName = ;
}else{
WorkerGroup workerGroup = workerGroupMapper.selectById(processInstance.getWorkerGroupId());
if(workerGroup != null){
@ -123,7 +123,7 @@ public class ProcessInstanceService extends BaseDAGService {
workerGroupName = DEFAULT;
}
}
processInstance.setWorkerGroupName(workerGroupName);
processInstance.setWorkerGroupName(workerGroupName);*/
ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId());
processInstance.setReceivers(processDefinition.getReceivers());
processInstance.setReceiversCc(processDefinition.getReceiversCc());

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -92,7 +92,7 @@ public class SchedulerService extends BaseService {
* @param processInstancePriority process instance priority
* @param receivers receivers
* @param receiversCc receivers cc
* @param workerGroupId worker group id
* @param workerGroup worker group
* @return create result code
* @throws IOException ioexception
*/
@ -106,7 +106,7 @@ public class SchedulerService extends BaseService {
String receivers,
String receiversCc,
Priority processInstancePriority,
int workerGroupId) throws IOException {
String workerGroup) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5);
@ -156,7 +156,7 @@ public class SchedulerService extends BaseService {
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroupId(workerGroupId);
scheduleObj.setWorkerGroup(workerGroup);
scheduleMapper.insert(scheduleObj);
/**
@ -182,7 +182,7 @@ public class SchedulerService extends BaseService {
* @param warningType warning type
* @param warningGroupId warning group id
* @param failureStrategy failure strategy
* @param workerGroupId worker group id
* @param workerGroup worker group
* @param processInstancePriority process instance priority
* @param receiversCc receiver cc
* @param receivers receivers
@ -202,7 +202,7 @@ public class SchedulerService extends BaseService {
String receiversCc,
ReleaseState scheduleStatus,
Priority processInstancePriority,
int workerGroupId) throws IOException {
String workerGroup) throws IOException {
Map<String, Object> result = new HashMap<String, Object>(5);
Project project = projectMapper.queryByName(projectName);
@ -266,7 +266,7 @@ public class SchedulerService extends BaseService {
if (scheduleStatus != null) {
schedule.setReleaseState(scheduleStatus);
}
schedule.setWorkerGroupId(workerGroupId);
schedule.setWorkerGroup(workerGroup);
schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.updateById(schedule);

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -50,6 +51,9 @@ public class WorkerGroupService extends BaseService {
@Autowired
ProcessInstanceMapper processInstanceMapper;
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
/**
* create or update a worker group
*
@ -181,7 +185,8 @@ public class WorkerGroupService extends BaseService {
*/
public Map<String,Object> queryAllGroup() {
Map<String, Object> result = new HashMap<>(5);
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH);
result.put(Constants.DATA_LIST, workerGroupList);
putMsg(result, Status.SUCCESS);
return result;

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -117,7 +117,7 @@ public class ExecutorService2Test {
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}catch (Exception e){
@ -138,7 +138,7 @@ public class ExecutorService2Test {
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
}catch (Exception e){
@ -159,7 +159,7 @@ public class ExecutorService2Test {
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, 0, 110);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
}catch (Exception e){
@ -180,7 +180,7 @@ public class ExecutorService2Test {
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
}catch (Exception e){
@ -201,7 +201,7 @@ public class ExecutorService2Test {
null, null,
null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, 0, 110);
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
}catch (Exception e){

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -803,7 +803,7 @@ public class ProcessDefinitionServiceTest {
schedule.setProcessInstancePriority(Priority.MEDIUM);
schedule.setWarningType(WarningType.NONE);
schedule.setWarningGroupId(1);
schedule.setWorkerGroupId(-1);
schedule.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return schedule;
}
@ -822,7 +822,6 @@ public class ProcessDefinitionServiceTest {
processMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy()));
processMeta.setScheduleReleaseState(String.valueOf(schedule.getReleaseState()));
processMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority()));
processMeta.setScheduleWorkerGroupId(schedule.getWorkerGroupId());
processMeta.setScheduleWorkerGroupName("workgroup1");
return processMeta;
}

16
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -114,9 +114,9 @@ public class TaskNode {
private Priority taskInstancePriority;
/**
* worker group id
* worker group
*/
private int workerGroupId;
private String workerGroup;
/**
@ -230,7 +230,7 @@ public class TaskNode {
Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) &&
Objects.equals(workerGroupId, taskNode.workerGroupId) &&
Objects.equals(workerGroup, taskNode.workerGroup) &&
CollectionUtils.equalLists(depList, taskNode.depList);
}
@ -310,15 +310,15 @@ public class TaskNode {
", dependence='" + dependence + '\'' +
", taskInstancePriority=" + taskInstancePriority +
", timeout='" + timeout + '\'' +
", workerGroupId='" + workerGroupId + '\'' +
", workerGroup='" + workerGroup + '\'' +
'}';
}
public int getWorkerGroupId() {
return workerGroupId;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
}

24
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -108,13 +108,11 @@ public class Command {
@TableField("update_time")
private Date updateTime;
/**
*
* worker group
*/
@TableField("worker_group_id")
private int workerGroupId;
@TableField(exist = false)
private String workerGroup;
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
@ -254,13 +252,12 @@ public class Command {
this.updateTime = updateTime;
}
public int getWorkerGroupId() {
return workerGroupId;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
@Override
@ -283,7 +280,7 @@ public class Command {
if (executorId != command.executorId) {
return false;
}
if (workerGroupId != command.workerGroupId) {
if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) {
return false;
}
if (commandType != command.commandType) {
@ -332,10 +329,9 @@ public class Command {
result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0);
result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0);
result = 31 * result + workerGroupId;
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "Command{" +
@ -352,7 +348,7 @@ public class Command {
", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime +
", workerGroupId=" + workerGroupId +
", workerGroup='" + workerGroup + '\'' +
'}';
}
}

29
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -189,9 +189,9 @@ public class ProcessInstance {
private Priority processInstancePriority;
/**
* worker group id
* worker group
*/
private int workerGroupId;
private String workerGroup;
/**
* process timeout for warning
@ -203,12 +203,6 @@ public class ProcessInstance {
*/
private int tenantId;
/**
* worker group name. for api.
*/
@TableField(exist = false)
private String workerGroupName;
/**
* receivers for api
*/
@ -527,12 +521,12 @@ public class ProcessInstance {
this.duration = duration;
}
public int getWorkerGroupId() {
return workerGroupId;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getTimeout() {
@ -552,14 +546,6 @@ public class ProcessInstance {
return this.tenantId ;
}
public String getWorkerGroupName() {
return workerGroupName;
}
public void setWorkerGroupName(String workerGroupName) {
this.workerGroupName = workerGroupName;
}
public String getReceivers() {
return receivers;
}
@ -610,10 +596,9 @@ public class ProcessInstance {
", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' +
", duration=" + duration +
", processInstancePriority=" + processInstancePriority +
", workerGroupId=" + workerGroupId +
", workerGroup='" + workerGroup + '\'' +
", timeout=" + timeout +
", tenantId=" + tenantId +
", workerGroupName='" + workerGroupName + '\'' +
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
'}';

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Schedule.java

@ -122,9 +122,9 @@ public class Schedule {
private Priority processInstancePriority;
/**
* worker group id
* worker group
*/
private int workerGroupId;
private String workerGroup;
public int getWarningGroupId() {
return warningGroupId;
@ -265,13 +265,12 @@ public class Schedule {
this.processInstancePriority = processInstancePriority;
}
public int getWorkerGroupId() {
return workerGroupId;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
@Override
@ -294,7 +293,7 @@ public class Schedule {
", releaseState=" + releaseState +
", warningGroupId=" + warningGroupId +
", processInstancePriority=" + processInstancePriority +
", workerGroupId=" + workerGroupId +
", workerGroup='" + workerGroup + '\'' +
'}';
}

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -186,15 +186,10 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private String dependentResult;
/**
* worker group id
*/
private int workerGroupId;
/**
* workerGroup
*/
@TableField(exist = false)
private String workerGroup;
public ProcessInstance getProcessInstance() {
@ -450,14 +445,6 @@ public class TaskInstance implements Serializable {
this.processInstancePriority = processInstancePriority;
}
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
public String getDependentResult() {
return dependentResult;
}
@ -505,7 +492,6 @@ public class TaskInstance implements Serializable {
", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority +
", dependentResult='" + dependentResult + '\'' +
", workerGroupId=" + workerGroupId +
", workerGroup='" + workerGroup + '\'' +
'}';
}

3
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java

@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.CommandCount;
@ -265,7 +266,7 @@ public class CommandMapperTest {
command.setProcessInstancePriority(Priority.MEDIUM);
command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
command.setWorkerGroupId(-1);
command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
commandMapper.insert(command);
return command;

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -486,8 +486,13 @@ public class MasterExecThread implements Runnable {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
}
int workerGroupId = taskNode.getWorkerGroupId();
taskInstance.setWorkerGroupId(workerGroupId);
String processWorkerGroup = processInstance.getWorkerGroup();
String taskWorkerGroup = StringUtils.isBlank(taskNode.getWorkerGroup()) ? processWorkerGroup : taskNode.getWorkerGroup();
if (!processWorkerGroup.equals(DEFAULT_WORKER_GROUP) && taskWorkerGroup.equals(DEFAULT_WORKER_GROUP)) {
taskInstance.setWorkerGroup(processWorkerGroup);
}else {
taskInstance.setWorkerGroup(taskWorkerGroup);
}
}
return taskInstance;

30
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -427,8 +427,8 @@ public class ProcessService {
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
processInstance.setWorkerGroupId(workerGroupId);
String workerGroup = StringUtils.isBlank(command.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : command.getWorkerGroup();
processInstance.setWorkerGroup(workerGroup);
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantId(processDefinition.getTenantId());
return processInstance;
@ -964,7 +964,7 @@ public class ProcessService {
*/
public String taskZkInfo(TaskInstance taskInstance) {
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
String taskWorkerGroup = getTaskWorkerGroup(taskInstance);
ProcessInstance processInstance = this.findProcessInstanceById(taskInstance.getProcessInstanceId());
if(processInstance == null){
logger.error("process instance is null. please check the task info, task id: " + taskInstance.getId());
@ -976,9 +976,10 @@ public class ProcessService {
sb.append(processInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getId()).append(Constants.UNDERLINE);
.append(taskInstance.getId()).append(Constants.UNDERLINE)
.append(taskInstance.getWorkerGroup());
if(taskWorkerGroupId > 0){
/*if(StringUtils.isNotBlank(taskWorkerGroup)){
//not to find data from db
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
@ -1012,8 +1013,7 @@ public class ProcessService {
sb.append(ipSb);
}else{
sb.append(Constants.DEFAULT_WORKER_ID);
}
}*/
return sb.toString();
}
@ -1689,24 +1689,24 @@ public class ProcessService {
}
/**
* get task worker group id
* get task worker group
* @param taskInstance taskInstance
* @return workerGroupId
*/
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
public String getTaskWorkerGroup(TaskInstance taskInstance) {
String workerGroup = taskInstance.getWorkerGroup();
if(taskWorkerGroupId > 0){
return taskWorkerGroupId;
if(StringUtils.isNotBlank(workerGroup)){
return workerGroup;
}
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
if(processInstance != null){
return processInstance.getWorkerGroupId();
return processInstance.getWorkerGroup();
}
logger.info("task : {} will use default worker group id", taskInstance.getId());
return Constants.DEFAULT_WORKER_ID;
logger.info("task : {} will use default worker group", taskInstance.getId());
return Constants.DEFAULT_WORKER_GROUP;
}
/**

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -98,7 +98,7 @@ public class ProcessScheduleJob implements Job {
command.setScheduleTime(scheduledFireTime);
command.setStartTime(fireTime);
command.setWarningGroupId(schedule.getWarningGroupId());
command.setWorkerGroupId(schedule.getWorkerGroupId());
command.setWorkerGroup(schedule.getWorkerGroup());
command.setWarningType(schedule.getWarningType());
command.setProcessInstancePriority(schedule.getProcessInstancePriority());

64
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -32,3 +32,67 @@ delimiter ;
SELECT uc_dolphin_T_t_ds_process_definition_A_modify_by();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_definition_A_modify_by();
-- ac_dolphin_T_t_ds_process_instance_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_instance_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_process_instance_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_process_instance ADD COLUMN worker_group varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_process_instance_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_process_instance_A_worker_group();
-- ac_dolphin_T_t_ds_task_instance_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_task_instance_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_task_instance_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN worker_group varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_task_instance_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_task_instance_A_worker_group();
-- ac_dolphin_T_t_ds_process_instance_A_worker_group
delimiter ;
DROP FUNCTION IF EXISTS ac_dolphin_T_t_ds_process_instance_A_worker_group();
delimiter d//
CREATE FUNCTION ac_dolphin_T_t_ds_schedules_A_worker_group() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_schedules'
AND COLUMN_NAME ='worker_group')
THEN
ALTER TABLE t_ds_schedules ADD COLUMN worker_group varchar(255) DEFAULT null;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select ac_dolphin_T_t_ds_schedules_A_worker_group();
DROP FUNCTION ac_dolphin_T_t_ds_schedules_A_worker_group();

Loading…
Cancel
Save