From 083ab2b5c9682e6afadefcbc4d5285622a4155d0 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 15 Jul 2022 20:07:18 +0800 Subject: [PATCH] Remove dao in worker (#10994) --- .../api/ApiApplicationServer.java | 32 ++++++++- .../server/master/MasterServer.java | 2 +- .../service/task/TaskPluginManager.java | 72 ++++++++----------- dolphinscheduler-worker/pom.xml | 6 ++ .../server/worker/WorkerServer.java | 4 +- 5 files changed, 68 insertions(+), 48 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 9e6aa94530..078506898a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -17,8 +17,20 @@ package org.apache.dolphinscheduler.api; +import org.apache.dolphinscheduler.common.enums.PluginType; +import org.apache.dolphinscheduler.dao.PluginDao; +import org.apache.dolphinscheduler.dao.entity.PluginDefine; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.service.task.TaskPluginManager; +import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -32,16 +44,34 @@ import org.springframework.context.event.EventListener; @ComponentScan("org.apache.dolphinscheduler") public class ApiApplicationServer { + private final Logger logger = LoggerFactory.getLogger(ApiApplicationServer.class); + @Autowired private TaskPluginManager taskPluginManager; + @Autowired + private PluginDao pluginDao; + public static void main(String[] args) { SpringApplication.run(ApiApplicationServer.class); } @EventListener public void run(ApplicationReadyEvent readyEvent) { + logger.info("Received spring application context ready event will load taskPlugin and write to DB"); // install task plugin - taskPluginManager.installPlugin(); + taskPluginManager.loadPlugin(); + for (Map.Entry entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) { + String taskPluginName = entry.getKey(); + TaskChannelFactory taskChannelFactory = entry.getValue(); + List params = taskChannelFactory.getParams(); + String paramsJson = PluginParamsTransfer.transferParamsToJson(params); + + PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson); + int count = pluginDao.addOrUpdatePluginDefine(pluginDefine); + if (count <= 0) { + throw new TaskPluginException("Failed to update task plugin: " + taskPluginName); + } + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 0bf3c945e7..1415aaa840 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -87,7 +87,7 @@ public class MasterServer implements IStoppable { this.masterRPCServer.start(); // install task plugin - this.taskPluginManager.installPlugin(); + this.taskPluginManager.loadPlugin(); // self tolerant this.masterRegistryClient.init(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java index 289e8ddbfb..1289b5718a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java @@ -19,25 +19,18 @@ package org.apache.dolphinscheduler.service.task; import static java.lang.String.format; -import org.apache.dolphinscheduler.common.enums.PluginType; -import org.apache.dolphinscheduler.dao.PluginDao; -import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; -import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; -import org.apache.dolphinscheduler.spi.params.base.PluginParams; import java.util.Collections; -import java.util.HashSet; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,23 +40,42 @@ import org.springframework.stereotype.Component; public class TaskPluginManager { private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class); - private final Map taskChannelMap = new ConcurrentHashMap<>(); + private final Map taskChannelFactoryMap = new HashMap<>(); + private final Map taskChannelMap = new HashMap<>(); - private final PluginDao pluginDao; + private final AtomicBoolean loadedFlag = new AtomicBoolean(false); - public TaskPluginManager(PluginDao pluginDao) { - this.pluginDao = pluginDao; - } + /** + * Load task plugins from classpath. + */ + public void loadPlugin() { + if (!loadedFlag.compareAndSet(false, true)) { + logger.warn("The task plugin has already been loaded"); + return; + } + ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> { + final String name = factory.getName(); - private void loadTaskChannel(TaskChannelFactory taskChannelFactory) { - TaskChannel taskChannel = taskChannelFactory.create(); - taskChannelMap.put(taskChannelFactory.getName(), taskChannel); + logger.info("Registering task plugin: {}", name); + + if (taskChannelFactoryMap.containsKey(name)) { + throw new TaskPluginException(format("Duplicate task plugins named '%s'", name)); + } + taskChannelFactoryMap.put(name, factory); + taskChannelMap.put(name, factory.create()); + + logger.info("Registered task plugin: {}", name); + }); } public Map getTaskChannelMap() { return Collections.unmodifiableMap(taskChannelMap); } + public Map getTaskChannelFactoryMap() { + return Collections.unmodifiableMap(taskChannelFactoryMap); + } + public TaskChannel getTaskChannel(String type) { return this.getTaskChannelMap().get(type); } @@ -85,30 +97,4 @@ public class TaskPluginManager { return taskChannel.parseParameters(parametersNode); } - public void installPlugin() { - final Set names = new HashSet<>(); - - ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> { - final String name = factory.getName(); - - logger.info("Registering task plugin: {}", name); - - if (!names.add(name)) { - throw new TaskPluginException(format("Duplicate task plugins named '%s'", name)); - } - - loadTaskChannel(factory); - - logger.info("Registered task plugin: {}", name); - - List params = factory.getParams(); - String paramsJson = PluginParamsTransfer.transferParamsToJson(params); - - PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson); - int count = pluginDao.addOrUpdatePluginDefine(pluginDefine); - if (count <= 0) { - throw new TaskPluginException("Failed to update task plugin: " + name); - } - }); - } } diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml index 45a9890738..adb5bfcde5 100644 --- a/dolphinscheduler-worker/pom.xml +++ b/dolphinscheduler-worker/pom.xml @@ -34,6 +34,12 @@ org.apache.dolphinscheduler dolphinscheduler-service + + + org.apache.dolphinscheduler + dolphinscheduler-dao + + org.apache.dolphinscheduler diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index d03a2a951b..6b4b27a46e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -88,8 +88,6 @@ public class WorkerServer implements IStoppable { @Autowired private WorkerRegistryClient workerRegistryClient; - // todo: Can we just load the task spi, and don't install into mysql? - // we don't need to rely the dao module in worker. @Autowired private TaskPluginManager taskPluginManager; @@ -116,7 +114,7 @@ public class WorkerServer implements IStoppable { public void run() { this.workerRpcServer.start(); this.workerRpcClient.start(); - this.taskPluginManager.installPlugin(); + this.taskPluginManager.loadPlugin(); this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this);