diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js index 55a56d685f..e92a68c8c1 100644 --- a/docs/configs/docsdev.js +++ b/docs/configs/docsdev.js @@ -169,6 +169,10 @@ export default { title: 'DVC', link: '/en-us/docs/dev/user_doc/guide/task/dvc.html', }, + { + title: 'Dinky', + link: '/en-us/docs/dev/user_doc/guide/task/dinky.html', + }, ], }, { @@ -553,6 +557,10 @@ export default { title: 'DVC', link: '/zh-cn/docs/dev/user_doc/guide/task/dvc.html', }, + { + title: 'Dinky', + link: '/zh-cn/docs/dev/user_doc/guide/task/dinky.html', + }, ], }, { diff --git a/docs/docs/en/guide/task/dinky.md b/docs/docs/en/guide/task/dinky.md new file mode 100644 index 0000000000..be44e29a69 --- /dev/null +++ b/docs/docs/en/guide/task/dinky.md @@ -0,0 +1,40 @@ +# Dinky + +## Overview + +Use `Dinky Task` to create a dinky-type task and support one-stop development, debugging, operation and maintenance of FlinkSql, Flink jar and SQL. When the worker executes `Dinky Task`, +it will call `Dinky API` to trigger dinky task. Click [here](http://www.dlink.top/) for details about `Dinky`. + +## Create Task + +- Click Project Management-Project Name-Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page. +- Drag from the toolbar to the canvas. + +## Task Parameter + +| **Parameter** | **Description** | +| ------- | ---------- | +| Node Name | Set the name of the task. Node names within a workflow definition are unique. | +| Run flag | Indicates whether the node can be scheduled normally. If it is not necessary to execute, you can turn on the prohibiting execution switch. | +| Description | Describes the function of this node. | +| Task priority | When the number of worker threads is insufficient, they are executed in order from high to low according to the priority, and they are executed according to the first-in, first-out principle when the priority is the same. | +| Worker group | The task is assigned to the machines in the worker group for execution. If Default is selected, a worker machine will be randomly selected for execution. | +| Task group name | The group in Resources, if not configured, it will not be used. | +| Environment Name | Configure the environment in which to run the script. | +| Number of failed retries | The number of times the task is resubmitted after failure. It supports drop-down and manual filling. | +| Failure Retry Interval | The time interval for resubmitting the task if the task fails. It supports drop-down and manual filling. | +| Timeout alarm | Check Timeout Alarm and Timeout Failure. When the task exceeds the "timeout duration", an alarm email will be sent and the task execution will fail. | +| Dinky Address | The url for a dinky server. | +| Dinky Task ID | The unique task id for a dinky task. | +| Online Task | Specify whether the current dinky job is online. If yes, the submitted job can only be submitted successfully when it is published and there is no corresponding Flink job instance running. | + +## Task Example + +### Dinky Task Example + +This example illustrates how to create a dinky task node. + +![demo-dinky](../../../../img/tasks/demo/dinky.png) + +![demo-get-dinky-task-id](../../../../img/tasks/demo/dinky_task_id.png) + diff --git a/docs/docs/zh/guide/task/dinky.md b/docs/docs/zh/guide/task/dinky.md new file mode 100644 index 0000000000..e4741296b8 --- /dev/null +++ b/docs/docs/zh/guide/task/dinky.md @@ -0,0 +1,40 @@ +# Dinky + +## Overview + +`Dinky`任务类型,用于创建并执行`Dinky`类型任务以支撑一站式的开发、调试、运维 FlinkSQL、Flink Jar、SQL。worker 执行该任务的时候,会通过`Dinky API`触发`Dinky 的作业`。 +点击[这里](http://www.dlink.top/) 获取更多关于`Dinky`的信息。 + +## Create Task + +- 点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。 +- 工具栏中拖动 到画板中,即可完成创建。 + +## Task Parameter + +| **参数** | **描述** | +|-------------|--------------------------------------------------------------------| +| 任务名称 | 设置任务的名称。一个工作流定义中的节点名称是唯一的。 | +| 运行标志 | 标识这个节点是否可以正常调度。如果不需要执行,可以打开禁止执行开关。 | +| 描述 | 描述该节点的功能。 | +| 任务优先级 | worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。 | +| Worker 分组 | 任务分配给 worker 组的机器机执行,选择 Default,会随机选择一台 worker 机执行。 | +| 任务组名 | 资源中的组,如果未配置,将不会使用。 | +| 环境名称 | 配置运行脚本的环境。 | +| 失败重试次数 | 任务失败重新提交的次数,支持下拉和手填。 | +| 失败重试间隔 | 任务失败重新提交任务的时间间隔,支持下拉和手填。 | +| 超时告警 | 勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败. | +| Dinky 地址 | Dinky 服务的 url。 | +| Dinky 任务 ID | Dinky 作业对应的唯一ID。 | +| 上线作业 | 指定当前 Dinky 作业是否上线,如果是,则该被提交的作业只能处于已发布且当前无对应的 Flink Job 实例在运行才可提交成功。 | + +## Task Example + +### Dinky Task Example + +这个示例展示了如何创建 Dinky 任务节点: + +![demo-dinky](../../../../img/tasks/demo/dinky.png) + +![demo-get-dinky-task-id](../../../../img/tasks/demo/dinky_task_id.png) + diff --git a/docs/img/tasks/demo/dinky.png b/docs/img/tasks/demo/dinky.png new file mode 100644 index 0000000000..a2027faeab Binary files /dev/null and b/docs/img/tasks/demo/dinky.png differ diff --git a/docs/img/tasks/demo/dinky_task_id.png b/docs/img/tasks/demo/dinky_task_id.png new file mode 100644 index 0000000000..280da5f30f Binary files /dev/null and b/docs/img/tasks/demo/dinky_task_id.png differ diff --git a/docs/img/tasks/icons/dinky.png b/docs/img/tasks/icons/dinky.png new file mode 100644 index 0000000000..7f4ad3997b Binary files /dev/null and b/docs/img/tasks/icons/dinky.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index 7a9d8d91ae..759b94505e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -177,6 +177,12 @@ dolphinscheduler-task-dvc ${project.version} + + + org.apache.dolphinscheduler + dolphinscheduler-task-dinky + ${project.version} + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/pom.xml new file mode 100644 index 0000000000..ec3259c0bc --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/pom.xml @@ -0,0 +1,50 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + dev-SNAPSHOT + + 4.0.0 + jar + + dolphinscheduler-task-dinky + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyParameters.java new file mode 100644 index 0000000000..2b0d6c1d32 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyParameters.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.Collections; +import java.util.List; + +public class DinkyParameters extends AbstractParameters { + + /** + * parameters for dinky Open API + * + * @see Dinky_Open_API + */ + private String address; + private String taskId; + private boolean online = false; + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(this.address) && StringUtils.isNotEmpty(this.taskId); + } + + @Override + public List getResourceFilesList() { + return Collections.emptyList(); + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public boolean isOnline() { + return online; + } + + public void setOnline(boolean online) { + this.online = online; + } + + @Override + public String toString() { + return "DinkyParameters{" + + "address='" + address + '\'' + + ", taskId='" + taskId + '\'' + + ", online='" + online + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java new file mode 100644 index 0000000000..622a674780 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.MissingNode; + +public class DinkyTask extends AbstractTaskExecutor { + + /** + * taskExecutionContext + */ + private final TaskExecutionContext taskExecutionContext; + + /** + * dinky parameters + */ + private DinkyParameters dinkyParameters; + + /** + * constructor + * + * @param taskExecutionContext taskExecutionContext + */ + protected DinkyTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + this.taskExecutionContext = taskExecutionContext; + } + + @Override + public void init() { + final String taskParams = taskExecutionContext.getTaskParams(); + logger.info("dinky task params:{}", taskParams); + this.dinkyParameters = JSONUtils.parseObject(taskParams, DinkyParameters.class); + if (this.dinkyParameters == null || !this.dinkyParameters.checkParameters()) { + throw new DinkyTaskException("dinky task params is not valid"); + } + } + + @Override + public void handle() throws Exception { + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + boolean isOnline = this.dinkyParameters.isOnline(); + JsonNode result; + if (isOnline) { + // Online dinky task, and only one job is allowed to execute + result = onlineTask(address, taskId); + } else { + // Submit dinky task + result = submitTask(address, taskId); + } + if (checkResult(result)) { + boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean(); + String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText(); + boolean finishFlag = false; + while (!finishFlag) { + JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId); + if (!checkResult(jobInstanceInfoResult)) { + break; + } + String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); + switch (jobInstanceStatus) { + case DinkyTaskConstants.STATUS_FINISHED: + final int exitStatusCode = mapStatusToExitCode(status); + // Use address-taskId as app id + setAppIds(String.format("%s-%s", address, taskId)); + setExitStatusCode(exitStatusCode); + logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS)); + finishFlag = true; + break; + case DinkyTaskConstants.STATUS_FAILED: + case DinkyTaskConstants.STATUS_CANCELED: + case DinkyTaskConstants.STATUS_UNKNOWN: + errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText()); + finishFlag = true; + break; + default: + Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS); + } + } + } + } + + /** + * map dinky task status to exitStatusCode + * + * @param status dinky job status + * @return exitStatusCode + */ + private int mapStatusToExitCode(boolean status) { + if (status) { + return TaskConstants.EXIT_CODE_SUCCESS; + } else { + return TaskConstants.EXIT_CODE_FAILURE; + } + } + + private boolean checkResult(JsonNode result) { + if (result instanceof MissingNode || result == null) { + errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS); + return false; + } else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) { + errorHandle(result.get("msg")); + return false; + } + return true; + } + + private void errorHandle(Object msg) { + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + logger.error("dinky task submit failed with error: {}", msg); + } + + @Override + public AbstractParameters getParameters() { + return dinkyParameters; + } + + @Override + public void cancelApplication(boolean status) throws Exception { + super.cancelApplication(status); + String address = this.dinkyParameters.getAddress(); + String taskId = this.dinkyParameters.getTaskId(); + logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}", + this.taskExecutionContext.getTaskInstanceId(), + address, + taskId); + cancelTask(address, taskId); + logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}", + this.taskExecutionContext.getTaskInstanceId(), + address, + taskId); + } + + private JsonNode submitTask(String address, String taskId) { + Map params = new HashMap<>(); + params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId); + return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params)); + } + + private JsonNode onlineTask(String address, String taskId) { + Map params = new HashMap<>(); + params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId); + return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params)); + } + + private JsonNode cancelTask(String address, String taskId) { + Map params = new HashMap<>(); + params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId); + params.put(DinkyTaskConstants.PARAM_SAVEPOINT_TYPE, DinkyTaskConstants.SAVEPOINT_CANCEL); + return parse(sendJsonStr(address + DinkyTaskConstants.SAVEPOINT_TASK, JSONUtils.toJsonString(params))); + } + + private JsonNode getJobInstanceInfo(String address, String taskId) { + Map params = new HashMap<>(); + params.put(DinkyTaskConstants.PARAM_JOB_INSTANCE_ID, taskId); + return parse(doGet(address + DinkyTaskConstants.GET_JOB_INFO, params)); + } + + private JsonNode parse(String res) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode result = null; + try { + result = mapper.readTree(res); + } catch (JsonProcessingException e) { + logger.error("dinky task submit failed with error", e); + } + return result; + } + + private String doGet(String url, Map params) { + String result = ""; + HttpClient httpClient = HttpClientBuilder.create().build(); + HttpGet httpGet = null; + try { + URIBuilder uriBuilder = new URIBuilder(url); + if (null != params && !params.isEmpty()) { + for (Map.Entry entry : params.entrySet()) { + uriBuilder.addParameter(entry.getKey(), entry.getValue()); + } + } + URI uri = uriBuilder.build(); + httpGet = new HttpGet(uri); + logger.info("access url: {}", uri); + HttpResponse response = httpClient.execute(httpGet); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + result = EntityUtils.toString(response.getEntity()); + logger.info("dinky task succeed with results: {}", result); + } else { + logger.error("dinky task terminated,response: {}", response); + } + } catch (IllegalArgumentException ie) { + logger.error("dinky task terminated: {}", ie.getMessage()); + } catch (Exception e) { + logger.error("dinky task terminated: ", e); + } finally { + if (null != httpGet) { + httpGet.releaseConnection(); + } + } + return result; + } + + private String sendJsonStr(String url, String params) { + String result = ""; + HttpClient httpClient = HttpClientBuilder.create().build(); + HttpPost httpPost = new HttpPost(url); + try { + httpPost.addHeader("Content-type", "application/json; charset=utf-8"); + httpPost.setHeader("Accept", "application/json"); + if (StringUtils.isNotBlank(params)) { + httpPost.setEntity(new StringEntity(params, StandardCharsets.UTF_8)); + } + HttpResponse response = httpClient.execute(httpPost); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + result = EntityUtils.toString(response.getEntity()); + logger.info("dinky task succeed with results: {}", result); + } else { + logger.error("dinky task terminated,response: {}", response); + } + } catch (IllegalArgumentException ie) { + logger.error("dinky task terminated: {}", ie.getMessage()); + } catch (Exception he) { + logger.error("dinky task terminated: ", he); + } + return result; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannel.java new file mode 100644 index 0000000000..6d419b3dbb --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannel.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +public class DinkyTaskChannel implements TaskChannel { + + @Override + public void cancelApplication(boolean status) { + // nothing to do + } + + @Override + public AbstractTask createTask(TaskExecutionContext taskRequest) { + return new DinkyTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(ParametersNode parametersNode) { + return JSONUtils.parseObject(parametersNode.getTaskParams(), DinkyParameters.class); + } + + @Override + public ResourceParametersHelper getResources(String parameters) { + return null; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannelFactory.java new file mode 100644 index 0000000000..6c80a9769d --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskChannelFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +import com.google.auto.service.AutoService; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; +import org.apache.dolphinscheduler.spi.params.base.PluginParams; + +import java.util.ArrayList; +import java.util.List; + +@AutoService(TaskChannelFactory.class) +public class DinkyTaskChannelFactory implements TaskChannelFactory { + @Override + public String getName() { + return "DINKY"; + } + + @Override + public List getParams() { + return new ArrayList<>(); + } + + @Override + public TaskChannel create() { + return new DinkyTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java new file mode 100644 index 0000000000..847d4e3a31 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskConstants.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +/** + * Custom DinkyTaskConstants + */ +public class DinkyTaskConstants { + + private DinkyTaskConstants() { + throw new IllegalStateException("Utility class"); + } + + private static final String API_ROUTE = "/openapi/"; + public static final String SUBMIT_TASK = API_ROUTE + "submitTask"; + public static final String ONLINE_TASK = API_ROUTE + "onLineTask"; + public static final String SAVEPOINT_TASK = API_ROUTE + "savepointTask"; + public static final String GET_JOB_INFO = API_ROUTE + "getJobInstance"; + public static final int API_ERROR = 1; + public static final String API_VERSION_ERROR_TIPS = "Please check that the dinky version is greater than or equal to 0.6.5"; + public static final String API_RESULT_DATAS = "datas"; + + public static final String SAVEPOINT_CANCEL = "cancel"; + + public static final String PARAM_TASK_ID = "id"; + public static final String PARAM_JSON_TASK_ID = "taskId"; + public static final String PARAM_SAVEPOINT_TYPE = "type"; + public static final String PARAM_JOB_INSTANCE_ID = "id"; + + public static final String STATUS_FINISHED = "FINISHED"; + public static final String STATUS_CANCELED = "CANCELED"; + public static final String STATUS_FAILED = "FAILED"; + public static final String STATUS_UNKNOWN = "UNKNOWN"; + + public static final long SLEEP_MILLIS = 3000; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskException.java new file mode 100644 index 0000000000..86393bf3a6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTaskException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.dinky; + +/** + * Custom DinkyTaskException + */ +public class DinkyTaskException extends RuntimeException { + + public DinkyTaskException() { + super(); + } + + public DinkyTaskException(String message) { + super(message); + } + + public DinkyTaskException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index f40e882d47..49f874d904 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -56,5 +56,6 @@ dolphinscheduler-task-mlflow dolphinscheduler-task-openmldb dolphinscheduler-task-dvc + dolphinscheduler-task-dinky \ No newline at end of file diff --git a/dolphinscheduler-ui/public/images/task-icons/dinky.png b/dolphinscheduler-ui/public/images/task-icons/dinky.png new file mode 100644 index 0000000000..7f4ad3997b Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dinky.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png b/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png new file mode 100644 index 0000000000..1db1980ce9 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/dinky_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 93e643690f..b5514c8a75 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -706,6 +706,11 @@ export default { 'Please select column, only single column is supported', please_enter_threshold_number_is_needed: 'Please enter threshold number is needed', - please_enter_comparison_title: 'please select comparison title' + please_enter_comparison_title: 'please select comparison title', + dinky_address: 'Dinky address', + dinky_address_tips: 'Please enter the url of your dinky', + dinky_task_id: 'Dinky task id', + dinky_task_id_tips: 'Please enter the task id of your dinky', + dinky_online: 'Online task' } } diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index baed96432e..1def17bf06 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -692,6 +692,11 @@ export default { please_enter_filter_expression: '请输入源表过滤条件', please_enter_column_only_single_column_is_supported: '请选择源表检测列', please_enter_threshold_number_is_needed: '请输入阈值', - please_enter_comparison_title: '请选择期望值类型' + please_enter_comparison_title: '请选择期望值类型', + dinky_address: 'dinky 地址', + dinky_address_tips: '请输入 Dinky 地址', + dinky_task_id: 'dinky 作业ID', + dinky_task_id_tips: '请输入作业 ID', + dinky_online: '是否上线作业' } } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index 971fd0447b..3054129214 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -71,3 +71,4 @@ export { useMlflowProjects } from './use-mlflow-projects' export { useMlflowModels } from './use-mlflow-models' export { useOpenmldb } from './use-openmldb' export { useDvc } from './use-dvc' +export { useDinky } from './use-dinky' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dinky.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dinky.ts new file mode 100644 index 0000000000..d043b2209d --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-dinky.ts @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import { useCustomParams } from '.' +import type { IJsonItem } from '../types' + +export function useDinky(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + return [ + { + type: 'input', + field: 'address', + name: t('project.node.dinky_address'), + props: { + placeholder: t('project.node.dinky_address_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(_validate: any, value: string) { + if (!value) { + return new Error(t('project.node.dinky_address_tips')) + } + } + } + }, + { + type: 'input', + field: 'taskId', + name: t('project.node.dinky_task_id'), + props: { + placeholder: t('project.node.dinky_task_id_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(_validate: any, value: string) { + if (!value) { + return new Error(t('project.node.dinky_task_id_tips')) + } + } + } + }, + { + type: 'switch', + field: 'online', + name: t('project.node.dinky_online') + }, + ...useCustomParams({ model, field: 'localParams', isSimple: false }) + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index ae5a2efbae..aea0097295 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -360,7 +360,6 @@ export function formatParams(data: INodeData): { } if (data.taskType === 'DVC') { - taskParams.dvcTaskType = data.dvcTaskType taskParams.dvcRepository = data.dvcRepository taskParams.dvcVersion = data.dvcVersion @@ -370,6 +369,12 @@ export function formatParams(data: INodeData): { taskParams.dvcStoreUrl = data.dvcStoreUrl } + if (data.taskType === 'DINKY') { + taskParams.address = data.address + taskParams.taskId = data.taskId + taskParams.online = data.online + } + if (data.taskType === 'OPENMLDB') { taskParams.zk = data.zk taskParams.zkPath = data.zkPath diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index d2a5a6dd01..4240892eff 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -39,6 +39,7 @@ import { useJupyter } from './use-jupyter' import { useMlflow } from './use-mlflow' import { useOpenmldb } from './use-openmldb' import { useDvc } from './use-dvc' +import { useDinky } from './use-dinky' export default { SHELL: useShell, @@ -64,5 +65,6 @@ export default { JUPYTER: useJupyter, MLFLOW: useMlflow, OPENMLDB: useOpenmldb, - DVC: useDvc + DVC: useDvc, + DINKY: useDinky } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts new file mode 100644 index 0000000000..5e7a0aa0cb --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData, ITaskData } from '../types' + +export function useDinky({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + taskType: 'DINKY', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + timeoutNotifyStrategy: ['WARN'] + } as INodeData) + + let extra: IJsonItem[] = [] + if (from === 1) { + extra = [ + Fields.useTaskType(model, readonly), + Fields.useProcessName({ + model, + projectCode, + isCreate: !data?.id, + from, + processName: data?.processName + }) + ] + } + + return { + json: [ + Fields.useName(from), + ...extra, + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(), + Fields.useEnvironmentName(model, !model.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useDinky(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 7ceca70723..06a5c4adfe 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -348,6 +348,9 @@ interface ITaskParams { dvcMessage?: string dvcLoadSaveDataPath?: string dvcStoreUrl?: string + address?: string + taskId?: string + online?: boolean } interface INodeData diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index ebd3f741e1..1cc0e00c5b 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -39,6 +39,7 @@ export type TaskType = | 'MLFLOW' | 'OPENMLDB' | 'DVC' + | 'DINKY' export const TASK_TYPES_MAP = { SHELL: { @@ -123,5 +124,9 @@ export const TASK_TYPES_MAP = { DVC: { alias: 'DVC', helperLinkDisable: true + }, + DINKY: { + alias: 'DINKY', + helperLinkDisable: true } } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss index 656e5ae890..90615483fd 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss @@ -173,6 +173,9 @@ $bgLight: #ffffff; &.icon-dvc { background-image: url('/images/task-icons/dvc.png'); } + &.icon-dinky { + background-image: url('/images/task-icons/dinky.png'); + } } &:hover { @@ -249,6 +252,9 @@ $bgLight: #ffffff; &.icon-dvc { background-image: url('/images/task-icons/dvc_hover.png'); } + &.icon-dinky { + background-image: url('/images/task-icons/dinky_hover.png'); + } } } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/definition/tree/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/definition/tree/index.tsx index 0856c60953..2edf0c8626 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/definition/tree/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/tree/index.tsx @@ -133,6 +133,11 @@ export default defineComponent({ color: '#8c8c8f', image: `${import.meta.env.BASE_URL}images/task-icons/seatunnel.png` }, + { + taskType: 'DINKY', + color: '#d69f5b', + image: `${import.meta.env.BASE_URL}images/task-icons/dinky.png` + }, { taskType: 'DAG', color: '#bbdde9' } ])