diff --git a/docs/docs/en/contribute/backend/spi/alert.md b/docs/docs/en/contribute/backend/spi/alert.md index e2629a87a8..9b6c45e547 100644 --- a/docs/docs/en/contribute/backend/spi/alert.md +++ b/docs/docs/en/contribute/backend/spi/alert.md @@ -6,7 +6,9 @@ DolphinScheduler is undergoing a microkernel + plug-in architecture change. All For alarm-related codes, please refer to the `dolphinscheduler-alert-api` module. This module defines the extension interface of the alarm plug-in and some basic codes. When we need to realize the plug-inization of related functions, it is recommended to read the code of this block first. Of course, it is recommended that you read the document. This will reduce a lot of time, but the document There is a certain degree of lag. When the document is missing, it is recommended to take the source code as the standard (if you are interested, we also welcome you to submit related documents). In addition, we will hardly make changes to the extended interface (excluding new additions) , Unless there is a major structural adjustment, there is an incompatible upgrade version, so the existing documents can generally be satisfied. -We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple. +We use the native JAVA-SPI, when you need to extend, in fact, you only need to pay attention to the extension of the `org.apache.dolphinscheduler.alert.api.AlertChannelFactory` interface, the underlying logic such as plug-in loading, and other kernels have been implemented, Which makes our development more focused and simple. + +In additional, the `AlertChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin. By the way, we have adopted an excellent front-end component form-create, which supports the generation of front-end UI components based on JSON. If plug-in development involves the front-end, we will use JSON to generate related front-end UI components, org.apache.dolphinscheduler. The parameters of the plug-in are encapsulated in spi.params, which will convert all the relevant parameters into the corresponding JSON, which means that you can complete the drawing of the front-end components by way of Java code (here is mainly the form, we only care Data exchanged between the front and back ends). diff --git a/docs/docs/en/contribute/backend/spi/datasource.md b/docs/docs/en/contribute/backend/spi/datasource.md index 5772b4357c..9738e07330 100644 --- a/docs/docs/en/contribute/backend/spi/datasource.md +++ b/docs/docs/en/contribute/backend/spi/datasource.md @@ -18,6 +18,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient We provide APIs for external access of all data sources in the dolphin scheduler data source API module +In additional, the `DataSourceChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin. + #### **Future plan** Support data sources such as kafka, http, files, sparkSQL, FlinkSQL, etc. \ No newline at end of file diff --git a/docs/docs/en/contribute/backend/spi/task.md b/docs/docs/en/contribute/backend/spi/task.md index 70b01d48ff..f909d42fa8 100644 --- a/docs/docs/en/contribute/backend/spi/task.md +++ b/docs/docs/en/contribute/backend/spi/task.md @@ -8,6 +8,8 @@ The plug-in can implement the above interface. It mainly includes creating tasks We provide APIs for external access to all tasks in the dolphinscheduler-task-api module, while the dolphinscheduler-spi module is the spi general code library, which defines all the plug-in modules, such as the alarm module, the registry module, etc., you can read and view in detail . +In additional, the `TaskChannelFactory` extends from `PrioritySPI`, this means you can set the plugin priority, when you have two plugin has the same name, you can customize the priority by override the `getIdentify` method. The high priority plugin will be load, but if you have two plugin with the same name and same priority, the server will throw `IllegalArgumentException` when load the plugin. + *NOTICE* Since the task plug-in involves the front-end page, the front-end SPI has not yet been implemented, so you need to implement the front-end page corresponding to the plug-in separately. diff --git a/docs/docs/zh/contribute/backend/spi/alert.md b/docs/docs/zh/contribute/backend/spi/alert.md index 709802e782..21ea651967 100644 --- a/docs/docs/zh/contribute/backend/spi/alert.md +++ b/docs/docs/zh/contribute/backend/spi/alert.md @@ -8,6 +8,8 @@ DolphinScheduler 正在处于微内核 + 插件化的架构更改之中,所有 我们采用了原生的 JAVA-SPI,当你需要扩展的时候,事实上你只需要关注扩展`org.apache.dolphinscheduler.alert.api.AlertChannelFactory`接口即可,底层相关逻辑如插件加载等内核已经实现,这让我们的开发更加专注且简单。 +另外,`AlertChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。 + 顺便提一句,我们采用了一款优秀的前端组件 form-create,它支持基于 json 生成前端 ui 组件,如果插件开发牵扯到前端,我们会通过 json 来生成相关前端 UI 组件,org.apache.dolphinscheduler.spi.params 里面对插件的参数做了封装,它会将相关参数全部全部转化为对应的 json,这意味这你完全可以通过 Java 代码的方式完成前端组件的绘制(这里主要是表单,我们只关心前后端交互的数据)。 本文主要着重讲解 Alert 告警相关设计以及开发。 diff --git a/docs/docs/zh/contribute/backend/spi/datasource.md b/docs/docs/zh/contribute/backend/spi/datasource.md index 1868c86d9e..a2fc4b59de 100644 --- a/docs/docs/zh/contribute/backend/spi/datasource.md +++ b/docs/docs/zh/contribute/backend/spi/datasource.md @@ -17,6 +17,8 @@ org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient 我们在 dolphinscheduler-datasource-api 模块提供了所有数据源对外访问的 API +另外,DataSourceChannelFactory 继承自PrioritySPI,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写getIdentify 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。 + #### **未来计划** 支持kafka、http、文件、sparkSQL、FlinkSQL等数据源 diff --git a/docs/docs/zh/contribute/backend/spi/task.md b/docs/docs/zh/contribute/backend/spi/task.md index b2ee5242b5..fb0fe88fbe 100644 --- a/docs/docs/zh/contribute/backend/spi/task.md +++ b/docs/docs/zh/contribute/backend/spi/task.md @@ -8,6 +8,8 @@ org.apache.dolphinscheduler.spi.task.TaskChannel 我们在 dolphinscheduler-task-api 模块提供了所有任务对外访问的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比如告警模块,注册中心模块等,你可以详细阅读查看。 +另外,`TaskChannelFactory` 继承自 `PrioritySPI`,这意味着你可以设置插件的优先级,当你有两个插件同名时,你可以通过重写 `getIdentify` 方法来自定义优先级。高优先级的插件会被加载,但是如果你有两个同名且优先级相同的插件,加载插件时服务器会抛出 `IllegalArgumentException`。 + *NOTICE* 由于任务插件涉及到前端页面,目前前端的SPI还没有实现,因此你需要单独实现插件对应的前端页面。 diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java index 2fa328e213..0f9878485c 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertChannelFactory.java @@ -20,15 +20,18 @@ package org.apache.dolphinscheduler.alert.api; import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.plugin.SPIIdentify; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPI; import java.util.List; /** * alert channel factory */ -public interface AlertChannelFactory { +public interface AlertChannelFactory extends PrioritySPI { /** * Returns the name of the alert channel + * * @return the name of the alert channel */ String name(); @@ -44,4 +47,8 @@ public interface AlertChannelFactory { * Returns the configurable parameters that this plugin needs to display on the web ui */ List params(); + + default SPIIdentify getIdentify() { + return SPIIdentify.builder().name(name()).build(); + } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java index cc84769304..782bf0aa85 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.alert; -import static java.lang.String.format; - import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannelFactory; import org.apache.dolphinscheduler.alert.api.AlertConstants; @@ -31,21 +29,20 @@ import org.apache.dolphinscheduler.spi.params.base.ParamsOptions; import org.apache.dolphinscheduler.spi.params.base.PluginParams; import org.apache.dolphinscheduler.spi.params.base.Validate; import org.apache.dolphinscheduler.spi.params.radio.RadioParam; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.ServiceLoader; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; +import static java.lang.String.format; @Component public final class AlertPluginManager { @@ -74,20 +71,17 @@ public final class AlertPluginManager { @EventListener public void installPlugin(ApplicationReadyEvent readyEvent) { - final Set names = new HashSet<>(); - - ServiceLoader.load(AlertChannelFactory.class).forEach(factory -> { - final String name = factory.name(); - logger.info("Registering alert plugin: {}", name); + PrioritySPIFactory prioritySPIFactory = new PrioritySPIFactory<>(AlertChannelFactory.class); + for (Map.Entry entry : prioritySPIFactory.getSPIMap().entrySet()) { + String name = entry.getKey(); + AlertChannelFactory factory = entry.getValue(); - if (!names.add(name)) { - throw new IllegalStateException(format("Duplicate alert plugins named '%s'", name)); - } + logger.info("Registering alert plugin: {} - {}", name, factory.getClass()); final AlertChannel alertChannel = factory.create(); - logger.info("Registered alert plugin: {}", name); + logger.info("Registered alert plugin: {} - {}", name, factory.getClass()); final List params = new ArrayList<>(factory.params()); params.add(0, warningTypeParams); @@ -98,7 +92,7 @@ public final class AlertPluginManager { final int id = pluginDao.addOrUpdatePluginDefine(pluginDefine); channelKeyedById.put(id, alertChannel); - }); + } } public Optional getAlertChannel(int id) { diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java index fd31579a7c..0610ed6d7b 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java @@ -17,18 +17,18 @@ package org.apache.dolphinscheduler.plugin.datasource.api.plugin; -import static java.lang.String.format; - import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Map; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static java.lang.String.format; public class DataSourcePluginManager { private static final Logger logger = LoggerFactory.getLogger(DataSourcePluginManager.class); @@ -41,8 +41,10 @@ public class DataSourcePluginManager { public void installPlugin() { - ServiceLoader.load(DataSourceChannelFactory.class).forEach(factory -> { - final String name = factory.getName(); + PrioritySPIFactory prioritySPIFactory = new PrioritySPIFactory<>(DataSourceChannelFactory.class); + for (Map.Entry entry : prioritySPIFactory.getSPIMap().entrySet()) { + final DataSourceChannelFactory factory = entry.getValue(); + final String name = entry.getKey(); logger.info("Registering datasource plugin: {}", name); @@ -53,7 +55,7 @@ public class DataSourcePluginManager { loadDatasourceClient(factory); logger.info("Registered datasource plugin: {}", name); - }); + } } private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 6419a97365..06d2c3ceb0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; +import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.quartz.SchedulerException; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 7b5f7c66f7..8651dc018a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -64,6 +64,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.ResourceType; +import org.apache.dolphinscheduler.spi.plugin.SPIIdentify; import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,6 +262,11 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { throw new UnsupportedOperationException("This abstract class doesn's has type"); } + @Override + public SPIIdentify getIdentify() { + return SPIIdentify.builder().name(getType()).build(); + } + @Override public TaskInstance taskInstance() { return this.taskInstance; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index d7d241b7f6..de0a0a7c23 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -19,11 +19,12 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPI; /** * interface of task processor in master */ -public interface ITaskProcessor { +public interface ITaskProcessor extends PrioritySPI { void init(TaskInstance taskInstance, ProcessInstance processInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java index 9595c46dd8..f585c94bbf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/TaskProcessorFactory.java @@ -17,24 +17,18 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_STREAM; - +import lombok.experimental.UtilityClass; import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Map; -import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.experimental.UtilityClass; - -import org.apache.dolphinscheduler.common.enums.TaskExecuteType; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; /** * the factory to create task processor @@ -44,16 +38,19 @@ public final class TaskProcessorFactory { private static final Logger logger = LoggerFactory.getLogger(TaskProcessorFactory.class); - public static final Map> PROCESS_MAP = new ConcurrentHashMap<>(); + private static final Map> PROCESS_MAP = new ConcurrentHashMap<>(); private static final String DEFAULT_PROCESSOR = COMMON_TASK_TYPE; static { - for (ITaskProcessor iTaskProcessor : ServiceLoader.load(ITaskProcessor.class)) { + PrioritySPIFactory prioritySPIFactory = new PrioritySPIFactory<>(ITaskProcessor.class); + for (Map.Entry entry : prioritySPIFactory.getSPIMap().entrySet()) { try { - PROCESS_MAP.put(iTaskProcessor.getType(), (Constructor) iTaskProcessor.getClass().getConstructor()); + logger.info("Registering task processor: {} - {}", entry.getKey(), entry.getValue().getClass()); + PROCESS_MAP.put(entry.getKey(), (Constructor) entry.getValue().getClass().getConstructor()); + logger.info("Registered task processor: {} - {}", entry.getKey(), entry.getValue().getClass()); } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("The task processor should has a no args constructor", e); + throw new IllegalArgumentException(String.format("The task processor: %s should has a no args constructor", entry.getKey())); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java index 1289b5718a..e3776e09ba 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/task/TaskPluginManager.java @@ -17,25 +17,21 @@ package org.apache.dolphinscheduler.service.task; -import static java.lang.String.format; - import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.ServiceLoader; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - @Component public class TaskPluginManager { private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class); @@ -53,19 +49,19 @@ public class TaskPluginManager { logger.warn("The task plugin has already been loaded"); return; } - ServiceLoader.load(TaskChannelFactory.class).forEach(factory -> { - final String name = factory.getName(); + PrioritySPIFactory prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class); + for (Map.Entry entry : prioritySPIFactory.getSPIMap().entrySet()) { + String factoryName = entry.getKey(); + TaskChannelFactory factory = entry.getValue(); - logger.info("Registering task plugin: {}", name); + logger.info("Registering task plugin: {} - {}", factoryName, factory.getClass()); - if (taskChannelFactoryMap.containsKey(name)) { - throw new TaskPluginException(format("Duplicate task plugins named '%s'", name)); - } - taskChannelFactoryMap.put(name, factory); - taskChannelMap.put(name, factory.create()); + taskChannelFactoryMap.put(factoryName, factory); + taskChannelMap.put(factoryName, factory.create()); + + logger.info("Registered task plugin: {} - {}", factoryName, factory.getClass()); + } - logger.info("Registered task plugin: {}", name); - }); } public Map getTaskChannelMap() { diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java index c947c3a647..eceb39fda9 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceChannelFactory.java @@ -17,7 +17,10 @@ package org.apache.dolphinscheduler.spi.datasource; -public interface DataSourceChannelFactory { +import org.apache.dolphinscheduler.spi.plugin.PrioritySPI; +import org.apache.dolphinscheduler.spi.plugin.SPIIdentify; + +public interface DataSourceChannelFactory extends PrioritySPI { /** * get datasource client */ @@ -27,4 +30,9 @@ public interface DataSourceChannelFactory { * get registry component name */ String getName(); + + @Override + default SPIIdentify getIdentify() { + return SPIIdentify.builder().name(getName()).build(); + } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java new file mode 100644 index 0000000000..094b39ce64 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPI.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.plugin; + +public interface PrioritySPI extends Comparable { + + /** + * The SPI identify, if the two plugin has the same name, will load the high priority. + * If the priority and name is all same, will throw IllegalArgumentException + * @return + */ + SPIIdentify getIdentify(); + + @Override + default int compareTo(Integer o) { + return Integer.compare(getIdentify().getPriority(), o); + } + +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java new file mode 100644 index 0000000000..8b1921b700 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.plugin; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +@Slf4j +public class PrioritySPIFactory { + + private final Map map = new HashMap<>(); + + public PrioritySPIFactory(Class spiClass) { + for (T t : ServiceLoader.load(spiClass)) { + if (map.containsKey(t.getIdentify().getName())) { + resolveConflict(t); + } else { + map.put(t.getIdentify().getName(), t); + } + } + } + + public Map getSPIMap() { + return Collections.unmodifiableMap(map); + } + + private void resolveConflict(T newSPI) { + SPIIdentify identify = newSPI.getIdentify(); + T oldSPI = map.get(identify.getName()); + + if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) == 0) { + throw new IllegalArgumentException(String.format("These two spi plugins has conflict identify name with the same priority: %s, %s", + oldSPI.getIdentify(), newSPI.getIdentify())); + } else if (newSPI.compareTo(oldSPI.getIdentify().getPriority()) > 0) { + log.info("The {} plugin has high priority, will override {}", newSPI.getIdentify(), oldSPI); + map.put(identify.getName(), newSPI); + } else { + log.info("The low plugin {} will be skipped", newSPI); + } + } +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java new file mode 100644 index 0000000000..e55de1c466 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/plugin/SPIIdentify.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.plugin; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +@AllArgsConstructor +public class SPIIdentify { + + private static final int DEFAULT_PRIORITY = 0; + + private String name; + + @Builder.Default + private int priority = DEFAULT_PRIORITY; + +} diff --git a/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java new file mode 100644 index 0000000000..4ed3519a6d --- /dev/null +++ b/dolphinscheduler-spi/src/test/java/org/apache/dolphinscheduler/spi/plugin/PrioritySPIFactoryTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.plugin; + +import com.google.auto.service.AutoService; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class PrioritySPIFactoryTest { + + @Test + public void loadHighPriority() { + PrioritySPIFactory factory = new PrioritySPIFactory<>(LoadHighPriorityConflictTestSPI.class); + Map spiMap = factory.getSPIMap(); + Assert.assertEquals(1, spiMap.get("A").getIdentify().getPriority()); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWhenPriorityIsSame() { + PrioritySPIFactory factory = new PrioritySPIFactory<>(ThrowExceptionConflictTestSPI.class); + Map spiMap = factory.getSPIMap(); + Assert.assertEquals(0, spiMap.get("B").getIdentify().getPriority()); + } + + + public interface LoadHighPriorityConflictTestSPI extends PrioritySPI { + + } + + @AutoService(LoadHighPriorityConflictTestSPI.class) + public static class SPIA implements LoadHighPriorityConflictTestSPI { + + @Override + public SPIIdentify getIdentify() { + return SPIIdentify.builder().name("A").priority(0).build(); + } + } + + @AutoService(LoadHighPriorityConflictTestSPI.class) + public static class SPIAA implements LoadHighPriorityConflictTestSPI { + + @Override + public SPIIdentify getIdentify() { + return SPIIdentify.builder().name("A").priority(1).build(); + } + } + + public interface ThrowExceptionConflictTestSPI extends PrioritySPI { + + } + + @AutoService(ThrowExceptionConflictTestSPI.class) + public static class SPIB implements ThrowExceptionConflictTestSPI { + + @Override + public SPIIdentify getIdentify() { + return SPIIdentify.builder().name("B").priority(0).build(); + } + } + + @AutoService(ThrowExceptionConflictTestSPI.class) + public static class SPIBB implements ThrowExceptionConflictTestSPI { + + @Override + public SPIIdentify getIdentify() { + return SPIIdentify.builder().name("B").priority(0).build(); + } + } + + +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java index 643c4cdb70..51bf1957e4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskChannelFactory.java @@ -18,8 +18,15 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.spi.common.UiChannelFactory; +import org.apache.dolphinscheduler.spi.plugin.SPIIdentify; +import org.apache.dolphinscheduler.spi.plugin.PrioritySPI; -public interface TaskChannelFactory extends UiChannelFactory { +public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI { TaskChannel create(); + + default SPIIdentify getIdentify() { + return SPIIdentify.builder().name(getName()).build(); + } + }