Browse Source

[Improvement-#3735] Make task delayed execution more efficient (#4812)

* [Improvement-3735] improve implementation of delay task execution

* [Improvement][worker] delay task compatible with dev branch and fix test


Co-authored-by: vanilla111 <1115690319@qq.com>
pull/3/MERGE
guohaozhang 4 years ago committed by GitHub
parent
commit
ffcb1c22e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  2. 44
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  3. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  4. 46
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  5. 143
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  6. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  7. 195
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  8. 187
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
  9. 2
      pom.xml

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -84,6 +85,9 @@ public class WorkerServer {
@Autowired @Autowired
private RetryReportTaskStatusThread retryReportTaskStatusThread; private RetryReportTaskStatusThread retryReportTaskStatusThread;
@Autowired
private WorkerManagerThread workerManagerThread;
/** /**
* worker server startup * worker server startup
* *
@ -119,6 +123,9 @@ public class WorkerServer {
// worker registry // worker registry
this.workerRegistry.registry(); this.workerRegistry.registry();
// task execute manager
this.workerManagerThread.start();
// retry report task status // retry report task status
this.retryReportTaskStatusThread.start(); this.retryReportTaskStatusThread.start();

44
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date; import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -57,11 +57,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
* thread executor service
*/
private final ExecutorService workerExecService;
/** /**
* worker config * worker config
*/ */
@ -80,13 +75,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/** /**
* taskExecutionContextCacheManager * taskExecutionContextCacheManager
*/ */
private TaskExecutionContextCacheManager taskExecutionContextCacheManager; private final TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/*
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskExecuteProcessor() { public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
} }
/** /**
@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
} }
public TaskExecuteProcessor(AlertClientService alertClientService) { public TaskExecuteProcessor(AlertClientService alertClientService) {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.alertClientService = alertClientService; this.alertClientService = alertClientService;
} }
@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getTaskInstanceId())); taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext); String execLocalPath = getExecLocalPath(taskExecutionContext);
@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque())); new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
} else {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setStartTime(new Date());
}
this.doAck(taskExecutionContext); this.doAck(taskExecutionContext);
// submit task // submit task to manager
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService)); if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
}
} }
private void doAck(TaskExecutionContext taskExecutionContext) { private void doAck(TaskExecutionContext taskExecutionContext) {
@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/** /**
* build ack command * build ack command
*
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand * @return TaskExecuteAckCommand
*/ */

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/ */
private TaskExecutionContextCacheManager taskExecutionContextCacheManager; private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/*
* task execute manager
*/
private final WorkerManagerThread workerManager;
public TaskKillProcessor() { public TaskKillProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
} }
/** /**
@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId(); Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) { if (processId.equals(0)) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds); return Pair.of(true, appIds);

46
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
@ -51,7 +50,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException;
/** /**
* task scheduler thread * task scheduler thread
*/ */
public class TaskExecuteThread implements Runnable { public class TaskExecuteThread implements Runnable, Delayed {
/** /**
* logger * logger
@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable {
// task node // task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
delayExecutionIfNeeded();
if (taskExecutionContext.getStartTime() == null) { if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setStartTime(new Date());
} }
@ -289,24 +289,6 @@ public class TaskExecuteThread implements Runnable {
} }
} }
/**
* delay execution if needed.
*/
private void delayExecutionIfNeeded() {
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime);
if (remainTime > 0) {
try {
Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}
}
/** /**
* send an ack to change the status of the task. * send an ack to change the status of the task.
*/ */
@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable {
} }
return ackCommand; return ackCommand;
} }
/**
* get current TaskExecutionContext
* @return TaskExecutionContext
*/
public TaskExecutionContext getTaskExecutionContext() {
return this.taskExecutionContext;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
} }

