Browse Source

[Fix-10854] Fix database restart may lost task instance status (#10866)

* Fix database update error doesn't rollback the task instance status

* Fix database error may cause workflow dead with running status
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
f639a2eed4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
  3. 79
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
  4. 43
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java
  5. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  6. 120
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java
  7. 88
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java
  8. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java
  9. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java
  10. 34
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java
  11. 77
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
  12. 117
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
  13. 118
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java
  14. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java
  15. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java
  16. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java
  17. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java
  18. 48
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java
  19. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java
  20. 86
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java
  21. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
  22. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  23. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  24. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  25. 210
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  26. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  27. 183
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  28. 108
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java
  29. 240
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  30. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java
  31. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  32. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java
  33. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  34. 14
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -21,7 +21,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
public enum StateEventType {
PROCESS_STATE_CHANGE(0, "process statechange"),
PROCESS_STATE_CHANGE(0, "process state change"),
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.common.enums;
public enum Event {
public enum TaskEventType {
DISPATCH,
DELAY,
RUNNING,

79
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
public class TaskInstanceUtils {
/**
* Copy the property of given source {@link TaskInstance} to target.
*
* @param source Given task instance, copy from.
* @param target Given task instance, copy to
* @return a soft copy of given task instance.
*/
public static void copyTaskInstance(TaskInstance source, TaskInstance target) {
target.setId(source.getId());
target.setName(source.getName());
target.setTaskType(source.getTaskType());
target.setProcessInstanceId(source.getProcessInstanceId());
target.setTaskCode(source.getTaskCode());
target.setTaskDefinitionVersion(source.getTaskDefinitionVersion());
target.setProcessInstanceName(source.getProcessInstanceName());
target.setTaskGroupPriority(source.getTaskGroupPriority());
target.setState(source.getState());
target.setFirstSubmitTime(source.getFirstSubmitTime());
target.setSubmitTime(source.getSubmitTime());
target.setStartTime(source.getStartTime());
target.setEndTime(source.getEndTime());
target.setHost(source.getHost());
target.setExecutePath(source.getExecutePath());
target.setLogPath(source.getLogPath());
target.setRetryTimes(source.getRetryTimes());
target.setAlertFlag(source.getAlertFlag());
target.setProcessInstance(source.getProcessInstance());
target.setProcessDefine(source.getProcessDefine());
target.setTaskDefine(source.getTaskDefine());
target.setPid(source.getPid());
target.setAppLink(source.getAppLink());
target.setFlag(source.getFlag());
target.setDependency(source.getDependency());
// todo: we need to cpoy the task params and then copy switchDependency, since the setSwitchDependency rely on task params, this is really a very bad practice.
target.setTaskParams(source.getTaskParams());
target.setSwitchDependency(source.getSwitchDependency());
target.setDuration(source.getDuration());
target.setMaxRetryTimes(source.getMaxRetryTimes());
target.setRetryInterval(source.getRetryInterval());
target.setTaskInstancePriority(source.getTaskInstancePriority());
target.setDependentResult(source.getDependentResult());
target.setWorkerGroup(source.getWorkerGroup());
target.setEnvironmentCode(source.getEnvironmentCode());
target.setEnvironmentConfig(source.getEnvironmentConfig());
target.setExecutorId(source.getExecutorId());
target.setVarPool(source.getVarPool());
target.setExecutorName(source.getExecutorName());
target.setResources(source.getResources());
target.setDelayTime(source.getDelayTime());
target.setDryRun(source.getDryRun());
target.setTaskGroupId(source.getTaskGroupId());
target.setCpuQuota(source.getCpuQuota());
target.setMemoryMax(source.getMemoryMax());
}
}

43
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtilsTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Date;
import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class TaskInstanceUtilsTest {
@Test
void copyTaskInstance() {
TaskInstance source = new TaskInstance();
source.setId(1);
source.setName("source");
source.setSubmitTime(new Date());
source.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
TaskInstance target = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(source, target);
Assertions.assertEquals(target.getId(), source.getId());
Assertions.assertEquals(target.getName(), source.getName());
}
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
@ -59,7 +59,7 @@ public class MasterServer implements IStoppable {
private TaskPluginManager taskPluginManager;
@Autowired
private MasterSchedulerService masterSchedulerService;
private MasterSchedulerBootstrap masterSchedulerBootstrap;
@Autowired
private SchedulerApi schedulerApi;
@ -94,8 +94,8 @@ public class MasterServer implements IStoppable {
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
this.masterSchedulerService.init();
this.masterSchedulerService.start();
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();
this.failoverExecuteThread.start();
@ -130,7 +130,7 @@ public class MasterServer implements IStoppable {
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
// close
this.schedulerApi.close();
this.masterSchedulerService.close();
this.masterSchedulerBootstrap.close();
this.masterRPCServer.close();
this.masterRegistryClient.closeRegistry();
// close spring Context and will invoke method with @PreDestroy annotation to destroy beans.

120
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskDelayEventHandler implements TaskEventHandler {
private final Logger logger = LoggerFactory.getLogger(TaskDelayEventHandler.class);
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private ProcessService processService;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError("Cannot find related workflow instance from cache");
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
sendAckToWorker(taskEvent);
return;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn(
"The current task status is: {}, will not handle the running event, this event is delay, will discard this event: {}",
taskInstance.getState(),
taskEvent);
sendAckToWorker(taskEvent);
return;
}
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
try {
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
if (!processService.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
if (ex instanceof TaskEventHandleError) {
throw ex;
}
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed", ex);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.DELAY;
}
}

88
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDispatchEventHandler.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskDispatchEventHandler implements TaskEventHandler {
private final Logger logger = LoggerFactory.getLogger(TaskDispatchEventHandler.class);
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private ProcessService processService;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
throw new TaskEventHandleError("Cannot find related workflow instance from cache");
}
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId)
.orElseThrow(() -> new TaskEventHandleError("Cannot find related taskInstance from cache"));
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
logger.warn(
"The current taskInstance status is not SUBMITTED_SUCCESS, so the dispatch event will be discarded, the current is a delay event, event: {}",
taskEvent);
return;
}
// todo: we need to just log the old status and rollback these two field, no need to copy all fields
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
// update the taskInstance status
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
try {
if (!processService.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed");
}
} catch (Exception ex) {
// rollback status
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
if (ex instanceof TaskEventHandleError) {
throw ex;
}
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex);
}
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.DISPATCH;
}
}

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleError.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public class TaskEventHandleError extends Exception {
public TaskEventHandleError(String message) {
super(message);
}
public TaskEventHandleError(String message, Throwable throwable) {
super(message, throwable);
}
}

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandleException.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public class TaskEventHandleException extends Exception {
public TaskEventHandleException(String message) {
super(message);
}
public TaskEventHandleException(String message, Throwable throwable) {
super(message, throwable);
}
}

34
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskEventHandler.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
public interface TaskEventHandler {
/**
* Handle the task event
*
* @throws TaskEventHandleError this exception means we will discord this event.
* @throws TaskEventHandleException this exception means we need to retry this event
*/
void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException;
TaskEventType getHandleEventType();
}

77
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task reject event error, cannot find related workflow instance from cache, will discard this event");
}
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
sendAckToWorker(taskEvent);
return new TaskEventHandleError(
"Handle task reject event error, cannot find the taskInstance from cache, will discord this event");
});
try {
// todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple,
// we need to control the worker overload by master rather than worker
// if the task resubmit and the worker failover, this task may be dispatch twice?
// todo: we need to clear the taskInstance host and rollback the status to submit.
workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
sendAckToWorker(taskEvent);
} catch (Exception ex) {
throw new TaskEventHandleError("Handle task reject event error", ex);
}
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskRecallAckCommand taskRecallAckCommand =
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.WORKER_REJECT;
}
}

