Browse Source

[Fix-10785] Fix state event handle error will not retry (#10786)

* Fix state event handle error will not retry

* Use state event handler to deal with the event
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
67d14fb7b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java
  4. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java
  5. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java
  6. 32
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java
  7. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java
  8. 41
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java
  9. 47
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java
  10. 115
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
  11. 64
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
  12. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
  13. 60
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java
  14. 95
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java
  15. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java
  16. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  17. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
  18. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  19. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
  20. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  21. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  22. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  23. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  24. 404
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  25. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  26. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java
  27. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  28. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java
  29. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java
  30. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  31. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  32. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  33. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -121,7 +121,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private ProcessService processService; private ProcessService processService;
@Autowired @Autowired
StateEventCallbackService stateEventCallbackService; private StateEventCallbackService stateEventCallbackService;
@Autowired @Autowired
private TaskDefinitionMapper taskDefinitionMapper; private TaskDefinitionMapper taskDefinitionMapper;
@ -500,6 +500,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal // determine whether the process is normal
if (update > 0) { if (update > 0) {
// directly send the process instance state change event to target master, not guarantee the event send success
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0 processInstance.getId(), 0, processInstance.getState(), processInstance.getId(), 0
); );

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -116,7 +116,9 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@PostConstruct @PostConstruct
public void init() { public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber()); this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
logger.info("Task priority queue consume thread staring");
super.start(); super.start();
logger.info("Task priority queue consume thread started");
} }
@Override @Override

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/Selector.java

@ -27,7 +27,7 @@ public interface Selector<T> {
/** /**
* select * select
* @param source source * @param source source, the given source should not be empty.
* @return T * @return T
*/ */
T select(Collection<T> source); T select(Collection<T> source);

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEvent.java → dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEvent.java

@ -15,8 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.enums; package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import io.netty.channel.Channel; import io.netty.channel.Channel;

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleError.java

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
/**
* This exception represent the exception can not recover, this happens when the event is broken.
* And when we get this exception, we will drop the event.
*/
public class StateEventHandleError extends Exception {
public StateEventHandleError(String message) {
super(message);
}
public StateEventHandleError(String message, Throwable throwable) {
super(message, throwable);
}
}

32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandleException.java

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
/**
* This exception represent the exception can be recovered, when we get this exception, we will retry the event.
*/
public class StateEventHandleException extends Exception {
public StateEventHandleException(String message) {
super(message);
}
public StateEventHandleException(String message, Throwable throwable) {
super(message, throwable);
}
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandler.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
public interface StateEventHandler {
/**
* Handle a event, if handle success will reture true, else return false
*
* @param stateEvent given state event.
* @throws StateEventHandleException this exception means it can be recovered.
* @throws StateEventHandleError this exception means it cannot be recovered, so the event need to drop.
*/
boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException, StateEventHandleError;
StateEventType getEventType();
}

41
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
public class StateEventHandlerManager {
private static final Map<StateEventType, StateEventHandler> stateEventHandlerMap = new HashMap<>();
static {
ServiceLoader.load(StateEventHandler.class)
.forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(),
stateEventHandler));
}
public static Optional<StateEventHandler> getStateEventHandler(StateEventType stateEventType) {
return Optional.ofNullable(stateEventHandlerMap.get(stateEventType));
}
}

47
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRetryStateEventHandler.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Map;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskRetryStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException {
TaskMetrics.incTaskRetry();
Map<Long, TaskInstance> waitToRetryTaskInstanceMap = workflowExecuteRunnable.getWaitToRetryTaskInstanceMap();
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
workflowExecuteRunnable.addTaskToStandByList(taskInstance);
workflowExecuteRunnable.submitStandByTask();
waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.TASK_RETRY;
}
}

