Browse Source

Fix insert command error due to the id is not null (#12092) (#12098)

(cherry picked from commit fba5a8eaa0)
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
1a63f8672a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 111
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 126
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

111
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -198,8 +198,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
processDefinition.getId(), processDefinition.getName());
logger.error(
"There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
@ -226,8 +227,12 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.",
processDefinition.getCode(), create);
putMsg(result, Status.SUCCESS);
} else {
logger.error("Start process instance failed because create command error, processDefinitionCode:{}.",
processDefinition.getCode());
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
@ -288,12 +293,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.",
ReleaseState.ONLINE.getDescp(), processDefineCode, version);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
} else if (!checkSubProcessDefinitionValid(processDefinition)) {
// check sub process definition online
logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefineCode);
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
@ -391,7 +402,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
logger.error(
"There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
@ -433,6 +445,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
case PAUSE:
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
logger.warn("Process instance status is already {}, processInstanceName:{}.",
WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@ -441,7 +455,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
break;
default:
logger.error("unknown execute type : {}", executeType);
logger.warn("Unknown execute type for process instance, processInstanceId:{}.",
processInstance.getId());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
@ -456,6 +471,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
}
@ -548,6 +565,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
logger.info("Process instance state is updated to {} in database, processInstanceName:{}.",
executionStatus.getDesc(), processInstance.getName());
// directly send the process instance state change event to target master, not guarantee the event send
// success
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
@ -612,6 +631,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstanceId(instanceId);
if (!processService.verifyIsNeedCreateCommand(command)) {
logger.warn(
"Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
}
@ -621,6 +643,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (create > 0) {
putMsg(result, Status.SUCCESS);
} else {
logger.error(
"Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion,
instanceId);
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
@ -655,6 +681,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* if there is no online process, exit directly
*/
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
logger.warn("Subprocess definition {} of process definition {} is not {}.",
processDefinitionTmp.getName(),
processDefinition.getName(), ReleaseState.ONLINE.getDescp());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
logger.info("not release process definition id: {} , name : {}", processDefinitionTmp.getId(),
processDefinitionTmp.getName());
@ -736,12 +765,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
if (schedule == null || StringUtils.isEmpty(schedule)) {
logger.error("Create {} type command error because parameter schedule is invalid.",
command.getCommandType().getDescp());
return 0;
}
if (!isValidateScheduleTime(schedule)) {
return 0;
}
try {
logger.info("Start to create {} command, processDefinitionCode:{}.",
command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
complementDependentMode);
} catch (CronParseException cronParseException) {
@ -785,44 +818,65 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
switch (runMode) {
case RUN_MODE_SERIAL: {
logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
if (createCount > 0)
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
createCount = processService.createCommand(command);
if (createCount > 0)
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ "dependent complement data", command.getProcessDefinitionCode());
logger.info(
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
} else {
logger.info(
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
}
break;
}
case RUN_MODE_PARALLEL: {
logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (startDate != null && endDate != null) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
List<ZonedDateTime> listDate = new ArrayList<>(
CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate), schedules));
List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate),
schedules);
int listDateSize = listDate.size();
createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
createCount);
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be assigned to the remaining tasks.
@ -847,14 +901,21 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info(
"process code: {} complement dependent in off mode or schedule's size is 0, skip "
+ "dependent complement data",
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
} else {
logger.info(
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
}
@ -868,11 +929,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
createCount);
for (List<String> stringDate : Lists.partition(listDate, createCount)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processService.createCommand(command);
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
}
}
@ -993,7 +1061,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return false;
}
if (start.isAfter(end)) {
logger.error("complement data error, wrong date start:{} and end date:{} ", start, end);
logger.error(
"Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.",
start, end);
return false;
}
} catch (Exception ex) {
@ -1034,6 +1104,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
org.apache.dolphinscheduler.remote.command.Command command =
stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
if (command == null) {
logger.error("Query executing process instance from master error, processInstanceId:{}.",
processInstanceId);
return null;
}
WorkflowExecutingDataResponseCommand responseCommand =
@ -1080,6 +1152,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (response != null) {
putMsg(result, Status.SUCCESS);
} else {
logger.error(
"Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
projectCode, taskDefinitionCode, taskDefinitionVersion);
putMsg(result, Status.START_TASK_INSTANCE_ERROR);
}
return result;

126
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -303,7 +303,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
} else
logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@ -679,6 +681,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
logger.error("Update task definitions error, projectCode:{}, processCode:{}.",
processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
@ -710,12 +714,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (insertVersion <= 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
} else
logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
logger.info(
"Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
@ -724,6 +733,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
} else {
logger.info(
"Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
}
@ -779,6 +791,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstances = processInstanceService
.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
logger.warn(
"Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
processInstances.size(), processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
}
@ -790,6 +805,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
logger.warn(
"Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
taskDepDetail, processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
}
}
@ -820,6 +838,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// Determine if the login user is the owner of the process definition
if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.",
loginUser.getId(), code);
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@ -832,11 +852,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
logger.error(
"Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.",
code, scheduleObj.getId());
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
}
if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
logger.warn(
"Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
}
@ -848,7 +874,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
logger.warn("The process definition has not relation, it will be delete successfully");
logger.warn(
"The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
code);
}
deleteOtherRelation(project, result, processDefinition);
putMsg(result, Status.SUCCESS);
@ -897,22 +925,31 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
code);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
if (updateProcess > 0 && schedule != null) {
logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}",
projectCode, schedule.getId(), code);
// set status
schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
if (updateProcess > 0) {
logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.",
projectCode, code);
if (schedule != null) {
// set status
schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
logger.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
} else
logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
break;
default:
@ -1272,6 +1309,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
logger.error(
"Save process definition error because generate process definition code error, projectCode:{}.",
projectCode, e);
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@ -1294,7 +1334,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}",
projectCode, processDefinition.getCode(), e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
}
@ -1303,6 +1344,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode,
processDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
@ -1345,6 +1388,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(createDagResult, Status.SUCCESS);
} else {
result.putAll(createDagResult);
logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
@ -1357,10 +1402,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
schedule.setUpdateTime(now);
int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) {
logger.error(
"Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.",
projectCode, processDefinition.getCode());
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
}
logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
return true;
}
@ -1934,7 +1985,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
processDefinition.setId(0);
processDefinition.setId(null);
processDefinition.setUserId(loginUser.getId());
processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX));
final Date date = new Date();
@ -1967,6 +2018,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
otherParamsJson));
} catch (Exception e) {
logger.error("Copy process definition error, processDefinitionCode from {} to {}.",
oldProcessDefinitionCode, processDefinition.getCode(), e);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
}
@ -1975,6 +2028,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
Lists.newArrayList(), otherParamsJson));
} catch (Exception e) {
logger.error("Move process definition error, processDefinitionCode:{}.",
processDefinition.getCode(), e);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
}
@ -2030,6 +2085,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
logger.error(
"Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.",
projectCode, code);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
@ -2037,15 +2095,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) {
logger.error(
"Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
processDefinition.getCode(), version);
return result;
}
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
if (switchVersion <= 0) {
logger.error(
"Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
}
logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
return result;
}
@ -2062,14 +2128,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
if (!failedProcessList.isEmpty()) {
String failedProcess = String.join(",", failedProcessList);
if (isCopy) {
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
String.join(",", failedProcessList));
logger.error(
"Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
} else {
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
String.join(",", failedProcessList));
logger.error(
"Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
}
} else {
logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.",
isCopy ? "copy" : "move", srcProjectCode, targetProjectCode);
putMsg(result, Status.SUCCESS);
}
}
@ -2136,16 +2209,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
if (processDefinition.getVersion() == version) {
logger.warn(
"Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
}
int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
if (deleteLog == 0 || deleteRelationLog == 0) {
logger.error(
"Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
deleteOtherRelation(project, result, processDefinition);
logger.info(
"Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
}
return result;
@ -2255,7 +2337,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.",
processDefinition.getCode());
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
@ -2476,6 +2559,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
logger.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, scheduleObj.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -409,6 +409,7 @@ public class ProcessServiceImpl implements ProcessService {
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
command.setId(null);
result = commandMapper.insert(command);
}
return result;

Loading…
Cancel
Save