117
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private DataQualityResultOperator dataQualityResultOperator;
@Autowired
private ProcessService processService;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find related workflow instance from cache, will discard this event");
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find the taskInstance from cache, will discord this event");
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, the task instance is already finished, will discord this event");
}
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
try {
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.updateTaskInstance(taskInstance);
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.RESULT;
}
}

118
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TaskRunningEventHandler implements TaskEventHandler {
private final Logger logger = LoggerFactory.getLogger(TaskRunningEventHandler.class);
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private ProcessService processService;
@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task running event error, cannot find related workflow instance from cache, will discard this event");
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle running event error, cannot find the taskInstance from cache, will discord this event");
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task running event error, this task instance is already finished, this event is delay, will discard this event");
}
TaskInstance oldTaskInstance = new TaskInstance();
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance);
try {
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
if (!processService.updateTaskInstance(taskInstance)) {
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed");
}
sendAckToWorker(taskEvent);
} catch (Exception ex) {
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance);
if (ex instanceof TaskEventHandleError) {
throw ex;
}
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
@Override
public TaskEventType getHandleEventType() {
return TaskEventType.RUNNING;
}
}

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEvent.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class WorkflowEvent {
private WorkflowEventType workflowEventType;
private int workflowInstanceId;
}

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleError.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public class WorkflowEventHandleError extends Exception {
public WorkflowEventHandleError(String message) {
super(message);
}
public WorkflowEventHandleError(String message, Throwable throwable) {
super(message, throwable);
}
}

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandleException.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public class WorkflowEventHandleException extends Exception {
public WorkflowEventHandleException(String message) {
super(message);
}
public WorkflowEventHandleException(String message, Throwable throwable) {
super(message, throwable);
}
}

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventHandler.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public interface WorkflowEventHandler {
/**
* Handle a workflow event,
*
* @throws WorkflowEventHandleError if this exception happen, means the event is broken, need to drop this event.
* @throws WorkflowEventHandleException if this exception happen, means we need to retry this event.
*/
void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError, WorkflowEventHandleException;
WorkflowEventType getHandleWorkflowEventType();
}