115
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException, StateEventHandleError {
measureTaskState(stateEvent);
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
"Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) {
throw new StateEventHandleError("Task state event handle error due to task state is null");
}
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode())
&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
}
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
if (iTaskProcessor.taskInstance().getState() != task.getState()) {
task.setState(iTaskProcessor.taskInstance().getState());
}
workflowExecuteRunnable.taskFinished(task);
}
return true;
}
throw new StateEventHandleException(
"Task state event handle error, due to the task is not in activeTaskProcessorMaps");
}
@Override
public StateEventType getEventType() {
return StateEventType.TASK_STATE_CHANGE;
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}
switch (taskStateEvent.getExecutionStatus()) {
case STOP:
TaskMetrics.incTaskStop();
break;
case SUCCESS:
TaskMetrics.incTaskSuccess();
break;
case FAILURE:
TaskMetrics.incTaskFailure();
break;
default:
break;
}
}
}

64
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import java.util.Map;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
TaskMetrics.incTaskTimeout();
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId()).get();
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy
|| TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessMap.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
}
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
workflowExecuteRunnable.processTimeout();
workflowExecuteRunnable.taskTimeout(taskInstance);
}
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.TASK_TIMEOUT;
}
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
}
@Override
public StateEventType getEventType() {
return StateEventType.WAIT_TASK_GROUP;
}
}

60
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowBlockStateEventHandler.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class WorkflowBlockStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(WorkflowBlockStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleError {
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
if (!taskInstanceOptional.isPresent()) {
throw new StateEventHandleError("Cannot find taskInstance from taskMap by taskInstanceId: "
+ stateEvent.getTaskInstanceId());
}
TaskInstance task = taskInstanceOptional.get();
BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
if (parameters != null && parameters.isAlertWhenBlocking()) {
workflowExecuteRunnable.processBlock();
}
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.PROCESS_BLOCKED;
}
}

