From fba5a8eaa0945f399733275341dfdcec11ab6b3d Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 22 Sep 2022 09:52:32 +0800 Subject: [PATCH] Fix insert command error due to the id is not null (#12092) --- .../api/service/impl/ExecutorServiceImpl.java | 141 ++++++++++++------ .../impl/ProcessDefinitionServiceImpl.java | 135 ++++++++++++----- .../service/process/ProcessServiceImpl.java | 1 + 3 files changed, 193 insertions(+), 84 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 7524a0c15f..32de874b1d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -177,7 +177,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun, int testFlag, ComplementDependentMode complementDependentMode) { + int dryRun, int testFlag, + ComplementDependentMode complementDependentMode) { Project project = projectMapper.queryByCode(projectCode); // check user access for project Map result = @@ -201,7 +202,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } if (!checkTenantSuitable(processDefinition)) { - logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.", + logger.error( + "There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.", processDefinition.getCode(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); return result; @@ -224,15 +226,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, - environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode); + environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, + complementDependentMode); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); processDefinitionMapper.updateById(processDefinition); - logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create); + logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", + processDefinition.getCode(), create); putMsg(result, Status.SUCCESS); } else { - logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", processDefinition.getCode()); + logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", + processDefinition.getCode()); putMsg(result, Status.START_PROCESS_INSTANCE_ERROR); } return result; @@ -295,15 +300,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Map result = new HashMap<>(); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { // check process definition exists - logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefineCode); + logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefineCode); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode)); } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { // check process definition online - logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode, version); + logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", + ReleaseState.ONLINE.getDescp(), processDefineCode, version); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version); } else if (!checkSubProcessDefinitionValid(processDefinition)) { // check sub process definition online - logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode); + logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", + ReleaseState.ONLINE.getDescp(), processDefineCode); putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE); } else { result.put(Constants.STATUS, Status.SUCCESS); @@ -398,7 +406,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } if (!checkTenantSuitable(processDefinition)) { - logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ", + logger.error( + "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.TENANT_NOT_SUITABLE); } @@ -418,19 +427,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag()); + processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, + processInstance.getTestFlag()); break; case RECOVER_SUSPENDED_PROCESS: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag()); + processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, + processInstance.getTestFlag()); break; case START_FAILURE_TASK_PROCESS: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag()); + processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, + processInstance.getTestFlag()); break; case STOP: if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) { - logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); + logger.warn("Process instance status is already {}, processInstanceName:{}.", + WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { @@ -441,7 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ break; case PAUSE: if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) { - logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); + logger.warn("Process instance status is already {}, processInstanceName:{}.", + WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName()); putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState()); } else { @@ -450,7 +464,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } break; default: - logger.warn("Unknown execute type for process instance, processInstanceId:{}.", processInstance.getId()); + logger.warn("Unknown execute type for process instance, processInstanceId:{}.", + processInstance.getId()); putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type"); break; @@ -465,7 +480,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // check process instance exist ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId()); if (processInstance == null) { - logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId()); + logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", + taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId()); putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()); return result; } @@ -558,7 +574,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // determine whether the process is normal if (update > 0) { - logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName()); + logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", + executionStatus.getDesc(), processInstance.getName()); // directly send the process instance state change event to target master, not guarantee the event send // success WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand( @@ -607,7 +624,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return insert result code */ private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, - int processVersion, CommandType commandType, String startParams, int testFlag) { + int processVersion, CommandType commandType, String startParams, + int testFlag) { Map result = new HashMap<>(); // To add startParams only when repeat running is needed @@ -626,7 +644,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setProcessInstanceId(instanceId); command.setTestFlag(testFlag); if (!processService.verifyIsNeedCreateCommand(command)) { - logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", + logger.warn( + "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", processDefinitionCode, processVersion, instanceId); putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode)); return result; @@ -640,8 +659,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion); putMsg(result, Status.SUCCESS); } else { - logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", - command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId); + logger.error( + "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", + command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, + instanceId); putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR); } @@ -676,7 +697,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * if there is no online process, exit directly */ if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) { - logger.warn("Subprocess definition {} of process definition {} is not {}.", processDefinitionTmp.getName(), + logger.warn("Subprocess definition {} of process definition {} is not {}.", + processDefinitionTmp.getName(), processDefinition.getName(), ReleaseState.ONLINE.getDescp()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName()); return result; @@ -759,14 +781,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // determine whether to complement if (commandType == CommandType.COMPLEMENT_DATA) { if (schedule == null || StringUtils.isEmpty(schedule)) { - logger.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp()); + logger.error("Create {} type command error because parameter schedule is invalid.", + command.getCommandType().getDescp()); return 0; } if (!isValidateScheduleTime(schedule)) { return 0; } try { - logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode); + logger.info("Start to create {} command, processDefinitionCode:{}.", + command.getCommandType().getDescp(), processDefineCode); return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber, complementDependentMode); } catch (CronParseException cronParseException) { @@ -811,16 +835,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ } switch (runMode) { case RUN_MODE_SERIAL: { - logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); if (StringUtils.isNotEmpty(dateList)) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); createCount = processService.createCommand(command); if (createCount > 0) - logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("Create {} command complete, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); else - logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.error("Create {} command error, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); } if (startDate != null && endDate != null) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate); @@ -829,37 +856,46 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ logger.info("Creating command, commandInfo:{}.", command); createCount = processService.createCommand(command); if (createCount > 0) - logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("Create {} command complete, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); else - logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.error("Create {} command error, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); // dependent process definition List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( command.getProcessDefinitionCode()); if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode()); + logger.info( + "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); } else { - logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode()); + logger.info( + "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); } } break; } case RUN_MODE_PARALLEL: { - logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); if (startDate != null && endDate != null) { List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( command.getProcessDefinitionCode()); - List listDate = new ArrayList<>( - CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate), - DateUtils.stringToZoneDateTime(endDate), schedules)); + List listDate = CronUtils.getSelfFireDateList( + DateUtils.stringToZoneDateTime(startDate), + DateUtils.stringToZoneDateTime(endDate), + schedules); int listDateSize = listDate.size(); createCount = listDate.size(); if (!CollectionUtils.isEmpty(listDate)) { if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { createCount = Math.min(createCount, expectedParallelismNumber); } - logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount); + logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", + createCount); // Distribute the number of tasks equally to each command. // The last command with insufficient quantity will be assigned to the remaining tasks. @@ -886,13 +922,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); if (processService.createCommand(command) > 0) - logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("Create {} command complete, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); else - logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.error("Create {} command error, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) { - logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode()); + logger.info( + "Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); } else { - logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode()); + logger.info( + "Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", + command.getProcessDefinitionCode()); dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command); } @@ -906,15 +948,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ if (expectedParallelismNumber != null && expectedParallelismNumber != 0) { createCount = Math.min(createCount, expectedParallelismNumber); } - logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount); + logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", + createCount); for (List stringDate : Lists.partition(listDate, createCount)) { cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate)); command.setCommandParam(JSONUtils.toJsonString(cmdParam)); logger.info("Creating command, commandInfo:{}.", command); if (processService.createCommand(command) > 0) - logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.info("Create {} command complete, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); else - logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode()); + logger.error("Create {} command error, processDefinitionCode:{}", + command.getCommandType().getDescp(), command.getProcessDefinitionCode()); } } } @@ -1036,7 +1081,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return false; } if (start.isAfter(end)) { - logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", start, end); + logger.error( + "Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", + start, end); return false; } } catch (Exception ex) { @@ -1078,7 +1125,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command()); if (command == null) { - logger.error("Query executing process instance from master error, processInstanceId:{}.", processInstanceId); + logger.error("Query executing process instance from master error, processInstanceId:{}.", + processInstanceId); return null; } WorkflowExecutingDataResponseCommand responseCommand = @@ -1126,7 +1174,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ logger.info("Send task execute start command complete, response is {}.", response); putMsg(result, Status.SUCCESS); } else { - logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", + logger.error( + "Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", projectCode, taskDefinitionCode, taskDefinitionVersion); putMsg(result, Status.START_TASK_INSTANCE_ERROR); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 6a172d4936..c3569a5e00 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -117,7 +117,19 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -299,7 +311,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro logger.error("Save process definition error, processCode:{}.", processDefinition.getCode()); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); } else - logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion); + logger.info("Save process definition complete, processCode:{}, processVersion:{}.", + processDefinition.getCode(), insertVersion); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); @@ -692,7 +705,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro logger.info("The task has not changed, so skip"); } if (saveTaskResult == Constants.DEFINITION_FAILURE) { - logger.error("Update task definitions error, projectCode:{}, processCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode()); + logger.error("Update task definitions error, projectCode:{}, processCode:{}.", + processDefinition.getProjectCode(), processDefinition.getCode()); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } @@ -728,13 +742,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } else - logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion); + logger.info("Update process definition complete, processCode:{}, processVersion:{}.", + processDefinition.getCode(), insertVersion); taskUsedInOtherTaskValid(processDefinition, taskRelationList); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { - logger.info("Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", + logger.info( + "Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -746,7 +762,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); } else { - logger.info("Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.", + logger.info( + "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.", processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion()); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -763,7 +780,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return true if process definition name not exists, otherwise false */ @Override - public Map verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) { + public Map verifyProcessDefinitionName(User loginUser, long projectCode, String name, + long processDefinitionCode) { Project project = projectMapper.queryByCode(projectCode); // check user access for project Map result = @@ -806,7 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List processInstances = processInstanceService .queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(processInstances)) { - logger.warn("Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}", + logger.warn( + "Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}", processInstances.size(), processDefinition.getCode()); throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size()); } @@ -819,7 +838,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro .map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(), task.getTaskName())) .collect(Collectors.joining(Constants.COMMA)); - logger.warn("Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}", + logger.warn( + "Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}", taskDepDetail, processDefinition.getCode()); throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail); } @@ -852,7 +872,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // Determine if the login user is the owner of the process definition if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { - logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", loginUser.getId(), code); + logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", + loginUser.getId(), code); putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -865,13 +886,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) { int delete = scheduleMapper.deleteById(scheduleObj.getId()); if (delete == 0) { - logger.error("Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", code, scheduleObj.getId()); + logger.error( + "Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", + code, scheduleObj.getId()); putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR); } } if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) { - logger.warn("Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.", + logger.warn( + "Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.", ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId()); putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId()); return result; @@ -885,7 +909,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); if (deleteRelation == 0) { - logger.warn("The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", code); + logger.warn( + "The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", + code); } deleteOtherRelation(project, result, processDefinition); putMsg(result, Status.SUCCESS); @@ -936,24 +962,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } processDefinition.setReleaseState(releaseState); processDefinitionMapper.updateById(processDefinition); - logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, code); + logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, + code); break; case OFFLINE: processDefinition.setReleaseState(releaseState); int updateProcess = processDefinitionMapper.updateById(processDefinition); Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code); if (updateProcess > 0) { - logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", projectCode, code); + logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", + projectCode, code); if (schedule != null) { // set status schedule.setReleaseState(releaseState); int updateSchedule = scheduleMapper.updateById(schedule); if (updateSchedule == 0) { - logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId()); + logger.error( + "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", + projectCode, code, schedule.getId()); putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); } else - logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId()); + logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", + projectCode, code, schedule.getId()); schedulerService.deleteSchedule(project.getId(), schedule.getId()); } } @@ -1321,7 +1352,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro try { processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } catch (CodeGenerateException e) { - logger.error("Save process definition error because generate process definition code error, projectCode:{}.", projectCode, e); + logger.error( + "Save process definition error because generate process definition code error, projectCode:{}.", + projectCode, e); putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); return false; } @@ -1344,7 +1377,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro taskCodeMap.put(taskDefinitionLog.getCode(), code); taskDefinitionLog.setCode(code); } catch (CodeGenerateException e) { - logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode(), e); + logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", + projectCode, processDefinition.getCode(), e); putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); return false; } @@ -1353,7 +1387,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList); int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList); if ((logInsert & insert) == 0) { - logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode()); + logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, + processDefinition.getCode()); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } @@ -1396,7 +1431,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(createDagResult, Status.SUCCESS); } else { result.putAll(createDagResult); - logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode()); + logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefinition.getCode()); throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); } @@ -1409,13 +1445,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro schedule.setUpdateTime(now); int scheduleInsert = scheduleMapper.insert(schedule); if (0 == scheduleInsert) { - logger.error("Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode()); + logger.error( + "Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", + projectCode, processDefinition.getCode()); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); } } - logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode()); + logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, + processDefinition.getCode()); return true; } @@ -1992,7 +2031,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS); } - processDefinition.setId(0); + processDefinition.setId(null); processDefinition.setUserId(loginUser.getId()); processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX)); final Date date = new Date(); @@ -2026,7 +2065,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson)); } catch (Exception e) { - logger.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e); + logger.error("Copy process definition error, processDefinitionCode from {} to {}.", + oldProcessDefinitionCode, processDefinition.getCode(), e); putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR); } @@ -2036,7 +2076,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, Lists.newArrayList(), otherParamsJson)); } catch (Exception e) { - logger.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e); + logger.error("Move process definition error, processDefinitionCode:{}.", + processDefinition.getCode(), e); putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR); } @@ -2092,7 +2133,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) { - logger.error("Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, code); + logger.error( + "Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", + projectCode, code); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code); return result; } @@ -2100,18 +2143,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version); if (Objects.isNull(processDefinitionLog)) { - logger.error("Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); + logger.error( + "Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", + projectCode, code, version); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version); return result; } int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); if (switchVersion <= 0) { - logger.error("Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); + logger.error( + "Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", + projectCode, code, version); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); } - logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); + logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", + projectCode, code, version); putMsg(result, Status.SUCCESS); return result; } @@ -2130,16 +2178,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (!failedProcessList.isEmpty()) { String failedProcess = String.join(",", failedProcessList); if (isCopy) { - logger.error("Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", + logger.error( + "Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", srcProjectCode, targetProjectCode, failedProcess); putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess); } else { - logger.error("Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", + logger.error( + "Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.", srcProjectCode, targetProjectCode, failedProcess); putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess); } } else { - logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", isCopy?"copy":"move", srcProjectCode, targetProjectCode); + logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", + isCopy ? "copy" : "move", srcProjectCode, targetProjectCode); putMsg(result, Status.SUCCESS); } } @@ -2207,7 +2258,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); } else { if (processDefinition.getVersion() == version) { - logger.warn("Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.", + logger.warn( + "Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); putMsg(result, Status.MAIN_TABLE_USING_VERSION); return result; @@ -2215,12 +2267,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version); int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version); if (deleteLog == 0 || deleteRelationLog == 0) { - logger.error("Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); + logger.error( + "Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", + projectCode, code, version); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } deleteOtherRelation(project, result, processDefinition); - logger.info("Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); + logger.info( + "Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", + projectCode, code, version); putMsg(result, Status.SUCCESS); } return result; @@ -2337,7 +2393,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Date now = new Date(); scheduleObj.setProcessDefinitionCode(processDefinition.getCode()); if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) { - logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", processDefinition.getCode()); + logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", + processDefinition.getCode()); putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); return result; } @@ -2569,7 +2626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro scheduleObj.setReleaseState(ReleaseState.OFFLINE); int updateSchedule = scheduleMapper.updateById(scheduleObj); if (updateSchedule == 0) { - logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, scheduleObj.getId()); + logger.error( + "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", + projectCode, code, scheduleObj.getId()); putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 09ccbd7d10..6bb38004c1 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -416,6 +416,7 @@ public class ProcessServiceImpl implements ProcessService { commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId()); command.setCommandParam(JSONUtils.toJsonString(commandParams)); } + command.setId(null); result = commandMapper.insert(command); } return result;