Browse Source

[FIX-#4172][server-worker] kill task NPE (#4182)

* [FIX-#4172][server-worker] kill task NPE

The cache task will be sent when the Process is generated. Before that, if a kill task appears, then NPE will appear
Modification method: write into the cache when the task is received, and mark it as preData
If the task is killed before the Process is generated, delete the cache directly at this time
It will be judged before the process is generated. If the task has been killed, it will not be executed.
After the new process is created, write it into the cache, and judge again, if kill, then kill the process.

this closes #4172

* Delete the commented out code
Add spring beans

* code smell

* add test

* add test

* fix error

* test

* test

* revert

* fix error
pull/3/MERGE
Kirs 4 years ago committed by GitHub
parent
commit
a13e737eb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java
  3. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java
  4. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  5. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  6. 57
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  7. 58
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java
  8. 61
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  9. 117
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -38,7 +38,6 @@ public class TaskExecutionContext implements Serializable {
*/
private int taskInstanceId;
/**
* task name
*/

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java vendored

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
@ -42,7 +41,16 @@ public interface TaskExecutionContextCacheManager {
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
void removeByTaskInstanceId(Integer taskInstanceId);
/**
* If the value for the specified key is present and non-null,then perform the updateotherwise it will return false
*
* @param taskExecutionContext taskExecutionContext
* @return status
*/
boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext);
}

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java vendored

@ -19,11 +19,12 @@ package org.apache.dolphinscheduler.server.worker.cache.impl;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Service;
/**
* TaskExecutionContextCache
*/
@ -59,10 +60,17 @@ public class TaskExecutionContextCacheManagerImpl implements TaskExecutionContex
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
@Override
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskExecutionContextCache.remove(taskInstanceId);
}
@Override
public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) {
taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext);
return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId());
}
}

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -21,13 +21,11 @@ import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -36,6 +34,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -47,7 +47,6 @@ import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
@ -55,7 +54,7 @@ import io.netty.channel.Channel;
*/
public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
* thread executor service
@ -72,10 +71,27 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/
private final TaskCallbackService taskCallbackService;
/**
* taskExecutionContextCacheManager
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
/**
* Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache
*
* @param taskExecutionContext task
*/
private void setTaskCache(TaskExecutionContext taskExecutionContext) {
TaskExecutionContext preTaskCache = new TaskExecutionContext();
preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
}
@Override
@ -100,6 +116,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null");
return;
}
setTaskCache(taskExecutionContext);
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
@ -120,8 +137,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
} catch (Throwable ex) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex);
LoggerUtils.logError(Optional.of(logger), errorLog, ex);
LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
FileUtils.taskLoggerThreadLocal.remove();
@ -143,6 +161,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* build ack command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
@ -164,6 +183,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -99,19 +99,20 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* do kill
*
* @param killCommand
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
List<String> appIds = Collections.EMPTY_LIST;
List<String> appIds = Collections.emptyList();
try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)) {
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
return Pair.of(false, appIds);
if (processId.equals(0)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds);
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()));

