Browse Source

[optimization]:prevent repeated database updates (#2396)

* sqlTask failed to run

* prevent repeated database updates

* prevent repeated database updates

* prevent repeated database updates

Co-authored-by: songqh <songquanhe@foxmail.com>
Co-authored-by: dailidong <dailidong66@gmail.com>
pull/3/MERGE
songgg 5 years ago committed by gaojun2048
parent
commit
b06c118af6
  1. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -225,20 +225,14 @@ public class ExecutorService extends BaseService{
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
processInstance.setCommandType(CommandType.STOP);
processInstance.addHistoryCmd(CommandType.STOP);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_STOP);
result = updateProcessInstancePrepare(processInstance, CommandType.STOP, ExecutionStatus.READY_STOP);
}
break;
case PAUSE:
if (processInstance.getState() == ExecutionStatus.READY_PAUSE) {
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(), processInstance.getState());
} else {
processInstance.setCommandType(CommandType.PAUSE);
processInstance.addHistoryCmd(CommandType.PAUSE);
processService.updateProcessInstance(processInstance);
result = updateProcessInstanceState(processInstanceId, ExecutionStatus.READY_PAUSE);
result = updateProcessInstancePrepare(processInstance, CommandType.PAUSE, ExecutionStatus.READY_PAUSE);
}
break;
default:
@ -308,22 +302,27 @@ public class ExecutorService extends BaseService{
}
/**
* update process instance state
* prepare to update process instance command type and status
*
* @param processInstanceId process instance id
* @param processInstance process instance
* @param commandType command type
* @param executionStatus execute status
* @return update result
*/
private Map<String, Object> updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
private Map<String, Object> updateProcessInstancePrepare(ProcessInstance processInstance, CommandType commandType, ExecutionStatus executionStatus) {
Map<String, Object> result = new HashMap<>(5);
int update = processService.updateProcessInstanceState(processInstanceId, executionStatus);
processInstance.setCommandType(commandType);
processInstance.addHistoryCmd(commandType);
processInstance.setState(executionStatus);
int update = processService.updateProcessInstance(processInstance);
// determine whether the process is normal
if (update > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -881,7 +881,7 @@ public class MasterExecThread implements Runnable {
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
processInstance.setState(state);
ProcessInstance instance = processService.findProcessInstanceById(processInstance.getId());
instance.setState(state);
instance.setProcessDefinition(processInstance.getProcessDefinition());

Loading…
Cancel
Save