Browse Source

Remove dao in worker (#10994)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
083ab2b5c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  2. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 70
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
  4. 6
      dolphinscheduler-worker/pom.xml
  5. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -17,8 +17,20 @@
package org.apache.dolphinscheduler.api; package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -32,16 +44,34 @@ import org.springframework.context.event.EventListener;
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan("org.apache.dolphinscheduler")
public class ApiApplicationServer { public class ApiApplicationServer {
private final Logger logger = LoggerFactory.getLogger(ApiApplicationServer.class);
@Autowired @Autowired
private TaskPluginManager taskPluginManager; private TaskPluginManager taskPluginManager;
@Autowired
private PluginDao pluginDao;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(ApiApplicationServer.class); SpringApplication.run(ApiApplicationServer.class);
} }
@EventListener @EventListener
public void run(ApplicationReadyEvent readyEvent) { public void run(ApplicationReadyEvent readyEvent) {
logger.info("Received spring application context ready event will load taskPlugin and write to DB");
// install task plugin // install task plugin
taskPluginManager.installPlugin(); taskPluginManager.loadPlugin();
for (Map.Entry<String, TaskChannelFactory> entry : taskPluginManager.getTaskChannelFactoryMap().entrySet()) {
String taskPluginName = entry.getKey();
TaskChannelFactory taskChannelFactory = entry.getValue();
List<PluginParams> params = taskChannelFactory.getParams();
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
}
}
} }
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -87,7 +87,7 @@ public class MasterServer implements IStoppable {
this.masterRPCServer.start(); this.masterRPCServer.start();
// install task plugin // install task plugin
this.taskPluginManager.installPlugin(); this.taskPluginManager.loadPlugin();
// self tolerant // self tolerant
this.masterRegistryClient.init(); this.masterRegistryClient.init();

70
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java

@ -19,25 +19,18 @@ package org.apache.dolphinscheduler.service.task;
import static java.lang.String.format; import static java.lang.String.format;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -47,23 +40,42 @@ import org.springframework.stereotype.Component;
public class TaskPluginManager { public class TaskPluginManager {
private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class); private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
private final Map<String, TaskChannel> taskChannelMap = new ConcurrentHashMap<>(); private final Map<String, TaskChannelFactory> taskChannelFactoryMap = new HashMap<>();
private final Map<String, TaskChannel> taskChannelMap = new HashMap<>();
private final PluginDao pluginDao; private final AtomicBoolean loadedFlag = new AtomicBoolean(false);
public TaskPluginManager(PluginDao pluginDao) { /**
this.pluginDao = pluginDao; * Load task plugins from classpath.
*/
public void loadPlugin() {
if (!loadedFlag.compareAndSet(false, true)) {
logger.warn("The task plugin has already been loaded");
return;
}
ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
final String name = factory.getName();
logger.info("Registering task plugin: {}", name);
if (taskChannelFactoryMap.containsKey(name)) {
throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
} }
taskChannelFactoryMap.put(name, factory);
taskChannelMap.put(name, factory.create());
private void loadTaskChannel(TaskChannelFactory taskChannelFactory) { logger.info("Registered task plugin: {}", name);
TaskChannel taskChannel = taskChannelFactory.create(); });
taskChannelMap.put(taskChannelFactory.getName(), taskChannel);
} }
public Map<String, TaskChannel> getTaskChannelMap() { public Map<String, TaskChannel> getTaskChannelMap() {
return Collections.unmodifiableMap(taskChannelMap); return Collections.unmodifiableMap(taskChannelMap);
} }
public Map<String, TaskChannelFactory> getTaskChannelFactoryMap() {
return Collections.unmodifiableMap(taskChannelFactoryMap);
}
public TaskChannel getTaskChannel(String type) { public TaskChannel getTaskChannel(String type) {
return this.getTaskChannelMap().get(type); return this.getTaskChannelMap().get(type);
} }
@ -85,30 +97,4 @@ public class TaskPluginManager {
return taskChannel.parseParameters(parametersNode); return taskChannel.parseParameters(parametersNode);
} }
public void installPlugin() {
final Set<String> names = new HashSet<>();
ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> {
final String name = factory.getName();
logger.info("Registering task plugin: {}", name);
if (!names.add(name)) {
throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
}
loadTaskChannel(factory);
logger.info("Registered task plugin: {}", name);
List<PluginParams> params = factory.getParams();
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
PluginDefine pluginDefine = new PluginDefine(name, PluginType.TASK.getDesc(), paramsJson);
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
if (count <= 0) {
throw new TaskPluginException("Failed to update task plugin: " + name);
}
});
}
} }

6
dolphinscheduler-worker/pom.xml

@ -34,6 +34,12 @@
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-service</artifactId> <artifactId>dolphinscheduler-service</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-dao</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -88,8 +88,6 @@ public class WorkerServer implements IStoppable {
@Autowired @Autowired
private WorkerRegistryClient workerRegistryClient; private WorkerRegistryClient workerRegistryClient;
// todo: Can we just load the task spi, and don't install into mysql?
// we don't need to rely the dao module in worker.
@Autowired @Autowired
private TaskPluginManager taskPluginManager; private TaskPluginManager taskPluginManager;
@ -116,7 +114,7 @@ public class WorkerServer implements IStoppable {
public void run() { public void run() {
this.workerRpcServer.start(); this.workerRpcServer.start();
this.workerRpcClient.start(); this.workerRpcClient.start();
this.taskPluginManager.installPlugin(); this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry(); this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.setRegistryStoppable(this);

Loading…
Cancel
Save