57
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -14,36 +14,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS;
import org.slf4j.Logger;
/**
* abstract command executor
@ -139,8 +147,14 @@ public abstract class AbstractCommandExecutor {
CommandExecuteResult result = new CommandExecuteResult();
int taskInstanceId = taskExecutionContext.getTaskInstanceId();
// If the task has been killed, then the task in the cache is null
if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
@ -155,14 +169,18 @@ public abstract class AbstractCommandExecutor {
// parse process output
parseProcessOutput(process);
Integer processId = getProcessId(process);
result.setProcessId(processId);
// cache processId
taskExecutionContext.setProcessId(processId);
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
// print process id
logger.info("process start, process id is: {}", processId);
@ -173,7 +191,6 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskExecutionContext.getExecutePath(),
processId
@ -198,7 +215,6 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
return result;
}
@ -208,6 +224,7 @@ public abstract class AbstractCommandExecutor {
/**
* cancel application
*
* @throws Exception exception
*/
public void cancelApplication() throws Exception {
@ -238,6 +255,7 @@ public abstract class AbstractCommandExecutor {
/**
* soft kill
*
* @param processId process id
* @return process is alive
* @throws InterruptedException interrupted exception
@ -262,6 +280,7 @@ public abstract class AbstractCommandExecutor {
/**
* hard kill
*
* @param processId process id
*/
private void hardKill(int processId) {
@ -280,6 +299,7 @@ public abstract class AbstractCommandExecutor {
/**
* print command
*
* @param commands process builder
*/
private void printCommand(List<String> commands) {
@ -311,6 +331,7 @@ public abstract class AbstractCommandExecutor {
/**
* get the standard output of the process
*
* @param process process
*/
private void parseProcessOutput(Process process) {
@ -360,8 +381,8 @@ public abstract class AbstractCommandExecutor {
while (Stopper.isRunning()) {
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final state:{}", appId, applicationStatus.name());
if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
applicationStatus.equals(ExecutionStatus.KILL)) {
if (applicationStatus.equals(ExecutionStatus.FAILURE)
|| applicationStatus.equals(ExecutionStatus.KILL)) {
return false;
}
@ -408,6 +429,7 @@ public abstract class AbstractCommandExecutor {
/**
* convert file to list
*
* @param filename file name
* @return line list
*/
@ -443,6 +465,7 @@ public abstract class AbstractCommandExecutor {
/**
* find app id
*
* @param line line
* @return appid
*/
@ -454,7 +477,6 @@ public abstract class AbstractCommandExecutor {
return null;
}
/**
* get remain times
*
@ -532,7 +554,10 @@ public abstract class AbstractCommandExecutor {
protected List<String> commandOptions() {
return Collections.emptyList();
}
protected abstract String buildCommandFilePath();
protected abstract String commandInterpreter();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
}

58
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManagerTest.java vendored

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* TaskExecutionContextCacheManagerTest
*/
public class TaskExecutionContextCacheManagerTest {
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
private TaskExecutionContext taskExecutionContext;
@Before
public void before() {
taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
}
@Test
public void testGetByTaskInstanceId() {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(2);
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
Assert.assertEquals(2, taskExecutionContextCacheManager.getByTaskInstanceId(2).getTaskInstanceId());
}
@Test
public void updateTaskExecutionContext() {
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
Assert.assertTrue(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext));
taskExecutionContextCacheManager.removeByTaskInstanceId(1);
Assert.assertFalse(taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext));
}
}

61
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import java.util.Date;
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -37,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
@ -44,10 +44,12 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ -56,6 +58,7 @@ import io.netty.channel.Channel;
/**
* test task call back service
* todo refactor it in the form of mock
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {
@ -75,7 +78,8 @@ import io.netty.channel.Channel;
TaskAckProcessor.class,
TaskResponseProcessor.class,
TaskExecuteProcessor.class,
CuratorZookeeperClient.class})
CuratorZookeeperClient.class,
TaskExecutionContextCacheManagerImpl.class})
public class TaskCallbackServiceTest {
@Autowired
@ -95,6 +99,7 @@ public class TaskCallbackServiceTest {
/**
* send ack test
*
* @throws Exception
*/
@Test
@ -122,6 +127,7 @@ public class TaskCallbackServiceTest {
/**
* send result test
*
* @throws Exception
*/
@Test
@ -152,16 +158,9 @@ public class TaskCallbackServiceTest {
nettyRemotingClient.close();
}
// @Test(expected = IllegalArgumentException.class)
// public void testSendAckWithIllegalArgumentException(){
// TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class);
// taskCallbackService.sendAck(1, ackCommand.convert2Command());
// Stopper.stop();
// }
@Test
public void testPause() {
Assert.assertEquals(5000, taskCallbackService.pause(3));;
Assert.assertEquals(5000, taskCallbackService.pause(3));
}
@Test
@ -185,7 +184,7 @@ public class TaskCallbackServiceTest {
taskCallbackService.sendAck(1, ackCommand.convert2Command());
Assert.assertEquals(true, channel.isOpen());
Assert.assertTrue(channel.isOpen());
Stopper.stop();
@ -223,40 +222,4 @@ public class TaskCallbackServiceTest {
nettyRemotingClient.close();
}
// @Test(expected = IllegalStateException.class)
// public void testSendAckWithIllegalStateException2(){
// masterRegistry.registry();
// final NettyServerConfig serverConfig = new NettyServerConfig();
// serverConfig.setListenPort(30000);
// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
// nettyRemotingServer.start();
//
// final NettyClientConfig clientConfig = new NettyClientConfig();
// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
// channel.close();
// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
// ackCommand.setTaskInstanceId(1);
// ackCommand.setStartTime(new Date());
//
// nettyRemotingServer.close();
//
// taskCallbackService.sendAck(1, ackCommand.convert2Command());
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// Stopper.stop();
//
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
}

117
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* TaskKillProcessorTest
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskKillProcessor.class, OSUtils.class, ProcessUtils.class, LoggerUtils.class})
public class TaskKillProcessorTest {
private TaskKillProcessor taskKillProcessor;
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private Channel channel;
private Command command;
private TaskExecutionContext taskExecutionContext;
@Before
public void before() throws Exception {
TaskCallbackService taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
channel = PowerMockito.mock(Channel.class);
command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST);
TaskKillRequestCommand taskKillRequestCommand = new TaskKillRequestCommand();
taskKillRequestCommand.setTaskInstanceId(1);
command.setBody(JSONUtils.toJsonString(taskKillRequestCommand).getBytes());
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
LogClientService logClient = PowerMockito.mock(LogClientService.class);
NettyRemoteChannel nettyRemoteChannel = PowerMockito.mock(NettyRemoteChannel.class);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.mockStatic(ProcessUtils.class);
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any());
PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null);
PowerMockito.when(OSUtils.exeCmd(any())).thenReturn(null);
PowerMockito.when(ProcessUtils.getPidsStr(102)).thenReturn("123");
PowerMockito.whenNew(LogClientService.class).withAnyArguments().thenReturn(logClient);
PowerMockito.when(logClient.viewLog(any(), anyInt(), any())).thenReturn("test");
PowerMockito.when(LoggerUtils.getAppIds(any(), any())).thenReturn(Collections.singletonList("id"));
Command viewLogResponseCommand = new Command();
viewLogResponseCommand.setBody("success".getBytes());
taskKillProcessor = new TaskKillProcessor();
}
@Test
public void testProcess() {
PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext);
taskKillProcessor.process(channel, command);
taskExecutionContext.setProcessId(101);
taskExecutionContext.setHost("127.0.0.1:22");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setExecutePath("/path");
taskExecutionContext.setTenantCode("ten");
taskKillProcessor.process(channel, command);
}
}
Loading…
Cancel
Save