Browse Source

Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe (#10479)

* Fix TaskProcessorFactory#getTaskProcessor get common processor is not thread safe
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
ad2646ff1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  2. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  3. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  5. 106
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
  6. 22
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java
  7. 66
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  8. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  9. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  10. 23
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  11. 4
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java
  12. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
  13. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  14. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  15. 35
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  16. 9
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
  17. 4
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  18. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  19. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  20. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  21. 28
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

29
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -17,14 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXES_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.Constants.UTF_8;
import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* file utils
@ -112,7 +123,15 @@ public class FileUtils {
File execLocalPathFile = new File(execLocalPath);
if (execLocalPathFile.exists()) {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
try {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
} catch (Exception ex) {
if (ex instanceof NoSuchFileException || ex.getCause() instanceof NoSuchFileException) {
// this file is already be deleted.
} else {
throw ex;
}
}
}
//create work dir

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -265,13 +265,11 @@ public class ProcessInstance {
*/
public ProcessInstance(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition;
this.name = processDefinition.getName()
+ "-"
+
processDefinition.getVersion()
+ "-"
+
DateUtils.getCurrentTimeStamp();
// todo: the name is not unique
this.name = String.join("-",
processDefinition.getName(),
String.valueOf(processDefinition.getVersion()),
DateUtils.getCurrentTimeStamp());
}
public String getVarPool() {

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -190,16 +190,18 @@ public class TaskPriorityQueueConsumer extends Thread {
return true;
}
}
result = dispatcher.dispatch(executionContext);
if (result) {
logger.info("Master success dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
addDispatchEvent(context, executionContext);
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
}
} catch (RuntimeException e) {
logger.error("dispatch error: ", e);
logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage());
logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;

106
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

@ -27,10 +27,12 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
import io.netty.channel.Channel;
import lombok.Data;
/**
* task event
*/
@Data
public class TaskEvent {
/**
@ -144,108 +146,4 @@ public class TaskEvent {
event.setEvent(Event.WORKER_REJECT);
return event;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getWorkerAddress() {
return workerAddress;
}
public void setWorkerAddress(String workerAddress) {
this.workerAddress = workerAddress;
}
public ExecutionStatus getState() {
return state;
}
public void setState(ExecutionStatus state) {
this.state = state;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public Event getEvent() {
return event;
}
public void setEvent(Event event) {
this.event = event;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
}

22
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskExecuteThreadPool.java

@ -79,7 +79,7 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
public void submitTaskEvent(TaskEvent taskEvent) {
if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
logger.warn("workflowExecuteThread is null, event: {}", taskEvent);
logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
return;
}
if (!taskExecuteThreadMap.containsKey(taskEvent.getProcessInstanceId())) {
@ -114,20 +114,24 @@ public class TaskExecuteThreadPool extends ThreadPoolTaskExecutor {
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error("handle event {} failed: {}", taskExecuteThread.getProcessInstanceId(), ex);
if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.error("persist event failed processInstanceId: {}", processInstanceId, ex);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
logger.info("persist events {} succeeded.", taskExecuteThread.getProcessInstanceId());
if (!processInstanceExecCacheManager.contains(taskExecuteThread.getProcessInstanceId())) {
taskExecuteThreadMap.remove(taskExecuteThread.getProcessInstanceId());
logger.info("remove process instance: {}", taskExecuteThread.getProcessInstanceId());
Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
logger.info("persist events succeeded, processInstanceId: {}", processInstanceId);
if (!processInstanceExecCacheManager.contains(processInstanceId)) {
taskExecuteThreadMap.remove(processInstanceId);
logger.info("Cannot find processInstance from cacheManager, remove process instance from threadMap: {}",
processInstanceId);
}
multiThreadFilterMap.remove(taskExecuteThread.getKey());
}

66
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -17,10 +17,19 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@ -69,8 +78,10 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.corn.CronUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
@ -88,18 +99,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_BLOCKING;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
/**
* Workflow execute task, used to execute a workflow instance.
@ -1001,7 +1004,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param taskInstance task instance
* @return TaskInstance
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
try {
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@ -1019,7 +1022,7 @@ public class WorkflowExecuteRunnable implements Runnable {
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
processInstance.getId(), processInstance.getName(),
taskInstance.getId(), taskInstance.getName());
return null;
return Optional.empty();
}
// in a dag, only one taskInstance is valid per taskCode, so need to set the old taskInstance invalid
@ -1058,10 +1061,10 @@ public class WorkflowExecuteRunnable implements Runnable {
taskStateChangeEvent.setType(StateEventType.TASK_STATE_CHANGE);
this.stateEvents.add(taskStateChangeEvent);
}
return taskInstance;
return Optional.of(taskInstance);
} catch (Exception e) {
logger.error("submit standby task error", e);
return null;
return Optional.empty();
}
}
@ -1360,6 +1363,7 @@ public class WorkflowExecuteRunnable implements Runnable {
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskQueue.contains(task)) {
logger.warn("Task is already at submit queue, taskInstanceId: {}", task.getId());
continue;
}
@ -1764,9 +1768,6 @@ public class WorkflowExecuteRunnable implements Runnable {
* @param taskInstance task instance
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list, id: {} name:{}",
taskInstance.getId(),
taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
@ -1859,14 +1860,14 @@ public class WorkflowExecuteRunnable implements Runnable {
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
TaskInstance taskInstance = submitTaskExec(task);
if (taskInstance == null) {
Optional<TaskInstance> taskInstanceOptional = submitTaskExec(task);
if (!taskInstanceOptional.isPresent()) {
this.taskFailedSubmit = true;
// Remove and add to complete map and error map
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
errorTaskMap.put(task.getTaskCode(), task.getId());
logger.error("process {}, task {}, code:{} submit task failed.", task.getProcessInstanceId(), task.getName(), task.getTaskCode());
logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
} else {
removeTaskFromStandbyList(task);
}
@ -1874,11 +1875,11 @@ public class WorkflowExecuteRunnable implements Runnable {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTaskMap.put(task.getTaskCode(), task.getId());
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
logger.info("Task dependent result is failed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
} else if (DependResult.NON_EXEC == dependResult) {
// for some reasons(depend task pause/stop) this task would not be submit
removeTaskFromStandbyList(task);
logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
logger.info("Remove task due to depend result not executed, taskInstanceId:{} depend result : {}", task.getId(), dependResult);
}
}
} catch (Exception e) {
@ -2067,6 +2068,11 @@ public class WorkflowExecuteRunnable implements Runnable {
}
private void measureTaskState(StateEvent taskStateEvent) {
if (taskStateEvent == null || taskStateEvent.getExecutionStatus() == null) {
// the event is broken
logger.warn("The task event is broken..., taskEvent: {}", taskStateEvent);
return;
}
if (taskStateEvent.getExecutionStatus().typeIsFinished()) {
TaskMetrics.incTaskFinish();
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
/**
* Handle the events belong to the given workflow.
*/
public void executeEvent(WorkflowExecuteRunnable workflowExecuteThread) {
public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -119,7 +119,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
logger.info("submit task, but the status of the task {} is already running or delayed.", taskInstance.getName());
return true;
}
logger.debug("task ready to submit: {}", taskInstance.getName());
logger.info("task ready to submit: taskInstanceId: {}", taskInstance.getId());
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
@ -134,7 +134,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
logger.info("master submit success, task : {}", taskInstance.getName());
logger.info("Master submit task to priority queue success, taskInstanceId : {}", taskInstance.getId());
return true;
} catch (Exception e) {
logger.error("submit task error", e);

23
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -21,8 +21,9 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
@ -36,27 +37,31 @@ public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
public static final Map<String, ITaskProcessor> PROCESS_MAP = new ConcurrentHashMap<>();
public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static {
for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) {
PROCESS_MAP.put(iTaskProcessor.getType(), iTaskProcessor);
try {
PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("The task processor should has a no args constructor");
}
}
}
public static ITaskProcessor getTaskProcessor(String type) throws InstantiationException, IllegalAccessException {
public static ITaskProcessor getTaskProcessor(String type) throws InvocationTargetException, InstantiationException, IllegalAccessException {
if (StringUtils.isEmpty(type)) {
type = DEFAULT_PROCESSOR;
}
ITaskProcessor iTaskProcessor = PROCESS_MAP.get(type);
if (Objects.isNull(iTaskProcessor)) {
logger.warn("task processor not found for type: {}", type);
return PROCESS_MAP.get(DEFAULT_PROCESSOR);
Constructor<ITaskProcessor> iTaskProcessorConstructor = PROCESS_MAP.get(type);
if (iTaskProcessorConstructor == null) {
logger.warn("ITaskProcessor could not found for taskType: {}", type);
iTaskProcessorConstructor = PROCESS_MAP.get(DEFAULT_PROCESSOR);
}
return iTaskProcessor.getClass().newInstance();
return iTaskProcessorConstructor.newInstance();
}
/**

4
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.lang.reflect.InvocationTargetException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -27,7 +29,7 @@ import org.junit.Test;
public class TaskProcessorFactoryTest {
@Test
public void testFactory() throws InstantiationException, IllegalAccessException {
public void testFactory() throws InvocationTargetException, InstantiationException, IllegalAccessException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("shell");

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java

@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
@ -28,6 +31,8 @@ import io.netty.channel.Channel;
*/
public class ChannelUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelUtils.class);
private ChannelUtils() {
throw new IllegalStateException(ChannelUtils.class.getName());
}
@ -49,7 +54,7 @@ public class ChannelUtils {
* @return remote address
*/
public static String getRemoteAddress(Channel channel) {
return NetUtils.getHost(((InetSocketAddress) channel.remoteAddress()).getAddress());
return toAddress(channel).getAddress();
}
/**
@ -60,6 +65,11 @@ public class ChannelUtils {
*/
public static Host toAddress(Channel channel) {
InetSocketAddress socketAddress = ((InetSocketAddress) channel.remoteAddress());
if (socketAddress == null) {
// the remote channel already closed
LOGGER.warn("The channel is already closed");
return Host.EMPTY;
}
return new Host(NetUtils.getHost(socketAddress.getAddress()), socketAddress.getPort());
}

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -27,6 +27,8 @@ import java.util.Objects;
*/
public class Host implements Serializable {
public static final Host EMPTY = new Host();
/**
* address
*/

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -766,7 +766,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
processInstance.setRecovery(Flag.NO);
processInstance.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime());
// the new process instance restart time is null.
processInstance.setRestartTime(null);
processInstance.setRunTimes(1);
processInstance.setMaxTryTimes(0);
processInstance.setCommandParam(command.getCommandParam());
@ -1285,7 +1286,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("start submit task : {}, instance id:{}, state: {}",
logger.info("start submit task : {}, processInstance id:{}, state: {}",
taskInstance.getName(), taskInstance.getProcessInstanceId(), processInstance.getState());
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);

35
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -21,11 +21,13 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* Task instances priority queue implementation
@ -40,12 +42,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/**
* queue
*/
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* Lock used for all public operations
*/
private final ReentrantLock lock = new ReentrantLock(true);
private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
private final Set<Integer> taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>());
/**
* put task instance to priority queue
@ -56,6 +54,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
@Override
public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
queue.add(taskInstance);
taskInstanceIdSet.add(taskInstance.getId());
}
/**
@ -66,7 +65,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
return queue.poll();
TaskInstance taskInstance = queue.poll();
if (taskInstance != null) {
taskInstanceIdSet.remove(taskInstance.getId());
}
return taskInstance;
}
/**
@ -111,6 +114,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
public void clear() {
queue.clear();
taskInstanceIdSet.clear();
}
/**
@ -120,20 +124,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
return this.contains(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
return this.contains(taskInstance.getId());
}
public boolean contains(long taskCode, int taskVersion) {
Iterator<TaskInstance> iterator = this.queue.iterator();
while (iterator.hasNext()) {
TaskInstance taskInstance = iterator.next();
if (taskCode == taskInstance.getTaskCode()
&& taskVersion == taskInstance.getTaskDefinitionVersion()) {
return true;
}
}
return false;
public boolean contains(int taskInstanceId) {
return taskInstanceIdSet.contains(taskInstanceId);
}
/**

9
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -46,14 +46,11 @@ public class PeerTaskInstancePriorityQueueTest {
Assert.assertTrue(queue.size() < peekBeforeLength);
}
@Test
@Test(expected = TaskPriorityQueueException.class)
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
try {
queue.poll(1000, TimeUnit.MILLISECONDS);
} catch (TaskPriorityQueueException e) {
e.printStackTrace();
}
queue.poll(1000, TimeUnit.MILLISECONDS);
}
@Test

4
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -109,7 +109,7 @@ master:
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
exec-threads: 10
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
@ -134,7 +134,7 @@ worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
exec-threads: 100
exec-threads: 10
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100

7
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -34,6 +34,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@ -147,7 +148,11 @@ public class ShellTask extends AbstractTaskExecutor {
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
Files.createFile(path, attr);
try {
Files.createFile(path, attr);
} catch (FileAlreadyExistsException ex) {
// this is expected
}
}
Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -21,9 +21,9 @@ import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.context.annotation.Configuration;
@Component
@Configuration
@EnableConfigurationProperties
@ConfigurationProperties("worker")
public class WorkerConfig {

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -163,6 +163,8 @@ public class TaskCallbackService {
}
}
});
} else {
logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
}
}

28
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -19,12 +19,15 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -39,17 +42,26 @@ import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
/**
* task scheduler thread
*/
@ -232,7 +244,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
logger.error("delete exec dir failed : {}", e.getMessage(), e);
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete exec dir failed.", e);
}
}
}
}
@ -263,7 +279,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("Kill task failed", e);
}
}
}

Loading…
Cancel
Save