Browse Source
* Fix database update error doesn't rollback the task instance status
* Fix database error may cause workflow dead with running status
(cherry picked from commit f639a2eed4
)
3.0.0/version-upgrade
Wenjun Ruan
2 years ago
34 changed files with 1399 additions and 461 deletions
@ -0,0 +1,79 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.utils; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
public class TaskInstanceUtils { |
||||
|
||||
/** |
||||
* Copy the property of given source {@link TaskInstance} to target. |
||||
* |
||||
* @param source Given task instance, copy from. |
||||
* @param target Given task instance, copy to |
||||
* @return a soft copy of given task instance. |
||||
*/ |
||||
public static void copyTaskInstance(TaskInstance source, TaskInstance target) { |
||||
target.setId(source.getId()); |
||||
target.setName(source.getName()); |
||||
target.setTaskType(source.getTaskType()); |
||||
target.setProcessInstanceId(source.getProcessInstanceId()); |
||||
target.setTaskCode(source.getTaskCode()); |
||||
target.setTaskDefinitionVersion(source.getTaskDefinitionVersion()); |
||||
target.setProcessInstanceName(source.getProcessInstanceName()); |
||||
target.setTaskGroupPriority(source.getTaskGroupPriority()); |
||||
target.setState(source.getState()); |
||||
target.setFirstSubmitTime(source.getFirstSubmitTime()); |
||||
target.setSubmitTime(source.getSubmitTime()); |
||||
target.setStartTime(source.getStartTime()); |
||||
target.setEndTime(source.getEndTime()); |
||||
target.setHost(source.getHost()); |
||||
target.setExecutePath(source.getExecutePath()); |
||||
target.setLogPath(source.getLogPath()); |
||||
target.setRetryTimes(source.getRetryTimes()); |
||||
target.setAlertFlag(source.getAlertFlag()); |
||||
target.setProcessInstance(source.getProcessInstance()); |
||||
target.setProcessDefine(source.getProcessDefine()); |
||||
target.setTaskDefine(source.getTaskDefine()); |
||||
target.setPid(source.getPid()); |
||||
target.setAppLink(source.getAppLink()); |
||||
target.setFlag(source.getFlag()); |
||||
target.setDependency(source.getDependency()); |
||||
// todo: we need to cpoy the task params and then copy switchDependency, since the setSwitchDependency rely on task params, this is really a very bad practice.
|
||||
target.setTaskParams(source.getTaskParams()); |
||||
target.setSwitchDependency(source.getSwitchDependency()); |
||||
target.setDuration(source.getDuration()); |
||||
target.setMaxRetryTimes(source.getMaxRetryTimes()); |
||||
target.setRetryInterval(source.getRetryInterval()); |
||||
target.setTaskInstancePriority(source.getTaskInstancePriority()); |
||||
target.setDependentResult(source.getDependentResult()); |
||||
target.setWorkerGroup(source.getWorkerGroup()); |
||||
target.setEnvironmentCode(source.getEnvironmentCode()); |
||||
target.setEnvironmentConfig(source.getEnvironmentConfig()); |
||||
target.setExecutorId(source.getExecutorId()); |
||||
target.setVarPool(source.getVarPool()); |
||||
target.setExecutorName(source.getExecutorName()); |
||||
target.setResources(source.getResources()); |
||||
target.setDelayTime(source.getDelayTime()); |
||||
target.setDryRun(source.getDryRun()); |
||||
target.setTaskGroupId(source.getTaskGroupId()); |
||||
target.setCpuQuota(source.getCpuQuota()); |
||||
target.setMemoryMax(source.getMemoryMax()); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,43 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.dao.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
class TaskInstanceUtilsTest { |
||||
|
||||
@Test |
||||
void copyTaskInstance() { |
||||
TaskInstance source = new TaskInstance(); |
||||
source.setId(1); |
||||
source.setName("source"); |
||||
source.setSubmitTime(new Date()); |
||||
source.setTaskParams(JSONUtils.toJsonString(new HashMap<>())); |
||||
TaskInstance target = new TaskInstance(); |
||||
TaskInstanceUtils.copyTaskInstance(source, target); |
||||
Assertions.assertEquals(target.getId(), source.getId()); |
||||
Assertions.assertEquals(target.getName(), source.getName()); |
||||
} |
||||
} |
@ -0,0 +1,120 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class TaskDelayEventHandler implements TaskEventHandler { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(TaskDelayEventHandler.class); |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Autowired |
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool; |
||||
|
||||
@Override |
||||
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { |
||||
int taskInstanceId = taskEvent.getTaskInstanceId(); |
||||
int processInstanceId = taskEvent.getProcessInstanceId(); |
||||
|
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); |
||||
if (workflowExecuteRunnable == null) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError("Cannot find related workflow instance from cache"); |
||||
} |
||||
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); |
||||
if (!taskInstanceOptional.isPresent()) { |
||||
sendAckToWorker(taskEvent); |
||||
return; |
||||
} |
||||
TaskInstance taskInstance = taskInstanceOptional.get(); |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
logger.warn( |
||||
"The current task status is: {}, will not handle the running event, this event is delay, will discard this event: {}", |
||||
taskInstance.getState(), |
||||
taskEvent); |
||||
sendAckToWorker(taskEvent); |
||||
return; |
||||
} |
||||
|
||||
TaskInstance oldTaskInstance = new TaskInstance(); |
||||
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); |
||||
try { |
||||
taskInstance.setState(taskEvent.getState()); |
||||
taskInstance.setStartTime(taskEvent.getStartTime()); |
||||
taskInstance.setHost(taskEvent.getWorkerAddress()); |
||||
taskInstance.setLogPath(taskEvent.getLogPath()); |
||||
taskInstance.setExecutePath(taskEvent.getExecutePath()); |
||||
taskInstance.setPid(taskEvent.getProcessId()); |
||||
taskInstance.setAppLink(taskEvent.getAppIds()); |
||||
if (!processService.updateTaskInstance(taskInstance)) { |
||||
throw new TaskEventHandleError("Handle task delay event error, update taskInstance to db failed"); |
||||
} |
||||
sendAckToWorker(taskEvent); |
||||
} catch (Exception ex) { |
||||
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
||||
if (ex instanceof TaskEventHandleError) { |
||||
throw ex; |
||||
} |
||||
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed", ex); |
||||
} |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); |
||||
stateEvent.setExecutionStatus(taskEvent.getState()); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent); |
||||
|
||||
} |
||||
|
||||
private void sendAckToWorker(TaskEvent taskEvent) { |
||||
// If event handle success, send ack to worker to otherwise the worker will retry this event
|
||||
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = |
||||
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); |
||||
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public TaskEventType getHandleEventType() { |
||||
return TaskEventType.DELAY; |
||||
} |
||||
} |
@ -0,0 +1,88 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class TaskDispatchEventHandler implements TaskEventHandler { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(TaskDispatchEventHandler.class); |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Override |
||||
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { |
||||
int taskInstanceId = taskEvent.getTaskInstanceId(); |
||||
int processInstanceId = taskEvent.getProcessInstanceId(); |
||||
|
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); |
||||
if (workflowExecuteRunnable == null) { |
||||
throw new TaskEventHandleError("Cannot find related workflow instance from cache"); |
||||
} |
||||
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId) |
||||
.orElseThrow(() -> new TaskEventHandleError("Cannot find related taskInstance from cache")); |
||||
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) { |
||||
logger.warn( |
||||
"The current taskInstance status is not SUBMITTED_SUCCESS, so the dispatch event will be discarded, the current is a delay event, event: {}", |
||||
taskEvent); |
||||
return; |
||||
} |
||||
|
||||
// todo: we need to just log the old status and rollback these two field, no need to copy all fields
|
||||
TaskInstance oldTaskInstance = new TaskInstance(); |
||||
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); |
||||
// update the taskInstance status
|
||||
taskInstance.setState(ExecutionStatus.DISPATCH); |
||||
taskInstance.setHost(taskEvent.getWorkerAddress()); |
||||
try { |
||||
if (!processService.updateTaskInstance(taskInstance)) { |
||||
throw new TaskEventHandleError("Handle task dispatch event error, update taskInstance to db failed"); |
||||
} |
||||
} catch (Exception ex) { |
||||
// rollback status
|
||||
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
||||
if (ex instanceof TaskEventHandleError) { |
||||
throw ex; |
||||
} |
||||
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public TaskEventType getHandleEventType() { |
||||
return TaskEventType.DISPATCH; |
||||
} |
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public class TaskEventHandleError extends Exception { |
||||
|
||||
public TaskEventHandleError(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public TaskEventHandleError(String message, Throwable throwable) { |
||||
super(message, throwable); |
||||
} |
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public class TaskEventHandleException extends Exception { |
||||
|
||||
public TaskEventHandleException(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public TaskEventHandleException(String message, Throwable throwable) { |
||||
super(message, throwable); |
||||
} |
||||
} |
@ -0,0 +1,34 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
|
||||
public interface TaskEventHandler { |
||||
|
||||
/** |
||||
* Handle the task event |
||||
* |
||||
* @throws TaskEventHandleError this exception means we will discord this event. |
||||
* @throws TaskEventHandleException this exception means we need to retry this event |
||||
*/ |
||||
void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException; |
||||
|
||||
TaskEventType getHandleEventType(); |
||||
} |
@ -0,0 +1,77 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class TaskRejectByWorkerEventHandler implements TaskEventHandler { |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Override |
||||
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { |
||||
int taskInstanceId = taskEvent.getTaskInstanceId(); |
||||
int processInstanceId = taskEvent.getProcessInstanceId(); |
||||
|
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); |
||||
if (workflowExecuteRunnable == null) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task reject event error, cannot find related workflow instance from cache, will discard this event"); |
||||
} |
||||
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> { |
||||
sendAckToWorker(taskEvent); |
||||
return new TaskEventHandleError( |
||||
"Handle task reject event error, cannot find the taskInstance from cache, will discord this event"); |
||||
}); |
||||
try { |
||||
// todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple,
|
||||
// we need to control the worker overload by master rather than worker
|
||||
// if the task resubmit and the worker failover, this task may be dispatch twice?
|
||||
// todo: we need to clear the taskInstance host and rollback the status to submit.
|
||||
workflowExecuteRunnable.resubmit(taskInstance.getTaskCode()); |
||||
sendAckToWorker(taskEvent); |
||||
} catch (Exception ex) { |
||||
throw new TaskEventHandleError("Handle task reject event error", ex); |
||||
} |
||||
|
||||
} |
||||
|
||||
public void sendAckToWorker(TaskEvent taskEvent) { |
||||
TaskRecallAckCommand taskRecallAckCommand = |
||||
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); |
||||
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public TaskEventType getHandleEventType() { |
||||
return TaskEventType.WORKER_REJECT; |
||||
} |
||||
} |
@ -0,0 +1,117 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
||||
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class TaskResultEventHandler implements TaskEventHandler { |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Autowired |
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool; |
||||
|
||||
@Autowired |
||||
private DataQualityResultOperator dataQualityResultOperator; |
||||
|
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Override |
||||
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException { |
||||
int taskInstanceId = taskEvent.getTaskInstanceId(); |
||||
int processInstanceId = taskEvent.getProcessInstanceId(); |
||||
|
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); |
||||
if (workflowExecuteRunnable == null) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task result event error, cannot find related workflow instance from cache, will discard this event"); |
||||
} |
||||
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); |
||||
if (!taskInstanceOptional.isPresent()) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task result event error, cannot find the taskInstance from cache, will discord this event"); |
||||
} |
||||
TaskInstance taskInstance = taskInstanceOptional.get(); |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task result event error, the task instance is already finished, will discord this event"); |
||||
} |
||||
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance); |
||||
|
||||
TaskInstance oldTaskInstance = new TaskInstance(); |
||||
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); |
||||
try { |
||||
taskInstance.setStartTime(taskEvent.getStartTime()); |
||||
taskInstance.setHost(taskEvent.getWorkerAddress()); |
||||
taskInstance.setLogPath(taskEvent.getLogPath()); |
||||
taskInstance.setExecutePath(taskEvent.getExecutePath()); |
||||
taskInstance.setPid(taskEvent.getProcessId()); |
||||
taskInstance.setAppLink(taskEvent.getAppIds()); |
||||
taskInstance.setState(taskEvent.getState()); |
||||
taskInstance.setEndTime(taskEvent.getEndTime()); |
||||
taskInstance.setVarPool(taskEvent.getVarPool()); |
||||
processService.changeOutParam(taskInstance); |
||||
processService.updateTaskInstance(taskInstance); |
||||
sendAckToWorker(taskEvent); |
||||
} catch (Exception ex) { |
||||
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
||||
throw new TaskEventHandleError("Handle task result event error, save taskInstance to db error", ex); |
||||
} |
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); |
||||
stateEvent.setExecutionStatus(taskEvent.getState()); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent); |
||||
|
||||
} |
||||
|
||||
public void sendAckToWorker(TaskEvent taskEvent) { |
||||
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = |
||||
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); |
||||
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public TaskEventType getHandleEventType() { |
||||
return TaskEventType.RESULT; |
||||
} |
||||
} |
@ -0,0 +1,118 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.StateEventType; |
||||
import org.apache.dolphinscheduler.common.enums.TaskEventType; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; |
||||
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; |
||||
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
||||
import org.apache.dolphinscheduler.service.process.ProcessService; |
||||
|
||||
import java.util.Optional; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class TaskRunningEventHandler implements TaskEventHandler { |
||||
private final Logger logger = LoggerFactory.getLogger(TaskRunningEventHandler.class); |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Autowired |
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool; |
||||
|
||||
@Autowired |
||||
private ProcessService processService; |
||||
|
||||
@Override |
||||
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { |
||||
int taskInstanceId = taskEvent.getTaskInstanceId(); |
||||
int processInstanceId = taskEvent.getProcessInstanceId(); |
||||
|
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); |
||||
if (workflowExecuteRunnable == null) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task running event error, cannot find related workflow instance from cache, will discard this event"); |
||||
} |
||||
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId); |
||||
if (!taskInstanceOptional.isPresent()) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle running event error, cannot find the taskInstance from cache, will discord this event"); |
||||
} |
||||
TaskInstance taskInstance = taskInstanceOptional.get(); |
||||
if (taskInstance.getState().typeIsFinished()) { |
||||
sendAckToWorker(taskEvent); |
||||
throw new TaskEventHandleError( |
||||
"Handle task running event error, this task instance is already finished, this event is delay, will discard this event"); |
||||
} |
||||
|
||||
TaskInstance oldTaskInstance = new TaskInstance(); |
||||
TaskInstanceUtils.copyTaskInstance(taskInstance, oldTaskInstance); |
||||
try { |
||||
taskInstance.setState(taskEvent.getState()); |
||||
taskInstance.setStartTime(taskEvent.getStartTime()); |
||||
taskInstance.setHost(taskEvent.getWorkerAddress()); |
||||
taskInstance.setLogPath(taskEvent.getLogPath()); |
||||
taskInstance.setExecutePath(taskEvent.getExecutePath()); |
||||
taskInstance.setPid(taskEvent.getProcessId()); |
||||
taskInstance.setAppLink(taskEvent.getAppIds()); |
||||
if (!processService.updateTaskInstance(taskInstance)) { |
||||
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed"); |
||||
} |
||||
sendAckToWorker(taskEvent); |
||||
} catch (Exception ex) { |
||||
TaskInstanceUtils.copyTaskInstance(oldTaskInstance, taskInstance); |
||||
if (ex instanceof TaskEventHandleError) { |
||||
throw ex; |
||||
} |
||||
throw new TaskEventHandleError("Handle task running event error, update taskInstance to db failed", ex); |
||||
} |
||||
|
||||
StateEvent stateEvent = new StateEvent(); |
||||
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId()); |
||||
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId()); |
||||
stateEvent.setExecutionStatus(taskEvent.getState()); |
||||
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); |
||||
workflowExecuteThreadPool.submitStateEvent(stateEvent); |
||||
} |
||||
|
||||
private void sendAckToWorker(TaskEvent taskEvent) { |
||||
// If event handle success, send ack to worker to otherwise the worker will retry this event
|
||||
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = |
||||
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId()); |
||||
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command()); |
||||
} |
||||
|
||||
@Override |
||||
public TaskEventType getHandleEventType() { |
||||
return TaskEventType.RUNNING; |
||||
} |
||||
} |
@ -0,0 +1,31 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Data; |
||||
|
||||
@Data |
||||
@AllArgsConstructor |
||||
public class WorkflowEvent { |
||||
|
||||
private WorkflowEventType workflowEventType; |
||||
|
||||
private int workflowInstanceId; |
||||
|
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public class WorkflowEventHandleError extends Exception { |
||||
|
||||
public WorkflowEventHandleError(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public WorkflowEventHandleError(String message, Throwable throwable) { |
||||
super(message, throwable); |
||||
} |
||||
} |
@ -0,0 +1,29 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public class WorkflowEventHandleException extends Exception { |
||||
|
||||
public WorkflowEventHandleException(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
public WorkflowEventHandleException(String message, Throwable throwable) { |
||||
super(message, throwable); |
||||
} |
||||
} |
@ -0,0 +1,31 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public interface WorkflowEventHandler { |
||||
|
||||
/** |
||||
* Handle a workflow event, |
||||
* |
||||
* @throws WorkflowEventHandleError if this exception happen, means the event is broken, need to drop this event. |
||||
* @throws WorkflowEventHandleException if this exception happen, means we need to retry this event. |
||||
*/ |
||||
void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError, WorkflowEventHandleException; |
||||
|
||||
WorkflowEventType getHandleWorkflowEventType(); |
||||
} |
@ -0,0 +1,48 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class WorkflowEventQueue { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(WorkflowEventQueue.class); |
||||
|
||||
private static final LinkedBlockingQueue<WorkflowEvent> workflowEventQueue = new LinkedBlockingQueue<>(); |
||||
|
||||
/** |
||||
* Add a workflow event. |
||||
*/ |
||||
public void addEvent(WorkflowEvent workflowEvent) { |
||||
workflowEventQueue.add(workflowEvent); |
||||
logger.info("Added workflow event to workflowEvent queue, event: {}", workflowEvent); |
||||
} |
||||
|
||||
/** |
||||
* Pool the head of the workflow event queue and wait an workflow event. |
||||
*/ |
||||
public WorkflowEvent poolEvent() throws InterruptedException { |
||||
return workflowEventQueue.take(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,25 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
public enum WorkflowEventType { |
||||
|
||||
START_WORKFLOW, |
||||
; |
||||
|
||||
} |
@ -0,0 +1,86 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.event; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; |
||||
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; |
||||
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; |
||||
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue; |
||||
|
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class WorkflowStartEventHandler implements WorkflowEventHandler { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(WorkflowStartEventHandler.class); |
||||
|
||||
@Autowired |
||||
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; |
||||
|
||||
@Autowired |
||||
private StateWheelExecuteThread stateWheelExecuteThread; |
||||
|
||||
@Autowired |
||||
private WorkflowExecuteThreadPool workflowExecuteThreadPool; |
||||
|
||||
@Autowired |
||||
private WorkflowEventQueue workflowEventQueue; |
||||
|
||||
@Override |
||||
public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError { |
||||
logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent); |
||||
WorkflowExecuteRunnable workflowExecuteRunnable = |
||||
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId()); |
||||
if (workflowExecuteRunnable == null) { |
||||
throw new WorkflowEventHandleError( |
||||
"The workflow start event is invalid, cannot find the workflow instance from cache"); |
||||
} |
||||
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); |
||||
|
||||
ProcessInstanceMetrics.incProcessInstanceSubmit(); |
||||
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture = |
||||
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool); |
||||
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> { |
||||
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) { |
||||
// submit failed will resend the event to workflow event queue
|
||||
logger.info("Success submit the workflow instance"); |
||||
if (processInstance.getTimeout() > 0) { |
||||
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); |
||||
} |
||||
} else { |
||||
logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", |
||||
workflowEvent); |
||||
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, |
||||
processInstance.getId())); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@Override |
||||
public WorkflowEventType getHandleWorkflowEventType() { |
||||
return WorkflowEventType.START_WORKFLOW; |
||||
} |
||||
} |
@ -0,0 +1,108 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.runner; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; |
||||
import org.apache.dolphinscheduler.common.thread.Stopper; |
||||
import org.apache.dolphinscheduler.common.thread.ThreadUtils; |
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleException; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue; |
||||
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import javax.annotation.PostConstruct; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class WorkflowEventLooper extends BaseDaemonThread { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(WorkflowEventLooper.class); |
||||
|
||||
@Autowired |
||||
private WorkflowEventQueue workflowEventQueue; |
||||
|
||||
@Autowired |
||||
private List<WorkflowEventHandler> workflowEventHandlerList; |
||||
|
||||
private final Map<WorkflowEventType, WorkflowEventHandler> workflowEventHandlerMap = new HashMap<>(); |
||||
|
||||
protected WorkflowEventLooper() { |
||||
super("WorkflowEventLooper"); |
||||
} |
||||
|
||||
@PostConstruct |
||||
public void init() { |
||||
workflowEventHandlerList.forEach(workflowEventHandler -> workflowEventHandlerMap.put(workflowEventHandler.getHandleWorkflowEventType(), |
||||
workflowEventHandler)); |
||||
} |
||||
|
||||
@Override |
||||
public synchronized void start() { |
||||
logger.info("WorkflowEventLooper thread starting"); |
||||
super.start(); |
||||
logger.info("WorkflowEventLooper thread started"); |
||||
} |
||||
|
||||
public void run() { |
||||
WorkflowEvent workflowEvent = null; |
||||
while (Stopper.isRunning()) { |
||||
try { |
||||
workflowEvent = workflowEventQueue.poolEvent(); |
||||
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId()); |
||||
logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent); |
||||
WorkflowEventHandler workflowEventHandler = |
||||
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType()); |
||||
workflowEventHandler.handleWorkflowEvent(workflowEvent); |
||||
} catch (InterruptedException e) { |
||||
logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e); |
||||
Thread.currentThread().interrupt(); |
||||
break; |
||||
} catch (WorkflowEventHandleException workflowEventHandleException) { |
||||
logger.error("Handle workflow event failed, will add this event to event queue again, event: {}", |
||||
workflowEvent, workflowEventHandleException); |
||||
workflowEventQueue.addEvent(workflowEvent); |
||||
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} catch (WorkflowEventHandleError workflowEventHandleError) { |
||||
logger.error("Handle workflow event error, will drop this event, event: {}", |
||||
workflowEvent, |
||||
workflowEventHandleError); |
||||
} catch (Exception unknownException) { |
||||
logger.error( |
||||
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}", |
||||
workflowEvent, unknownException); |
||||
workflowEventQueue.addEvent(workflowEvent); |
||||
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); |
||||
} finally { |
||||
LoggerUtils.removeWorkflowInstanceIdMDC(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue