getApplicationIds() throws TaskException {
+ return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger);
+ }
+
+ /**
+ * find app id
+ *
+ * @param line line
+ * @return appid
+ */
+ protected String findAppId(String line) {
+ Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
+ if (matcher.find()) {
+ return matcher.group();
+ }
+ return null;
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
new file mode 100644
index 0000000000..71643d56cd
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskCallBack.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api;
+
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
+
+public interface TaskCallBack {
+
+ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo);
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
index a8d297cb0d..d4e9d6d01b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/enums/TaskExecutionStatus.java
@@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.api.enums;
-import com.baomidou.mybatisplus.annotation.EnumValue;
-
import java.util.HashMap;
import java.util.Map;
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
public enum TaskExecutionStatus {
SUBMITTED_SUCCESS(0, "submit success"),
@@ -80,7 +80,7 @@ public enum TaskExecutionStatus {
}
public boolean isFailure() {
- return this == TaskExecutionStatus.FAILURE;
+ return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE;
}
public boolean isPause() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
index a82e5f6bbd..4567076405 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
@@ -17,13 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
-public abstract class AbstractK8sTask extends AbstractTaskExecutor {
+public abstract class AbstractK8sTask extends AbstractRemoteTask {
/**
* process task
*/
@@ -38,8 +40,9 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor {
this.abstractK8sTaskExecutor = new K8sTaskExecutor(logger,taskRequest);
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
@@ -50,15 +53,25 @@ public abstract class AbstractK8sTask extends AbstractTaskExecutor {
}
}
+ // todo
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ // todo
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
/**
* cancel application
*
- * @param status status
* @throws Exception exception
*/
@Override
- public void cancelApplication(boolean status) throws Exception {
- cancel = true;
+ public void cancelApplication() throws TaskException {
// cancel process
abstractK8sTaskExecutor.cancelApplication(buildCommand());
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
index 3c4857f2d0..36fcef97c3 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
@@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.api.loop;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.RetryUtils;
import java.time.Duration;
@@ -34,7 +36,12 @@ import lombok.NonNull;
*
* The loop task type means, we will submit a task, and loop the task status until the task is finished.
*/
-public abstract class BaseLoopTaskExecutor extends AbstractTaskExecutor {
+public abstract class BaseLoopTaskExecutor extends AbstractRemoteTask {
+
+ /**
+ * cancel flag
+ */
+ protected volatile boolean cancel = false;
/**
* The task instance info will be set when task has submitted successful.
@@ -46,11 +53,13 @@ public abstract class BaseLoopTaskExecutor extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
final long loopInterval = getTaskInstanceStatusQueryInterval().toMillis();
loopTaskInstanceInfo = submitLoopTask();
- this.appIds = loopTaskInstanceInfo.getTaskInstanceId();
+ this.setAppIds(loopTaskInstanceInfo.getTaskInstanceId());
+ taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(), new ApplicationInfo(getAppIds()));
+
// loop the task status until the task is finished or task has been canceled.
// we use retry utils here to avoid the task status query failure due to network failure.
// the default retry policy is 3 times, and the interval is 1 second.
@@ -94,8 +103,7 @@ public abstract class BaseLoopTaskExecutor extends AbstractTaskExecutor {
/**
* Query the loop task status, if query failed, directly throw exception
*/
- public abstract @NonNull LoopTaskInstanceStatus queryTaskInstanceStatus(@NonNull LoopTaskInstanceInfo taskInstanceInfo)
- throws TaskException;
+ public abstract @NonNull LoopTaskInstanceStatus queryTaskInstanceStatus(@NonNull LoopTaskInstanceInfo taskInstanceInfo) throws TaskException;
/**
* Get the interval time to query the loop task status
@@ -110,8 +118,8 @@ public abstract class BaseLoopTaskExecutor extends AbstractTaskExecutor {
public abstract void cancelLoopTaskInstance(@Nullable LoopTaskInstanceInfo taskInstanceInfo) throws TaskException;
@Override
- public void cancelApplication(boolean status) throws Exception {
+ public void cancelApplication() throws TaskException {
+ this.cancel = true;
cancelLoopTaskInstance(loopTaskInstanceInfo);
- super.cancelApplication(status);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ApplicationInfo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ApplicationInfo.java
new file mode 100644
index 0000000000..4320cc40b0
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/ApplicationInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.api.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ApplicationInfo {
+
+ String appIds;
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/stream/StreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/stream/StreamTask.java
index d554278516..51ab17e156 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/stream/StreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/stream/StreamTask.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api.stream;
-import java.io.IOException;
-
public interface StreamTask {
public void savePoint() throws Exception;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
index 220d2583d2..0df5c3a911 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
@@ -17,47 +17,46 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
-import lombok.NonNull;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
-import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
+import lombok.NonNull;
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.slf4j.Logger;
+
@Slf4j
@UtilityClass
public class LogUtils {
private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
- public List getAppIdsFromLogFile(@NonNull String logPath) {
+ public Set getAppIdsFromLogFile(@NonNull String logPath) {
return getAppIdsFromLogFile(logPath, log);
}
- public List getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
+ public Set getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) {
- return Collections.emptyList();
+ return Collections.emptySet();
}
Set appIds = new HashSet<>();
try (Stream stream = Files.lines(Paths.get(logPath))) {
stream.filter(line -> {
- Matcher matcher = APPLICATION_REGEX.matcher(line);
- return matcher.find();
- }
- ).forEach(line -> {
+ Matcher matcher = APPLICATION_REGEX.matcher(line);
+ return matcher.find();
+ }).forEach(line -> {
Matcher matcher = APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
String appId = matcher.group();
@@ -66,10 +65,10 @@ public class LogUtils {
}
}
});
- return new ArrayList<>(appIds);
+ return appIds;
} catch (IOException e) {
logger.error("Get appId from log file erro, logPath: {}", logPath, e);
- return Collections.emptyList();
+ return Collections.emptySet();
}
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
index feb10ad859..bd961d3137 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
@@ -17,11 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
-import com.google.common.collect.Lists;
+import java.util.Set;
+
import org.junit.Assert;
import org.junit.Test;
-import java.util.List;
+import com.google.common.collect.Sets;
public class LogUtilsTest {
@@ -30,7 +31,7 @@ public class LogUtilsTest {
@Test
public void getAppIdsFromLogFile() {
- List appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
- Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
+ Set appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
+ Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds);
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
index 48914ea2e1..a690dcd383 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
@@ -18,9 +18,11 @@
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.commons.lang.SystemUtils;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -41,6 +43,7 @@ import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,7 +54,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
/**
* chunjun task
*/
-public class ChunJunTask extends AbstractTaskExecutor {
+public class ChunJunTask extends AbstractTask {
/**
* chunjun path
*/
@@ -104,7 +107,7 @@ public class ChunJunTask extends AbstractTaskExecutor {
* @throws TaskException exception
*/
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
Map paramsMap = taskExecutionContext.getPrepareParamsMap();
@@ -113,7 +116,9 @@ public class ChunJunTask extends AbstractTaskExecutor {
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
+
+ // todo get applicationId
+ setAppIds(String.join(TaskConstants.COMMA, Collections.emptySet()));
setProcessId(commandExecuteResult.getProcessId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -251,13 +256,16 @@ public class ChunJunTask extends AbstractTaskExecutor {
/**
* cancel ChunJun process
*
- * @param cancelApplication cancelApplication
* @throws Exception if error throws Exception
*/
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 37905d30fe..48a7f7f743 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -33,8 +33,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -75,7 +76,7 @@ import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUt
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
-public class DataxTask extends AbstractTaskExecutor {
+public class DataxTask extends AbstractTask {
/**
* jvm parameters
*/
@@ -147,7 +148,7 @@ public class DataxTask extends AbstractTaskExecutor {
* @throws TaskException if error throws Exception
*/
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map paramsMap = taskExecutionContext.getPrepareParamsMap();
@@ -158,7 +159,6 @@ public class DataxTask extends AbstractTaskExecutor {
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -175,14 +175,16 @@ public class DataxTask extends AbstractTaskExecutor {
/**
* cancel DataX process
*
- * @param cancelApplication cancelApplication
- * @throws Exception if error throws Exception
+ * @throws TaskException if error throws Exception
*/
@Override
- public void cancelApplication(boolean cancelApplication)
- throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
index 987126b0a7..51751fe10d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
@@ -21,7 +21,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -40,12 +43,14 @@ import org.apache.http.util.EntityUtils;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-public class DinkyTask extends AbstractTaskExecutor {
+public class DinkyTask extends AbstractRemoteTask {
/**
* taskExecutionContext
@@ -67,6 +72,11 @@ public class DinkyTask extends AbstractTaskExecutor {
this.taskExecutionContext = taskExecutionContext;
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
@@ -77,8 +87,9 @@ public class DinkyTask extends AbstractTaskExecutor {
}
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
String address = this.dinkyParameters.getAddress();
@@ -130,6 +141,16 @@ public class DinkyTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
/**
* map dinky task status to exitStatusCode
*
@@ -166,8 +187,7 @@ public class DinkyTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean status) throws Exception {
- super.cancelApplication(status);
+ public void cancelApplication() throws TaskException {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}",
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
index 4fb9f7b8fa..d049f8360c 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
@@ -17,9 +17,11 @@
package org.apache.dolphinscheduler.plugin.task.dvc;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
-import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
@@ -29,12 +31,10 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-
/**
* shell task
*/
-public class DvcTask extends AbstractTaskExecutor {
+public class DvcTask extends AbstractTask {
/**
* dvc parameters
@@ -75,13 +75,12 @@ public class DvcTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
@@ -97,9 +96,13 @@ public class DvcTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
public String buildCommand() {
@@ -159,12 +162,9 @@ public class DvcTask extends AbstractTaskExecutor {
}
-
@Override
public AbstractParameters getParameters() {
return parameters;
}
-
}
-
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
index 329c3bc3d5..07ff171ba1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
@@ -22,7 +22,8 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -44,7 +45,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
*
* @since v3.1.0
*/
-public abstract class AbstractEmrTask extends AbstractTaskExecutor {
+public abstract class AbstractEmrTask extends AbstractRemoteTask {
final TaskExecutionContext taskExecutionContext;
EmrParameters emrParameters;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
index 6f4fc95e48..a746185805 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
@@ -30,11 +30,15 @@ import com.amazonaws.services.elasticmapreduce.model.StepState;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -62,7 +66,12 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
- public void handle() throws TaskException {
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void submitApplication() throws TaskException {
StepStatus stepStatus = null;
try {
AddJobFlowStepsRequest addJobFlowStepsRequest = createAddJobFlowStepsRequest();
@@ -77,13 +86,27 @@ public class EmrAddStepsTask extends AbstractEmrTask {
stepStatus = getStepStatus();
+ } catch (EmrTaskException | SdkBaseException e) {
+ logger.error("emr task submit failed with error", e);
+ throw new TaskException("emr task submit fail", e);
+ } finally {
+ final int exitStatusCode = calculateExitStatusCode(stepStatus);
+ setExitStatusCode(exitStatusCode);
+ logger.info("emr task finished with step status : {}", stepStatus);
+ }
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+ StepStatus stepStatus = getStepStatus();
+
+ try {
while (waitingStateSet.contains(stepStatus.getState())) {
TimeUnit.SECONDS.sleep(10);
stepStatus = getStepStatus();
}
-
} catch (EmrTaskException | SdkBaseException e) {
- logger.error("emr task submit failed with error", e);
+ logger.error("emr task failed with error", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Execute emr task failed", e);
@@ -154,8 +177,7 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
- public void cancelApplication(boolean status) throws Exception {
- super.cancelApplication(status);
+ public void cancelApplication() throws TaskException {
logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
index bc361f6538..770fb9f996 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
@@ -21,7 +21,9 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
@@ -41,10 +43,9 @@ import com.google.common.collect.Sets;
public class EmrJobFlowTask extends AbstractEmrTask {
private final HashSet waitingStateSet = Sets.newHashSet(
- ClusterState.STARTING.toString(),
- ClusterState.BOOTSTRAPPING.toString(),
- ClusterState.RUNNING.toString()
- );
+ ClusterState.STARTING.toString(),
+ ClusterState.BOOTSTRAPPING.toString(),
+ ClusterState.RUNNING.toString());
/**
* constructor
@@ -56,7 +57,12 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
- public void handle() throws TaskException {
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void submitApplication() throws TaskException {
ClusterStatus clusterStatus = null;
try {
RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest();
@@ -65,18 +71,34 @@ public class EmrJobFlowTask extends AbstractEmrTask {
RunJobFlowResult result = emrClient.runJobFlow(runJobFlowRequest);
clusterId = result.getJobFlowId();
- // Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task type . Other task type, such as EMR task, k8s task not ready yet.
+ // Failover on EMR Task type has not been implemented. In this time, DS only supports failover on yarn task
+ // type . Other task type, such as EMR task, k8s task not ready yet.
setAppIds(clusterId);
clusterStatus = getClusterStatus();
+ } catch (EmrTaskException | SdkBaseException e) {
+ logger.error("emr task submit failed with error", e);
+ throw new TaskException("emr task submit failed", e);
+ } finally {
+ final int exitStatusCode = calculateExitStatusCode(clusterStatus);
+ setExitStatusCode(exitStatusCode);
+ logger.info("emr task finished with cluster status : {}", clusterStatus);
+ }
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+ ClusterStatus clusterStatus = null;
+ try {
+ clusterStatus = getClusterStatus();
+
while (waitingStateSet.contains(clusterStatus.getState())) {
TimeUnit.SECONDS.sleep(10);
clusterStatus = getClusterStatus();
}
-
} catch (EmrTaskException | SdkBaseException e) {
- logger.error("emr task submit failed with error", e);
+ logger.error("emr task failed with error", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Execute emr task failed", e);
@@ -123,7 +145,8 @@ public class EmrJobFlowTask extends AbstractEmrTask {
case TERMINATED:
case TERMINATING:
String code = stateChangeReason.getCode();
- if (code != null && code.equalsIgnoreCase(ClusterStateChangeReasonCode.ALL_STEPS_COMPLETED.toString())) {
+ if (code != null
+ && code.equalsIgnoreCase(ClusterStateChangeReasonCode.ALL_STEPS_COMPLETED.toString())) {
return TaskConstants.EXIT_CODE_SUCCESS;
} else {
return TaskConstants.EXIT_CODE_KILL;
@@ -148,9 +171,9 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
- public void cancelApplication(boolean status) throws Exception {
- super.cancelApplication(status);
- logger.info("trying terminate job flow, taskId:{}, clusterId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId);
+ public void cancelApplication() throws TaskException {
+ logger.info("trying terminate job flow, taskId:{}, clusterId:{}", this.taskExecutionContext.getTaskInstanceId(),
+ clusterId);
TerminateJobFlowsRequest terminateJobFlowsRequest = new TerminateJobFlowsRequest().withJobFlowIds(clusterId);
TerminateJobFlowsResult terminateJobFlowsResult = emrClient.terminateJobFlows(terminateJobFlowsRequest);
logger.info("the result of terminate job flow is:{}", terminateJobFlowsResult);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
index d74d36fa82..48aee81648 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTaskTest.java
@@ -28,6 +28,8 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -88,6 +90,9 @@ public class EmrAddStepsTaskTest {
private EmrAddStepsTask emrAddStepsTask;
private AmazonElasticMapReduce emrClient;
private Step step;
+ private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+
+ };
@Before
public void before() throws Exception {
@@ -116,15 +121,14 @@ public class EmrAddStepsTaskTest {
emrAddStepsTask.init();
}
- @Test
+ @Test(expected = TaskException.class)
public void testCanNotParseJson() throws Exception {
mockStatic(JSONUtils.class);
when(emrAddStepsTask, "createAddJobFlowStepsRequest").thenThrow(new EmrTaskException("can not parse AddJobFlowStepsRequest from json", new Exception("error")));
- emrAddStepsTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
+ emrAddStepsTask.handle(taskCallBack);
}
- @Test
+ @Test(expected = TaskException.class)
public void testDefineJsonStepNotOne() throws Exception {
// mock EmrParameters and EmrAddStepsTask
EmrParameters emrParameters = buildErrorEmrTaskParameters();
@@ -134,16 +138,14 @@ public class EmrAddStepsTaskTest {
emrAddStepsTask = spy(new EmrAddStepsTask(taskExecutionContext));
doReturn(emrClient).when(emrAddStepsTask, "createEmrClient");
emrAddStepsTask.init();
- emrAddStepsTask.handle();
-
- Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
+ emrAddStepsTask.handle(taskCallBack);
}
@Test
public void testHandle() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, completedState);
- emrAddStepsTask.handle();
+ emrAddStepsTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_SUCCESS, emrAddStepsTask.getExitStatusCode());
}
@@ -151,19 +153,18 @@ public class EmrAddStepsTaskTest {
public void testHandleUserRequestTerminate() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, cancelledState);
- emrAddStepsTask.handle();
+ emrAddStepsTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_KILL, emrAddStepsTask.getExitStatusCode());
}
- @Test
+ @Test(expected = TaskException.class)
public void testHandleError() throws Exception {
when(step.getStatus()).thenReturn(pendingState, runningState, failedState);
- emrAddStepsTask.handle();
+ emrAddStepsTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
when(emrClient.addJobFlowSteps(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException());
- emrAddStepsTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrAddStepsTask.getExitStatusCode());
+ emrAddStepsTask.handle(taskCallBack);
}
private EmrParameters buildEmrTaskParameters() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
index 65c6c0c239..71596ea53e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/test/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTaskTest.java
@@ -28,6 +28,8 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
@@ -117,6 +119,9 @@ public class EmrJobFlowTaskTest {
private EmrJobFlowTask emrJobFlowTask;
private AmazonElasticMapReduce emrClient;
private Cluster cluster;
+ private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+
+ };
@Before
public void before() throws Exception {
@@ -146,7 +151,7 @@ public class EmrJobFlowTaskTest {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatingStatus);
- emrJobFlowTask.handle();
+ emrJobFlowTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode());
}
@@ -155,7 +160,7 @@ public class EmrJobFlowTaskTest {
public void testHandleAliveWhenNoSteps() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, waitingStatus);
- emrJobFlowTask.handle();
+ emrJobFlowTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_SUCCESS, emrJobFlowTask.getExitStatusCode());
}
@@ -163,7 +168,7 @@ public class EmrJobFlowTaskTest {
public void testHandleUserRequestTerminate() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, userRequestTerminateStatus);
- emrJobFlowTask.handle();
+ emrJobFlowTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_KILL, emrJobFlowTask.getExitStatusCode());
}
@@ -171,36 +176,28 @@ public class EmrJobFlowTaskTest {
public void testHandleTerminatedWithError() throws Exception {
when(cluster.getStatus()).thenReturn(startingStatus, softwareConfigStatus, runningStatus, terminatedWithErrorsStatus);
- emrJobFlowTask.handle();
+ emrJobFlowTask.handle(taskCallBack);
Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
}
- @Test
+ @Test(expected = TaskException.class)
public void testCanNotParseJson() throws Exception {
mockStatic(JSONUtils.class);
when(emrJobFlowTask, "createRunJobFlowRequest").thenThrow(new EmrTaskException("can not parse RunJobFlowRequest from json", new Exception("error")));
- emrJobFlowTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
+ emrJobFlowTask.handle(taskCallBack);
}
- @Test
+ @Test(expected = TaskException.class)
public void testClusterStatusNull() throws Exception {
when(emrClient.describeCluster(any())).thenReturn(null);
-
- emrJobFlowTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
+ emrJobFlowTask.handle(taskCallBack);
}
- @Test
+ @Test(expected = TaskException.class)
public void testRunJobFlowError() throws Exception {
-
when(emrClient.runJobFlow(any())).thenThrow(new AmazonElasticMapReduceException("error"), new EmrTaskException());
- emrJobFlowTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
- emrJobFlowTask.handle();
- Assert.assertEquals(EXIT_CODE_FAILURE, emrJobFlowTask.getExitStatusCode());
-
+ emrJobFlowTask.handle(taskCallBack);
}
private String buildEmrTaskParameters() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
index a6915ea182..f6d4e56815 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@@ -97,7 +98,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
}
@Override
- public void cancelApplication(boolean status) throws Exception {
+ public void cancelApplication() throws TaskException {
Set appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) {
logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
@@ -110,8 +111,11 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(args);
- processBuilder.start();
- super.cancelApplication(status);
+ try {
+ processBuilder.start();
+ } catch (IOException e) {
+ throw new TaskException("cancel application error", e);
+ }
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index b7dc41fe01..a342b2aac3 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
@@ -32,6 +33,7 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
@@ -112,7 +114,7 @@ public class FlinkTask extends AbstractYarnTask {
}
@Override
- public Set getApplicationIds() throws IOException {
+ public Set getApplicationIds() throws TaskException {
Set appIds = new HashSet<>();
File file = new File(taskRequest.getLogPath());
@@ -131,6 +133,10 @@ public class FlinkTask extends AbstractYarnTask {
appIds.add(appId);
}
}
+ } catch (FileNotFoundException e) {
+ throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath());
+ } catch (IOException e) {
+ throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e);
}
return appIds;
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
index 2af1bacb38..06ae0b953e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
@@ -19,8 +19,10 @@ package org.apache.dolphinscheduler.plugin.task.hivecli;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -34,10 +36,12 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
-public class HiveCliTask extends AbstractTaskExecutor {
+public class HiveCliTask extends AbstractRemoteTask {
private HiveCliParameters hiveCliParameters;
@@ -54,6 +58,11 @@ public class HiveCliTask extends AbstractTaskExecutor {
logger);
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public void init() {
logger.info("hiveCli task params {}", taskExecutionContext.getTaskParams());
@@ -65,8 +74,9 @@ public class HiveCliTask extends AbstractTaskExecutor {
}
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand());
setExitStatusCode(taskResponse.getExitStatusCode());
@@ -85,6 +95,16 @@ public class HiveCliTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
protected String buildCommand() {
final List args = new ArrayList<>();
@@ -126,8 +146,12 @@ public class HiveCliTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
- shellCommandExecutor.cancelApplication();
+ public void cancelApplication() throws TaskException {
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index df17725cf2..2562d5dcc1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -19,14 +19,14 @@ package org.apache.dolphinscheduler.plugin.task.http;
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -48,13 +48,12 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.node.ObjectNode;
-public class HttpTask extends AbstractTaskExecutor {
+public class HttpTask extends AbstractTask {
/**
* output
@@ -90,7 +89,7 @@ public class HttpTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null;
@@ -114,6 +113,11 @@ public class HttpTask extends AbstractTaskExecutor {
}
+ @Override
+ public void cancel() throws TaskException {
+
+ }
+
/**
* send request
*
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java
index 23b8095f83..2a063e5ac8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/test/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskTest.java
@@ -76,11 +76,11 @@ public class HttpTaskTest {
HttpTask headHttpTask = generateHttpTask(HttpMethod.HEAD, HttpStatus.SC_OK);
HttpTask putHttpTask = generateHttpTask(HttpMethod.PUT, HttpStatus.SC_OK);
HttpTask deleteHttpTask = generateHttpTask(HttpMethod.DELETE, HttpStatus.SC_OK);
- getHttpTask.handle();
- postHttpTask.handle();
- headHttpTask.handle();
- putHttpTask.handle();
- deleteHttpTask.handle();
+ getHttpTask.handle(null);
+ postHttpTask.handle(null);
+ headHttpTask.handle(null);
+ putHttpTask.handle(null);
+ deleteHttpTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, getHttpTask.getExitStatusCode());
Assert.assertEquals(EXIT_CODE_SUCCESS, postHttpTask.getExitStatusCode());
Assert.assertEquals(EXIT_CODE_SUCCESS, headHttpTask.getExitStatusCode());
@@ -91,7 +91,7 @@ public class HttpTaskTest {
@Test
public void testHandleCheckCodeDefaultError() throws Exception {
HttpTask getHttpTask = generateHttpTask(HttpMethod.GET, HttpStatus.SC_BAD_REQUEST);
- getHttpTask.handle();
+ getHttpTask.handle(null);
Assert.assertEquals(EXIT_CODE_FAILURE, getHttpTask.getExitStatusCode());
}
@@ -102,8 +102,8 @@ public class HttpTaskTest {
condition, HttpStatus.SC_CREATED, "");
HttpTask httpErrorTask = generateHttpTask(HttpMethod.GET, HttpCheckCondition.STATUS_CODE_CUSTOM,
condition, HttpStatus.SC_OK, "");
- httpTask.handle();
- httpErrorTask.handle();
+ httpTask.handle(null);
+ httpErrorTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
Assert.assertEquals(EXIT_CODE_FAILURE, httpErrorTask.getExitStatusCode());
}
@@ -114,8 +114,8 @@ public class HttpTaskTest {
"success", HttpStatus.SC_OK, "{\"status\": \"success\"}");
HttpTask httpErrorTask = generateHttpTask(HttpMethod.GET, HttpCheckCondition.BODY_CONTAINS,
"success", HttpStatus.SC_OK, "{\"status\": \"failed\"}");
- httpTask.handle();
- httpErrorTask.handle();
+ httpTask.handle(null);
+ httpErrorTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
Assert.assertEquals(EXIT_CODE_FAILURE, httpErrorTask.getExitStatusCode());
}
@@ -126,8 +126,8 @@ public class HttpTaskTest {
"failed", HttpStatus.SC_OK, "{\"status\": \"success\"}");
HttpTask httpErrorTask = generateHttpTask(HttpMethod.GET, HttpCheckCondition.BODY_NOT_CONTAINS,
"failed", HttpStatus.SC_OK, "{\"status\": \"failed\"}");
- httpTask.handle();
- httpErrorTask.handle();
+ httpTask.handle(null);
+ httpErrorTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
Assert.assertEquals(EXIT_CODE_FAILURE, httpErrorTask.getExitStatusCode());
}
@@ -148,7 +148,7 @@ public class HttpTaskTest {
HttpTask httpTask = generateHttpTask(MOCK_DISPATCH_PATH_REQ_BODY_TO_RES_BODY, HttpMethod.POST,
httpParams, prepareParamsMap, HttpCheckCondition.BODY_CONTAINS, "20220812",
HttpStatus.SC_OK, "");
- httpTask.handle();
+ httpTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
}
@@ -168,7 +168,7 @@ public class HttpTaskTest {
HttpTask httpTask = generateHttpTask(MOCK_DISPATCH_PATH_REQ_PARAMS_TO_RES_BODY, HttpMethod.POST,
httpParams, prepareParamsMap, HttpCheckCondition.BODY_CONTAINS, "20220812",
HttpStatus.SC_OK, "");
- httpTask.handle();
+ httpTask.handle(null);
Assert.assertEquals(EXIT_CODE_SUCCESS, httpTask.getExitStatusCode());
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index ab135ac99e..6587fddc31 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -18,8 +18,11 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -35,10 +38,12 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
-public class JupyterTask extends AbstractTaskExecutor {
+public class JupyterTask extends AbstractRemoteTask {
/**
* jupyter parameters
@@ -60,6 +65,11 @@ public class JupyterTask extends AbstractTaskExecutor {
logger);
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public void init() {
logger.info("jupyter task params {}", taskExecutionContext.getTaskParams());
@@ -76,8 +86,9 @@ public class JupyterTask extends AbstractTaskExecutor {
}
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
@@ -96,6 +107,16 @@ public class JupyterTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
/**
* create command
*
@@ -223,9 +244,13 @@ public class JupyterTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancelApplication() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index 7265e9fda8..b405ae5f17 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -30,7 +30,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.K8sTaskParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
public class K8sTask extends AbstractK8sTask {
@@ -56,6 +58,11 @@ public class K8sTask extends AbstractK8sTask {
}
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public AbstractParameters getParameters() {
return k8sTaskParameters;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 26e7e260a4..03eec517b5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -20,8 +20,9 @@ package org.apache.dolphinscheduler.plugin.task.mlflow;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -42,7 +43,7 @@ import java.util.regex.Pattern;
/**
* shell task
*/
-public class MlflowTask extends AbstractTaskExecutor {
+public class MlflowTask extends AbstractTask {
private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)");
/**
@@ -111,7 +112,7 @@ public class MlflowTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -123,7 +124,6 @@ public class MlflowTask extends AbstractTaskExecutor {
exitCode = commandExecuteResult.getExitStatusCode();
}
setExitStatusCode(exitCode);
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
@@ -139,9 +139,13 @@ public class MlflowTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
public String buildCommand() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
index e5771d588d..177122847e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
@@ -18,7 +18,10 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -27,6 +30,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -36,18 +40,21 @@ import org.apache.http.util.EntityUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
+import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
/**
* TIS DataX Task
**/
-public class PigeonTask extends AbstractTaskExecutor {
+public class PigeonTask extends AbstractRemoteTask {
public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host";
private final TaskExecutionContext taskExecutionContext;
@@ -62,6 +69,11 @@ public class PigeonTask extends AbstractTaskExecutor {
this.config = PigeonConfig.getInstance();
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public void init() {
super.init();
@@ -72,8 +84,9 @@ public class PigeonTask extends AbstractTaskExecutor {
}
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
// Trigger PIGEON DataX pipeline
logger.info("start execute PIGEON task");
long startTime = System.currentTimeMillis();
@@ -153,13 +166,22 @@ public class PigeonTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
private void addFormUrlencoded(HttpPost post) {
post.addHeader("content-type", "application/x-www-form-urlencoded");
}
@Override
- public void cancelApplication(boolean status) throws Exception {
- super.cancelApplication(status);
+ public void cancelApplication() throws TaskException {
logger.info("start to cancelApplication");
Objects.requireNonNull(triggerResult, "triggerResult can not be null");
logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
@@ -181,8 +203,12 @@ public class PigeonTask extends AbstractTaskExecutor {
if (CollectionUtils.isNotEmpty(errormsg)) {
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(",")));
}
- throw new Exception("cancel PIGEON job faild taskId:" + triggerResult.getTaskId() + errs.toString());
+ throw new TaskException("cancel PIGEON job faild taskId:" + triggerResult.getTaskId() + errs);
}
+ } catch (ClientProtocolException e) {
+ throw new TaskException("client protocol error", e);
+ } catch (Exception e) {
+ throw new TaskException("pigeon execute error", e);
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java
index 07a624a3e6..f748315b65 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/test/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.java
@@ -114,7 +114,7 @@ public class PigeonTaskTest {
file("src/test/resources/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTaskTest.json"));
running(server, () -> {
- pigeonTask.handle();
+ pigeonTask.handle(null);
Assert.assertEquals("PIGEON execute be success", TaskExecutionStatus.SUCCESS, pigeonTask.getExitStatus());
});
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index df0fbae71e..19890dd90a 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -22,7 +22,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
@@ -49,7 +50,7 @@ import com.google.common.collect.Maps;
/**
* procedure task
*/
-public class ProcedureTask extends AbstractTaskExecutor {
+public class ProcedureTask extends AbstractTask {
/**
* procedure parameters
@@ -88,7 +89,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
@@ -142,6 +143,11 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void cancel() throws TaskException {
+
+ }
+
private String formatSql(Map sqlParamsMap, Map paramsMap) {
// combining local and global parameters
setSqlParamsMap(procedureParameters.getMethod(), rgex, sqlParamsMap, paramsMap,
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 6c18d206d9..8c1d26392e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.python;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -27,7 +28,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.FileUtils;
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.HashMap;
import java.util.Map;
import com.google.common.base.Preconditions;
@@ -45,7 +44,7 @@ import com.google.common.base.Preconditions;
/**
* python task
*/
-public class PythonTask extends AbstractTaskExecutor {
+public class PythonTask extends AbstractTask {
/**
* python parameters
@@ -100,7 +99,7 @@ public class PythonTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();
@@ -113,7 +112,6 @@ public class PythonTask extends AbstractTaskExecutor {
TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) {
@@ -124,9 +122,13 @@ public class PythonTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
index 3cd7c7c4f8..c362097e38 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
@@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.pytorch;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -33,7 +34,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class PytorchTask extends AbstractTaskExecutor {
+public class PytorchTask extends AbstractTask {
private final ShellCommandExecutor shellCommandExecutor;
protected PytorchParameters pytorchParameters;
@@ -65,12 +66,11 @@ public class PytorchTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
- setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
@@ -84,6 +84,11 @@ public class PytorchTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void cancel() throws TaskException {
+
+ }
+
public String buildPythonExecuteCommand() throws Exception {
List args = new ArrayList<>();
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
index b5df5b9ea5..a70637e046 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/PipelineUtils.java
@@ -23,6 +23,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import java.util.Collections;
import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,75 +43,57 @@ import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
public class PipelineUtils {
+ protected final Logger logger =
+ LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
+ private static final String EXECUTING = "Executing";
+ private static final String SUCCEEDED = "Succeeded";
- protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
- private final AmazonSageMaker client;
- private String pipelineExecutionArn;
- private String clientRequestToken;
- private String pipelineStatus;
-
- public PipelineUtils(AmazonSageMaker client) {
- this.client = client;
- }
-
- public int startPipelineExecution(StartPipelineExecutionRequest request) {
- int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
- try {
- StartPipelineExecutionResult result = client.startPipelineExecution(request);
- pipelineExecutionArn = result.getPipelineExecutionArn();
- clientRequestToken = request.getClientRequestToken();
- exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
- logger.info("Start pipeline: {} success", pipelineExecutionArn);
- } catch (Exception e) {
- logger.error("Start pipeline error: {}", e.getMessage());
- }
+ public PipelineId startPipelineExecution(AmazonSageMaker client, StartPipelineExecutionRequest request) {
+ StartPipelineExecutionResult result = client.startPipelineExecution(request);
+ String pipelineExecutionArn = result.getPipelineExecutionArn();
+ String clientRequestToken = request.getClientRequestToken();
+ logger.info("Start success, pipeline: {}, token: {}", pipelineExecutionArn, clientRequestToken);
- return exitStatusCode;
+ return new PipelineId(pipelineExecutionArn, clientRequestToken);
}
- public void stopPipelineExecution() {
+ public void stopPipelineExecution(AmazonSageMaker client, PipelineId pipelineId) {
StopPipelineExecutionRequest request = new StopPipelineExecutionRequest();
- request.setPipelineExecutionArn(pipelineExecutionArn);
- request.setClientRequestToken(clientRequestToken);
-
- try {
- StopPipelineExecutionResult result = client.stopPipelineExecution(request);
- logger.info("Stop pipeline: {} success", result.getPipelineExecutionArn());
- } catch (Exception e) {
- logger.error("Stop pipeline error: {}", e.getMessage());
- }
+ request.setPipelineExecutionArn(pipelineId.getPipelineExecutionArn());
+ request.setClientRequestToken(pipelineId.getClientRequestToken());
+ StopPipelineExecutionResult result = client.stopPipelineExecution(request);
+ logger.info("Stop pipeline: {} success", result.getPipelineExecutionArn());
}
- public int checkPipelineExecutionStatus() {
- describePipelineExecution();
- while (pipelineStatus.equals("Executing")) {
+ public int checkPipelineExecutionStatus(AmazonSageMaker client, PipelineId pipelineId) {
+ String pipelineStatus = describePipelineExecution(client, pipelineId);
+ while (EXECUTING.equals(pipelineStatus)) {
logger.info("check Pipeline Steps running");
- listPipelineExecutionSteps();
+ listPipelineExecutionSteps(client, pipelineId);
ThreadUtils.sleep(SagemakerConstants.CHECK_PIPELINE_EXECUTION_STATUS_INTERVAL);
- describePipelineExecution();
+ describePipelineExecution(client, pipelineId);
}
int exitStatusCode = TaskConstants.EXIT_CODE_FAILURE;
- if (pipelineStatus.equals("Succeeded")) {
+ if (SUCCEEDED.equals(pipelineStatus)) {
exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS;
}
- logger.info("exit : {}", exitStatusCode);
- logger.info("PipelineExecutionStatus : {}", pipelineStatus);
+ logger.info("PipelineExecutionStatus : {}, exitStatusCode: {}", pipelineStatus, exitStatusCode);
return exitStatusCode;
}
- private void describePipelineExecution() {
+ private String describePipelineExecution(AmazonSageMaker client, PipelineId pipelineId) {
DescribePipelineExecutionRequest request = new DescribePipelineExecutionRequest();
- request.setPipelineExecutionArn(pipelineExecutionArn);
+ request.setPipelineExecutionArn(pipelineId.getPipelineExecutionArn());
DescribePipelineExecutionResult result = client.describePipelineExecution(request);
- pipelineStatus = result.getPipelineExecutionStatus();
- logger.info("PipelineExecutionStatus: {}", pipelineStatus);
+ logger.info("PipelineExecutionStatus: {}", result.getPipelineExecutionStatus());
+ return result.getPipelineExecutionStatus();
}
- private void listPipelineExecutionSteps() {
+ private void listPipelineExecutionSteps(AmazonSageMaker client, PipelineId pipelineId) {
ListPipelineExecutionStepsRequest request = new ListPipelineExecutionStepsRequest();
- request.setPipelineExecutionArn(pipelineExecutionArn);
+ request.setPipelineExecutionArn(pipelineId.getPipelineExecutionArn());
request.setMaxResults(SagemakerConstants.PIPELINE_MAX_RESULTS);
ListPipelineExecutionStepsResult result = client.listPipelineExecutionSteps(request);
List steps = result.getPipelineExecutionSteps();
@@ -119,7 +105,12 @@ public class PipelineUtils {
}
}
- public String getPipelineExecutionArn() {
- return pipelineExecutionArn;
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class PipelineId {
+
+ private String pipelineExecutionArn;
+ private String clientRequestToken;
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
index 2543e60614..22c898d355 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
@@ -22,7 +22,7 @@ import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKN
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -31,8 +31,11 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
@@ -46,34 +49,43 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
/**
* SagemakerTask task, Used to start Sagemaker pipeline
*/
-public class SagemakerTask extends AbstractTaskExecutor {
+public class SagemakerTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =
- new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false).configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true).configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
- .configure(REQUIRE_SETTERS_FOR_GETTERS, true).setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
- /**
- * taskExecutionContext
- */
- private final TaskExecutionContext taskExecutionContext;
+ new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
/**
* SageMaker parameters
*/
private SagemakerParameters parameters;
- private PipelineUtils utils;
+
+ private final AmazonSageMaker client;
+ private final PipelineUtils utils;
+ private PipelineUtils.PipelineId pipelineId;
public SagemakerTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
+ client = createClient();
+ utils = new PipelineUtils();
+ }
- this.taskExecutionContext = taskExecutionContext;
-
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
}
@Override
public void init() {
- logger.info("Sagemaker task params {}", taskExecutionContext.getTaskParams());
+ logger.info("Sagemaker task params {}", taskRequest.getTaskParams());
- parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SagemakerParameters.class);
+ parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class);
+ if (parameters == null) {
+ throw new SagemakerTaskException("Sagemaker task params is empty");
+ }
if (!parameters.checkParameters()) {
throw new SagemakerTaskException("Sagemaker task params is not valid");
}
@@ -81,41 +93,51 @@ public class SagemakerTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void submitApplication() throws TaskException {
try {
- int exitStatusCode = handleStartPipeline();
- setExitStatusCode(exitStatusCode);
+ StartPipelineExecutionRequest request = createStartPipelineRequest();
+
+ // Start pipeline
+ pipelineId = utils.startPipelineExecution(client, request);
+
+ // set AppId
+ setAppIds(JSONUtils.toJsonString(pipelineId));
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
- throw new TaskException("SageMaker task error", e);
+ throw new TaskException("SageMaker task submit error", e);
}
}
@Override
- public void cancelApplication(boolean cancelApplication) {
- // stop pipeline
- utils.stopPipelineExecution();
- }
-
- public int handleStartPipeline() {
- int exitStatusCode;
- StartPipelineExecutionRequest request = createStartPipelineRequest();
-
+ public void cancelApplication() {
+ initPipelineId();
try {
- AmazonSageMaker client = createClient();
- utils = new PipelineUtils(client);
- setAppIds(utils.getPipelineExecutionArn());
+ // stop pipeline
+ utils.stopPipelineExecution(client, pipelineId);
} catch (Exception e) {
- throw new SagemakerTaskException("can not connect aws ", e);
+ throw new TaskException("cancel application error", e);
}
+ }
- // Start pipeline
- exitStatusCode = utils.startPipelineExecution(request);
- if (exitStatusCode == TaskConstants.EXIT_CODE_SUCCESS) {
- // Keep checking the health status
- exitStatusCode = utils.checkPipelineExecutionStatus();
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+ initPipelineId();
+ // Keep checking the health status
+ exitStatusCode = utils.checkPipelineExecutionStatus(client, pipelineId);
+ }
+
+ /**
+ * init sagemaker applicationId if null
+ */
+ private void initPipelineId() {
+ if (pipelineId == null) {
+ if (StringUtils.isNotEmpty(getAppIds())) {
+ pipelineId = JSONUtils.parseObject(getAppIds(), PipelineUtils.PipelineId.class);
+ }
+ }
+ if (pipelineId == null) {
+ throw new TaskException("sagemaker applicationID is null");
}
- return exitStatusCode;
}
public StartPipelineExecutionRequest createStartPipelineRequest() throws SagemakerTaskException {
@@ -142,11 +164,11 @@ public class SagemakerTask extends AbstractTaskExecutor {
private String parseRequstJson(String requestJson) {
// combining local and global parameters
- Map paramsMap = taskExecutionContext.getPrepareParamsMap();
+ Map paramsMap = taskRequest.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(requestJson, ParamUtils.convert(paramsMap));
}
- private AmazonSageMaker createClient() {
+ protected AmazonSageMaker createClient() {
final String awsAccessKeyId = PropertyUtils.getString(TaskConstants.AWS_ACCESS_KEY_ID);
final String awsSecretAccessKey = PropertyUtils.getString(TaskConstants.AWS_SECRET_ACCESS_KEY);
final String awsRegion = PropertyUtils.getString(TaskConstants.AWS_REGION);
@@ -154,9 +176,9 @@ public class SagemakerTask extends AbstractTaskExecutor {
final AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(basicAWSCredentials);
// create a SageMaker client
return AmazonSageMakerClientBuilder.standard()
- .withCredentials(awsCredentialsProvider)
- .withRegion(awsRegion)
- .build();
+ .withCredentials(awsCredentialsProvider)
+ .withRegion(awsRegion)
+ .build();
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
index a7dcdca7bd..01e93b382f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/test/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTaskTest.java
@@ -37,10 +37,8 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;
import com.amazonaws.services.sagemaker.AmazonSageMaker;
@@ -54,24 +52,23 @@ import com.amazonaws.services.sagemaker.model.StopPipelineExecutionResult;
@RunWith(PowerMockRunner.class)
@PrepareForTest({JSONUtils.class, PropertyUtils.class,})
@PowerMockIgnore({"javax.*"})
-@SuppressStaticInitializationFor("org.apache.dolphinscheduler.spi.utils.PropertyUtils")
public class SagemakerTaskTest {
private final String pipelineExecutionArn = "test-pipeline-arn";
+ private final String clientRequestToken = "test-pipeline-token";
private SagemakerTask sagemakerTask;
private AmazonSageMaker client;
- private PipelineUtils pipelineUtils;
+ private PipelineUtils pipelineUtils = new PipelineUtils();
@Before
public void before() {
- PowerMockito.mockStatic(PropertyUtils.class);
String parameters = buildParameters();
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+
+ client = mock(AmazonSageMaker.class);
sagemakerTask = new SagemakerTask(taskExecutionContext);
sagemakerTask.init();
- client = mock(AmazonSageMaker.class);
- pipelineUtils = new PipelineUtils(client);
StartPipelineExecutionResult startPipelineExecutionResult = mock(StartPipelineExecutionResult.class);
when(startPipelineExecutionResult.getPipelineExecutionArn()).thenReturn(pipelineExecutionArn);
@@ -82,7 +79,8 @@ public class SagemakerTaskTest {
DescribePipelineExecutionResult describePipelineExecutionResult = mock(DescribePipelineExecutionResult.class);
when(describePipelineExecutionResult.getPipelineExecutionStatus()).thenReturn("Executing", "Succeeded");
- ListPipelineExecutionStepsResult listPipelineExecutionStepsResult = mock(ListPipelineExecutionStepsResult.class);
+ ListPipelineExecutionStepsResult listPipelineExecutionStepsResult =
+ mock(ListPipelineExecutionStepsResult.class);
PipelineExecutionStep pipelineExecutionStep = mock(PipelineExecutionStep.class);
List pipelineExecutionSteps = new ArrayList<>();
pipelineExecutionSteps.add(pipelineExecutionStep);
@@ -110,10 +108,11 @@ public class SagemakerTaskTest {
@Test
public void testPipelineExecution() throws Exception {
- pipelineUtils.startPipelineExecution(sagemakerTask.createStartPipelineRequest());
- Assert.assertEquals(pipelineExecutionArn, pipelineUtils.getPipelineExecutionArn());
- Assert.assertEquals(0, pipelineUtils.checkPipelineExecutionStatus());
- pipelineUtils.stopPipelineExecution();
+ PipelineUtils.PipelineId pipelineId =
+ pipelineUtils.startPipelineExecution(client, sagemakerTask.createStartPipelineRequest());
+ Assert.assertEquals(pipelineExecutionArn, pipelineId.getPipelineExecutionArn());
+ Assert.assertEquals(0, pipelineUtils.checkPipelineExecutionStatus(client, pipelineId));
+ pipelineUtils.stopPipelineExecution(client, pipelineId);
}
private String buildParameters() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index cd3d8ebb22..b0e8aa5454 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -36,8 +39,10 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
@@ -45,7 +50,7 @@ import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG
/**
* seatunnel task
*/
-public class SeatunnelTask extends AbstractTaskExecutor {
+public class SeatunnelTask extends AbstractRemoteTask {
/**
* seatunnel parameters
@@ -76,6 +81,11 @@ public class SeatunnelTask extends AbstractTaskExecutor {
logger);
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
@Override
public void init() {
logger.info("SeaTunnel task params {}", taskExecutionContext.getTaskParams());
@@ -84,8 +94,9 @@ public class SeatunnelTask extends AbstractTaskExecutor {
}
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
@@ -107,9 +118,23 @@ public class SeatunnelTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
+
+ }
+
+ @Override
+ public void cancelApplication() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
private String buildCommand() throws Exception {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 7e20bed634..7c7c4b2497 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.plugin.task.shell;
import org.apache.commons.lang3.SystemUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -47,7 +49,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
/**
* shell task
*/
-public class ShellTask extends AbstractTaskExecutor {
+public class ShellTask extends AbstractTask {
/**
* shell parameters
@@ -90,13 +92,12 @@ public class ShellTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
- setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
@@ -112,9 +113,13 @@ public class ShellTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean cancelApplication) throws Exception {
+ public void cancel() throws TaskException {
// cancel process
- shellCommandExecutor.cancelApplication();
+ try {
+ shellCommandExecutor.cancelApplication();
+ } catch (Exception e) {
+ throw new TaskException("cancel application error", e);
+ }
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index d7fbfb87aa..d280c33db8 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -23,8 +23,9 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -60,7 +61,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-public class SqlTask extends AbstractTaskExecutor {
+public class SqlTask extends AbstractTask {
/**
* taskExecutionContext
@@ -114,7 +115,7 @@ public class SqlTask extends AbstractTaskExecutor {
}
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
@@ -164,6 +165,11 @@ public class SqlTask extends AbstractTaskExecutor {
}
}
+ @Override
+ public void cancel() throws TaskException {
+
+ }
+
/**
* execute function and sql
*
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
index f51b352d9c..75b6155587 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
@@ -19,7 +19,10 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
import kong.unirest.Unirest;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+
+import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
+import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -32,11 +35,13 @@ import org.apache.zeppelin.client.ParagraphResult;
import org.apache.zeppelin.client.Status;
import org.apache.zeppelin.client.ZeppelinClient;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
-public class ZeppelinTask extends AbstractTaskExecutor {
+public class ZeppelinTask extends AbstractRemoteTask {
/**
* taskExecutionContext
@@ -74,8 +79,9 @@ public class ZeppelinTask extends AbstractTaskExecutor {
this.zClient = getZeppelinClient();
}
+ // todo split handle to submit and track
@Override
- public void handle() throws TaskException {
+ public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
final String paragraphId = this.zeppelinParameters.getParagraphId();
final String productionNoteDirectory = this.zeppelinParameters.getProductionNoteDirectory();
@@ -142,6 +148,15 @@ public class ZeppelinTask extends AbstractTaskExecutor {
logger.error("zeppelin task submit failed with error", e);
throw new TaskException("Execute ZeppelinTask exception");
}
+ }
+
+ @Override
+ public void submitApplication() throws TaskException {
+
+ }
+
+ @Override
+ public void trackApplicationStatus() throws TaskException {
}
@@ -188,9 +203,8 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
@Override
- public void cancelApplication(boolean status) throws Exception {
+ public void cancelApplication() throws TaskException {
final String restEndpoint = this.zeppelinParameters.getRestEndpoint();
- super.cancelApplication(status);
final String noteId = this.zeppelinParameters.getNoteId();
final String paragraphId = this.zeppelinParameters.getParagraphId();
if (paragraphId == null) {
@@ -207,7 +221,11 @@ public class ZeppelinTask extends AbstractTaskExecutor {
this.taskExecutionContext.getTaskInstanceId(),
noteId,
paragraphId);
- this.zClient.cancelParagraph(noteId, paragraphId);
+ try {
+ this.zClient.cancelParagraph(noteId, paragraphId);
+ } catch (Exception e) {
+ throw new TaskException("cancel paragraph error", e);
+ }
logger.info("zeppelin task terminated, taskId: {}, noteId: {}, paragraphId: {}",
this.taskExecutionContext.getTaskInstanceId(),
noteId,
@@ -216,4 +234,9 @@ public class ZeppelinTask extends AbstractTaskExecutor {
}
+ @Override
+ public Set getApplicationIds() throws TaskException {
+ return Collections.emptySet();
+ }
+
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
index ff70ba5b45..70a0945a94 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/test/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTaskTest.java
@@ -28,6 +28,8 @@ import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
@@ -73,6 +75,9 @@ public class ZeppelinTaskTest {
private ZeppelinTask zeppelinTask;
private ParagraphResult paragraphResult;
private NoteResult noteResult;
+ private TaskCallBack taskCallBack = (taskInstanceId, appIds) -> {
+
+ };
@Before
public void before() throws Exception {
@@ -95,7 +100,7 @@ public class ZeppelinTaskTest {
@Test
public void testHandleWithParagraphExecutionSuccess() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map) mapper.readValue(MOCK_PARAMETERS, Map.class));
@@ -107,7 +112,7 @@ public class ZeppelinTaskTest {
@Test
public void testHandleWithParagraphExecutionAborted() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ABORT);
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map) mapper.readValue(MOCK_PARAMETERS, Map.class));
@@ -119,7 +124,7 @@ public class ZeppelinTaskTest {
@Test
public void testHandleWithParagraphExecutionError() throws Exception {
when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map) mapper.readValue(MOCK_PARAMETERS, Map.class));
@@ -133,7 +138,7 @@ public class ZeppelinTaskTest {
when(this.zClient.executeParagraph(any(), any(), any(Map.class))).
thenThrow(new TaskException("Something wrong happens from zeppelin side"));
// when(this.paragraphResult.getStatus()).thenReturn(Status.ERROR);
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).executeParagraph(MOCK_NOTE_ID,
MOCK_PARAGRAPH_ID,
(Map) mapper.readValue(MOCK_PARAMETERS, Map.class));
@@ -159,7 +164,7 @@ public class ZeppelinTaskTest {
when(paragraphResult.getResultInText()).thenReturn("mock-zeppelin-paragraph-execution-result");
this.zeppelinTask.init();
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).executeNote(MOCK_NOTE_ID,
(Map) mapper.readValue(MOCK_PARAMETERS, Map.class));
Mockito.verify(this.noteResult).getParagraphResultList();
@@ -186,7 +191,7 @@ public class ZeppelinTaskTest {
this.zeppelinTask.init();
when(this.paragraphResult.getStatus()).thenReturn(Status.FINISHED);
when(DateUtils.getTimestampString()).thenReturn("123456789");
- this.zeppelinTask.handle();
+ this.zeppelinTask.handle(taskCallBack);
Mockito.verify(this.zClient).cloneNote(
MOCK_NOTE_ID,
String.format("%s%s_%s", MOCK_PRODUCTION_DIRECTORY, MOCK_NOTE_ID, "123456789"));
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
index 7ba75efba1..5b874118e5 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteRunningMessageSender.java
@@ -25,11 +25,11 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
+import lombok.NonNull;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import lombok.NonNull;
-
@Component
public class TaskExecuteRunningMessageSender implements MessageSender {
@@ -46,10 +46,10 @@ public class TaskExecuteRunningMessageSender implements MessageSender projectRes = taskExecutionContext.getResources();
if (MapUtils.isEmpty(projectRes)) {
@@ -113,7 +118,8 @@ public class TaskExecutionCheckerUtils {
String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from path:{}", resPath);
long resourceDownloadStartTime = System.currentTimeMillis();
- storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
+ storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false,
+ true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics.recordWorkerResourceDownloadSize(
diff --git a/pom.xml b/pom.xml
index 6860c0d96d..ee83f78503 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,14 +63,15 @@
UTF-8
2.6.1
1.8
- 4.12
+ 5.9.0
+ 3.9.0
3.1.12
3.3
3.3.0
2.5.3
2.10.3
2.4
- 2.22.1
+ 3.0.0-M6
3.1.1
3.2.1
2.2.0
@@ -308,6 +309,13 @@
dolphinscheduler-tools
${project.version}