Browse Source

Task instance failure when worker group doesn't exist (#13448)

dependabot/maven/dolphinscheduler-bom/org.yaml-snakeyaml-2.0
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
ce34e21960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 126
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  3. 20
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java
  4. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java
  5. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java
  6. 14
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java
  7. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  8. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java
  9. 32
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  10. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java
  11. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  12. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java
  13. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  14. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
  15. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  16. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java
  17. 5
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

126
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -19,14 +19,17 @@ package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -35,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@ -47,6 +51,7 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -171,8 +176,17 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
consumerThreadPoolExecutor.submit(() -> {
try {
boolean dispatchResult = this.dispatchTask(taskPriority);
if (!dispatchResult) {
try {
this.dispatchTask(taskPriority);
} catch (WorkerGroupNotFoundException e) {
// If the worker group not found, will not try to dispatch again.
// The task instance will be failed
// todo:
addDispatchFailedEvent(taskPriority);
} catch (ExecuteException e) {
failedDispatchTasks.add(taskPriority);
} catch (Exception e) {
logger.error("Dispatch task error, meet an unknown exception", e);
failedDispatchTasks.add(taskPriority);
}
} finally {
@ -193,60 +207,50 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
* @param taskPriority taskPriority
* @return dispatch result, return true if dispatch success, return false if dispatch failed.
*/
protected boolean dispatchTask(TaskPriority taskPriority) {
protected void dispatchTask(TaskPriority taskPriority) throws ExecuteException {
TaskMetrics.incTaskDispatch();
boolean result = false;
try {
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
return true;
}
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
if (!taskInstanceOptional.isPresent()) {
logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
taskPriority);
// we return true, so that we will drop this task.
return true;
}
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext =
new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(),
taskInstance);
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
logger.info("Task {} is already finished, no need to dispatch, task instance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
}
// check task is cache execution, and decide whether to dispatch
if (checkIsCacheExecution(taskInstance, context)) {
return true;
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
return;
}
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
if (!taskInstanceOptional.isPresent()) {
logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
taskPriority);
// we return true, so that we will drop this task.
return;
}
TaskInstance taskInstance = taskInstanceOptional.get();
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
ExecutionContext executionContext = ExecutionContext.builder()
.taskInstance(taskInstance)
.workerGroup(context.getWorkerGroup())
.executorType(ExecutorType.WORKER)
.command(toCommand(context))
.build();
if (isTaskNeedToCheck(taskPriority)) {
if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
// when task finish, ignore this task, there is no need to dispatch anymore
logger.info("Task {} is already finished, no need to dispatch, task instance id: {}",
taskInstance.getName(), taskInstance.getId());
return;
}
}
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
}
} catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
// check task is cache execution, and decide whether to dispatch
if (checkIsCacheExecution(taskInstance, context)) {
return;
}
return result;
dispatcher.dispatch(executionContext);
logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
taskPriority.getTaskId(),
executionContext.getHost());
addDispatchEvent(context, executionContext);
}
/**
@ -258,6 +262,24 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
taskEventService.addEvent(taskEvent);
}
private void addDispatchFailedEvent(TaskPriority taskPriority) {
TaskExecutionContext taskExecutionContext = taskPriority.getTaskExecutionContext();
TaskEvent taskEvent = TaskEvent.builder()
.processInstanceId(taskPriority.getProcessInstanceId())
.taskInstanceId(taskPriority.getTaskId())
.state(TaskExecutionStatus.FAILURE)
.logPath(taskExecutionContext.getLogPath())
.executePath(taskExecutionContext.getExecutePath())
.appIds(taskExecutionContext.getAppIds())
.processId(taskExecutionContext.getProcessId())
.varPool(taskExecutionContext.getVarPool())
.startTime(DateUtils.timeStampToDate(taskExecutionContext.getStartTime()))
.endTime(new Date())
.event(TaskEventType.RESULT)
.build();
taskEventService.addEvent(taskEvent);
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
// todo: we didn't set the host here, since right now we didn't need to retry this message.
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -74,7 +74,7 @@ public class ExecutorDispatcher implements InitializingBean {
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
public void dispatch(final ExecutionContext context) throws ExecuteException {
// get executor manager
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
@ -86,13 +86,13 @@ public class ExecutorDispatcher implements InitializingBean {
if (StringUtils.isEmpty(host.getAddress())) {
logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
return false;
throw new ExecuteException("no suitable worker");
}
context.setHost(host);
executorManager.beforeExecute(context);
try {
// task execute
return executorManager.execute(context);
executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}

20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java

@ -24,12 +24,15 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* execution context
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ExecutionContext {
/**
@ -40,19 +43,16 @@ public class ExecutionContext {
/**
* command
*/
private final Command command;
private Command command;
private final TaskInstance taskInstance;
private TaskInstance taskInstance;
/**
* executor type : worker or client
*/
private final ExecutorType executorType;
private ExecutorType executorType;
/**
* worker group
*/
private final String workerGroup;
private String workerGroup;
public ExecutionContext(Command command, ExecutorType executorType, TaskInstance taskInstance) {
this(command, executorType, DEFAULT_WORKER_GROUP, taskInstance);

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/exceptions/WorkerGroupNotFoundException.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.exceptions;
public class WorkerGroupNotFoundException extends ExecuteException {
public WorkerGroupNotFoundException(String workerGroup) {
super("Cannot find worker group: " + workerGroup);
}
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java

@ -35,11 +35,12 @@ public interface ExecutorManager<T> {
/**
* execute task
*
* @param context context
* @return T
* @throws ExecuteException if error throws ExecuteException
*/
T execute(ExecutionContext context) throws ExecuteException;
void execute(ExecutionContext context) throws ExecuteException;
/**
* execute task directly without retry

14
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskRecallProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@ -92,7 +93,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
public void execute(ExecutionContext context) throws ExecuteException {
// all nodes
Set<String> allNodes = getAllNodes(context);
// fail nodes
@ -101,23 +102,22 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
Command command = context.getCommand();
// execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
for (int i = 0; i < allNodes.size(); i++) {
try {
doExecute(host, command);
success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be
// failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
return;
} catch (ExecuteException ex) {
logger.error("Execute command {} error", command, ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
if (CollectionUtils.isNotEmpty(remained)) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
@ -128,8 +128,6 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
}
}
}
return success;
}
@Override
@ -171,7 +169,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
* @param context context
* @return nodes
*/
private Set<String> getAllNodes(ExecutionContext context) {
private Set<String> getAllNodes(ExecutionContext context) throws WorkerGroupNotFoundException {
Set<String> nodes = Collections.emptySet();
/**
* executor type

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
@ -48,9 +49,10 @@ public abstract class CommonHostManager implements HostManager {
*
* @param context context
* @return host
* @throws WorkerGroupNotFoundException If the worker group not found
*/
@Override
public Host select(ExecutionContext context) {
public Host select(ExecutionContext context) throws WorkerGroupNotFoundException {
List<HostWorker> candidates = null;
String workerGroup = context.getWorkerGroup();
ExecutorType executorType = context.getExecutorType();
@ -72,7 +74,7 @@ public abstract class CommonHostManager implements HostManager {
protected abstract HostWorker select(Collection<HostWorker> nodes);
protected List<HostWorker> getWorkerCandidates(String workerGroup) {
protected List<HostWorker> getWorkerCandidates(String workerGroup) throws WorkerGroupNotFoundException {
List<HostWorker> hostWorkers = new ArrayList<>();
Set<String> nodes = serverNodeManager.getWorkerGroupNodes(workerGroup);
if (CollectionUtils.isNotEmpty(nodes)) {

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
/**
* host manager
@ -26,10 +27,12 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
public interface HostManager {
/**
* select host
* select host
*
* @param context context
* @return host
* @throws WorkerGroupNotFoundException If the worker group does exist
*/
Host select(ExecutionContext context);
Host select(ExecutionContext context) throws WorkerGroupNotFoundException;
}

32
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
@ -35,8 +36,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.PostConstruct;
@ -60,16 +60,15 @@ public class LowerWeightHostManager extends CommonHostManager {
*/
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
/**
* worker group host lock
*/
private Lock lock;
private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
@PostConstruct
public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
serverNodeManager.addWorkerInfoChangeListener(new WorkerWeightListener());
}
@ -78,9 +77,10 @@ public class LowerWeightHostManager extends CommonHostManager {
*
* @param context context
* @return host
* @throws WorkerGroupNotFoundException If the worker group not found
*/
@Override
public Host select(ExecutionContext context) {
public Host select(ExecutionContext context) throws WorkerGroupNotFoundException {
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if (CollectionUtils.isNotEmpty(workerHostWeights)) {
return selector.select(workerHostWeights).getHost();
@ -130,12 +130,12 @@ public class LowerWeightHostManager extends CommonHostManager {
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
workerGroupWriteLock.lock();
try {
workerHostWeightsMap.clear();
workerHostWeightsMap.putAll(workerHostWeights);
} finally {
lock.unlock();
workerGroupWriteLock.unlock();
}
}
}
@ -165,12 +165,16 @@ public class LowerWeightHostManager extends CommonHostManager {
heartBeat.getStartupTime()));
}
private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
lock.lock();
private Set<HostWeight> getWorkerHostWeights(String workerGroup) throws WorkerGroupNotFoundException {
workerGroupReadLock.lock();
try {
return workerHostWeightsMap.get(workerGroup);
Set<HostWeight> hostWeights = workerHostWeightsMap.get(workerGroup);
if (hostWeights == null) {
throw new WorkerGroupNotFoundException("Can not find worker group " + workerGroup);
}
return hostWeights;
} finally {
lock.unlock();
workerGroupReadLock.unlock();
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

@ -36,6 +36,8 @@ import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import io.netty.channel.Channel;
@Component
public class TaskResultEventHandler implements TaskEventHandler {
@ -113,13 +115,17 @@ public class TaskResultEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
Channel channel = taskEvent.getChannel();
if (channel == null) {
return;
}
// we didn't set the receiver address, since the ack doen's need to retry
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(true,
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
channel.writeAndFlush(taskExecuteAckMessage.convert2Command());
}
@Override

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -26,13 +26,16 @@ import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import io.netty.channel.Channel;
/**
* task event
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskEvent {
/**

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteRunnable.java

@ -18,10 +18,10 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleError;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandleException;
import org.apache.dolphinscheduler.server.master.event.TaskEventHandler;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -53,7 +53,7 @@ public class TaskExecuteRunnable implements Runnable {
// we handle the task event belongs to one task serial, so if the event comes in wrong order,
TaskEvent event = this.events.peek();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
LogUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
logger.info("Handle task event begin: {}", event);
taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
events.remove(event);
@ -71,7 +71,7 @@ public class TaskExecuteRunnable implements Runnable {
event, unknownException);
events.remove(event);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.commons.collections4.CollectionUtils;
@ -334,13 +335,16 @@ public class ServerNodeManager implements InitializingBean {
* @param workerGroup workerGroup
* @return worker nodes
*/
public Set<String> getWorkerGroupNodes(String workerGroup) {
public Set<String> getWorkerGroupNodes(String workerGroup) throws WorkerGroupNotFoundException {
workerGroupReadLock.lock();
try {
if (StringUtils.isEmpty(workerGroup)) {
workerGroup = Constants.DEFAULT_WORKER_GROUP;
}
Set<String> nodes = workerGroupNodes.get(workerGroup);
if (nodes == null) {
throw new WorkerGroupNotFoundException(String.format("WorkerGroup: %s is invalidated", workerGroup));
}
if (CollectionUtils.isEmpty(nodes)) {
return Collections.emptySet();
}

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java

@ -165,7 +165,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskExecutionContext.getWorkerGroup(), taskInstance);
Boolean dispatchSuccess = false;
try {
dispatchSuccess = dispatcher.dispatch(executionContext);
dispatcher.dispatch(executionContext);
dispatchSuccess = true;
} catch (ExecuteException e) {
logger.error("Master dispatch task to worker error, taskInstanceId: {}, worker: {}",
taskInstance.getId(),

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@ -310,7 +311,12 @@ public class TaskPriorityQueueConsumerTest {
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority);
boolean res = false;
try {
taskPriorityQueueConsumer.dispatchTask(taskPriority);
} catch (ExecuteException e) {
throw new RuntimeException(e);
}
Assertions.assertFalse(res);
}

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

@ -72,13 +72,12 @@ public class NettyExecutorManagerTest {
.create();
ExecutionContext executionContext = new ExecutionContext(toCommand(context), ExecutorType.WORKER, taskInstance);
executionContext.setHost(Host.of(NetUtils.getAddr(serverConfig.getListenPort())));
Boolean execute = nettyExecutorManager.execute(executionContext);
Assertions.assertTrue(execute);
Assertions.assertDoesNotThrow(() -> nettyExecutorManager.execute(executionContext));
nettyRemotingServer.close();
}
@Test
public void testExecuteWithException() throws ExecuteException {
public void testExecuteWithException() {
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProcessDefinition processDefinition = Mockito.mock(ProcessDefinition.class);
ProcessInstance processInstance = new ProcessInstance();

5
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutionContextTestUtils;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import java.util.Optional;
@ -49,7 +50,7 @@ public class RoundRobinHostManagerTest {
RoundRobinHostManager roundRobinHostManager;
@Test
public void testSelectWithEmptyResult() {
public void testSelectWithEmptyResult() throws WorkerGroupNotFoundException {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(null);
ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000);
Host emptyHost = roundRobinHostManager.select(context);
@ -57,7 +58,7 @@ public class RoundRobinHostManagerTest {
}
@Test
public void testSelectWithResult() {
public void testSelectWithResult() throws WorkerGroupNotFoundException {
Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22"));
Mockito.when(serverNodeManager.getWorkerNodeInfo("192.168.1.1:22"))
.thenReturn(Optional.of(new WorkerHeartBeat()));

Loading…
Cancel
Save