95
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class WorkflowStateEventHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(WorkflowStateEventHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException {
measureProcessState(stateEvent);
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessDefinition processDefinition = processInstance.getProcessDefinition();
logger.info("process:{} state {} change to {}",
processInstance.getId(),
processInstance.getState(),
stateEvent.getExecutionStatus());
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType()
.typeIsSerialPriority()) {
workflowExecuteRunnable.endProcess();
return true;
}
workflowExecuteRunnable.updateProcessInstanceState(stateEvent);
return true;
}
if (workflowExecuteRunnable.processComplementData()) {
return true;
}
if (stateEvent.getExecutionStatus().typeIsFinished()) {
workflowExecuteRunnable.endProcess();
}
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
workflowExecuteRunnable.killAllTasks();
}
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.PROCESS_STATE_CHANGE;
}
private void measureProcessState(StateEvent processStateEvent) {
if (processStateEvent.getExecutionStatus().typeIsFinished()) {
ProcessInstanceMetrics.incProcessInstanceFinish();
}
switch (processStateEvent.getExecutionStatus()) {
case STOP:
ProcessInstanceMetrics.incProcessInstanceStop();
break;
case SUCCESS:
ProcessInstanceMetrics.incProcessInstanceSuccess();
break;
case FAILURE:
ProcessInstanceMetrics.incProcessInstanceFailure();
break;
default:
break;
}
}
}

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.event;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import com.google.auto.service.AutoService;
@AutoService(StateEventHandler.class)
public class WorkflowTimeoutStateEventHandler implements StateEventHandler {
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
ProcessInstanceMetrics.incProcessInstanceTimeout();
workflowExecuteRunnable.processTimeout();
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.PROCESS_TIMEOUT;
}
}

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -62,7 +62,8 @@ public class StateEventProcessor implements NettyRequestProcessor {
} }
stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId()); stateEvent.setProcessInstanceId(stateEventChangeCommand.getDestProcessInstanceId());
stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId()); stateEvent.setTaskInstanceId(stateEventChangeCommand.getDestTaskInstanceId());
StateEventType type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE; StateEventType
type = stateEvent.getTaskInstanceId() == 0 ? StateEventType.PROCESS_STATE_CHANGE : StateEventType.TASK_STATE_CHANGE;
stateEvent.setType(type); stateEvent.setType(type);
try { try {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -98,6 +98,7 @@ public class StateEventResponseService {
*/ */
public void addResponse(StateEvent stateEvent) { public void addResponse(StateEvent stateEvent) {
try { try {
// check the event is validated
eventQueue.put(stateEvent); eventQueue.put(stateEvent);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("Put state event : {} error", stateEvent, e); logger.error("Put state event : {} error", stateEvent, e);

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java

@ -63,11 +63,15 @@ public class TaskEventService {
@PostConstruct @PostConstruct
public void start() { public void start() {
this.taskEventThread = new TaskEventThread(); this.taskEventThread = new TaskEventDispatchThread();
logger.info("TaskEvent dispatch thread starting");
this.taskEventThread.start(); this.taskEventThread.start();
logger.info("TaskEvent dispatch thread started");
this.taskEventHandlerThread = new TaskEventHandlerThread(); this.taskEventHandlerThread = new TaskEventHandlerThread();
logger.info("TaskEvent handle thread staring");
this.taskEventHandlerThread.start(); this.taskEventHandlerThread.start();
logger.info("TaskEvent handle thread started");
} }
@PreDestroy @PreDestroy
@ -94,14 +98,14 @@ public class TaskEventService {
* @param taskEvent taskEvent * @param taskEvent taskEvent
*/ */
public void addEvent(TaskEvent taskEvent) { public void addEvent(TaskEvent taskEvent) {
taskExecuteThreadPool.submitTaskEvent(taskEvent); eventQueue.add(taskEvent);
} }
/** /**
* task worker thread * Dispatch event to target task runnable.
*/ */
class TaskEventThread extends BaseDaemonThread { class TaskEventDispatchThread extends BaseDaemonThread {
protected TaskEventThread() { protected TaskEventDispatchThread() {
super("TaskEventLoopThread"); super("TaskEventLoopThread");
} }
@ -109,7 +113,7 @@ public class TaskEventService {
public void run() { public void run() {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
// if not task , blocking here // if not task event, blocking here
TaskEvent taskEvent = eventQueue.take(); TaskEvent taskEvent = eventQueue.take();
taskExecuteThreadPool.submitTaskEvent(taskEvent); taskExecuteThreadPool.submitTaskEvent(taskEvent);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -141,6 +145,7 @@ public class TaskEventService {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.warn("TaskEvent handle thread interrupted, will return this loop");
break; break;
} catch (Exception e) { } catch (Exception e) {
logger.error("event handler thread error", e); logger.error("event handler thread error", e);

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -193,6 +193,7 @@ public class TaskExecuteRunnable implements Runnable {
} }
} }
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success // if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
// send ack to worker
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) { } catch (Exception e) {

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -82,19 +82,14 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent); logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return; return;
} }
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) { TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
TaskExecuteRunnable taskExecuteThread = new TaskExecuteRunnable( (processInstanceId) -> new TaskExecuteRunnable(processInstanceId,
taskEvent.getProcessInstanceId(), processService,
processService, workflowExecuteThreadPool, workflowExecuteThreadPool,
processInstanceExecCacheManager, processInstanceExecCacheManager,
dataQualityResultOperator); dataQualityResultOperator));
taskExecuteThreadMap.put(taskEvent.getProcessInstanceId(), taskExecuteThread);
}
TaskExecuteRunnable taskExecuteRunnable= taskExecuteThreadMap.get(taskEvent.getProcessInstanceId());
if (taskExecuteRunnable != null) {
taskExecuteRunnable.addEvent(taskEvent); taskExecuteRunnable.addEvent(taskEvent);
} }
}
public void eventHandler() { public void eventHandler() {
for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) { for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
@ -103,7 +98,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
} }
public void executeEvent(TaskExecuteRunnable taskExecuteThread) { public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
if (taskExecuteThread.eventSize() == 0) { if (taskExecuteThread.isEmpty()) {
return; return;
} }
if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) { if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -36,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;

404
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -35,16 +35,14 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -63,20 +61,23 @@ import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.BlockingParameters;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
import org.apache.dolphinscheduler.server.master.event.StateEventHandler;
import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils; import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
@ -283,16 +284,37 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (!isStart) { if (!isStart) {
return; return;
} }
StateEvent stateEvent = null;
while (!this.stateEvents.isEmpty()) { while (!this.stateEvents.isEmpty()) {
try { try {
StateEvent stateEvent = this.stateEvents.peek(); stateEvent = this.stateEvents.peek();
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
if (stateEventHandler(stateEvent)) { stateEvent.getTaskInstanceId());
// if state handle success then will remove this state, otherwise will retry this state next time.
// The state should always handle success except database error.
checkProcessInstance(stateEvent);
StateEventHandler stateEventHandler =
StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
.orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
if (stateEventHandler.handleStateEvent(this, stateEvent)) {
this.stateEvents.remove(stateEvent); this.stateEvents.remove(stateEvent);
} }
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue. // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
logger.error("state handle error:", e); logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally { } finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
} }
@ -330,58 +352,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return this.processInstance; return this.processInstance;
} }
private boolean stateEventHandler(StateEvent stateEvent) { public boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
logger.info("process event: {}", stateEvent);
if (!checkProcessInstance(stateEvent)) {
return false;
}
boolean result = false;
switch (stateEvent.getType()) {
case PROCESS_STATE_CHANGE:
measureProcessState(stateEvent);
result = processStateChangeHandler(stateEvent);
break;
case TASK_STATE_CHANGE:
measureTaskState(stateEvent);
result = taskStateChangeHandler(stateEvent);
break;
case PROCESS_TIMEOUT:
ProcessInstanceMetrics.incProcessInstanceTimeout();
result = processTimeout();
break;
case TASK_TIMEOUT:
TaskMetrics.incTaskTimeout();
result = taskTimeout(stateEvent);
break;
case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent);
break;
case TASK_RETRY:
TaskMetrics.incTaskRetry();
result = taskRetryEventHandler(stateEvent);
break;
case PROCESS_BLOCKED:
result = processBlockHandler(stateEvent);
break;
default:
break;
}
if (result) {
this.stateEvents.remove(stateEvent);
}
return result;
}
private boolean checkForceStartAndWakeUp(StateEvent stateEvent) {
TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId());
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.DISPATCH); taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true; return true;
} }
if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) {
@ -396,76 +374,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return false; return false;
} }
private boolean taskTimeout(StateEvent stateEvent) { public void processTimeout() {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
TaskInstance taskInstance = taskInstanceMap.get(stateEvent.getTaskInstanceId());
if (TimeoutFlag.CLOSE == taskInstance.getTaskDefine().getTimeoutFlag()) {
return true;
}
TaskTimeoutStrategy taskTimeoutStrategy = taskInstance.getTaskDefine().getTimeoutNotifyStrategy();
if (TaskTimeoutStrategy.FAILED == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode());
taskProcessor.action(TaskAction.TIMEOUT);
}
if (TaskTimeoutStrategy.WARN == taskTimeoutStrategy || TaskTimeoutStrategy.WARNFAILED == taskTimeoutStrategy) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
}
return true;
}
private boolean processTimeout() {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser); this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser);
return true;
}
private boolean taskStateChangeHandler(StateEvent stateEvent) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(
() -> new RuntimeException("Cannot find task instance by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
}
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
taskFinished(task);
if (task.getTaskGroupId() > 0) {
releaseTaskGroup(task);
} }
return true;
}
if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) { public void taskTimeout(TaskInstance taskInstance) {
if (iTaskProcessor.taskInstance().getState() != task.getState()) { ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
task.setState(iTaskProcessor.taskInstance().getState()); processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser);
}
taskFinished(task);
}
return true;
}
logger.error("state handler error: {}", stateEvent);
return true;
} }
private void taskFinished(TaskInstance taskInstance) { public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
logger.info("TaskInstance finished task code:{} state:{} ", logger.info("TaskInstance finished task code:{} state:{} ",
taskInstance.getTaskCode(), taskInstance.getTaskCode(),
taskInstance.getState()); taskInstance.getState());
@ -512,7 +431,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* *
* @param taskInstance * @param taskInstance
*/ */
private void releaseTaskGroup(TaskInstance taskInstance) { public void releaseTaskGroup(TaskInstance taskInstance) {
if (taskInstance.getTaskGroupId() > 0) { if (taskInstance.getTaskGroupId() > 0) {
TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance);
if (nextTaskInstance != null) { if (nextTaskInstance != null) {
@ -536,13 +455,15 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* *
* @param taskInstance * @param taskInstance
*/ */
private void retryTaskInstance(TaskInstance taskInstance) { private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
if (!taskInstance.taskCanRetry()) { if (!taskInstance.taskCanRetry()) {
return; return;
} }
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance); TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) { if (newTaskInstance == null) {
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId()); logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}",
taskInstance.getTaskCode(),
taskInstance.getId());
return; return;
} }
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance); waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
@ -563,20 +484,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
} }
/**
* handle task retry event
*
* @param stateEvent
* @return
*/
private boolean taskRetryEventHandler(StateEvent stateEvent) {
TaskInstance taskInstance = waitToRetryTaskInstanceMap.get(stateEvent.getTaskCode());
addTaskToStandByList(taskInstance);
submitStandByTask();
waitToRetryTaskInstanceMap.remove(stateEvent.getTaskCode());
return true;
}
/** /**
* update process instance * update process instance
*/ */
@ -610,45 +517,23 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* check process instance by state event * check process instance by state event
*/ */
public boolean checkProcessInstance(StateEvent stateEvent) { public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError {
if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.error("mismatch process instance id: {}, state event:{}", throw new StateEventHandleError("The event doesn't contains process instance id");
this.processInstance.getId(),
stateEvent);
return false;
} }
return true;
} }
/** /**
* check if task instance exist by state event * check if task instance exist by state event
*/ */
public boolean checkTaskInstanceByStateEvent(StateEvent stateEvent) { public void checkTaskInstanceByStateEvent(StateEvent stateEvent) throws StateEventHandleError {
if (stateEvent.getTaskInstanceId() == 0) { if (stateEvent.getTaskInstanceId() == 0) {
logger.error("task instance id null, state event:{}", stateEvent); throw new StateEventHandleError("The taskInstanceId is 0");
return false;
} }
if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) { if (!taskInstanceMap.containsKey(stateEvent.getTaskInstanceId())) {
logger.error("mismatch task instance id, event:{}", stateEvent); throw new StateEventHandleError("Cannot find the taskInstance from taskInstanceMap");
return false;
}
return true;
}
/**
* check if task instance exist by task code
*/
public boolean checkTaskInstanceByCode(long taskCode) {
if (taskInstanceMap.isEmpty()) {
return false;
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
if (taskInstance.getTaskCode() == taskCode) {
return true;
}
} }
return false;
} }
/** /**
@ -697,58 +582,13 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return Optional.empty(); return Optional.empty();
} }
private boolean processStateChangeHandler(StateEvent stateEvent) { public void processBlock() {
try {
logger.info("process:{} state {} change to {}", processInstance.getId(), processInstance.getState(), stateEvent.getExecutionStatus());
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
endProcess();
return true;
}
this.updateProcessInstanceState(stateEvent);
return true;
}
if (processComplementData()) {
return true;
}
if (stateEvent.getExecutionStatus().typeIsFinished()) {
endProcess();
}
if (processInstance.getState() == ExecutionStatus.READY_STOP) {
killAllTasks();
}
return true;
} catch (Exception e) {
logger.error("process state change error:", e);
}
return true;
}
private boolean processBlockHandler(StateEvent stateEvent) {
try {
Optional<TaskInstance> taskInstanceOptional = getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(
() -> new RuntimeException("Cannot find taskInstance by taskInstanceId:" + stateEvent.getTaskInstanceId()));
if (!checkTaskInstanceByStateEvent(stateEvent)) {
logger.error("task {} is not a blocking task", task.getTaskCode());
return false;
}
BlockingParameters parameters = JSONUtils.parseObject(task.getTaskParams(), BlockingParameters.class);
if (parameters.isAlertWhenBlocking()) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); processAlertManager.sendProcessBlockingAlert(processInstance, projectUser);
logger.info("processInstance {} block alert send successful!", processInstance.getId()); logger.info("processInstance {} block alert send successful!", processInstance.getId());
} }
} catch (Exception e) {
logger.error("sending blocking message error:", e);
}
return true;
}
private boolean processComplementData() throws Exception { public boolean processComplementData() {
if (!needComplementProcess()) { if (!needComplementProcess()) {
return false; return false;
} }
@ -946,7 +786,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* init task queue * init task queue
*/ */
private void initTaskQueue() { private void initTaskQueue() throws StateEventHandleException {
taskFailedSubmit = false; taskFailedSubmit = false;
activeTaskProcessorMaps.clear(); activeTaskProcessorMaps.clear();
@ -955,7 +795,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
errorTaskMap.clear(); errorTaskMap.clear();
if (!isNewProcessInstance()) { if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) { for (TaskInstance task : validTaskInstanceList) {
if (validTaskMap.containsKey(task.getTaskCode())) { if (validTaskMap.containsKey(task.getTaskCode())) {
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
@ -965,7 +806,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processService.updateTaskInstance(task); processService.updateTaskInstance(task);
continue; continue;
} }
logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}", task.getTaskCode()); logger.warn("have same taskCode taskInstance when init task queue, taskCode:{}",
task.getTaskCode());
} }
validTaskMap.put(task.getTaskCode(), task.getId()); validTaskMap.put(task.getTaskCode(), task.getId());
@ -1113,6 +955,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
Host host = new Host(taskInstance.getHost()); Host host = new Host(taskInstance.getHost());
nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command()); nettyExecutorManager.doExecute(host, hostUpdateCommand.convert2Command());
} catch (Exception e) { } catch (Exception e) {
// Do we need to catch this exception?
logger.error("notify process host update", e); logger.error("notify process host update", e);
} }
} }
@ -1366,8 +1209,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return validTaskInstanceList; return validTaskInstanceList;
} }
private void submitPostNode(String parentNodeCode) { private void submitPostNode(String parentNodeCode) throws StateEventHandleException {
Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); Set<String> submitTaskNodeList =
DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
List<TaskInstance> taskInstances = new ArrayList<>(); List<TaskInstance> taskInstances = new ArrayList<>();
for (String taskNode : submitTaskNodeList) { for (String taskNode : submitTaskNodeList) {
TaskNode taskNodeObject = dag.getNode(taskNode); TaskNode taskNodeObject = dag.getNode(taskNode);
@ -1710,34 +1554,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return true; return true;
} }
try {
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE));
return processInstance.getScheduleTime().equals(endTime); return processInstance.getScheduleTime().equals(endTime);
} catch (Exception e) {
logger.error("complement end failed ", e);
return false;
}
} }
/** /**
* updateProcessInstance process instance state * updateProcessInstance process instance state
* after each batch of tasks is executed, the status of the process instance is updated * after each batch of tasks is executed, the status of the process instance is updated
*/ */
private void updateProcessInstanceState() { private void updateProcessInstanceState() throws StateEventHandleException {
ExecutionStatus state = getProcessInstanceState(processInstance); ExecutionStatus state = getProcessInstanceState(processInstance);
if (processInstance.getState() != state) { if (processInstance.getState() != state) {
logger.info( updateWorkflowInstanceStatesToDB(state);
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(),
processInstance.getState(), state,
processInstance.getCommandType());
processInstance.setState(state);
if (state.typeIsFinished()) {
processInstance.setEndTime(new Date());
}
processService.updateProcessInstance(processInstance);
StateEvent stateEvent = new StateEvent(); StateEvent stateEvent = new StateEvent();
stateEvent.setExecutionStatus(processInstance.getState()); stateEvent.setExecutionStatus(processInstance.getState());
@ -1752,20 +1581,33 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* stateEvent's execution status as process instance state * stateEvent's execution status as process instance state
*/ */
private void updateProcessInstanceState(StateEvent stateEvent) { public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventHandleException {
ExecutionStatus state = stateEvent.getExecutionStatus(); ExecutionStatus state = stateEvent.getExecutionStatus();
if (processInstance.getState() != state) { updateWorkflowInstanceStatesToDB(state);
logger.info( }
"work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(), processInstance.getName(), private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException {
processInstance.getState(), state, ExecutionStatus originStates = processInstance.getState();
if (originStates != newStates) {
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
processInstance.getId(),
processInstance.getName(),
originStates,
newStates,
processInstance.getCommandType()); processInstance.getCommandType());
processInstance.setState(state); processInstance.setState(newStates);
if (state.typeIsFinished()) { if (newStates.typeIsFinished()) {
processInstance.setEndTime(new Date()); processInstance.setEndTime(new Date());
} }
try {
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
} catch (Exception ex) {
// recover the status
processInstance.setState(originStates);
processInstance.setEndTime(null);
throw new StateEventHandleException("Update process instance status to DB error", ex);
}
} }
} }
@ -1784,19 +1626,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* *
* @param taskInstance task instance * @param taskInstance task instance
*/ */
private void addTaskToStandByList(TaskInstance taskInstance) { public void addTaskToStandByList(TaskInstance taskInstance) {
try {
if (readyToSubmitTaskQueue.contains(taskInstance)) { if (readyToSubmitTaskQueue.contains(taskInstance)) {
logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode());
return; return;
} }
logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}",
taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); taskInstance.getName(),
taskInstance.getId(),
taskInstance.getTaskCode());
TaskMetrics.incTaskSubmit(); TaskMetrics.incTaskSubmit();
readyToSubmitTaskQueue.put(taskInstance); readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e);
}
} }
/** /**
@ -1805,13 +1645,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
* @param taskInstance task instance * @param taskInstance task instance
*/ */
private void removeTaskFromStandbyList(TaskInstance taskInstance) { private void removeTaskFromStandbyList(TaskInstance taskInstance) {
try {
readyToSubmitTaskQueue.remove(taskInstance); readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error, task id:{}, Name: {}",
taskInstance.getId(),
taskInstance.getName(), e);
}
} }
/** /**
@ -1831,8 +1665,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* close the on going tasks * close the on going tasks
*/ */
private void killAllTasks() { public void killAllTasks() {
logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), logger.info("kill called on process instance id: {}, num: {}",
processInstance.getId(),
activeTaskProcessorMaps.size()); activeTaskProcessorMaps.size());
if (readyToSubmitTaskQueue.size() > 0) { if (readyToSubmitTaskQueue.size() > 0) {
@ -1868,7 +1703,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
/** /**
* handling the list of tasks to be submitted * handling the list of tasks to be submitted
*/ */
private void submitStandByTask() { public void submitStandByTask() throws StateEventHandleException {
int length = readyToSubmitTaskQueue.size(); int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek(); TaskInstance task = readyToSubmitTaskQueue.peek();
@ -2027,6 +1862,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode); throw new Exception("resubmit error, taskProcessor is null, task code: " + taskCode);
} }
} }
public Map<Long, Integer> getCompleteTaskMap() {
return completeTaskMap;
}
public Map<Long, ITaskProcessor> getActiveTaskProcessMap() {
return activeTaskProcessorMaps;
}
public Map<Long, TaskInstance> getWaitToRetryTaskInstanceMap() {
return waitToRetryTaskInstanceMap;
}
private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) { private void setGlobalParamIfCommanded(ProcessDefinition processDefinition, Map<String, String> cmdParam) {
// get start params from command param // get start params from command param
Map<String, String> startParamMap = new HashMap<>(); Map<String, String> startParamMap = new HashMap<>();
@ -2061,46 +1909,4 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} }
} }
private void measureProcessState(StateEvent processStateEvent) {
if (processStateEvent.getExecutionStatus().typeIsFinished()) {
ProcessInstanceMetrics.incProcessInstanceFinish();
}
switch (processStateEvent.getExecutionStatus()) {
case STOP:
ProcessInstanceMetrics.incProcessInstanceStop();
break;
case SUCCESS:
ProcessInstanceMetrics.incProcessInstanceSuccess();
break;
case FAILURE:
ProcessInstanceMetrics.incProcessInstanceFailure();
break;
default:
break;
}
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}
switch (taskStateEvent.getExecutionStatus()) {
case STOP:
TaskMetrics.incTaskStop();
break;
case SUCCESS:
TaskMetrics.incTaskSuccess();
break;
case FAILURE:
TaskMetrics.incTaskFailure();
break;
default:
break;
}
}
} }

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map; import java.util.Map;

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/FailoverService.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.service;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -26,7 +26,7 @@ import static org.mockito.Mockito.doNothing;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRemoteChannel.java

