Browse Source

Fix insert command error due to the id is not null (#12092)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
fba5a8eaa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 141
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 135
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

141
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -177,7 +177,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, int testFlag, ComplementDependentMode complementDependentMode) {
int dryRun, int testFlag,
ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@ -201,7 +202,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
logger.error(
"There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
@ -224,15 +226,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup,
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode);
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create);
logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.",
processDefinition.getCode(), create);
putMsg(result, Status.SUCCESS);
} else {
logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", processDefinition.getCode());
logger.error("Start process instance failed because create command error, processDefinitionCode:{}.",
processDefinition.getCode());
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
@ -295,15 +300,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefineCode);
logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode, version);
logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.",
ReleaseState.ONLINE.getDescp(), processDefineCode, version);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
} else if (!checkSubProcessDefinitionValid(processDefinition)) {
// check sub process definition online
logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode);
logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefineCode);
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
@ -398,7 +406,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
if (!checkTenantSuitable(processDefinition)) {
logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
logger.error(
"There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
@ -418,19 +427,23 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag());
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams,
processInstance.getTestFlag());
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag());
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams,
processInstance.getTestFlag());
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag());
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams,
processInstance.getTestFlag());
break;
case STOP:
if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
logger.warn("Process instance status is already {}, processInstanceName:{}.",
WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@ -441,7 +454,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
case PAUSE:
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
logger.warn("Process instance status is already {}, processInstanceName:{}.",
WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@ -450,7 +464,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
break;
default:
logger.warn("Unknown execute type for process instance, processInstanceId:{}.", processInstance.getId());
logger.warn("Unknown execute type for process instance, processInstanceId:{}.",
processInstance.getId());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
@ -465,7 +480,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
}
@ -558,7 +574,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName());
logger.info("Process instance state is updated to {} in database, processInstanceName:{}.",
executionStatus.getDesc(), processInstance.getName());
// directly send the process instance state change event to target master, not guarantee the event send
// success
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
@ -607,7 +624,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
int processVersion, CommandType commandType, String startParams, int testFlag) {
int processVersion, CommandType commandType, String startParams,
int testFlag) {
Map<String, Object> result = new HashMap<>();
// To add startParams only when repeat running is needed
@ -626,7 +644,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag);
if (!processService.verifyIsNeedCreateCommand(command)) {
logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
logger.warn(
"Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
@ -640,8 +659,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion);
putMsg(result, Status.SUCCESS);
} else {
logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId);
logger.error(
"Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion,
instanceId);
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
@ -676,7 +697,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* if there is no online process, exit directly
*/
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
logger.warn("Subprocess definition {} of process definition {} is not {}.", processDefinitionTmp.getName(),
logger.warn("Subprocess definition {} of process definition {} is not {}.",
processDefinitionTmp.getName(),
processDefinition.getName(), ReleaseState.ONLINE.getDescp());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
return result;
@ -759,14 +781,16 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
if (schedule == null || StringUtils.isEmpty(schedule)) {
logger.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp());
logger.error("Create {} type command error because parameter schedule is invalid.",
command.getCommandType().getDescp());
return 0;
}
if (!isValidateScheduleTime(schedule)) {
return 0;
}
try {
logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode);
logger.info("Start to create {} command, processDefinitionCode:{}.",
command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
complementDependentMode);
} catch (CronParseException cronParseException) {
@ -811,16 +835,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
switch (runMode) {
case RUN_MODE_SERIAL: {
logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
if (createCount > 0)
logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
@ -829,37 +856,46 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
if (createCount > 0)
logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
logger.info(
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
} else {
logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
logger.info(
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
}
break;
}
case RUN_MODE_PARALLEL: {
logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (startDate != null && endDate != null) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
List<ZonedDateTime> listDate = new ArrayList<>(
CronUtils.getSelfFireDateList(DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate), schedules));
List<ZonedDateTime> listDate = CronUtils.getSelfFireDateList(
DateUtils.stringToZoneDateTime(startDate),
DateUtils.stringToZoneDateTime(endDate),
schedules);
int listDateSize = listDate.size();
createCount = listDate.size();
if (!CollectionUtils.isEmpty(listDate)) {
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
createCount);
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be assigned to the remaining tasks.
@ -886,13 +922,19 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
logger.info(
"Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
} else {
logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
logger.info(
"Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.",
command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
}
@ -906,15 +948,18 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.",
createCount);
for (List<String> stringDate : Lists.partition(listDate, createCount)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
logger.info("Creating command, commandInfo:{}.", command);
if (processService.createCommand(command) > 0)
logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.info("Create {} command complete, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
else
logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
logger.error("Create {} command error, processDefinitionCode:{}",
command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
}
}
@ -1036,7 +1081,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return false;
}
if (start.isAfter(end)) {
logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", start, end);
logger.error(
"Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.",
start, end);
return false;
}
} catch (Exception ex) {
@ -1078,7 +1125,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
org.apache.dolphinscheduler.remote.command.Command command =
stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
if (command == null) {
logger.error("Query executing process instance from master error, processInstanceId:{}.", processInstanceId);
logger.error("Query executing process instance from master error, processInstanceId:{}.",
processInstanceId);
return null;
}
WorkflowExecutingDataResponseCommand responseCommand =
@ -1126,7 +1174,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
logger.info("Send task execute start command complete, response is {}.", response);
putMsg(result, Status.SUCCESS);
} else {
logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
logger.error(
"Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
projectCode, taskDefinitionCode, taskDefinitionVersion);
putMsg(result, Status.START_TASK_INSTANCE_ERROR);
}

135
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -117,7 +117,19 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -299,7 +311,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
} else
logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
logger.info("Save process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
@ -692,7 +705,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
logger.error("Update task definitions error, projectCode:{}, processCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
logger.error("Update task definitions error, projectCode:{}, processCode:{}.",
processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
@ -728,13 +742,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} else
logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
logger.info("Update process definition complete, processCode:{}, processVersion:{}.",
processDefinition.getCode(), insertVersion);
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
logger.info(
"Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -746,7 +762,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
} else {
logger.info("Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
logger.info(
"Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -763,7 +780,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return true if process definition name not exists, otherwise false
*/
@Override
public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) {
public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name,
long processDefinitionCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@ -806,7 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstances = processInstanceService
.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
logger.warn("Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
logger.warn(
"Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
processInstances.size(), processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
}
@ -819,7 +838,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
logger.warn("Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
logger.warn(
"Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
taskDepDetail, processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
}
@ -852,7 +872,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// Determine if the login user is the owner of the process definition
if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", loginUser.getId(), code);
logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.",
loginUser.getId(), code);
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@ -865,13 +886,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
logger.error("Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", code, scheduleObj.getId());
logger.error(
"Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.",
code, scheduleObj.getId());
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
}
if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
logger.warn("Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
logger.warn(
"Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
@ -885,7 +909,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
logger.warn("The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", code);
logger.warn(
"The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.",
code);
}
deleteOtherRelation(project, result, processDefinition);
putMsg(result, Status.SUCCESS);
@ -936,24 +962,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
code);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
if (updateProcess > 0) {
logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.",
projectCode, code);
if (schedule != null) {
// set status
schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
logger.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
} else
logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
}
@ -1321,7 +1352,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
logger.error("Save process definition error because generate process definition code error, projectCode:{}.", projectCode, e);
logger.error(
"Save process definition error because generate process definition code error, projectCode:{}.",
projectCode, e);
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@ -1344,7 +1377,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (CodeGenerateException e) {
logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode(), e);
logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}",
projectCode, processDefinition.getCode(), e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
}
@ -1353,7 +1387,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode());
logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode,
processDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
@ -1396,7 +1431,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(createDagResult, Status.SUCCESS);
} else {
result.putAll(createDagResult);
logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
@ -1409,13 +1445,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
schedule.setUpdateTime(now);
int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) {
logger.error("Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
logger.error(
"Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.",
projectCode, processDefinition.getCode());
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
}
logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
return true;
}
@ -1992,7 +2031,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
processDefinition.setId(0);
processDefinition.setId(null);
processDefinition.setUserId(loginUser.getId());
processDefinition.setName(getNewName(processDefinition.getName(), COPY_SUFFIX));
final Date date = new Date();
@ -2026,7 +2065,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
otherParamsJson));
} catch (Exception e) {
logger.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e);
logger.error("Copy process definition error, processDefinitionCode from {} to {}.",
oldProcessDefinitionCode, processDefinition.getCode(), e);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
}
@ -2036,7 +2076,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
Lists.newArrayList(), otherParamsJson));
} catch (Exception e) {
logger.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e);
logger.error("Move process definition error, processDefinitionCode:{}.",
processDefinition.getCode(), e);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
}
@ -2092,7 +2133,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
logger.error("Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
logger.error(
"Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.",
projectCode, code);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
@ -2100,18 +2143,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) {
logger.error("Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
logger.error(
"Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
processDefinition.getCode(), version);
return result;
}
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
if (switchVersion <= 0) {
logger.error("Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
logger.error(
"Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
}
logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
return result;
}
@ -2130,16 +2178,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!failedProcessList.isEmpty()) {
String failedProcess = String.join(",", failedProcessList);
if (isCopy) {
logger.error("Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
logger.error(
"Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
} else {
logger.error("Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
logger.error(
"Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
srcProjectCode, targetProjectCode, failedProcess);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
}
} else {
logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", isCopy?"copy":"move", srcProjectCode, targetProjectCode);
logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.",
isCopy ? "copy" : "move", srcProjectCode, targetProjectCode);
putMsg(result, Status.SUCCESS);
}
}
@ -2207,7 +2258,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
if (processDefinition.getVersion() == version) {
logger.warn("Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
logger.warn(
"Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
@ -2215,12 +2267,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
if (deleteLog == 0 || deleteRelationLog == 0) {
logger.error("Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
logger.error(
"Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
deleteOtherRelation(project, result, processDefinition);
logger.info("Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
logger.info(
"Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
putMsg(result, Status.SUCCESS);
}
return result;
@ -2337,7 +2393,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", processDefinition.getCode());
logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.",
processDefinition.getCode());
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
@ -2569,7 +2626,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, scheduleObj.getId());
logger.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, scheduleObj.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -416,6 +416,7 @@ public class ProcessServiceImpl implements ProcessService {
commandParams.put(Constants.SCHEDULE_TIMEZONE, schedule.getTimezoneId());
command.setCommandParam(JSONUtils.toJsonString(commandParams));
}
command.setId(null);
result = commandMapper.insert(command);
}
return result;

Loading…
Cancel
Save