Browse Source

[Improvement-#11613] Add spi priority factory (#11614)

* Add spi priority factory

* Add doc

* Add override log

* Use lombok

* Add comment
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
b96d69701a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/contribute/backend/spi/alert.md
  2. 2
      docs/docs/en/contribute/backend/spi/datasource.md
  3. 2
      docs/docs/en/contribute/backend/spi/task.md
  4. 2
      docs/docs/zh/contribute/backend/spi/alert.md
  5. 2
      docs/docs/zh/contribute/backend/spi/datasource.md
  6. 2
      docs/docs/zh/contribute/backend/spi/task.md
  7. 9
      dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java
  8. 34
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
  9. 16
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
  10. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  11. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  12. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java
  13. 27
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java
  14. 30
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java
  15. 10
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java
  16. 34
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java
  17. 60
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java
  18. 36
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java
  19. 88
      dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java
  20. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java

2
docs/docs/en/contribute/backend/spi/alert.md

@ -8,6 +8,8 @@ For alarm-related codes, please refer to the `dolphinscheduler-alert-api` module
We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple. We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple.
In additional, the `AlertChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
By the way, we have adopted an excellent front-end component form-create, which supports the generation of front-end UI components based on JSON. If plug-in development involves the front-end, we will use JSON to generate related front-end UI components, org.apache.dolphinscheduler. The parameters of the plug-in are encapsulated in spi.params, which will convert all the relevant parameters into the corresponding JSON, which means that you can complete the drawing of the front-end components by way of Java code (here is mainly the form, we only care Data exchanged between the front and back ends). By the way, we have adopted an excellent front-end component form-create, which supports the generation of front-end UI components based on JSON. If plug-in development involves the front-end, we will use JSON to generate related front-end UI components, org.apache.dolphinscheduler. The parameters of the plug-in are encapsulated in spi.params, which will convert all the relevant parameters into the corresponding JSON, which means that you can complete the drawing of the front-end components by way of Java code (here is mainly the form, we only care Data exchanged between the front and back ends).
This article mainly focuses on the design and development of Alert. This article mainly focuses on the design and development of Alert.

2
docs/docs/en/contribute/backend/spi/datasource.md

@ -18,6 +18,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
We provide APIs for external access of all data sources in the dolphin scheduler data source API module We provide APIs for external access of all data sources in the dolphin scheduler data source API module
In additional, the `DataSourceChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
#### **Future plan** #### **Future plan**
Support data sources such as kafka, http, files, sparkSQL, FlinkSQL, etc. Support data sources such as kafka, http, files, sparkSQL, FlinkSQL, etc.

2
docs/docs/en/contribute/backend/spi/task.md

@ -8,6 +8,8 @@ The plug-in can implement the above interface. It mainly includes creating tasks
We provide APIs for external access to all tasks in the dolphinscheduler-task-api module, while the dolphinscheduler-spi module is the spi general code library, which defines all the plug-in modules, such as the alarm module, the registry module, etc., you can read and view in detail . We provide APIs for external access to all tasks in the dolphinscheduler-task-api module, while the dolphinscheduler-spi module is the spi general code library, which defines all the plug-in modules, such as the alarm module, the registry module, etc., you can read and view in detail .
In additional, the `TaskChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin.
*NOTICE* *NOTICE*
Since the task plug-in involves the front-end page, the front-end SPI has not yet been implemented, so you need to implement the front-end page corresponding to the plug-in separately. Since the task plug-in involves the front-end page, the front-end SPI has not yet been implemented, so you need to implement the front-end page corresponding to the plug-in separately.

2
docs/docs/zh/contribute/backend/spi/alert.md

@ -8,6 +8,8 @@ DolphinScheduler 正在处于微内核 + 插件化的架构更改之中,所有
我们采用了原生的 JAVA-SPI,当你需要扩展的时候,事实上你只需要关注扩展`org.apache.dolphinscheduler.alert.api.AlertChannelFactory`接口即可,底层相关逻辑如插件加载等内核已经实现,这让我们的开发更加专注且简单。 我们采用了原生的 JAVA-SPI,当你需要扩展的时候,事实上你只需要关注扩展`org.apache.dolphinscheduler.alert.api.AlertChannelFactory`接口即可,底层相关逻辑如插件加载等内核已经实现,这让我们的开发更加专注且简单。
另外,`AlertChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`
顺便提一句,我们采用了一款优秀的前端组件 form-create,它支持基于 json 生成前端 ui 组件,如果插件开发牵扯到前端,我们会通过 json 来生成相关前端 UI 组件,org.apache.dolphinscheduler.spi.params 里面对插件的参数做了封装,它会将相关参数全部全部转化为对应的 json,这意味这你完全可以通过 Java 代码的方式完成前端组件的绘制(这里主要是表单,我们只关心前后端交互的数据)。 顺便提一句,我们采用了一款优秀的前端组件 form-create,它支持基于 json 生成前端 ui 组件,如果插件开发牵扯到前端,我们会通过 json 来生成相关前端 UI 组件,org.apache.dolphinscheduler.spi.params 里面对插件的参数做了封装,它会将相关参数全部全部转化为对应的 json,这意味这你完全可以通过 Java 代码的方式完成前端组件的绘制(这里主要是表单,我们只关心前后端交互的数据)。
本文主要着重讲解 Alert 告警相关设计以及开发。 本文主要着重讲解 Alert 告警相关设计以及开发。

2
docs/docs/zh/contribute/backend/spi/datasource.md

@ -17,6 +17,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient
我们在 dolphinscheduler-datasource-api 模块提供了所有数据源对外访问的 API 我们在 dolphinscheduler-datasource-api 模块提供了所有数据源对外访问的 API
另外,DataSourceChannelFactory 继承自PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`
#### **未来计划** #### **未来计划**
支持kafka、http、文件、sparkSQL、FlinkSQL等数据源 支持kafka、http、文件、sparkSQL、FlinkSQL等数据源

2
docs/docs/zh/contribute/backend/spi/task.md

@ -8,6 +8,8 @@ org.apache.dolphinscheduler.spi.task.TaskChannel
我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。 我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。
另外,`TaskChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`
*NOTICE* *NOTICE*
由于任务插件涉及到前端页面,目前前端的SPI还没有实现,因此你需要单独实现插件对应的前端页面。 由于任务插件涉及到前端页面,目前前端的SPI还没有实现,因此你需要单独实现插件对应的前端页面。

9
dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java

@ -20,15 +20,18 @@
package org.apache.dolphinscheduler.alert.api; package org.apache.dolphinscheduler.alert.api;
import org.apache.dolphinscheduler.spi.params.base.PluginParams; import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
import java.util.List; import java.util.List;
/** /**
* alert channel factory * alert channel factory
*/ */
public interface AlertChannelFactory { public interface AlertChannelFactory extends PrioritySPI {
/** /**
* Returns the name of the alert channel * Returns the name of the alert channel
*
* @return the name of the alert channel * @return the name of the alert channel
*/ */
String name(); String name();
@ -44,4 +47,8 @@ public interface AlertChannelFactory {
* Returns the configurable parameters that this plugin needs to display on the web ui * Returns the configurable parameters that this plugin needs to display on the web ui
*/ */
List<PluginParams> params(); List<PluginParams> params();
default SPIIdentify getIdentify() {
return SPIIdentify.builder().name(name()).build();
}
} }

34
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.alert; package org.apache.dolphinscheduler.alert;
import static java.lang.String.format;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertChannelFactory; import org.apache.dolphinscheduler.alert.api.AlertChannelFactory;
import org.apache.dolphinscheduler.alert.api.AlertConstants; import org.apache.dolphinscheduler.alert.api.AlertConstants;
@ -31,21 +29,20 @@ import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams; import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate; import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.params.radio.RadioParam; import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import org.slf4j.Logger; import static java.lang.String.format;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component @Component
public final class AlertPluginManager { public final class AlertPluginManager {
@ -74,20 +71,17 @@ public final class AlertPluginManager {
@EventListener @EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) { public void installPlugin(ApplicationReadyEvent readyEvent) {
final Set<String> names = new HashSet<>();
ServiceLoader.load(AlertChannelFactory.class).forEach(factory -> {
final String name = factory.name();
logger.info("Registering alert plugin: {}", name); PrioritySPIFactory<AlertChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class);
for (Map.Entry<String, AlertChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
String name = entry.getKey();
AlertChannelFactory factory = entry.getValue();
if (!names.add(name)) { logger.info("Registering alert plugin: {} - {}", name, factory.getClass());
throw new IllegalStateException(format("Duplicate alert plugins named '%s'", name));
}
final AlertChannel alertChannel = factory.create(); final AlertChannel alertChannel = factory.create();
logger.info("Registered alert plugin: {}", name); logger.info("Registered alert plugin: {} - {}", name, factory.getClass());
final List<PluginParams> params = new ArrayList<>(factory.params()); final List<PluginParams> params = new ArrayList<>(factory.params());
params.add(0, warningTypeParams); params.add(0, warningTypeParams);
@ -98,7 +92,7 @@ public final class AlertPluginManager {
final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine); final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine);
channelKeyedById.put(id, alertChannel); channelKeyedById.put(id, alertChannel);
}); }
} }
public Optional<AlertChannel> getAlertChannel(int id) { public Optional<AlertChannel> getAlertChannel(int id) {

16
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java

@ -17,18 +17,18 @@
package org.apache.dolphinscheduler.plugin.datasource.api.plugin; package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import static java.lang.String.format;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import static java.lang.String.format;
import org.slf4j.LoggerFactory;
public class DataSourcePluginManager { public class DataSourcePluginManager {
private static final Logger logger = LoggerFactory.getLogger(DataSourcePluginManager.class); private static final Logger logger = LoggerFactory.getLogger(DataSourcePluginManager.class);
@ -41,8 +41,10 @@ public class DataSourcePluginManager {
public void installPlugin() { public void installPlugin() {
ServiceLoader.load(DataSourceChannelFactory.class).forEach(factory -> { PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(DataSourceChannelFactory.class);
final String name = factory.getName(); for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
final DataSourceChannelFactory factory = entry.getValue();
final String name = entry.getKey();
logger.info("Registering datasource plugin: {}", name); logger.info("Registering datasource plugin: {}", name);
@ -53,7 +55,7 @@ public class DataSourcePluginManager {
loadDatasourceClient(factory); loadDatasourceClient(factory);
logger.info("Registered datasource plugin: {}", name); logger.info("Registered datasource plugin: {}", name);
}); }
} }
private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) { private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -261,6 +262,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
throw new UnsupportedOperationException("This abstract class doesn's has type"); throw new UnsupportedOperationException("This abstract class doesn's has type");
} }
@Override
public SPIIdentify getIdentify() {
return SPIIdentify.builder().name(getType()).build();
}
@Override @Override
public TaskInstance taskInstance() { public TaskInstance taskInstance() {
return this.taskInstance; return this.taskInstance;

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java

@ -19,11 +19,12 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
/** /**
* interface of task processor in master * interface of task processor in master
*/ */
public interface ITaskProcessor { public interface ITaskProcessor extends PrioritySPI {
void init(TaskInstance taskInstance, ProcessInstance processInstance); void init(TaskInstance taskInstance, ProcessInstance processInstance);

27
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java

@ -17,24 +17,18 @@
package org.apache.dolphinscheduler.server.master.runner.task; package org.apache.dolphinscheduler.server.master.runner.task;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import lombok.experimental.UtilityClass;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_STREAM;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import org.slf4j.LoggerFactory;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
/** /**
* the factory to create task processor * the factory to create task processor
@ -44,16 +38,19 @@ public final class TaskProcessorFactory {
private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class);
public static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>(); private static final Map<String, Constructor<ITaskProcessor>> PROCESS_MAP = new ConcurrentHashMap<>();
private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE;
static { static {
for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) { PrioritySPIFactory<ITaskProcessor> prioritySPIFactory = new PrioritySPIFactory<>(ITaskProcessor.class);
for (Map.Entry<String, ITaskProcessor> entry : prioritySPIFactory.getSPIMap().entrySet()) {
try { try {
PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor<ITaskProcessor>) iTaskProcessor.getClass().getConstructor()); logger.info("Registering task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
PROCESS_MAP.put(entry.getKey(), (Constructor<ITaskProcessor>) entry.getValue().getClass().getConstructor());
logger.info("Registered task processor: {} - {}", entry.getKey(), entry.getValue().getClass());
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
throw new IllegalArgumentException("The task processor should has a no args constructor", e); throw new IllegalArgumentException(String.format("The task processor: %s should has a no args constructor", entry.getKey()));
} }
} }
} }

30
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java

@ -17,25 +17,21 @@
package org.apache.dolphinscheduler.service.task; package org.apache.dolphinscheduler.service.task;
import static java.lang.String.format;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component @Component
public class TaskPluginManager { public class TaskPluginManager {
private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class); private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
@ -53,19 +49,19 @@ public class TaskPluginManager {
logger.warn("The task plugin has already been loaded"); logger.warn("The task plugin has already been loaded");
return; return;
} }
ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> { PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
final String name = factory.getName(); for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
String factoryName = entry.getKey();
TaskChannelFactory factory = entry.getValue();
logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass());
logger.info("Registering task plugin: {}", name); taskChannelFactoryMap.put(factoryName, factory);
taskChannelMap.put(factoryName, factory.create());
if (taskChannelFactoryMap.containsKey(name)) { logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass());
throw new TaskPluginException(format("Duplicate task plugins named '%s'", name));
} }
taskChannelFactoryMap.put(name, factory);
taskChannelMap.put(name, factory.create());
logger.info("Registered task plugin: {}", name);
});
} }
public Map<String, TaskChannel> getTaskChannelMap() { public Map<String, TaskChannel> getTaskChannelMap() {

10
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java

@ -17,7 +17,10 @@
package org.apache.dolphinscheduler.spi.datasource; package org.apache.dolphinscheduler.spi.datasource;
public interface DataSourceChannelFactory { import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
public interface DataSourceChannelFactory extends PrioritySPI {
/** /**
* get datasource client * get datasource client
*/ */
@ -27,4 +30,9 @@ public interface DataSourceChannelFactory {
* get registry component name * get registry component name
*/ */
String getName(); String getName();
@Override
default SPIIdentify getIdentify() {
return SPIIdentify.builder().name(getName()).build();
}
} }

34
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.plugin;
public interface PrioritySPI extends Comparable<Integer> {
/**
* The SPI identify, if the two plugin has the same name, will load the high priority.
* If the priority and name is all same, will throw <code>IllegalArgumentException</code>
* @return
*/
SPIIdentify getIdentify();
@Override
default int compareTo(Integer o) {
return Integer.compare(getIdentify().getPriority(), o);
}
}

60
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.plugin;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
@Slf4j
public class PrioritySPIFactory<T extends PrioritySPI> {
private final Map<String, T> map = new HashMap<>();
public PrioritySPIFactory(Class<T> spiClass) {
for (T t : ServiceLoader.load(spiClass)) {
if (map.containsKey(t.getIdentify().getName())) {
resolveConflict(t);
} else {
map.put(t.getIdentify().getName(), t);
}
}
}
public Map<String, T> getSPIMap() {
return Collections.unmodifiableMap(map);
}
private void resolveConflict(T newSPI) {
SPIIdentify identify = newSPI.getIdentify();
T oldSPI = map.get(identify.getName());
if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) == 0) {
throw new IllegalArgumentException(String.format("These two spi plugins has conflict identify name with the same priority: %s, %s",
oldSPI.getIdentify(), newSPI.getIdentify()));
} else if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) > 0) {
log.info("The {} plugin has high priority, will override {}", newSPI.getIdentify(), oldSPI);
map.put(identify.getName(), newSPI);
} else {
log.info("The low plugin {} will be skipped", newSPI);
}
}
}

36
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.plugin;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
@AllArgsConstructor
public class SPIIdentify {
private static final int DEFAULT_PRIORITY = 0;
private String name;
@Builder.Default
private int priority = DEFAULT_PRIORITY;
}

88
dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.plugin;
import com.google.auto.service.AutoService;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class PrioritySPIFactoryTest {
@Test
public void loadHighPriority() {
PrioritySPIFactory<LoadHighPriorityConflictTestSPI> factory = new PrioritySPIFactory<>(LoadHighPriorityConflictTestSPI.class);
Map<String, LoadHighPriorityConflictTestSPI> spiMap = factory.getSPIMap();
Assert.assertEquals(1, spiMap.get("A").getIdentify().getPriority());
}
@Test(expected = IllegalArgumentException.class)
public void throwExceptionWhenPriorityIsSame() {
PrioritySPIFactory<ThrowExceptionConflictTestSPI> factory = new PrioritySPIFactory<>(ThrowExceptionConflictTestSPI.class);
Map<String, ThrowExceptionConflictTestSPI> spiMap = factory.getSPIMap();
Assert.assertEquals(0, spiMap.get("B").getIdentify().getPriority());
}
public interface LoadHighPriorityConflictTestSPI extends PrioritySPI {
}
@AutoService(LoadHighPriorityConflictTestSPI.class)
public static class SPIA implements LoadHighPriorityConflictTestSPI {
@Override
public SPIIdentify getIdentify() {
return SPIIdentify.builder().name("A").priority(0).build();
}
}
@AutoService(LoadHighPriorityConflictTestSPI.class)
public static class SPIAA implements LoadHighPriorityConflictTestSPI {
@Override
public SPIIdentify getIdentify() {
return SPIIdentify.builder().name("A").priority(1).build();
}
}
public interface ThrowExceptionConflictTestSPI extends PrioritySPI {
}
@AutoService(ThrowExceptionConflictTestSPI.class)
public static class SPIB implements ThrowExceptionConflictTestSPI {
@Override
public SPIIdentify getIdentify() {
return SPIIdentify.builder().name("B").priority(0).build();
}
}
@AutoService(ThrowExceptionConflictTestSPI.class)
public static class SPIBB implements ThrowExceptionConflictTestSPI {
@Override
public SPIIdentify getIdentify() {
return SPIIdentify.builder().name("B").priority(0).build();
}
}
}

9
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java

@ -18,8 +18,15 @@
package org.apache.dolphinscheduler.plugin.task.api; package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.common.UiChannelFactory; import org.apache.dolphinscheduler.spi.common.UiChannelFactory;
import org.apache.dolphinscheduler.spi.plugin.SPIIdentify;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPI;
public interface TaskChannelFactory extends UiChannelFactory { public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {
TaskChannel create(); TaskChannel create();
default SPIIdentify getIdentify() {
return SPIIdentify.builder().name(getName()).build();
}
} }

Loading…
Cancel
Save