diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index f3cdb4bc1c..ff30f28401 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -64,12 +64,6 @@ public class MasterSchedulerService extends Thread { @Autowired private ProcessService processService; - /** - * task processor factory - */ - @Autowired - private TaskProcessorFactory taskProcessorFactory; - /** * master config */ @@ -176,8 +170,7 @@ public class MasterSchedulerService extends Thread { , nettyExecutorManager , processAlertManager , masterConfig - , stateWheelExecuteThread - , taskProcessorFactory); + , stateWheelExecuteThread); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index c20ddf791e..9c5eee7b5d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -133,11 +133,6 @@ public class WorkflowExecuteThread { */ private ProcessDefinition processDefinition; - /** - * task processor - */ - private TaskProcessorFactory taskProcessorFactory; - /** * the object of DAG */ @@ -227,22 +222,19 @@ public class WorkflowExecuteThread { * @param processAlertManager processAlertManager * @param masterConfig masterConfig * @param stateWheelExecuteThread stateWheelExecuteThread - * @param taskProcessorFactory taskProcessorFactory */ public WorkflowExecuteThread(ProcessInstance processInstance , ProcessService processService , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager , MasterConfig masterConfig - , StateWheelExecuteThread stateWheelExecuteThread - , TaskProcessorFactory taskProcessorFactory) { + , StateWheelExecuteThread stateWheelExecuteThread) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; - this.taskProcessorFactory = taskProcessorFactory; } /** @@ -805,7 +797,7 @@ public class WorkflowExecuteThread { */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { try { - ITaskProcessor taskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); + ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); if (taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index ceefa26d1b..0b3d96bfc5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.enums.ResourceType; @@ -61,7 +62,6 @@ import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import com.google.common.base.Enums; import com.google.common.base.Strings; @@ -80,8 +80,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessInstance processInstance; - @Autowired - protected ProcessService processService; + protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);; /** * pause task, common tasks donot need this. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java new file mode 100644 index 0000000000..488465063e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.Constants; + +import com.google.auto.service.AutoService; + +@AutoService(ITaskProcessFactory.class) +public class CommonTaskProcessFactory implements ITaskProcessFactory { + @Override + public String type() { + return Constants.COMMON_TASK_TYPE; + + } + + @Override + public ITaskProcessor create() { + return new CommonTaskProcessor(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index f7053ff884..f35127af6d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -38,20 +38,14 @@ import org.apache.commons.lang.StringUtils; import java.util.Date; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - /** * common task processor */ -@Service public class CommonTaskProcessor extends BaseTaskProcessor { - @Autowired private TaskPriorityQueue taskUpdateQueue; - @Autowired - NettyExecutorManager nettyExecutorManager; + private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval, boolean isTaskLogger) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java new file mode 100644 index 0000000000..3028c56535 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.enums.TaskType; + +import com.google.auto.service.AutoService; + +@AutoService(ITaskProcessFactory.class) +public class ConditionTaskProcessFactory implements ITaskProcessFactory { + @Override + public String type() { + return TaskType.CONDITIONS.getDesc(); + } + + @Override + public ITaskProcessor create() { + return new ConditionTaskProcessor(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index bf2390a347..594fa29c23 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -39,13 +40,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - /** * condition task processor */ -@Service public class ConditionTaskProcessor extends BaseTaskProcessor { /** @@ -65,8 +62,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { */ private Map completeTaskList = new ConcurrentHashMap<>(); - @Autowired - private MasterConfig masterConfig; + private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; private TaskDefinition taskDefinition; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java new file mode 100644 index 0000000000..3f885ed256 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.enums.TaskType; + +import com.google.auto.service.AutoService; + +@AutoService(ITaskProcessFactory.class) +public class DependentTaskProcessFactory implements ITaskProcessFactory { + + @Override + public String type() { + return TaskType.DEPENDENT.getDesc(); + } + + @Override + public ITaskProcessor create() { + return new DependentTaskProcessor(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 6d311ee008..922ecca3c5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; @@ -40,15 +41,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - import com.fasterxml.jackson.annotation.JsonFormat; /** * dependent task processor */ -@Service public class DependentTaskProcessor extends BaseTaskProcessor { private DependentParameters dependentParameters; @@ -75,8 +72,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { ProcessInstance processInstance; TaskDefinition taskDefinition; - @Autowired - private MasterConfig masterConfig; + private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; boolean allDependentItemFinished; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java new file mode 100644 index 0000000000..ffbbafb4ba --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +public interface ITaskProcessFactory { + + String type(); + + ITaskProcessor create(); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java new file mode 100644 index 0000000000..439d8e1ee9 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.enums.TaskType; + +import com.google.auto.service.AutoService; + +@AutoService(ITaskProcessFactory.class) +public class SubTaskProcessFactory implements ITaskProcessFactory { + @Override + public String type() { + return TaskType.SUB_PROCESS.getDesc(); + } + + @Override + public ITaskProcessor create() { + return new SubTaskProcessor(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 0f8ccbc354..ed470633ab 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -25,18 +25,15 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - /** - * + * subtask processor */ -@Service public class SubTaskProcessor extends BaseTaskProcessor { private ProcessInstance processInstance; @@ -49,8 +46,7 @@ public class SubTaskProcessor extends BaseTaskProcessor { */ private final Lock runLock = new ReentrantLock(); - @Autowired - private StateEventCallbackService stateEventCallbackService; + private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);; @Override public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval, boolean isTaskLogger) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java new file mode 100644 index 0000000000..d536e65bb8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task; + +import org.apache.dolphinscheduler.common.enums.TaskType; + +import com.google.auto.service.AutoService; + +@AutoService(ITaskProcessFactory.class) +public class SwitchTaskProcessFactory implements ITaskProcessFactory { + + @Override + public String type() { + return TaskType.SWITCH.getDesc(); + } + + @Override + public ITaskProcessor create() { + return new SwitchTaskProcessor(); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index a4259ce6b2..e162613f3d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -46,7 +47,9 @@ import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -@Service +/** + * switch task processor + */ public class SwitchTaskProcessor extends BaseTaskProcessor { protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; @@ -56,8 +59,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { private ProcessInstance processInstance; TaskDefinition taskDefinition; - @Autowired - private MasterConfig masterConfig; + private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);; /** * switch result diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 09e8bf270e..4b20848e22 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -21,39 +21,35 @@ import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import org.apache.commons.lang3.StringUtils; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; /** * the factory to create task processor */ -@Service public class TaskProcessorFactory { - private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; + public static final Map PROCESS_FACTORY_MAP = new ConcurrentHashMap<>(); - private Map taskProcessorMap; + private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; - @Autowired - public TaskProcessorFactory(List taskProcessors) { - taskProcessorMap = taskProcessors.stream().collect(Collectors.toMap(ITaskProcessor::getType, Function.identity(), (v1, v2) -> v2)); + static { + for (ITaskProcessFactory iTaskProcessor : ServiceLoader.load(ITaskProcessFactory.class)) { + PROCESS_FACTORY_MAP.put(iTaskProcessor.type(), iTaskProcessor); + } } - public ITaskProcessor getTaskProcessor(String key) { - if (StringUtils.isEmpty(key)) { - key = DEFAULT_PROCESSOR; + public static ITaskProcessor getTaskProcessor(String type) { + if (StringUtils.isEmpty(type)) { + type = DEFAULT_PROCESSOR; } - ITaskProcessor taskProcessor = taskProcessorMap.get(key); - if (Objects.isNull(taskProcessor)) { - taskProcessor = taskProcessorMap.get(DEFAULT_PROCESSOR); + ITaskProcessFactory taskProcessFactory = PROCESS_FACTORY_MAP.get(type); + if (Objects.isNull(taskProcessFactory)) { + taskProcessFactory = PROCESS_FACTORY_MAP.get(DEFAULT_PROCESSOR); } - return taskProcessor; + return taskProcessFactory.create(); } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 8db1bd970e..8f2572d404 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -111,7 +111,7 @@ public class WorkflowExecuteThreadTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); stateWheelExecuteThread = mock(StateWheelExecuteThread.class); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory)); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java index ffef87c8bc..4114a7a0fe 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactoryTest.java @@ -26,14 +26,13 @@ import org.junit.Test; @Ignore public class TaskProcessorFactoryTest { - private TaskProcessorFactory taskProcessorFactory; @Test public void testFactory() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("shell"); - ITaskProcessor iTaskProcessor = taskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); + ITaskProcessor iTaskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType()); Assert.assertNotNull(iTaskProcessor); }