diff --git a/README.md b/README.md
index 7bccebc56d..9582619611 100644
--- a/README.md
+++ b/README.md
@@ -80,7 +80,7 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-${latest.release.version}-s
## Thanks
-DolphinScheduler is based on a lot of excellent open-source projects, such as Google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
+DolphinScheduler is based on a lot of excellent open-source projects, such as Google guava, guice, grpc, netty, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community!
## Get Help
@@ -92,7 +92,7 @@ We would like to express our deep gratitude to all the open-source projects used
You are very welcome to communicate with the developers and users of Dolphin Scheduler. There are two ways to find them:
1. Join the Slack channel by [this invitation link](https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-omtdhuio-_JISsxYhiVsltmC5h38yfw).
-2. Follow the [Twitter account of Dolphin Scheduler](https://twitter.com/dolphinschedule) and get the latest news on time.
+2. Follow the [Twitter account of DolphinScheduler](https://twitter.com/dolphinschedule) and get the latest news on time.
### Contributor over time
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java
index b0f07b6f88..61518b6cbf 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java
@@ -24,9 +24,9 @@ import static org.apache.dolphinscheduler.spi.utils.Constants.STRING_YES;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java
index 41395fc01f..2eefb4932f 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelFactory.java
@@ -26,9 +26,9 @@ import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.ShowType;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java
index 2ddd42ebde..9a87709e9a 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/test/java/org/apache/dolphinscheduler/plugin/alert/email/EmailAlertChannelTest.java
@@ -22,11 +22,10 @@ import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.alert.ShowType;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
-import org.apache.dolphinscheduler.spi.params.base.DataType;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
@@ -142,7 +141,7 @@ public class EmailAlertChannelTest {
emailShowTypeList.add(new ParamsOptions(ShowType.ATTACHMENT.getDescp(), ShowType.ATTACHMENT.getDescp(), false));
emailShowTypeList.add(new ParamsOptions(ShowType.TABLEATTACHMENT.getDescp(), ShowType.TABLEATTACHMENT.getDescp(), false));
RadioParam showType = RadioParam.newBuilder(AlertConstants.SHOW_TYPE, "showType")
- .setParamsOptionsList(emailShowTypeList)
+ .setOptions(emailShowTypeList)
.setValue(ShowType.TABLE.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java
index d85b4233a5..6eb3376a4d 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java
@@ -24,9 +24,9 @@ import static org.apache.dolphinscheduler.spi.utils.Constants.STRING_YES;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelFactory.java
index 6b8dd305fd..d54885eb7f 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/main/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelFactory.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.plugin.alert.http;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/test/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelTest.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/test/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelTest.java
index 4d385e8a54..2d29407e71 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/test/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelTest.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-http/src/test/java/org/apache/dolphinscheduler/plugin/alert/http/HttpAlertChannelTest.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.alert.http;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannelFactory.java
index a81cb4f12e..67cfc3931f 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptAlertChannelFactory.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.alert.script;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.params.InputParam;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/src/main/java/org/apache/dolphinscheduler/plugin/alert/slack/SlackAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/src/main/java/org/apache/dolphinscheduler/plugin/alert/slack/SlackAlertChannelFactory.java
index f3f135ebec..d56976400a 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/src/main/java/org/apache/dolphinscheduler/plugin/alert/slack/SlackAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-slack/src/main/java/org/apache/dolphinscheduler/plugin/alert/slack/SlackAlertChannelFactory.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.plugin.alert.slack;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java
index 636c571322..25b1318f42 100644
--- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java
+++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java
@@ -21,8 +21,8 @@ import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.ShowType;
-import org.apache.dolphinscheduler.spi.params.InputParam;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
index 6d1727f1e4..5c31225edc 100644
--- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
@@ -32,10 +32,10 @@ import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.ShowType;
-import org.apache.dolphinscheduler.spi.params.InputParam;
+import org.apache.dolphinscheduler.spi.params.input.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
-import org.apache.dolphinscheduler.spi.params.RadioParam;
+import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.DataType;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
@@ -217,7 +217,7 @@ public class EmailAlertPluginTest {
emailShowTypeList.add(new ParamsOptions(ShowType.ATTACHMENT.getDescp(), ShowType.ATTACHMENT.getDescp(), false));
emailShowTypeList.add(new ParamsOptions(ShowType.TABLEATTACHMENT.getDescp(), ShowType.TABLEATTACHMENT.getDescp(), false));
RadioParam showType = RadioParam.newBuilder(AlertConstants.SHOW_TYPE, "showType")
- .setParamsOptionsList(emailShowTypeList)
+ .setOptions(emailShowTypeList)
.setValue(ShowType.TABLE.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
index a16480fd6a..fec3342cf1 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
@@ -80,7 +80,9 @@
left join t_ds_user tu on td.user_id = tu.id
where td.project_code = #{projectCode}
- and td.name like concat('%', #{searchVal}, '%')
+ AND (td.name like concat('%', #{searchVal}, '%')
+ OR td.description like concat('%', #{searchVal}, '%')
+ )
and td.user_id = #{userId}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
index 59a24731fe..1b44c22158 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
@@ -88,7 +88,9 @@
)
- and p.name like concat('%', #{searchName}, '%')
+ AND (p.name LIKE concat('%', #{searchName}, '%')
+ OR p.description LIKE concat('%', #{searchName}, '%')
+ )
order by p.create_time desc
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index e5689b8bb2..557a993da6 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -74,4 +74,10 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 03544ad713..7c59fb6955 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -86,6 +86,7 @@
org.springframework
spring-test
+ test
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 91566b11a8..cd744a1998 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -17,14 +17,17 @@
package org.apache.dolphinscheduler.server.worker;
+import com.google.common.collect.ImmutableList;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
@@ -39,6 +42,9 @@ import java.util.Set;
import javax.annotation.PostConstruct;
+import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
+import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -101,6 +107,8 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerManagerThread workerManagerThread;
+ private TaskPluginManager taskPluginManager;
+
/**
* worker server startup, not use web service
*
@@ -119,11 +127,13 @@ public class WorkerServer implements IStoppable {
// alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT);
+ // init task plugin
+ initTaskPlugin();
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService));
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
@@ -157,6 +167,26 @@ public class WorkerServer implements IStoppable {
}));
}
+ private void initTaskPlugin() {
+ taskPluginManager = new TaskPluginManager();
+ DolphinPluginManagerConfig taskPluginManagerConfig = new DolphinPluginManagerConfig();
+ taskPluginManagerConfig.setPlugins(workerConfig.getTaskPluginBinding());
+ if (StringUtils.isNotBlank(workerConfig.getTaskPluginDir())) {
+ taskPluginManagerConfig.setInstalledPluginsDir(workerConfig.getTaskPluginDir().trim());
+ }
+
+ if (StringUtils.isNotBlank(workerConfig.getMavenLocalRepository())) {
+ taskPluginManagerConfig.setMavenLocalRepository(workerConfig.getMavenLocalRepository().trim());
+ }
+
+ DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(taskPluginManagerConfig, ImmutableList.of(taskPluginManager));
+ try {
+ alertPluginLoader.loadPlugins();
+ } catch (Exception e) {
+ throw new RuntimeException("Load Task Plugin Failed !", e);
+ }
+ }
+
public void close(String cause) {
try {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 2c71a97f30..a3feb7777d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -56,6 +56,15 @@ public class WorkerConfig {
@Value("${alert.listen.host:localhost}")
private String alertListenHost;
+ @Value("${task.plugin.dir:}")
+ private String taskPluginDir;
+
+ @Value("${maven.local.repository:}")
+ private String mavenLocalRepository;
+
+ @Value("${task.plugin.binding:}")
+ private String taskPluginBinding;
+
public int getListenPort() {
return listenPort;
}
@@ -130,4 +139,28 @@ public class WorkerConfig {
public void setAlertListenHost(String alertListenHost) {
this.alertListenHost = alertListenHost;
}
+
+ public String getTaskPluginDir() {
+ return taskPluginDir;
+ }
+
+ public void setTaskPluginDir(String taskPluginDir) {
+ this.taskPluginDir = taskPluginDir;
+ }
+
+ public String getMavenLocalRepository() {
+ return mavenLocalRepository;
+ }
+
+ public void setMavenLocalRepository(String mavenLocalRepository) {
+ this.mavenLocalRepository = mavenLocalRepository;
+ }
+
+ public String getTaskPluginBinding() {
+ return taskPluginBinding;
+ }
+
+ public void setTaskPluginBinding(String taskPluginBinding) {
+ this.taskPluginBinding = taskPluginBinding;
+ }
}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/plugin/TaskPluginManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/plugin/TaskPluginManager.java
index 7f51d8daea..a76a1bb661 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/plugin/TaskPluginManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/plugin/TaskPluginManager.java
@@ -83,6 +83,10 @@ public class TaskPluginManager extends AbstractDolphinPluginManager {
private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
+ public Map getTaskChannelMap() {
+ return taskChannelMap;
+ }
+
@Override
public void installPlugin(DolphinSchedulerPlugin dolphinSchedulerPlugin) {
for (TaskChannelFactory taskChannelFactory : dolphinSchedulerPlugin.getTaskChannelFactorys()) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 047dc6d9ed..718c9490ba 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
@@ -74,6 +75,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/
private AlertClientService alertClientService;
+ private TaskPluginManager taskPluginManager;
+
/**
* taskExecutionContextCacheManager
*/
@@ -102,9 +105,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache);
}
- public TaskExecuteProcessor(AlertClientService alertClientService) {
+ public TaskExecuteProcessor(AlertClientService alertClientService, TaskPluginManager taskPluginManager) {
this();
this.alertClientService = alertClientService;
+ this.taskPluginManager = taskPluginManager;
}
@Override
@@ -177,7 +181,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
this.doAck(taskExecutionContext);
// submit task to manager
- if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) {
+ if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService, taskPluginManager))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 50847f7e13..597529406d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -35,8 +35,8 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
-import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -55,6 +55,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.dolphinscheduler.spi.task.AbstractTask;
+import org.apache.dolphinscheduler.spi.task.TaskChannel;
+import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,14 +103,17 @@ public class TaskExecuteThread implements Runnable, Delayed {
*/
private AlertClientService alertClientService;
+ private TaskPluginManager taskPluginManager;
+
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param taskCallbackService taskCallbackService
*/
- public TaskExecuteThread(TaskExecutionContext taskExecutionContext
- , TaskCallbackService taskCallbackService
- , Logger taskLogger, AlertClientService alertClientService) {
+ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
+ TaskCallbackService taskCallbackService,
+ Logger taskLogger,
+ AlertClientService alertClientService) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
@@ -115,6 +121,19 @@ public class TaskExecuteThread implements Runnable, Delayed {
this.alertClientService = alertClientService;
}
+ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
+ TaskCallbackService taskCallbackService,
+ Logger taskLogger,
+ AlertClientService alertClientService,
+ TaskPluginManager taskPluginManager) {
+ this.taskExecutionContext = taskExecutionContext;
+ this.taskCallbackService = taskCallbackService;
+ this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+ this.taskLogger = taskLogger;
+ this.alertClientService = alertClientService;
+ this.taskPluginManager = taskPluginManager;
+ }
+
@Override
public void run() {
@@ -150,24 +169,31 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
- task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
+ TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
+
+ //TODO Temporary operation, To be adjusted
+ TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class);
+ task = taskChannel.createTask(taskRequest, taskLogger);
// task init
- task.init();
+ this.task.init();
//init varPool
- task.getParameters().setVarPool(taskExecutionContext.getVarPool());
+ //TODO Temporary operation, To be adjusted
+// this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
- task.handle();
+ this.task.handle();
// task result process
- task.after();
+ this.task.after();
- responseCommand.setStatus(task.getExitStatus().getCode());
+ responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
- responseCommand.setProcessId(task.getProcessId());
- responseCommand.setAppIds(task.getAppIds());
- responseCommand.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
- logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
- } catch (Exception e) {
+ responseCommand.setProcessId(this.task.getProcessId());
+ responseCommand.setAppIds(this.task.getAppIds());
+ //TODO Temporary operation, To be adjusted
+// responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
+ logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
+ } catch (Throwable e) {
+ e.printStackTrace();
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 9dd8b516ed..3c4b3ab273 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -42,6 +42,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.commons.collections.MapUtils;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -271,11 +273,11 @@ public class SqlTask extends AbstractTask {
public String setNonQuerySqlReturn(String updateResult, List properties) {
String result = null;
- for (Property info :properties) {
+ for (Property info : properties) {
if (Direct.OUT == info.getDirect()) {
- List