diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 6e05da90d5..00eab7876c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -68,6 +69,8 @@ public class MasterSchedulerService extends Thread { */ @Autowired private ProcessService processService; + @Autowired + private TaskProcessorFactory taskProcessorFactory; /** * zookeeper master client @@ -205,7 +208,8 @@ public class MasterSchedulerService extends Thread { , nettyExecutorManager , processAlertManager , masterConfig - , taskTimeoutCheckList); + , taskTimeoutCheckList + , taskProcessorFactory); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index fa047a082d..8a594093be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -130,6 +130,11 @@ public class WorkflowExecuteThread implements Runnable { */ private ProcessDefinition processDefinition; + /** + * task processor + */ + private TaskProcessorFactory taskProcessorFactory; + /** * the object of DAG */ @@ -216,13 +221,18 @@ public class WorkflowExecuteThread implements Runnable { * @param processInstance processInstance * @param processService processService * @param nettyExecutorManager nettyExecutorManager + * @param processAlertManager processAlertManager + * @param masterConfig masterConfig + * @param taskTimeoutCheckList taskTimeoutCheckList + * @param taskProcessorFactory taskProcessorFactory */ public WorkflowExecuteThread(ProcessInstance processInstance , ProcessService processService , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager , MasterConfig masterConfig - , ConcurrentHashMap taskTimeoutCheckList) { + , ConcurrentHashMap taskTimeoutCheckList + , TaskProcessorFactory taskProcessorFactory) { this.processService = processService; this.processInstance = processInstance; @@ -230,6 +240,7 @@ public class WorkflowExecuteThread implements Runnable { this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.taskTimeoutCheckList = taskTimeoutCheckList; + this.taskProcessorFactory = taskProcessorFactory; } @Override @@ -791,7 +802,7 @@ public class WorkflowExecuteThread implements Runnable { */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { try { - ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); + ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 07ae8124b8..b4951f449d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -40,7 +40,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.enums.ResourceType; @@ -62,6 +61,7 @@ import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import com.google.common.base.Enums; import com.google.common.base.Strings; @@ -80,7 +80,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessInstance processInstance; - protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + @Autowired + protected ProcessService processService; /** * pause task, common tasks donot need this. @@ -107,7 +108,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { public void run() { } - @Override public boolean action(TaskAction taskAction) { @@ -119,7 +119,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { case TIMEOUT: return timeout(); default: - logger.error("unknown task action: {}", taskAction.toString()); + logger.error("unknown task action: {}", taskAction); } return false; @@ -305,7 +305,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { } } - /** * set SQL task relation * diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java deleted file mode 100644 index 488465063e..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.Constants; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class CommonTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return Constants.COMMON_TASK_TYPE; - - } - - @Override - public ITaskProcessor create() { - return new CommonTaskProcessor(); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 60c7de7a79..d44315a6ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; @@ -40,20 +39,19 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * common task processor */ +@Service public class CommonTaskProcessor extends BaseTaskProcessor { @Autowired private TaskPriorityQueue taskUpdateQueue; @Autowired - MasterConfig masterConfig; - - @Autowired - NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); + NettyExecutorManager nettyExecutorManager; @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java deleted file mode 100644 index 3028c56535..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class ConditionTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return TaskType.CONDITIONS.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new ConditionTaskProcessor(); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 2412659513..c3c65b3fba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -26,14 +26,12 @@ import org.apache.dolphinscheduler.common.model.DependentItem; import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.utils.DependentUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -41,11 +39,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * condition task processor */ +@Service public class ConditionTaskProcessor extends BaseTaskProcessor { /** @@ -65,7 +65,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { */ private Map completeTaskList = new ConcurrentHashMap<>(); - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + @Autowired + private MasterConfig masterConfig; private TaskDefinition taskDefinition; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java deleted file mode 100644 index 3f885ed256..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class DependentTaskProcessFactory implements ITaskProcessFactory { - - @Override - public String type() { - return TaskType.DEPENDENT.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new DependentTaskProcessor(); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 0478f7ef80..28cd0e7ab7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -41,11 +40,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + import com.fasterxml.jackson.annotation.JsonFormat; /** * dependent task processor */ +@Service public class DependentTaskProcessor extends BaseTaskProcessor { private DependentParameters dependentParameters; @@ -72,7 +75,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor { ProcessInstance processInstance; TaskDefinition taskDefinition; - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + @Autowired + private MasterConfig masterConfig; boolean allDependentItemFinished; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java deleted file mode 100644 index ffbbafb4ba..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -public interface ITaskProcessFactory { - - String type(); - - ITaskProcessor create(); -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java deleted file mode 100644 index 439d8e1ee9..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class SubTaskProcessFactory implements ITaskProcessFactory { - @Override - public String type() { - return TaskType.SUB_PROCESS.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new SubTaskProcessor(); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 32cbce1919..10d1b28464 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -25,15 +25,18 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + /** * */ +@Service public class SubTaskProcessor extends BaseTaskProcessor { private ProcessInstance processInstance; @@ -46,7 +49,8 @@ public class SubTaskProcessor extends BaseTaskProcessor { */ private final Lock runLock = new ReentrantLock(); - private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); + @Autowired + private StateEventCallbackService stateEventCallbackService; @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java deleted file mode 100644 index d536e65bb8..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task; - -import org.apache.dolphinscheduler.common.enums.TaskType; - -import com.google.auto.service.AutoService; - -@AutoService(ITaskProcessFactory.class) -public class SwitchTaskProcessFactory implements ITaskProcessFactory { - - @Override - public String type() { - return TaskType.SWITCH.getDesc(); - } - - @Override - public ITaskProcessor create() { - return new SwitchTaskProcessor(); - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index b67dc47137..53786496a1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.lang.StringUtils; @@ -43,6 +42,10 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service public class SwitchTaskProcessor extends BaseTaskProcessor { protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; @@ -52,7 +55,8 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { private ProcessInstance processInstance; TaskDefinition taskDefinition; - MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + @Autowired + private MasterConfig masterConfig; /** * switch result diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 61a8ba52b4..09e8bf270e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -17,37 +17,43 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import org.apache.dolphinscheduler.common.Constants; +import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; import java.util.Map; -import java.util.ServiceLoader; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; -import com.google.common.base.Strings; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * the factory to create task processor */ +@Service public class TaskProcessorFactory { - public static final Map PROCESS_FACTORY_MAP = new ConcurrentHashMap<>(); + private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; - private static final String DEFAULT_PROCESSOR = Constants.COMMON_TASK_TYPE; + private Map taskProcessorMap; - static { - for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) { - PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor); - } + @Autowired + public TaskProcessorFactory(List taskProcessors) { + taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2)); } - public static ITaskProcessor getTaskProcessor(String type) { - if (Strings.isNullOrEmpty(type)) { - return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); + public ITaskProcessor getTaskProcessor(String key) { + if (StringUtils.isEmpty(key)) { + key = DEFAULT_PROCESSOR; } - if (!PROCESS_FACTORY_MAP.containsKey(type)) { - return PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR).create(); + ITaskProcessor taskProcessor = taskProcessorMap.get(key); + if (Objects.isNull(taskProcessor)) { + taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR); } - return PROCESS_FACTORY_MAP.get(type).create(); - } + return taskProcessor; + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index fe9bddc01d..48c1f84a13 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.process.ProcessService; import java.lang.reflect.Field; @@ -81,9 +82,12 @@ public class WorkflowExecuteThreadTest { private ApplicationContext applicationContext; + private TaskProcessorFactory taskProcessorFactory; + @Before public void init() throws Exception { processService = mock(ProcessService.class); + taskProcessorFactory = mock(TaskProcessorFactory.class); applicationContext = mock(ApplicationContext.class); config = new MasterConfig(); @@ -104,7 +108,7 @@ public class WorkflowExecuteThreadTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList)); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskProcessorFactory)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java index a49719a7f6..2dd349cfc2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java @@ -26,13 +26,14 @@ import org.junit.Test; @Ignore public class TaskProcessorFactoryTest { + private TaskProcessorFactory taskProcessorFactory; @Test public void testFactory() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("shell"); - ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); + ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); Assert.assertNotNull(iTaskProcessor); }