143
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* Manage tasks
*/
@Component
public class WorkerManagerThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
/**
* task queue
*/
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
/**
* worker config
*/
private final WorkerConfig workerConfig;
/**
* thread executor service
*/
private final ExecutorService workerExecService;
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
/**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public WorkerManagerThread() {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
}
/**
* get queue size
*
* @return queue size
*/
public int getQueueSize() {
return workerExecuteQueue.size();
}
/**
* Kill tasks that have not been executed, like delay task
* then send Response to Master, update the execution status of task instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
workerExecuteQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
.forEach(workerExecuteQueue::remove);
sendTaskKillResponse(taskInstanceId);
}
/**
* kill task before execute , like delay task
*/
private void sendTaskKillResponse(Integer taskInstanceId) {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
return;
}
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
responseCommand.setStatus(ExecutionStatus.KILL.getCode());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
/**
* submit task
*
* @param taskExecuteThread taskExecuteThread
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
return workerExecuteQueue.offer(taskExecuteThread);
}
public void start() {
Thread thread = new Thread(this, this.getClass().getName());
thread.start();
}
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
taskExecuteThread = workerExecuteQueue.take();
workerExecService.submit(taskExecuteThread);
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);
}
}
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
@ -79,7 +80,8 @@ import io.netty.channel.Channel;
TaskResponseProcessor.class, TaskResponseProcessor.class,
TaskExecuteProcessor.class, TaskExecuteProcessor.class,
CuratorZookeeperClient.class, CuratorZookeeperClient.class,
TaskExecutionContextCacheManagerImpl.class}) TaskExecutionContextCacheManagerImpl.class,
WorkerManagerThread.class})
public class TaskCallbackServiceTest { public class TaskCallbackServiceTest {
@Autowired @Autowired

195
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
public class TaskExecuteProcessorTest {
private TaskExecutionContext taskExecutionContext;
private TaskCallbackService taskCallbackService;
private ExecutorService workerExecService;
private WorkerConfig workerConfig;
private Command command;
private Command ackCommand;
private TaskExecuteRequestCommand taskRequestCommand;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private AlertClientService alertClientService;
private WorkerManagerThread workerManager;
@Before
public void before() throws Exception {
// init task execution context
taskExecutionContext = getTaskExecutionContext();
workerConfig = new WorkerConfig();
workerConfig.setWorkerExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteAckCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand();
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
.thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(null);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
workerManager = PowerMockito.mock(WorkerManagerThread.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
.thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()))
.thenReturn(workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
.thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
.thenReturn(simpleTaskExecuteThread);
}
@Test
public void testNormalExecution() {
TaskExecuteProcessor processor = new TaskExecuteProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
TaskExecuteProcessor processor = new TaskExecuteProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
public TaskExecutionContext getTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskType("sql");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
return taskExecutionContext;
}
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
}
@Override
public void run() {
//
}
}
}

187
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* test worker manager thread.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({
Stopper.class,
TaskManager.class,
JSONUtils.class,
CommonUtils.class,
SpringApplicationContext.class,
OSUtils.class})
public class WorkerManagerThreadTest {
private TaskCallbackService taskCallbackService;
private WorkerManagerThread workerManager;
private TaskExecutionContext taskExecutionContext;
private AlertClientService alertClientService;
private Logger taskLogger;
@Before
public void before() {
// init task execution context, logger
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setProcessId(12345);
taskExecutionContext.setProcessDefineId(1);
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTenantCode("test");
taskExecutionContext.setTaskType("");
taskExecutionContext.setFirstSubmitTime(new Date());
taskExecutionContext.setDelayTime(0);
taskExecutionContext.setLogPath("/tmp/test.log");
taskExecutionContext.setHost("localhost");
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
Command ackCommand = new TaskExecuteAckCommand().convert2Command();
Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command();
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()
));
TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class);
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
.thenReturn(taskCallbackService);
PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5);
workerManager = new WorkerManagerThread();
PowerMockito.mockStatic(TaskManager.class);
PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
.thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class))
.thenReturn(new TaskNode());
PowerMockito.mockStatic(CommonUtils.class);
PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
List<String> osUserList = Collections.singletonList("test");
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
PowerMockito.mockStatic(Stopper.class);
PowerMockito.when(Stopper.isRunning()).thenReturn(true, false);
}
@Test
public void testSendTaskKillResponse() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
workerManager.offer(taskExecuteThread);
Assert.assertEquals(1, workerManager.getQueueSize());
workerManager.killTaskBeforeExecuteByInstanceId(1);
Assert.assertEquals(0, workerManager.getQueueSize());
}
@Test
public void testRun() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
workerManager.offer(taskExecuteThread);
Assert.assertEquals(1, workerManager.getQueueSize());
workerManager.run();
Assert.assertEquals(0, workerManager.getQueueSize());
}
private static class SimpleTask extends AbstractTask {
protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
// pid
this.processId = taskExecutionContext.getProcessId();
}
@Override
public AbstractParameters getParameters() {
return null;
}
@Override
public void init() {
}
@Override
public void handle() {
}
@Override
public void after() {
}
@Override
public ExecutionStatus getExitStatus() {
return ExecutionStatus.SUCCESS;
}
}
}

2
pom.xml

@ -913,6 +913,7 @@
<include>**/server/utils/ProcessUtilsTest.java</include> <include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include> <include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include> <include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
<include>**/server/worker/processor/TaskExecuteProcessorTest.java</include>
<include>**/server/worker/registry/WorkerRegistryTest.java</include> <include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include> <include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include> <include>**/server/worker/sql/SqlExecutorTest.java</include>
@ -926,6 +927,7 @@
<include>**/server/worker/task/TaskManagerTest.java</include> <include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include> <include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include> <include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include> <include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>

Loading…
Cancel
Save