diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
index 7882bac404..d5338cad4c 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
@@ -37,6 +37,7 @@ public enum TaskType {
* 11 CONDITIONS
* 12 SQOOP
* 13 WATERDROP
+ * 15 PIGEON
*/
SHELL(0, "SHELL"),
SQL(1, "SQL"),
@@ -53,8 +54,7 @@ public enum TaskType {
SQOOP(12, "SQOOP"),
WATERDROP(13, "WATERDROP"),
SWITCH(14, "SWITCH"),
- TIS(15, "TIS"),
- ;
+ PIGEON(15, "PIGEON");
TaskType(int code, String desc) {
this.code = code;
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/PigeonCommonParameters.java
similarity index 94%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java
rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/PigeonCommonParameters.java
index aebbdd0746..6606cc13e9 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/TISCommonParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/tis/PigeonCommonParameters.java
@@ -30,9 +30,9 @@ import org.slf4j.LoggerFactory;
/**
* TIS parameter
*/
-public class TISCommonParameters extends AbstractParameters {
+public class PigeonCommonParameters extends AbstractParameters {
- private static final Logger logger = LoggerFactory.getLogger(TISCommonParameters.class);
+ private static final Logger logger = LoggerFactory.getLogger(PigeonCommonParameters.class);
/**
* TIS target job name
*/
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
index e4f990d052..781b83b828 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
-import org.apache.dolphinscheduler.common.task.tis.TISCommonParameters;
+import org.apache.dolphinscheduler.common.task.tis.PigeonCommonParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,8 +86,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, SqoopParameters.class);
case "SWITCH":
return JSONUtils.parseObject(parameter, SwitchParameters.class);
- case "TIS":
- return JSONUtils.parseObject(parameter, TISCommonParameters.class);
+ case "PIGEON":
+ return JSONUtils.parseObject(parameter, PigeonCommonParameters.class);
default:
logger.error("not support task type: {}", taskType);
return null;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java
index 47fe1acfa5..ad9f5f2342 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtilsTest.java
@@ -41,6 +41,6 @@ public class TaskParametersUtilsTest {
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.DEPENDENT.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.FLINK.getDesc(), "{}"));
Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.HTTP.getDesc(), "{}"));
- Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.TIS.getDesc(), "{}"));
+ Assert.assertNotNull(TaskParametersUtils.getParameters(TaskType.PIGEON.getDesc(), "{}"));
}
}
diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
index fc480acef1..2ed6a3fc58 100644
--- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
+++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
@@ -106,8 +106,8 @@
-
-
+
+
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 3ccb827b5c..7b18e2bc71 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -137,7 +137,7 @@ public class TaskPriorityQueueConsumer extends Thread {
result = dispatcher.dispatch(executionContext);
}
} catch (ExecuteException e) {
- logger.error("dispatch error: {}", e.getMessage());
+ logger.error("dispatch error: {}", e.getMessage(),e);
}
return result;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/pom.xml
similarity index 96%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/pom.xml
index 71fc55294d..4a77c5a91e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/pom.xml
@@ -26,7 +26,7 @@
4.0.0
- dolphinscheduler-task-tis
+ dolphinscheduler-task-pigeon
dolphinscheduler-plugin
@@ -107,7 +107,7 @@
- dolphinscheduler-task-tis-${project.version}
+ dolphinscheduler-task-pigeon-${project.version}
\ No newline at end of file
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/readme.md b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/readme.md
similarity index 100%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/readme.md
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/readme.md
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonConfig.java
similarity index 78%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonConfig.java
index e6a00f0453..a8a70ce0a5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISConfig.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonConfig.java
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.tis;
+package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ResourceBundle;
-public class TISConfig {
+public class PigeonConfig {
- private static TISConfig cfg;
+ private static PigeonConfig cfg;
private final String jobTriggerUrl;
private final String jobTriggerPostBody;
@@ -33,15 +33,15 @@ public class TISConfig {
private final String jobLogsFetchUrl;
private final String jobCancelPostBody;
- public static synchronized TISConfig getInstance() {
+ public static synchronized PigeonConfig getInstance() {
if (cfg == null) {
- cfg = new TISConfig();
+ cfg = new PigeonConfig();
}
return cfg;
}
- private TISConfig() {
- ResourceBundle bundle = ResourceBundle.getBundle(TISConfig.class.getPackage().getName().replace(".", "/") + "/config");
+ private PigeonConfig() {
+ ResourceBundle bundle = ResourceBundle.getBundle(PigeonConfig.class.getPackage().getName().replace(".", "/") + "/config");
this.jobTriggerUrl = bundle.getString("job.trigger.url");
this.jobStatusUrl = bundle.getString("job.status.url");
this.jobTriggerPostBody = bundle.getString("job.trigger.post.body");
@@ -55,7 +55,7 @@ public class TISConfig {
}
public String getJobTriggerUrl(String tisHost) {
- checkTisHost(tisHost);
+ checkHost(tisHost);
return String.format(this.jobTriggerUrl, tisHost);
}
@@ -67,17 +67,17 @@ public class TISConfig {
return String.format(jobStatusPostBody, taskId);
}
- public String getJobLogsFetchUrl(String tisHost, String jobName, int taskId) {
- checkTisHost(tisHost);
- return String.format(jobLogsFetchUrl, tisHost, jobName, taskId);
+ public String getJobLogsFetchUrl(String host, String jobName, int taskId) {
+ checkHost(host);
+ return String.format(jobLogsFetchUrl, host, jobName, taskId);
}
public String getJobStatusUrl(String tisHost) {
- checkTisHost(tisHost);
+ checkHost(tisHost);
return String.format(this.jobStatusUrl, tisHost);
}
- private static void checkTisHost(String tisHost) {
+ private static void checkHost(String tisHost) {
if (StringUtils.isBlank(tisHost)) {
throw new IllegalArgumentException("param tisHost can not be null");
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParameters.java
similarity index 91%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParameters.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParameters.java
index f19c33ed43..94a544f138 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParameters.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.tis;
+package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
@@ -30,11 +30,11 @@ import org.slf4j.LoggerFactory;
/**
* TIS parameter
*/
-public class TISParameters extends AbstractParameters {
+public class PigeonParameters extends AbstractParameters {
- private static final Logger logger = LoggerFactory.getLogger(TISParameters.class);
+ private static final Logger logger = LoggerFactory.getLogger(PigeonParameters.class);
/**
- * TIS target job name
+ * Pigeon target job name
*/
private String targetJobName;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParamsConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParamsConstants.java
similarity index 87%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParamsConstants.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParamsConstants.java
index 54dfcebb7a..e50755a728 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParamsConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonParamsConstants.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.tis;
+package org.apache.dolphinscheduler.plugin.task.pigeon;
-public class TISParamsConstants {
+public class PigeonParamsConstants {
public static String NAME_TARGET_JOB_NAME = "targetJobName";
public static String TARGET_JOB_NAME = NAME_TARGET_JOB_NAME;
- private TISParamsConstants() {
+ private PigeonParamsConstants() {
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
similarity index 84%
rename from dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java
rename to dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index aca7a5b04b..2ed7deef68 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.plugin.task.tis;
+package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
@@ -48,51 +48,51 @@ import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
-public class TISTask extends AbstractTaskExecutor {
+public class PigeonTask extends AbstractTaskExecutor {
- public static final String KEY_POOL_VAR_TIS_HOST = "tisHost";
+ public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host";
private final TaskRequest taskExecutionContext;
- private TISParameters tisParameters;
+ private PigeonParameters parameters;
private BizResult triggerResult;
- private final TISConfig tisConfig;
+ private final PigeonConfig config;
- public TISTask(TaskRequest taskExecutionContext) {
+ public PigeonTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
- this.tisConfig = TISConfig.getInstance();
+ this.config = PigeonConfig.getInstance();
}
@Override
public void init() {
super.init();
- logger.info("tis task params {}", taskExecutionContext.getTaskParams());
- tisParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), TISParameters.class);
- if (!tisParameters.checkParameters()) {
+ logger.info("PIGEON task params {}", taskExecutionContext.getTaskParams());
+ parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PigeonParameters.class);
+ if (!parameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
}
@Override
public void handle() throws Exception {
- // Trigger TIS DataX pipeline
- logger.info("start execute TIS task");
+ // Trigger PIGEON DataX pipeline
+ logger.info("start execute PIGEON task");
long startTime = System.currentTimeMillis();
- String targetJobName = this.tisParameters.getTargetJobName();
- String tisHost = getTisHost();
+ String targetJobName = this.parameters.getTargetJobName();
+ String host = getHost();
try {
final String triggerUrl = getTriggerUrl();
- final String getStatusUrl = tisConfig.getJobStatusUrl(tisHost);
+ final String getStatusUrl = config.getJobStatusUrl(host);
HttpPost post = new HttpPost(triggerUrl);
post.addHeader("appname", targetJobName);
addFormUrlencoded(post);
- StringEntity entity = new StringEntity(tisConfig.getJobTriggerPostBody(), StandardCharsets.UTF_8);
+ StringEntity entity = new StringEntity(config.getJobTriggerPostBody(), StandardCharsets.UTF_8);
post.setEntity(entity);
ExecResult execState = null;
int taskId;
WebSocketClient webSocket = null;
try (CloseableHttpClient client = HttpClients.createDefault();
- // trigger to start TIS dataX task
+ // trigger to start PIGEON dataX task
CloseableHttpResponse response = client.execute(post)) {
triggerResult = processResponse(triggerUrl, response, BizResult.class);
if (!triggerResult.isSuccess()) {
@@ -101,11 +101,11 @@ public class TISTask extends AbstractTaskExecutor {
if (CollectionUtils.isNotEmpty(errormsg)) {
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(",")));
}
- throw new Exception("trigger TIS job faild taskName:" + targetJobName + errs.toString());
+ throw new Exception("trigger PIGEON job faild taskName:" + targetJobName + errs.toString());
}
taskId = triggerResult.getBizresult().getTaskid();
- webSocket = receiveRealtimeLog(tisHost, targetJobName, taskId);
+ webSocket = receiveRealtimeLog(host, targetJobName, taskId);
setAppIds(String.valueOf(taskId));
@@ -141,11 +141,11 @@ public class TISTask extends AbstractTaskExecutor {
}
long costTime = System.currentTimeMillis() - startTime;
- logger.info("TIS task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
+ logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE);
} catch (Exception e) {
- logger.error("execute TIS dataX faild,TIS task name:" + targetJobName, e);
+ logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
@@ -165,7 +165,7 @@ public class TISTask extends AbstractTaskExecutor {
logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
final String triggerUrl = getTriggerUrl();
- StringEntity entity = new StringEntity(tisConfig.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
+ StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
CancelResult cancelResult = null;
HttpPost post = new HttpPost(triggerUrl);
@@ -181,26 +181,26 @@ public class TISTask extends AbstractTaskExecutor {
if (org.apache.dolphinscheduler.spi.utils.CollectionUtils.isNotEmpty(errormsg)) {
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(",")));
}
- throw new Exception("cancel TIS job faild taskId:" + triggerResult.getTaskId() + errs.toString());
+ throw new Exception("cancel PIGEON job faild taskId:" + triggerResult.getTaskId() + errs.toString());
}
}
}
private String getTriggerUrl() {
- final String tisHost = getTisHost();
- return tisConfig.getJobTriggerUrl(tisHost);
+ final String tisHost = getHost();
+ return config.getJobTriggerUrl(tisHost);
}
- private String getTisHost() {
- final String tisHost = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_TIS_HOST);
- if (StringUtils.isEmpty(tisHost)) {
- throw new IllegalStateException("global var '" + KEY_POOL_VAR_TIS_HOST + "' can not be empty");
+ private String getHost() {
+ final String host = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_PIGEON_HOST);
+ if (StringUtils.isEmpty(host)) {
+ throw new IllegalStateException("global var '" + KEY_POOL_VAR_PIGEON_HOST + "' can not be empty");
}
- return tisHost;
+ return host;
}
private WebSocketClient receiveRealtimeLog(final String tisHost, String dataXName, int taskId) throws Exception {
- final String applyURI = tisConfig.getJobLogsFetchUrl(tisHost, dataXName, taskId);
+ final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId);
logger.info("apply ws connection,uri:{}", applyURI);
WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) {
@Override
@@ -241,8 +241,8 @@ public class TISTask extends AbstractTaskExecutor {
@Override
public AbstractParameters getParameters() {
- Objects.requireNonNull(this.tisParameters, "tisParameters can not be null");
- return this.tisParameters;
+ Objects.requireNonNull(this.parameters, "tisParameters can not be null");
+ return this.parameters;
}
private static class CancelResult extends AjaxResult