Browse Source

[Feature-5992] Enhance using experience of DataX in DS by integeration with TIS (#6015)

2.0.7-release
百岁 3 years ago committed by GitHub
parent
commit
b04bceb03c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dist/release-docs/LICENSE
  2. 97
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml
  3. 0
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/readme.md
  4. 62
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParameters.java
  5. 27
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParamsConstants.java
  6. 345
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java
  7. 34
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskChannel.java
  8. 51
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskChannelFactory.java
  9. 142
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java
  10. 60
      dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/resources/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.json
  11. 1
      dolphinscheduler-task-plugin/pom.xml
  12. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js
  13. 3
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  14. 9
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  15. 225
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/tis.vue
  16. 46
      dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_TIS.svg
  17. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  18. 1
      pom.xml
  19. 2
      tools/dependencies/known-dependencies.txt

2
dolphinscheduler-dist/release-docs/LICENSE vendored

@ -416,7 +416,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
protostuff-runtime 1.7.2: https://github.com/protostuff/protostuff/protostuff-core Apache-2.0
protostuff-api 1.7.2: https://github.com/protostuff/protostuff/protostuff-api Apache-2.0
protostuff-collectionschema 1.7.2: https://github.com/protostuff/protostuff/protostuff-collectionschema Apache-2.0
async-http-client 2.12.3: https://mvnrepository.com/artifact/org.asynchttpclient/async-http-client Apache-2.0
========================================================================
BSD licenses
========================================================================

97
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/pom.xml

@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-tis</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!--https://github.com/dreamhead/moco/blob/master/moco-doc/usage.md#socket-->
<dependency>
<groupId>com.github.dreamhead</groupId>
<artifactId>moco-core</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.dreamhead</groupId>
<artifactId>moco-runner</artifactId>
<version>1.2.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-server</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
</dependency>
</dependencies>
</project>

0
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/readme.md

62
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParameters.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TIS parameter
*/
public class TISParameters extends AbstractParameters {
private static final Logger logger = LoggerFactory.getLogger(TISParameters.class);
/**
* TIS target job name
*/
private String targetJobName;
public String getTargetJobName() {
return targetJobName;
}
public void setTargetJobName(String targetJobName) {
this.targetJobName = targetJobName;
}
@Override
public boolean checkParameters() {
if (StringUtils.isBlank(this.targetJobName)) {
logger.error("checkParameters faild targetJobName can not be null");
return false;
}
return true;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return Collections.emptyList();
}
}

27
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISParamsConstants.java

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
public class TISParamsConstants {
public static String NAME_TARGET_JOB_NAME = "targetJobName";
public static String TARGET_JOB_NAME = NAME_TARGET_JOB_NAME;
private TISParamsConstants() {
}
}

345
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTask.java

@ -0,0 +1,345 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import org.slf4j.Logger;
/**
* TIS DataX Task
**/
public class TISTask extends AbstractTask {
public static final String WS_REQUEST_PATH = "/tjs/download/logfeedback";
public static final String KEY_POOL_VAR_TIS_HOST = "tisHost";
private final TaskRequest taskExecutionContext;
private TISParameters tisParameters;
public TISTask(TaskRequest taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
}
@Override
public void init() {
super.init();
logger.info("tis task params {}", taskExecutionContext.getTaskParams());
tisParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), TISParameters.class);
if (!tisParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
}
@Override
public void handle() throws Exception {
// Trigger TIS DataX pipeline
logger.info("start execute TIS task");
long startTime = System.currentTimeMillis();
String targetJobName = this.tisParameters.getTargetJobName();
final String tisHost = taskExecutionContext.getDefinedParams().get(KEY_POOL_VAR_TIS_HOST);
if (StringUtils.isEmpty(tisHost)) {
throw new IllegalStateException("global var '" + KEY_POOL_VAR_TIS_HOST + "' can not be empty");
}
try {
final String triggerUrl = String.format("http://%s/tjs/coredefine/coredefine.ajax", tisHost);
final String getStatusUrl = String.format("http://%s/tjs/config/config.ajax?action=collection_action&emethod=get_task_status", tisHost);
HttpPost post = new HttpPost(triggerUrl);
post.addHeader("appname", targetJobName);
addFormUrlencoded(post);
StringEntity entity = new StringEntity("action=datax_action&emethod=trigger_fullbuild_task", StandardCharsets.UTF_8);
post.setEntity(entity);
BizResult ajaxResult = null;
ExecResult execState = null;
int taskId;
WebSocket webSocket = null;
try (CloseableHttpClient client = HttpClients.createDefault();
// trigger to start TIS dataX task
CloseableHttpResponse response = client.execute(post)) {
ajaxResult = processResponse(triggerUrl, response, BizResult.class);
if (!ajaxResult.isSuccess()) {
List<String> errormsg = ajaxResult.getErrormsg();
StringBuffer errs = new StringBuffer();
if (CollectionUtils.isNotEmpty(errormsg)) {
errs.append(",errs:").append(errormsg.stream().collect(Collectors.joining(",")));
}
throw new Exception("trigger TIS job faild taskName:" + targetJobName + errs.toString());
}
taskId = ajaxResult.getBizresult().getTaskid();
webSocket = receiveRealtimeLog(tisHost, targetJobName, taskId);
setAppIds(String.valueOf(taskId));
CloseableHttpResponse status = null;
while (true) {
try {
post = new HttpPost(getStatusUrl);
entity = new StringEntity("{\n taskid: " + taskId + "\n, log: false }", StandardCharsets.UTF_8);
post.setEntity(entity);
status = client.execute(post);
StatusResult execStatus = processResponse(getStatusUrl, status, StatusResult.class);
Map bizresult = execStatus.getBizresult();
Map s = (Map) bizresult.get("status");
execState = ExecResult.parse((Integer) s.get("state"));
if (execState == ExecResult.SUCCESS || execState == ExecResult.FAILD) {
break;
}
Thread.sleep(3000);
} finally {
status.close();
}
}
} finally {
try {
webSocket.sendCloseFrame();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
long costTime = System.currentTimeMillis() - startTime;
logger.info("TIS task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE);
} catch (Exception e) {
logger.error("execute TIS dataX faild,TIS task name:" + targetJobName, e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
}
}
private void addFormUrlencoded(HttpPost post) {
post.addHeader("content-type", "application/x-www-form-urlencoded");
}
@Override
public void cancelApplication(boolean status) throws Exception {
super.cancelApplication(status);
}
private WebSocket receiveRealtimeLog(final String tisHost, String dataXName, int taskId) throws InterruptedException, java.util.concurrent.ExecutionException {
WebSocketUpgradeHandler.Builder upgradeHandlerBuilder
= new WebSocketUpgradeHandler.Builder();
WebSocketUpgradeHandler wsHandler = upgradeHandlerBuilder
.addWebSocketListener(new WebSocketListener() {
@Override
public void onOpen(WebSocket websocket) {
// WebSocket connection opened
}
@Override
public void onClose(WebSocket websocket, int code, String reason) {
// WebSocket connection closed
}
public void onTextFrame(String payload, boolean finalFragment, int rsv) {
ExecLog execLog = JSONUtils.parseObject(payload, ExecLog.class);
logger.info(execLog.getMsg());
}
@Override
public void onError(Throwable t) {
// WebSocket connection error
logger.error(t.getMessage(), t);
}
}).build();
WebSocket webSocketClient = Dsl.asyncHttpClient()
.prepareGet(String.format("ws://%s" + WS_REQUEST_PATH, tisHost))
// .addHeader("header_name", "header_value")
.addQueryParam("logtype", "full")
.addQueryParam("collection", dataXName)
.addQueryParam("taskid", String.valueOf(taskId))
.setRequestTimeout(5000)
.execute(wsHandler)
.get();
return webSocketClient;
}
private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, Class<T> clazz) throws Exception {
StatusLine resStatus = response.getStatusLine();
if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) {
throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase());
}
HttpEntity entity = response.getEntity();
String resp = EntityUtils.toString(entity, StandardCharsets.UTF_8);
T result = JSONUtils.parseObject(resp, clazz);
return result;
}
@Override
public AbstractParameters getParameters() {
Objects.requireNonNull(this.tisParameters, "tisParameters can not be null");
return this.tisParameters;
}
private static class BizResult extends AjaxResult<TriggerBuildResult> {
private TriggerBuildResult bizresult;
public TriggerBuildResult getBizresult() {
return this.bizresult;
}
public void setBizresult(TriggerBuildResult bizresult) {
this.bizresult = bizresult;
}
}
private static class StatusResult extends AjaxResult<Map> {
private Map bizresult;
public Map getBizresult() {
return this.bizresult;
}
public void setBizresult(Map bizresult) {
this.bizresult = bizresult;
}
}
private abstract static class AjaxResult<T> {
private boolean success;
private List<String> errormsg;
private List<String> msg;
public abstract T getBizresult();
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public List<String> getErrormsg() {
return this.errormsg;
}
public void setErrormsg(List<String> errormsg) {
this.errormsg = errormsg;
}
public List<String> getMsg() {
return this.msg;
}
public void setMsg(List<String> msg) {
this.msg = msg;
}
}
private static class TriggerBuildResult {
private int taskid;
public int getTaskid() {
return taskid;
}
public void setTaskid(int taskid) {
this.taskid = taskid;
}
}
private enum ExecResult {
SUCCESS(1), FAILD(-1), DOING(2), ASYN_DOING(22), CANCEL(3);
private final int value;
public static ExecResult parse(int value) {
for (ExecResult r : values()) {
if (r.value == value) {
return r;
}
}
throw new IllegalStateException("vale:" + value + " is illegal");
}
private ExecResult(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
}
private static class ExecLog {
private String logType;
private String msg;
private int taskId;
public String getLogType() {
return logType;
}
public void setLogType(String logType) {
this.logType = logType;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
}

34
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskChannel.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
public class TISTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public AbstractTask createTask(TaskRequest taskRequest, org.slf4j.Logger logger) {
return new TISTask(taskRequest, logger);
}
}

51
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/main/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskChannelFactory.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.Arrays;
import java.util.List;
/**
* TIS endpoint
**/
public class TISTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new TISTaskChannel();
}
@Override
public String getName() {
return "TIS";
}
@Override
public List<PluginParams> getParams() {
InputParam webHookParam = InputParam.newBuilder(TISParamsConstants.NAME_TARGET_JOB_NAME, TISParamsConstants.TARGET_JOB_NAME)
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
return Arrays.asList(webHookParam);
}
}

142
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/java/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.java

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.tis;
import static com.github.dreamhead.moco.Moco.pathResource;
import static com.github.dreamhead.moco.MocoJsonRunner.jsonHttpServer;
import static com.github.dreamhead.moco.Runner.running;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.spi.task.ExecutionStatus;
import org.apache.dolphinscheduler.spi.task.TaskRequest;
import org.apache.commons.io.IOUtils;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.dreamhead.moco.HttpServer;
public class TISTaskTest {
private static final Logger logger = LoggerFactory.getLogger(TISTaskTest.class);
private TISTask tisTask;
private TaskRequest taskExecutionContext;
@Before
public void before() throws Exception {
TaskProps props = new TaskProps();
props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstanceId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"targetJobName\":\"mysql_elastic\"}");
taskExecutionContext = Mockito.mock(TaskRequest.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
Mockito.when(taskExecutionContext.getTaskAppId()).thenReturn(UUID.randomUUID().toString());
Mockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
Mockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
Mockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
Mockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
// Mockito.when(taskExecutionContext.getVarPool())
// .thenReturn("[{\"direct\":\"IN\",\"prop\":\"" + TISTask.KEY_POOL_VAR_TIS_HOST + "\",\"type\":\"VARCHAR\",\"value\":\"127.0.0.1:8080\"}]");
Map<String, String> gloabParams = Collections.singletonMap(TISTask.KEY_POOL_VAR_TIS_HOST, "127.0.0.1:8080");
Mockito.when(taskExecutionContext.getDefinedParams()).thenReturn(gloabParams);
tisTask = PowerMockito.spy(new TISTask(taskExecutionContext, logger));
tisTask.init();
}
/**
* Method: DataxTask()
*/
@Test
public void testDataxTask()
throws Exception {
TaskProps props = new TaskProps();
props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstanceId(1);
props.setTenantCode("1");
Assert.assertNotNull(new TISTask(null, logger));
}
@Test
public void testInit()
throws Exception {
try {
tisTask.init();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
@Test
public void testHandle()
throws Exception {
HttpServer server = jsonHttpServer(8080, pathResource("org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.json"));
running(server, () -> {
tisTask.handle();
Assert.assertEquals("TIS execute be success", ExecutionStatus.SUCCESS, tisTask.getExitStatus());
});
}
private String loadResContent(String resName) {
try (InputStream i = this.getClass().getResourceAsStream(resName)) {
Objects.requireNonNull(i, "resource " + resName + " relevant stream content can not be null");
String content = IOUtils.toString(i, StandardCharsets.UTF_8);
return content;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void testCancelApplication()
throws Exception {
try {
tisTask.cancelApplication(true);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

60
dolphinscheduler-task-plugin/dolphinscheduler-task-tis/src/test/resources/org/apache/dolphinscheduler/plugin/task/tis/TISTaskTest.json

@ -0,0 +1,60 @@
[
{
"description": "trigger TIS task execute",
"request": {
"uri": "/tjs/coredefine/coredefine.ajax",
"method": "post",
"headers": {
"Content-Type": "application/x-www-form-urlencoded",
"appname": "mysql_elastic"
},
"text": "action=datax_action&emethod=trigger_fullbuild_task"
},
"response": {
"text": "{\n \"success\":true,\n \"errormsg\":[],\n \"msg\":[],\n \"bizresult\":{\"taskid\": \"1087\"}\n}"
}
},
{
"description": "Get TIS task execute status",
"request": {
"uri": "/tjs/config/config.ajax",
"method": "post",
"headers": {
"Content-Type": "text/plain; charset=UTF-8"
},
"queries": {
"action": "collection_action",
"emethod": "get_task_status"
},
"text": "{\n taskid: 1087\n, log: false }"
},
"response": {
"seq": [
{
"text": "{\n \"success\": true,\n \"errormsg\": [\n \"err1\"\n ],\n \"bizresult\": {\n \"status\": {\n \"state\": 2\n }\n }\n}"
},
{
"text": "{\n \"success\": true,\n \"errormsg\": [\n \"err1\"\n ],\n \"bizresult\": {\n \"status\": {\n \"state\": 1\n }\n }\n}"
}
]
}
},
{
"websocket": {
"uri": "/tjs/download/logfeedback",
"connected": "connected",
"sessions": [
{
"request": {
"text": "logtype=full&collection=mysql_elastic&taskid=1087"
},
"response": {
"broadcast": {
"content": "{\n \"logType\": \"FULL\",\n \"msg\": \"message 1\",\n \"taskId\": \"1087\"\n}"
}
}
}
]
}
}
]

1
dolphinscheduler-task-plugin/pom.xml

@ -34,6 +34,7 @@
<module>dolphinscheduler-task-flink</module>
<module>dolphinscheduler-task-python</module>
<module>dolphinscheduler-task-spark</module>
<module>dolphinscheduler-task-tis</module>
</modules>

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js

@ -301,6 +301,10 @@ const tasksType = {
desc: 'DataX',
color: '#1fc747'
},
TIS: {
desc: 'TIS',
color: '#1fc747'
},
SQOOP: {
desc: 'SQOOP',
color: '#E46F13'

3
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -107,6 +107,9 @@
.icos-DATAX {
background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%;
}
.icos-TIS {
background: url("../img/toolbar_TIS.svg") no-repeat 50% 50%;
}
.icos-SQOOP {
background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%;
}

9
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -243,6 +243,13 @@
ref="DATAX"
:backfill-item="backfillItem">
</m-datax>
<m-tis
v-if="nodeData.taskType === 'TIS'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
:backfill-item="backfillItem"
ref="TIS">
</m-tis>
<m-sqoop
v-if="nodeData.taskType === 'SQOOP'"
@on-params="_onParams"
@ -292,6 +299,7 @@
import mDependent from './tasks/dependent'
import mHttp from './tasks/http'
import mDatax from './tasks/datax'
import mTis from './tasks/tis'
import mConditions from './tasks/conditions'
import mSqoop from './tasks/sqoop'
import mSubProcess from './tasks/sub_process'
@ -799,6 +807,7 @@
mDependent,
mHttp,
mDatax,
mTis,
mSqoop,
mConditions,
mSelectInput,

225
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/tis.vue

@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="datax-model">
<m-list-box>
<div slot="text">{{$t('TargetJobName')}}</div>
<div slot="content">
<el-input
type="input"
size="small"
v-model="targetJobName"
:placeholder="$t('Please enter TIS DataX job name')">
</el-input>
</div>
</m-list-box>
</div>
</template>
<script>
import _ from 'lodash'
import mListBox from './_source/listBox'
import disabledState from '@/module/mixin/disabledState'
export default {
name: 'tis',
data () {
return {
// target table
targetJobName: ''
}
},
mixins: [disabledState],
props: {
backfillItem: Object,
createNodeId: Number
},
methods: {
setEditorVal () {
// this.item = editor.getValue()
// this.scriptBoxDialog = true
},
getSriptBoxValue (val) {
},
/**
* return pre statements
*/
_onPreStatements (a) {
this.preStatements = a
},
/**
* return post statements
*/
_onPostStatements (a) {
this.postStatements = a
},
/**
* return localParams
*/
_onLocalParams (a) {
this.localParams = a
},
/**
* verification
*/
_verification () {
// storage
this.$emit('on-params', {
targetJobName: this.targetJobName
})
return true
},
/**
* Processing code highlighting
*/
_handlerEditor () {
// this._destroyEditor()
// editor
// editor = codemirror('code-sql-mirror', {
// mode: 'sql',
// readOnly: this.isDetails
// })
//
// this.keypress = () => {
// if (!editor.getOption('readOnly')) {
// editor.showHint({
// completeSingle: false
// })
// }
// }
//
// // Monitor keyboard
// editor.on('keypress', this.keypress)
//
// editor.on('changes', () => {
// this._cacheParams()
// })
//
// editor.setValue(this.sql)
//
// return editor
},
// _handlerJsonEditor () {
// this._destroyJsonEditor()
//
// // jsonEditor
// jsonEditor = codemirror('code-json-mirror', {
// mode: 'json',
// readOnly: this.isDetails
// })
//
// this.keypress = () => {
// if (!jsonEditor.getOption('readOnly')) {
// jsonEditor.showHint({
// completeSingle: false
// })
// }
// }
//
// // Monitor keyboard
// jsonEditor.on('keypress', this.keypress)
//
// jsonEditor.on('changes', () => {
// // this._cacheParams()
// })
//
// jsonEditor.setValue(this.json)
//
// return jsonEditor
// },
_cacheParams () {
this.$emit('on-cache-params', {
// dsType: this.dsType,
// dataSource: this.rtDatasource,
// dtType: this.dtType,
// dataTarget: this.rtDatatarget,
// sql: editor ? editor.getValue() : '',
targetJobName: this.targetJobName
// jobSpeedByte: this.jobSpeedByte * 1024,
// jobSpeedRecord: this.jobSpeedRecord,
// preStatements: this.preStatements,
// postStatements: this.postStatements,
// xms: +this.xms,
// xmx: +this.xmx
})
}
// _destroyEditor () {
// if (editor) {
// editor.toTextArea() // Uninstall
// editor.off($('.code-sql-mirror'), 'keypress', this.keypress)
// editor.off($('.code-sql-mirror'), 'changes', this.changes)
// }
// },
// _destroyJsonEditor () {
// if (jsonEditor) {
// jsonEditor.toTextArea() // Uninstall
// jsonEditor.off($('.code-json-mirror'), 'keypress', this.keypress)
// jsonEditor.off($('.code-json-mirror'), 'changes', this.changes)
// }
// }
},
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
// backfill
this.targetJobName = o.params.targetJobName || ''
}
},
mounted () {
// if (this.customConfig) {
// setTimeout(() => {
// this._handlerJsonEditor()
// }, 200)
// } else {
// setTimeout(() => {
// this._handlerEditor()
// }, 200)
// }
},
destroyed () {
// /**
// * Destroy the editor instance
// */
// if (editor) {
// editor.toTextArea() // Uninstall
// editor.off($('.code-sql-mirror'), 'keypress', this.keypress)
// }
// if (jsonEditor) {
// jsonEditor.toTextArea() // Uninstall
// jsonEditor.off($('.code-json-mirror'), 'keypress', this.keypress)
// }
},
watch: {
// Watch the cacheParams
cacheParams (val) {
this._cacheParams()
}
},
computed: {
cacheParams () {
return {
targetJobName: this.targetJobName
}
}
},
components: { mListBox }
}
</script>

46
dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_TIS.svg

@ -0,0 +1,46 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
~
-->
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 204.85 127.79">
<defs>
<style>
.cls-1{fill:#f11d07;}.cls-2{fill:#ffde00;}.cls-2,.cls-3{stroke:#fff;stroke-miterlimit:10;stroke-width:2px;}.cls-3{fill:url(#xxxxxx);}
</style>
<linearGradient id="xxxxxx" x1="103.68" y1="127.03" x2="103.68" y2="1" gradientUnits="userSpaceOnUse">
<stop offset="0" stop-color="#fff33b"/>
<stop offset="0.1" stop-color="#fee72e"/>
<stop offset="0.28" stop-color="#fed51b"/>
<stop offset="0.47" stop-color="#fdca10"/>
<stop offset="0.67" stop-color="#fdc70c"/>
<stop offset="0.95" stop-color="#f3903f"/>
</linearGradient>
</defs>
<g >
<g >
<path class="cls-1"
d="M94.32,10.92a5.7,5.7,0,0,1,7.26,0L152,52.46,193.8,86.88A5.71,5.71,0,0,1,190.17,97H5.72A5.71,5.71,0,0,1,2.09,86.88L43.88,52.46Z"/>
<polygon class="cls-2"
points="34 36.23 87.53 32.38 76.92 50.38 64 50.38 34 97.45 13.7 97.45 40.46 51.3 26.16 51.3 34 36.23"/>
<polygon class="cls-3"
points="122.6 1 154.91 1 106.28 55.45 134.14 55.45 52.46 127.03 95.38 72.06 75.53 72.99 122.6 1"/>
<polygon class="cls-2"
points="202.75 31.93 157.21 36.99 134.6 75.3 160.44 75.3 155.91 83.14 129.52 83.14 121.22 97.45 165.13 97.45 183.59 61.09 158.13 61.09 163.67 51.3 189.05 51.3 202.75 31.93"/>
</g>
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.2 KiB

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -586,6 +586,8 @@ export default {
'Spark Version': 'Spark版本',
TargetDataBase: '目标库',
TargetTable: '目标表',
TargetJobName: 'TIS目标任务名',
'Please enter TIS DataX job name': '请输入TIS DataX任务名',
'Please enter the table of target': '请输入目标表名',
'Please enter a Target Table(required)': '请输入目标表(必填)',
SpeedByte: '限流(字节数)',

1
pom.xml

@ -1093,6 +1093,7 @@
<include>**/alert/processor/AlertRequestProcessorTest.java</include>
<include>**/alert/runner/AlertSenderTest.java</include>
<include>**/alert/AlertServerTest.java</include>
<include>**/plugin/task/tis/TISTaskTest.java</include>
</includes>
<!-- <skip>true</skip> -->
</configuration>

2
tools/dependencies/known-dependencies.txt

@ -16,7 +16,7 @@ api-util-1.0.0-M20.jar
asm-3.1.jar
asm-6.2.1.jar
aspectjweaver-1.9.6.jar
async-http-client-1.6.5.jar
async-http-client-2.12.3.jar
audience-annotations-0.5.0.jar
avro-1.7.4.jar
aws-java-sdk-1.7.4.jar

Loading…
Cancel
Save