Browse Source

Fix quoted bug about processDefineId of processInstance (#5263)

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
645847c096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  5. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  6. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  7. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  8. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  9. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  10. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  11. 20
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  12. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
  13. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  14. 42
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  15. 1
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -66,10 +66,10 @@ public interface ExecutorService {
* check whether the process definition can be executed
*
* @param processDefinition process definition
* @param processDefineId process definition id
* @param processDefineCode process definition code
* @return check result code
*/
Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId);
Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode);
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -195,18 +195,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* check whether the process definition can be executed
*
* @param processDefinition process definition
* @param processDefineId process definition id
* @param processDefineCode process definition code
* @return check result code
*/
@Override
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, int processDefineId) {
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineId);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefineCode);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
@ -246,7 +246,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionId());
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionCode());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -204,11 +204,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
ProcessData processData = processService.genProcessData(processDefinition);
processInstance.setProcessInstanceJson(JSONUtils.toJsonString(processData));
result.put(DATA_LIST, processInstance);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -117,7 +117,6 @@ public class ExecutorService2Test {
// processInstance
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -241,14 +241,14 @@ public class ProcessInstanceServiceTest {
//project auth success
ProcessInstance processInstance = getProcessInstance();
processInstance.setProcessDefinitionId(46);
putMsg(result, Status.SUCCESS, projectName);
Project project = getProject(projectName);
ProcessDefinition processDefinition = getProcessDefinition();
when(projectMapper.queryByName(projectName)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
when(processService.findProcessDefineById(processInstance.getProcessDefinitionId())).thenReturn(processDefinition);
when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectName, 1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -395,7 +395,6 @@ public class ProcessInstanceServiceTest {
Tenant tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("test_tenant");
when(processService.findProcessDefineById(processInstance.getProcessDefinitionId())).thenReturn(processDefinition);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result);
@ -555,6 +554,8 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_process_instance");
processInstance.setProcessDefinitionCode(46L);
processInstance.setProcessDefinitionVersion(1);
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
return processInstance;
@ -568,6 +569,7 @@ public class ProcessInstanceServiceTest {
private ProcessDefinition getProcessDefinition() {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(46L);
processDefinition.setVersion(1);
processDefinition.setId(46);
processDefinition.setName("test_pdf");
processDefinition.setProjectId(2);

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -48,13 +48,6 @@ public class ProcessInstance {
@TableId(value = "id", type = IdType.AUTO)
private int id;
/**
* process definition id
* TODO delete
*/
@TableField(exist = false)
private int processDefinitionId;
/**
* process definition code
*/
@ -290,14 +283,6 @@ public class ProcessInstance {
this.id = id;
}
public int getProcessDefinitionId() {
return processDefinitionId;
}
public void setProcessDefinitionId(int processDefinitionId) {
this.processDefinitionId = processDefinitionId;
}
public ExecutionStatus getState() {
return state;
}
@ -616,7 +601,6 @@ public class ProcessInstance {
public String toString() {
return "ProcessInstance{"
+ "id=" + id
+ ", processDefinitionId=" + processDefinitionId
+ ", state=" + state
+ ", recovery=" + recovery
+ ", startTime=" + startTime

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -275,7 +275,6 @@ public class ProcessInstanceMapperTest {
processDefinitionMapper.insert(processDefinition);
ProcessInstance processInstance = insertOne();
processInstance.setProcessDefinitionId(processDefinition.getId());
int update = processInstanceMapper.updateById(processInstance);
Long[] projectCodes = new Long[]{processDefinition.getProjectCode()};

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable {
processService.saveProcessInstance(processInstance);
// get schedules
int processDefinitionId = processInstance.getProcessDefinitionId();
int processDefinitionId = processInstance.getProcessDefinition().getId();
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionId(processDefinitionId);
List<Date> listDate = Lists.newLinkedList();
if (!CollectionUtils.isEmpty(schedules)) {
@ -268,7 +268,7 @@ public class MasterExecThread implements Runnable {
}
// get first fire date
Iterator<Date> iterator = null;
Date scheduleDate = null;
Date scheduleDate;
if (!CollectionUtils.isEmpty(listDate)) {
iterator = listDate.iterator();
scheduleDate = iterator.next();
@ -282,9 +282,7 @@ public class MasterExecThread implements Runnable {
}
while (Stopper.isRunning()) {
logger.info("process {} start to complement {} data",
processInstance.getId(), DateUtils.dateToString(scheduleDate));
logger.info("process {} start to complement {} data", processInstance.getId(), DateUtils.dateToString(scheduleDate));
// prepare dag and other info
prepareProcess();
@ -302,8 +300,7 @@ public class MasterExecThread implements Runnable {
endProcess();
// process instance failure ,no more complements
if (!processInstance.getState().typeIsSuccess()) {
logger.info("process {} state {}, complement not completely!",
processInstance.getId(), processInstance.getState());
logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState());
break;
}
// current process instance success ,next execute

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -347,8 +347,7 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size());
//updateProcessInstance host is null and insert into command
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
logger.info("failover process instance id: {} host:{}",
processInstance.getId(), processInstance.getHost());
logger.info("failover process instance id: {} host:{}", processInstance.getId(), processInstance.getHost());
if (Constants.NULL.equals(processInstance.getHost())) {
continue;
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -162,7 +162,6 @@ public class ConditionsTaskTest {
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1000);
processInstance.setProcessDefinitionId(1000);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;

20
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -145,7 +145,7 @@ public class DependentTaskTest {
public void testBasicSuccess() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
@ -168,7 +168,7 @@ public class DependentTaskTest {
public void testBasicFailure() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS);
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
@ -219,9 +219,9 @@ public class DependentTaskTest {
setupTaskInstance(taskNode);
ProcessInstance processInstance200 =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE);
ProcessInstance processInstance300 =
getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS);
getProcessInstanceForFindLastRunningProcess(300, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
@ -276,7 +276,7 @@ public class DependentTaskTest {
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS));
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.SUCCESS));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
@ -289,7 +289,7 @@ public class DependentTaskTest {
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE));
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.FAILURE));
DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
dependentTask.call();
@ -323,7 +323,7 @@ public class DependentTaskTest {
setupTaskInstance(taskNode);
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION);
getProcessInstanceForFindLastRunningProcess(200, ExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
@ -349,7 +349,6 @@ public class DependentTaskTest {
private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;
}
@ -403,12 +402,9 @@ public class DependentTaskTest {
return dependentItem;
}
private ProcessInstance getProcessInstanceForFindLastRunningProcess(
int processInstanceId, int processDefinitionId, ExecutionStatus state
) {
private ProcessInstance getProcessInstanceForFindLastRunningProcess(int processInstanceId, ExecutionStatus state) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(state);
return processInstance;
}

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java

@ -88,7 +88,6 @@ public class MasterExecThreadTest {
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processInstance = mock(ProcessInstance.class);
Mockito.when(processInstance.getProcessDefinitionId()).thenReturn(processDefinitionId);
Mockito.when(processInstance.getState()).thenReturn(ExecutionStatus.SUCCESS);
Mockito.when(processInstance.getHistoryCmd()).thenReturn(CommandType.COMPLEMENT_DATA.toString());
Mockito.when(processInstance.getIsSubProcess()).thenReturn(Flag.NO);

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -130,7 +130,6 @@ public class SubProcessTaskTest {
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(100);
processInstance.setProcessDefinitionId(1);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance;
@ -139,7 +138,6 @@ public class SubProcessTaskTest {
private ProcessInstance getSubProcessInstance(ExecutionStatus executionStatus) {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(102);
processInstance.setProcessDefinitionId(2);
processInstance.setState(executionStatus);
return processInstance;

42
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -236,7 +236,7 @@ public class ProcessService {
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);
this.setSubProcessParam(processInstance);
delCommandById(command.getId());
this.commandMapper.deleteById(command.getId());
return processInstance;
}
@ -250,7 +250,7 @@ public class ProcessService {
public void moveToErrorCommand(Command command, String message) {
ErrorCommand errorCommand = new ErrorCommand(command, message);
this.errorCommandMapper.insert(errorCommand);
delCommandById(command.getId());
this.commandMapper.deleteById(command.getId());
}
/**
@ -538,7 +538,7 @@ public class ProcessService {
processInstance.getTaskDependType(),
processInstance.getFailureStrategy(),
processInstance.getExecutorId(),
processInstance.getProcessDefinitionId(),
processInstance.getProcessDefinition().getId(),
JSONUtils.toJsonString(cmdParam),
processInstance.getWarningType(),
processInstance.getWarningGroupId(),
@ -600,7 +600,7 @@ public class ProcessService {
processInstance.setStartTime(new Date());
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
//processInstance.setProcessDefinitionId(command.getProcessDefinitionId());
processInstance.setCommandParam(command.getCommandParam());
processInstance.setCommandType(command.getCommandType());
processInstance.setIsSubProcess(Flag.NO);
@ -719,7 +719,6 @@ public class ProcessService {
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host) {
ProcessInstance processInstance;
CommandType commandType = command.getCommandType();
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
@ -764,16 +763,13 @@ public class ProcessService {
}
// Recalculate global parameters after rerun.
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime()));
processInstance.setProcessDefinition(processDefinition);
}
processDefinition = processDefineMapper.selectById(processInstance.getProcessDefinitionId());
processInstance.setProcessDefinition(processDefinition);
//reset command parameter
if (processInstance.getCommandParam() != null) {
Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
@ -1345,25 +1341,12 @@ public class ProcessService {
return true;
}
/**
* create a new process instance
*
* @param processInstance processInstance
*/
public void createProcessInstance(ProcessInstance processInstance) {
if (processInstance != null) {
processInstanceMapper.insert(processInstance);
}
}
/**
* insert or update work process instance to data base
*
* @param processInstance processInstance
*/
public void saveProcessInstance(ProcessInstance processInstance) {
if (processInstance == null) {
logger.error("save error, process instance is null!");
return;
@ -1371,7 +1354,7 @@ public class ProcessService {
if (processInstance.getId() != 0) {
processInstanceMapper.updateById(processInstance);
} else {
createProcessInstance(processInstance);
processInstanceMapper.insert(processInstance);
}
}
@ -1425,15 +1408,6 @@ public class ProcessService {
return count > 0;
}
/**
* delete a command by id
*
* @param id id
*/
public void delCommandById(int id) {
commandMapper.deleteById(id);
}
/**
* find task instance by id
*
@ -1772,9 +1746,11 @@ public class ProcessService {
processInstance.setHost(Constants.NULL);
processInstanceMapper.updateById(processInstance);
ProcessDefinition processDefinition = findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
//2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());
cmd.setProcessDefinitionId(processDefinition.getId());
cmd.setCommandParam(String.format("{\"%s\":%d}", Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstance.getId()));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);

1
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -109,7 +109,6 @@ public class ProcessServiceTest {
public void testCreateSubCommand() {
ProcessService processService = new ProcessService();
ProcessInstance parentInstance = new ProcessInstance();
parentInstance.setProcessDefinitionId(1);
parentInstance.setWarningType(WarningType.SUCCESS);
parentInstance.setWarningGroupId(0);

Loading…
Cancel
Save