Browse Source

to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
3.0.0/version-upgrade
zwZjut 3 years ago committed by GitHub
parent
commit
d09d68e109
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  2. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -68,11 +68,12 @@ public interface ExecutorService {
/**
* check whether the process definition can be executed
*
* @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode);
Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode);
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -153,7 +153,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
result = checkProcessDefinitionValid(processDefinition, processDefinitionCode);
result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -208,14 +208,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* check whether the process definition can be executed
*
* @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
@Override
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode) {
public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
@ -259,7 +260,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionCode());
result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -146,7 +146,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check work flow define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
result = executorService.checkProcessDefinitionValid(processDefinition, processDefineCode);
result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -247,7 +247,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
return result;
}
@ -296,7 +296,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
return result;
}
@ -396,7 +396,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
return result;
}
@ -606,7 +606,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}

Loading…
Cancel
Save