48
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventQueue.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class WorkflowEventQueue {
private final Logger logger = LoggerFactory.getLogger(WorkflowEventQueue.class);
private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue = new LinkedBlockingQueue<>();
/**
* Add a workflow event.
*/
public void addEvent(WorkflowEvent workflowEvent) {
workflowEventQueue.add(workflowEvent);
logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent);
}
/**
* Pool the head of the workflow event queue and wait an workflow event.
*/
public WorkflowEvent poolEvent() throws InterruptedException {
return workflowEventQueue.take();
}
}

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowEventType.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
public enum WorkflowEventType {
START_WORKFLOW,
;
}

86
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WorkflowStartEventHandler implements WorkflowEventHandler {
private final Logger logger = LoggerFactory.getLogger(WorkflowStartEventHandler.class);
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private WorkflowEventQueue workflowEventQueue;
@Override
public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstanceMetrics.incProcessInstanceSubmit();
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
// submit failed will resend the event to workflow event queue
logger.info("Success submit the workflow instance");
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else {
logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
}
});
}
@Override
public WorkflowEventType getHandleWorkflowEventType() {
return WorkflowEventType.START_WORKFLOW;
}
}

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java

@ -41,15 +41,14 @@ public class WorkflowStateEventHandler implements StateEventHandler {
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
logger.info("process:{} state {} change to {}",
processInstance.getId(),
processInstance.getState(),
stateEvent.getExecutionStatus());
logger.info(
"Handle workflow instance state event, the current workflow instance state {} will be changed to {}",
processInstance.getState(), stateEvent.getExecutionStatus());
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
.typeIsSerialPriority()) {
.typeIsSerialPriority()) {
workflowExecuteRunnable.endProcess();
return true;
}

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -26,6 +25,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.slf4j.Logger;
@ -67,11 +67,12 @@ public class StateEventProcessor implements NettyRequestProcessor {
stateEvent.setType(type);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
stateEvent.getTaskInstanceId());
logger.info("Received state event change command, event: {}", stateEvent);
stateEventResponseService.addResponse(stateEvent);
}finally {
logger.info("Received state change command, event: {}", stateEvent);
stateEventResponseService.addStateChangeEvent(stateEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -42,9 +42,6 @@ import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
/**
* task manager
*/
@Component
public class StateEventResponseService {
@ -96,7 +93,7 @@ public class StateEventResponseService {
/**
* put task to attemptQueue
*/
public void addResponse(StateEvent stateEvent) {
public void addStateChangeEvent(StateEvent stateEvent) {
try {
// check the event is validated
eventQueue.put(stateEvent);
@ -154,6 +151,7 @@ public class StateEventResponseService {
}
WorkflowExecuteRunnable workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
// We will refresh the task instance status first, if the refresh failed the event will not be removed
switch (stateEvent.getType()) {
case TASK_STATE_CHANGE:
workflowExecuteThread.refreshTaskInstance(stateEvent.getTaskInstanceId());

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
@ -83,7 +83,7 @@ public class TaskEvent {
/**
* ack / response
*/
private Event event;
private TaskEventType event;
/**
* varPool
@ -102,7 +102,7 @@ public class TaskEvent {
event.setProcessInstanceId(processInstanceId);
event.setTaskInstanceId(taskInstanceId);
event.setWorkerAddress(workerAddress);
event.setEvent(Event.DISPATCH);
event.setEvent(TaskEventType.DISPATCH);
return event;
}
@ -116,7 +116,7 @@ public class TaskEvent {
event.setLogPath(command.getLogPath());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setEvent(Event.RUNNING);
event.setEvent(TaskEventType.RUNNING);
return event;
}
@ -134,7 +134,7 @@ public class TaskEvent {
event.setVarPool(command.getVarPool());
event.setChannel(channel);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setEvent(Event.RESULT);
event.setEvent(TaskEventType.RESULT);
return event;
}
@ -143,7 +143,7 @@ public class TaskEvent {
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
event.setChannel(channel);
event.setEvent(Event.WORKER_REJECT);
event.setEvent(TaskEventType.WORKER_REJECT);
return event;
}
}

210
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -17,29 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import java.util.Optional;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
* task execute thread
*/
@ -51,34 +40,37 @@ public class TaskExecuteRunnable implements Runnable {
private final ConcurrentLinkedQueue<TaskEvent> events = new ConcurrentLinkedQueue<>();
private ProcessService processService;
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private DataQualityResultOperator dataQualityResultOperator;
private final Map<TaskEventType, TaskEventHandler> taskEventHandlerMap;
public TaskExecuteRunnable(int processInstanceId, ProcessService processService, WorkflowExecuteThreadPool workflowExecuteThreadPool,
ProcessInstanceExecCacheManager processInstanceExecCacheManager, DataQualityResultOperator dataQualityResultOperator) {
public TaskExecuteRunnable(int processInstanceId, Map<TaskEventType, TaskEventHandler> taskEventHandlerMap) {
this.processInstanceId = processInstanceId;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.dataQualityResultOperator = dataQualityResultOperator;
this.taskEventHandlerMap = taskEventHandlerMap;
}
@Override
public void run() {
while (!this.events.isEmpty()) {
// we handle the task event belongs to one task serial, so if the event comes in wrong order,
TaskEvent event = this.events.peek();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
persist(event);
} catch (Exception e) {
logger.error("persist task event error, event:{}", event, e);
logger.info("Handle task event begin: {}", event);
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
events.remove(event);
logger.info("Handle task event finished: {}", event);
} catch (TaskEventHandleException taskEventHandleException) {
// we don't need to resubmit this event, since the worker will resubmit this event
logger.error("Handle task event failed, this event will be retry later, event: {}", event,
taskEventHandleException);
} catch (TaskEventHandleError taskEventHandleError) {
logger.error("Handle task event error, this event will be removed, event: {}", event,
taskEventHandleError);
events.remove(event);
} catch (Exception unknownException) {
logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}",
event, unknownException);
events.remove(event);
} finally {
this.events.remove(event);
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
@ -109,156 +101,4 @@ public class TaskExecuteRunnable implements Runnable {
return this.events.add(event);
}
/**
* persist task event
*
* @param taskEvent taskEvent
*/
private void persist(TaskEvent taskEvent) throws Exception {
Event event = taskEvent.getEvent();
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
Optional<TaskInstance> taskInstance;
WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable != null && workflowExecuteRunnable.checkTaskInstanceById(taskInstanceId)) {
taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
} else {
taskInstance = Optional.ofNullable(processService.findTaskInstanceById(taskInstanceId));
}
boolean needToSendEvent = true;
switch (event) {
case DISPATCH:
needToSendEvent = handleDispatchEvent(taskEvent, taskInstance);
// dispatch event do not need to submit state event
break;
case DELAY:
case RUNNING:
needToSendEvent = handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
needToSendEvent = handleResultEvent(taskEvent, taskInstance);
break;
case WORKER_REJECT:
needToSendEvent =
handleWorkerRejectEvent(taskEvent.getChannel(), taskInstance, workflowExecuteRunnable);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
if (!needToSendEvent) {
logger.info("Handle task event: {} success, there is no need to send a StateEvent", taskEvent);
return;
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
* handle dispatch event
*/
private boolean handleDispatchEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
if (!taskInstanceOptional.isPresent()) {
logger.error("taskInstance is null");
return false;
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
return false;
}
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
processService.saveTaskInstance(taskInstance);
return true;
}
/**
* handle running event
*/
private boolean handleRunningEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}",
taskInstance.getId(),
taskInstance.getState());
return false;
} else {
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
processService.saveTaskInstance(taskInstance);
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
// send ack to worker
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
return true;
}
/**
* handle result event
*/
private boolean handleResultEvent(TaskEvent taskEvent, Optional<TaskInstance> taskInstanceOptional) {
Channel channel = taskEvent.getChannel();
if (taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().typeIsFinished()) {
logger.warn("The current taskInstance has already been finished, taskEvent: {}", taskEvent);
return false;
}
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.saveTaskInstance(taskInstance);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
return true;
}
/**
* handle result event
*/
private boolean handleWorkerRejectEvent(Channel channel,
Optional<TaskInstance> taskInstanceOptional,
WorkflowExecuteRunnable executeThread) throws Exception {
TaskInstance taskInstance =
taskInstanceOptional.orElseThrow(() -> new RuntimeException("taskInstance is null"));
if (executeThread != null) {
executeThread.resubmit(taskInstance.getTaskCode());
}
if (channel != null) {
TaskRecallAckCommand taskRecallAckCommand =
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskInstance.getId());
channel.writeAndFlush(taskRecallAckCommand.convert2Command());
}
return true;
}
}

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
@ -48,21 +50,10 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
* process service
*/
@Autowired
private ProcessService processService;
/**
* data quality result operator
*/
@Autowired
private DataQualityResultOperator dataQualityResultOperator;
private List<TaskEventHandler> taskEventHandlerList;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
private Map<TaskEventType, TaskEventHandler> taskEventHandlerMap = new HashMap<>();
/**
* task event thread map
@ -75,6 +66,8 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
this.setThreadNamePrefix("Task-Execute-Thread-");
this.setMaxPoolSize(masterConfig.getExecThreads());
this.setCorePoolSize(masterConfig.getExecThreads());
taskEventHandlerList.forEach(
taskEventHandler -> taskEventHandlerMap.put(taskEventHandler.getHandleEventType(), taskEventHandler));
}
public void submitTaskEvent(TaskEvent taskEvent) {
@ -83,11 +76,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
(processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
processService,
workflowExecuteThreadPool,
processInstanceExecCacheManager,
dataQualityResultOperator));
(processInstanceId) -> new TaskExecuteRunnable(processInstanceId, taskEventHandlerMap));
taskExecuteRunnable.addEvent(taskEvent);
}

183
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java → dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
@ -43,9 +46,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
@ -53,18 +54,13 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import lombok.NonNull;
/**
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
public class MasterSchedulerService extends BaseDaemonThread {
public class MasterSchedulerBootstrap extends BaseDaemonThread {
/**
* logger of MasterSchedulerService
*/
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
@Autowired
private ProcessService processService;
@ -83,12 +79,6 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
private ThreadPoolExecutor masterPrepareExecService;
/**
* workflow exec service
*/
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@ -98,13 +88,15 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private CuringParamsService curingGlobalParamsService;
private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances = new LinkedBlockingQueue<>();
@Autowired
private WorkflowEventQueue workflowEventQueue;
private Thread failedProcessInstanceResubmitThread;
@Autowired
private WorkflowEventLooper workflowEventLooper;
private String masterAddress;
protected MasterSchedulerService() {
protected MasterSchedulerBootstrap() {
super("MasterCommandLoopThread");
}
@ -114,23 +106,19 @@ public class MasterSchedulerService extends BaseDaemonThread {
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.failedProcessInstanceResubmitThread = new FailedProcessInstanceResubmitThread(submitFailedProcessInstances);
ProcessInstanceMetrics.registerProcessInstanceResubmitGauge(submitFailedProcessInstances::size);
}
@Override
public synchronized void start() {
logger.info("Master schedule service starting..");
logger.info("Master schedule bootstrap starting..");
super.start();
this.failedProcessInstanceResubmitThread.start();
logger.info("Master schedule service started...");
workflowEventLooper.start();
logger.info("Master schedule bootstrap started...");
}
public void close() {
logger.info("Master schedule service stopping...");
// these process instances will be failover, so we can safa clear here
submitFailedProcessInstances.clear();
logger.info("Master schedule service stopped...");
logger.info("Master schedule bootstrap stopping...");
logger.info("Master schedule bootstrap stopped...");
}
/**
@ -140,15 +128,51 @@ public class MasterSchedulerService extends BaseDaemonThread {
public void run() {
while (Stopper.isRunning()) {
try {
boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
// todo: if the workflow event queue is much, we need to handle the back pressure
boolean isOverload =
OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
scheduleWorkflow();
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
processInstances.forEach(processInstance -> {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
if (processInstanceExecCacheManager.contains(processInstance.getId())) {
logger.error("The workflow instance is already been cached, this case shouldn't be happened");
}
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
processService,
nettyExecutorManager,
processAlertManager,
masterConfig,
stateWheelExecuteThread,
curingGlobalParamsService);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
});
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule service interrupted, close the loop", interruptedException);
logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
@ -159,72 +183,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
}
}
/**
* Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
*/
private void scheduleWorkflow() throws InterruptedException, MasterException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
// indicate that the command transform to processInstance error, sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
MasterServerMetrics.incMasterConsumeCommand(commands.size());
for (ProcessInstance processInstance : processInstances) {
submitProcessInstance(processInstance);
}
}
private void submitProcessInstance(@NonNull ProcessInstance processInstance) {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance");
final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, stateWheelExecuteThread
, curingGlobalParamsService);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
ProcessInstanceMetrics.incProcessInstanceSubmit();
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = CompletableFuture.supplyAsync(
workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.FAILED == workflowSubmitStatue) {
// submit failed
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
submitFailedProcessInstances.add(processInstance);
}
});
logger.info("Master schedule service started workflow instance");
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
logger.info("Master schedule bootstrap transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
@ -254,7 +215,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
// make sure to finish handling command each time before next scan
latch.await();
logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
logger.info("Master schedule bootstrap transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances;
@ -273,7 +234,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
int pageSize = masterConfig.getFetchCommandNum();
final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
logger.info("Master schedule bootstrap loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
@ -297,34 +258,4 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state;
}
private class FailedProcessInstanceResubmitThread extends Thread {
private final LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances;
public FailedProcessInstanceResubmitThread(LinkedBlockingQueue<ProcessInstance> submitFailedProcessInstances) {
logger.info("Starting workflow resubmit thread");
this.submitFailedProcessInstances = submitFailedProcessInstances;
this.setDaemon(true);
this.setName("SubmitFailedProcessInstanceHandleThread");
logger.info("Started workflow resubmit thread");
}
@Override
public void run() {
while (Stopper.isRunning()) {
try {
ProcessInstance processInstance = submitFailedProcessInstances.take();
submitProcessInstance(processInstance);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("SubmitFailedProcessInstanceHandleThread has been interrupted, will return");
break;
}
// avoid the failed-fast cause CPU higher
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
}

108
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowEventLooper.java

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class WorkflowEventLooper extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(WorkflowEventLooper.class);
@Autowired
private WorkflowEventQueue workflowEventQueue;
@Autowired
private List<WorkflowEventHandler> workflowEventHandlerList;
private final Map<WorkflowEventType, WorkflowEventHandler> workflowEventHandlerMap = new HashMap<>();
protected WorkflowEventLooper() {
super("WorkflowEventLooper");
}
@PostConstruct
public void init() {
workflowEventHandlerList.forEach(workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(),
workflowEventHandler));
}
@Override
public synchronized void start() {
logger.info("WorkflowEventLooper thread starting");
super.start();
logger.info("WorkflowEventLooper thread started");
}
public void run() {
WorkflowEvent workflowEvent = null;
while (Stopper.isRunning()) {
try {
workflowEvent = workflowEventQueue.poolEvent();
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());
workflowEventHandler.handleWorkflowEvent(workflowEvent);
} catch (InterruptedException e) {
logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
Thread.currentThread().interrupt();
break;
} catch (WorkflowEventHandleException workflowEventHandleException) {
logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
workflowEvent, workflowEventHandleException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (WorkflowEventHandleError workflowEventHandleError) {
logger.error("Handle workflow event error, will drop this event, event: {}",
workflowEvent,
workflowEventHandleError);
} catch (Exception unknownException) {
logger.error(
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
workflowEvent, unknownException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
}

240
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -119,39 +119,18 @@ import lombok.NonNull;
*/
public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/**
* logger of WorkflowExecuteThread
*/
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
/**
* process service
*/
private final ProcessService processService;
/**
* alert manager
*/
private final ProcessAlertManager processAlertManager;
/**
* netty executor manager
*/
private final NettyExecutorManager nettyExecutorManager;
/**
* process instance
*/
private final ProcessInstance processInstance;
/**
* process definition
*/
private ProcessDefinition processDefinition;
/**
* the object of DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> dag;
/**
@ -159,10 +138,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private String key;
/**
* start flag, true: start nodes submit completely
*/
private volatile boolean isStart = false;
private WorkflowRunnableStatus workflowRunnableStatus = WorkflowRunnableStatus.CREATED;
/**
* submit failure nodes
@ -224,7 +201,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private final ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
/**
* ready to submit task queue
* The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id.
*/
private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
@ -234,14 +211,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private final Map<Long, TaskInstance> waitToRetryTaskInstanceMap = new ConcurrentHashMap<>();
/**
* state wheel execute thread
*/
private final StateWheelExecuteThread stateWheelExecuteThread;
/**
* curing global params service
*/
private final CuringParamsService curingParamsService;
private final String masterAddress;
@ -276,14 +247,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* the process start nodes are submitted completely.
*/
public boolean isStart() {
return this.isStart;
return WorkflowRunnableStatus.STARTED == workflowRunnableStatus;
}
/**
* handle event
*/
public void handleEvents() {
if (!isStart) {
if (!isStart()) {
logger.info(
"The workflow instance is not started, will not handle its state event, current state event size: {}",
stateEvents);
return;
}
StateEvent stateEvent = null;
@ -387,45 +361,53 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
logger.info("TaskInstance finished task code:{} state:{} ",
taskInstance.getTaskCode(),
taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
&& DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
try {
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
if (taskInstance.getState().typeIsSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: merge the last taskInstance
processInstance.setVarPool(taskInstance.getVarPool());
processService.saveProcessInstance(processInstance);
if (!processInstance.isBlocked()) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
}
} else if (taskInstance.getState().typeIsFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
} else if (taskInstance.getState().typeIsFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
taskInstance.getTaskCode(),
taskInstance.getState());
this.updateProcessInstanceState();
} catch (Exception ex) {
logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
// remove the task from complete map, so that we can finish in the next time.
completeTaskMap.remove(taskInstance.getTaskCode());
throw ex;
}
this.updateProcessInstanceState();
}
/**
@ -672,17 +654,29 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
@Override
public WorkflowSubmitStatue call() {
if (this.taskInstanceMap.size() > 0 || isStart) {
logger.warn("The workflow has already been started");
if (isStart()) {
// This case should not been happened
logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
}
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
buildFlowDag();
initTaskQueue();
submitPostNode(null);
isStart = true;
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
buildFlowDag();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
initTaskQueue();
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
submitPostNode(null);
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
}
return WorkflowSubmitStatue.SUCCESS;
} catch (Exception e) {
logger.error("Start workflow error", e);
@ -749,9 +743,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @throws Exception exception
*/
private void buildFlowDag() throws Exception {
if (this.dag != null) {
return;
}
processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
@ -886,9 +877,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
dependFailedTaskMap,
completeTaskMap,
errorTaskMap);
dependFailedTaskMap,
completeTaskMap,
errorTaskMap);
}
/**
@ -911,7 +902,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (!submit) {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(),
processInstance.getName(),
taskInstance.getId(),
taskInstance.getName());
return Optional.empty();
}
@ -962,7 +957,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error", e);
logger.error("submit standby task error, taskCode: {}, taskInstanceId: {}",
taskInstance.getTaskCode(),
taskInstance.getId(),
e);
return Optional.empty();
}
}
@ -1215,9 +1213,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
for (Integer taskInstanceId : completeTaskMap.values()) {
for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
Long taskConde = entry.getKey();
Integer taskInstanceId = entry.getValue();
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
if (taskInstance == null) {
logger.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}",
taskInstanceId,
taskConde);
// This case will happen when we submit to db failed, then the taskInstanceId is 0
continue;
}
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
}
return completeTaskInstanceMap;
}
@ -1429,6 +1437,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*/
private boolean processFailed() {
if (hasFailedTask()) {
logger.info("The current process has failed task, the current process failed");
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
return true;
}
@ -1504,22 +1513,29 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (activeTaskProcessorMaps.size() > 0 || hasRetryTaskInStandBy()) {
// active task and retry task exists
return runningState(state);
ExecutionStatus executionStatus = runningState(state);
logger.info("The workflowInstance has task running, the workflowInstance status is {}", executionStatus);
return executionStatus;
}
// block
if (state == ExecutionStatus.READY_BLOCK) {
return processReadyBlock();
ExecutionStatus executionStatus = processReadyBlock();
logger.info("The workflowInstance is ready to block, the workflowInstance status is {}", executionStatus);
}
// waiting thread
if (hasWaitingThreadTask()) {
logger.info("The workflowInstance has waiting thread task, the workflow status is {}",
ExecutionStatus.WAITING_THREAD);
return ExecutionStatus.WAITING_THREAD;
}
// pause
if (state == ExecutionStatus.READY_PAUSE) {
return processReadyPause();
ExecutionStatus executionStatus = processReadyPause();
logger.info("The workflowInstance is ready to pause, the workflow status is {}", executionStatus);
return executionStatus;
}
// stop
@ -1527,15 +1543,18 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE);
ExecutionStatus executionStatus;
if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !isComplementEnd()) {
return ExecutionStatus.STOP;
executionStatus = ExecutionStatus.STOP;
} else {
return ExecutionStatus.SUCCESS;
executionStatus = ExecutionStatus.SUCCESS;
}
logger.info("The workflowInstance is ready to stop, the workflow status is {}", executionStatus);
}
// process failure
if (processFailed()) {
logger.info("The workflowInstance is failed, the workflow status is {}", ExecutionStatus.FAILURE);
return ExecutionStatus.FAILURE;
}
@ -1579,15 +1598,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private void updateProcessInstanceState() throws StateEventHandleException {
ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) {
logger.info("Update workflowInstance states, origin state: {}, target state: {}",
processInstance.getState(),
state);
updateWorkflowInstanceStatesToDB(state);
StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState());
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
// this.processStateChangeHandler(stateEvent);
// replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks
this.stateEvents.add(stateEvent);
} else {
logger.info("There is no need to update the workflow instance state, origin state: {}, target state: {}",
processInstance.getState(),
state);
}
}
@ -1602,12 +1627,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException {
ExecutionStatus originStates = processInstance.getState();
if (originStates != newStates) {
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(),
processInstance.getName(),
originStates,
newStates,
processInstance.getCommandType());
logger.info("Begin to update workflow instance state , state will change from {} to {}",
originStates,
newStates);
processInstance.setState(newStates);
if (newStates.typeIsFinished()) {
@ -1657,8 +1679,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
*
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
readyToSubmitTaskQueue.remove(taskInstance);
private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
return readyToSubmitTaskQueue.remove(taskInstance);
}
/**
@ -1748,10 +1770,20 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
if (!removeTaskFromStandbyList(task)) {
logger.error(
"Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
processInstance.getId(),
task.getTaskCode());
}
completeTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
activeTaskProcessorMaps.remove(task.getTaskCode());
logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
task.getProcessInstanceId(),
task.getId(),
task.getTaskCode());
} else {
removeTaskFromStandbyList(task);
}
@ -1927,4 +1959,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
}
}
private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
;
}
}

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRecallCommand.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1284,8 +1284,8 @@ public class ProcessServiceImpl implements ProcessService {
break;
}
logger.error(
"task commit to db failed , taskId {} has already retry {} times, please check the database",
taskInstance.getId(),
"task commit to db failed , taskCode: {} has already retry {} times, please check the database",
taskInstance.getTaskCode(),
retryTimes);
Thread.sleep(commitInterval);
} catch (Exception e) {
@ -1298,6 +1298,7 @@ public class ProcessServiceImpl implements ProcessService {
}
/**
* // todo: This method need to refactor, we find when the db down, but the taskInstanceId is not 0. It's better to change to void, rather than return TaskInstance
* submit task to db
* submit sub process to command
*
@ -1316,9 +1317,9 @@ public class ProcessServiceImpl implements ProcessService {
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(),
taskInstance.getProcessInstance(),
processInstance.getState());
taskInstance.getName(),
taskInstance.getProcessInstance().getId(),
processInstance.getState());
return null;
}
@ -1328,8 +1329,8 @@ public class ProcessServiceImpl implements ProcessService {
logger.info(
"End save taskInstance to db successfully:{}, taskInstanceName: {}, taskInstance state:{}, processInstanceId:{}, processInstanceState: {}",
taskInstance.getId(),
taskInstance.getName(),
task.getId(),
task.getName(),
task.getState(),
processInstance.getId(),
processInstance.getState());
@ -1564,7 +1565,10 @@ public class ProcessServiceImpl implements ProcessService {
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) {
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) {
logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState);
logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
processInstance.getId(),
processInstanceState,
taskInstance.getTaskCode());
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponseCache.java vendored

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.Map;
@ -48,7 +48,7 @@ public class ResponseCache {
* @param command command
* @param event event ACK/RESULT
*/
public void cache(Integer taskInstanceId, Command command, Event event) {
public void cache(Integer taskInstanceId, Command command, TaskEventType event) {
switch (event) {
case RUNNING:
runningCache.put(taskInstanceId, command);

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
@ -236,7 +236,7 @@ public class TaskCallbackService {
public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
@ -256,7 +256,7 @@ public class TaskCallbackService {
public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
@ -270,7 +270,7 @@ public class TaskCallbackService {
*/
public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), Event.WORKER_REJECT);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
}
}

14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java

@ -84,6 +84,7 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread {
private void retryRunningCommand(ResponseCache instance) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
logger.info("Send task running retry command starting, waiting to retry size: {}", runningCache.size());
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
@ -93,12 +94,14 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread {
logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}", taskInstanceId, runningCommand);
}
}
logger.info("Send task running retry command finished, waiting to retry size: {}", runningCache.size());
}
}
private void retryResponseCommand(ResponseCache instance) {
if (!instance.getResponseCache().isEmpty()) {
Map<Integer, Command> responseCache = instance.getResponseCache();
Map<Integer, Command> responseCache = instance.getResponseCache();
if (!responseCache.isEmpty()) {
logger.info("Send task response retry command starting, waiting to retry size: {}", responseCache.size());
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
@ -108,12 +111,14 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread {
logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
logger.info("Send task response retry command finished, waiting to retry size: {}", responseCache.size());
}
}
private void retryRecallCommand(ResponseCache instance) {
if (!instance.getRecallCache().isEmpty()) {
Map<Integer, Command> recallCache = instance.getRecallCache();
Map<Integer, Command> recallCache = instance.getRecallCache();
if (!recallCache.isEmpty()) {
logger.info("Send task recall retry command starting, waiting to retry size: {}", recallCache.size());
for (Map.Entry<Integer, Command> entry : recallCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
@ -123,6 +128,7 @@ public class RetryReportTaskStatusThread extends BaseDaemonThread {
logger.error("Retry send recall command to master error, taskInstanceId: {}, command: {}", taskInstanceId, responseCommand);
}
}
logger.info("Send task recall retry command finished, waiting to retry size: {}", recallCache.size());
}
}
}

Loading…
Cancel
Save