@ -17,12 +17,13 @@
package org.apache.dolphinscheduler.remote.processor; package org.apache.dolphinscheduler.remote.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
/** /**
* callback channel * callback channel
*/ */

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/StateEventCallbackService.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -71,19 +72,19 @@ public class StateEventCallbackService {
* @param host * @param host
* @return callback channel * @return callback channel
*/ */
private NettyRemoteChannel newRemoteChannel(Host host) { private Optional<NettyRemoteChannel> newRemoteChannel(Host host) {
Channel newChannel; Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress()); NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(host.getAddress());
if (nettyRemoteChannel != null) { if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) { if (nettyRemoteChannel.isActive()) {
return nettyRemoteChannel; return Optional.of(nettyRemoteChannel);
} }
} }
newChannel = nettyRemotingClient.getChannel(host); newChannel = nettyRemotingClient.getChannel(host);
if (newChannel != null) { if (newChannel != null) {
return newRemoteChannel(newChannel, host.getAddress()); return Optional.of(newRemoteChannel(newChannel, host.getAddress()));
} }
return null; return Optional.empty();
} }
public long pause(int ntries) { public long pause(int ntries) {
@ -110,24 +111,26 @@ public class StateEventCallbackService {
} }
/** /**
* send result * Send the command to target address, this method doesn't guarantee the command send success.
* *
* @param command command * @param command command need tp send
*/ */
public void sendResult(String address, int port, Command command) { public void sendResult(String address, int port, Command command) {
logger.info("send result, host:{}, command:{}", address, command.toString()); logger.info("send result, host:{}, command:{}", address, command.toString());
Host host = new Host(address, port); Host host = new Host(address, port);
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); sendResult(host, command);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command);
}
} }
/**
* Send the command to target host, this method doesn't guarantee the command send success.
*
* @param host target host
* @param command command need to send
*/
public void sendResult(Host host, Command command) { public void sendResult(Host host, Command command) {
logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString()); logger.info("send result, host:{}, command:{}", host.getAddress(), command.toString());
NettyRemoteChannel nettyRemoteChannel = newRemoteChannel(host); newRemoteChannel(host).ifPresent(nettyRemoteChannel -> {
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command); nettyRemoteChannel.writeAndFlush(command);
} });
} }
} }

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -51,10 +51,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* put task instance to priority queue * put task instance to priority queue
* *
* @param taskInstance taskInstance * @param taskInstance taskInstance
* @throws TaskPriorityQueueException
*/ */
@Override @Override
public void put(TaskInstance taskInstance) throws TaskPriorityQueueException { public void put(TaskInstance taskInstance) {
Preconditions.checkNotNull(taskInstance); Preconditions.checkNotNull(taskInstance);
queue.add(taskInstance); queue.add(taskInstance);
taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance)); taskInstanceIdentifySet.add(getTaskInstanceIdentify(taskInstance));

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -33,7 +33,7 @@ public interface TaskPriorityQueue<T> {
* @param taskInfo taskInfo * @param taskInfo taskInfo
* @throws TaskPriorityQueueException * @throws TaskPriorityQueueException
*/ */
void put(T taskInfo) throws TaskPriorityQueueException; void put(T taskInfo);
/** /**
* take taskInfo * take taskInfo

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -40,10 +40,9 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
* put task takePriorityInfo * put task takePriorityInfo
* *
* @param taskPriorityInfo takePriorityInfo * @param taskPriorityInfo takePriorityInfo
* @throws TaskPriorityQueueException
*/ */
@Override @Override
public void put(TaskPriority taskPriorityInfo) throws TaskPriorityQueueException { public void put(TaskPriority taskPriorityInfo) {
queue.put(taskPriorityInfo); queue.put(taskPriorityInfo);
} }

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResponseAckProcessor.java

@ -58,7 +58,13 @@ public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId()); ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId()); TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId()); logger.debug("remove REMOTE_CHANNELS, task instance id:{}",
taskExecuteResponseAckCommand.getTaskInstanceId());
} else if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.FAILURE.getCode()) {
// master handle worker response error, will still retry
} else {
throw new IllegalArgumentException("Invalid task execute response ack status: "
+ taskExecuteResponseAckCommand.getStatus());
} }
} }

Loading…
Cancel
Save