diff --git a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml index 079185cf0c..9181e0d63b 100644 --- a/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml +++ b/dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml @@ -71,12 +71,6 @@ provided - - com.fasterxml.jackson.core - jackson-databind - provided - - junit junit diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java index 62b69fec90..f156eadeb2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java @@ -204,7 +204,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic // verify udfFuncName is exist if (!funcName.equals(udf.getFuncName())) { if (checkUdfFuncNameExists(funcName)) { - logger.error("UdfFunc {} has exist, can't create again.", funcName); + logger.error("UdfFuncRequest {} has exist, can't create again.", funcName); result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS); result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg()); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java index dddfb6de8e..23ac7b07ee 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java @@ -230,7 +230,7 @@ public class UdfFuncServiceTest { } /** - * get UdfFunc id + * get UdfFuncRequest id */ private UdfFunc getUdfFunc() { UdfFunc udfFunc = new UdfFunc(); diff --git a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml index 53e3afb220..bc2be45766 100644 --- a/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml +++ b/dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml @@ -106,5 +106,10 @@ + + + + + \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index f37af08e7d..1865d3ec54 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -25,11 +25,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext; /** * TaskExecutionContext builder diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index c32c38347f..52353bebd5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index bab83dbf2d..fc10a2943f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -42,10 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -55,6 +52,10 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest; import org.apache.commons.lang.StringUtils; @@ -345,13 +346,14 @@ public class TaskPriorityQueueConsumer extends Thread { } List udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray); - Map udfFuncMap = new HashMap<>(); + UdfFuncRequest udfFuncRequest; + Map udfFuncRequestMap = new HashMap<>(); for (UdfFunc udfFunc : udfFuncList) { + udfFuncRequest = JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class); String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF); - udfFuncMap.put(udfFunc, tenantCode); + udfFuncRequestMap.put(udfFuncRequest, tenantCode); } - - sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap); + sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 5a164e8879..b0a5625341 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.task.AbstractTask; +import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; import org.apache.dolphinscheduler.spi.task.TaskChannel; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; @@ -166,8 +167,6 @@ public class TaskExecuteThread implements Runnable, Delayed { if (null == taskChannel) { throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } - - //TODO Temporary operation, To be adjusted TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class); task = taskChannel.createTask(taskRequest); @@ -179,8 +178,9 @@ public class TaskExecuteThread implements Runnable, Delayed { this.task.handle(); // task result process - this.task.after(); - + if (this.task.getNeedAlert()) { + sendAlert(this.task.getTaskAlertInfo()); + } responseCommand.setStatus(this.task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); responseCommand.setProcessId(this.task.getProcessId()); @@ -203,6 +203,10 @@ public class TaskExecuteThread implements Runnable, Delayed { } } + private void sendAlert(TaskAlertInfo taskAlertInfo) { + alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent()); + } + /** * when task finish, clear execute path. */ diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java deleted file mode 100644 index a2ca6be3cd..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.entity; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.UdfFunc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import java.util.HashMap; -import java.util.Map; -import org.junit.Test; - -public class SQLTaskExecutionContextTest { - - /** - * test parse josn String to TaskExecutionContext - */ - @Test - public void testTaskExecutionContext() { - String contextJson = "{\n" - + " \"taskInstanceId\":32,\n" - + " \"taskName\":\"test-hive-func\",\n" - + " \"startTime\":\"2020-07-19 16:45:46\",\n" - + " \"taskType\":\"SQL\",\n" - + " \"host\":null,\n" - + " \"executePath\":\"/tmp/dolphinscheduler/exec/process/1/5/14/32\",\n" - + " \"logPath\":null,\n" - + " \"taskJson\":\"{\\\"id\\\":\\\"tasks-70999\\\",\\\"name\\\":\\\"test-hive-func\\\"" - + ",\\\"desc\\\":null,\\\"type\\\":\\\"SQL\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," - + "\\\"loc\\\":null,\\\"maxRetryTimes\\\":0,\\\"retryInterval\\\":1," - + "\\\"params\\\":{\\\"type\\\":\\\"HIVE\\\",\\\"datasource\\\":2," - + "\\\"sql\\\":\\\"select mid_id, user_id," - + " version_code, version_name, lang, source, os, area, model, " - + "brand, sdk_version, gmail, height_width, app_time, network," - + " lng, lat, dt,\\\\n Lower(model)\\\\nfrom dws_uv_detail_day limit 5;" - + "\\\",\\\"udfs\\\":\\\"1\\\",\\\"sqlType\\\":\\\"0\\\",\\\"title\\\":\\\"" - + "test-hive-user-func\\\",\\\"receivers\\\":\\\"534634799@qq.com\\\"," - + "\\\"receiversCc\\\":\\\"\\\",\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[]," - + "\\\"connParams\\\":\\\"\\\",\\\"preStatements\\\":[],\\\"postStatements\\\":[]}," - + "\\\"preTasks\\\":[],\\\"extras\\\":null,\\\"depList\\\":[],\\\"dependence\\\":{}," - + "\\\"conditionResult\\\":{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}," - + "\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"workerGroup\\\":\\\"default\\\"," - + "\\\"workerGroupId\\\":null,\\\"timeout\\\":{\\\"strategy\\\":\\\"\\\",\\\"interval\\\":null," - + "\\\"enable\\\":false},\\\"conditionsTask\\\":false,\\\"forbidden\\\":false," - + "\\\"taskTimeoutParameter\\\":{\\\"enable\\\":false,\\\"strategy\\\":null," - + "\\\"interval\\\":0}}\",\n" - + " \"processId\":0,\n" - + " \"appIds\":null,\n" - + " \"processInstanceId\":14,\n" - + " \"scheduleTime\":null,\n" - + " \"globalParams\":null,\n" - + " \"executorId\":2,\n" - + " \"cmdTypeIfComplement\":2,\n" - + " \"tenantCode\":\"sl\",\n" - + " \"queue\":\"sl\",\n" - + " \"processDefineId\":5,\n" - + " \"projectId\":1,\n" - + " \"taskParams\":null,\n" - + " \"envFile\":null,\n" - + " \"definedParams\":null,\n" - + " \"taskAppId\":null,\n" - + " \"taskTimeoutStrategy\":0,\n" - + " \"taskTimeout\":0,\n" - + " \"workerGroup\":\"default\",\n" - + " \"resources\":{\n" - + " },\n" - + " \"sqlTaskExecutionContext\":{\n" - + " \"warningGroupId\":0,\n" - + " \"connectionParams\":\"{\\\"type\\\":null,\\\"address\\\":" - + "\\\"jdbc:hive2://localhost:10000\\\",\\\"database\\\":\\\"gmall\\\"," - + "\\\"jdbcUrl\\\":\\\"jdbc:hive2://localhost:10000/gmall\\\"," - + "\\\"user\\\":\\\"sl-test\\\",\\\"password\\\":\\\"123456sl\\\"}\",\n" - + " \"udfFuncTenantCodeMap\": null" - + " },\n" - + " \"dataxTaskExecutionContext\":{\n" - + " \"dataSourceId\":0,\n" - + " \"sourcetype\":0,\n" - + " \"sourceConnectionParams\":null,\n" - + " \"dataTargetId\":0,\n" - + " \"targetType\":0,\n" - + " \"targetConnectionParams\":null\n" - + " },\n" - + " \"dependenceTaskExecutionContext\":null,\n" - + " \"sqoopTaskExecutionContext\":{\n" - + " \"dataSourceId\":0,\n" - + " \"sourcetype\":0,\n" - + " \"sourceConnectionParams\":null,\n" - + " \"dataTargetId\":0,\n" - + " \"targetType\":0,\n" - + " \"targetConnectionParams\":null\n" - + " },\n" - + " \"procedureTaskExecutionContext\":{\n" - + " \"connectionParams\":null\n" - + " }\n" - + "}\n"; - - TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); - - assertNotNull(taskExecutionContext); - } - - @Test - public void testSqlTaskExecutionContext() { - - SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); - sqlTaskExecutionContext.setWarningGroupId(0); - - Map udfmap = new HashMap<>(); - - UdfFunc udfFunc = new UdfFunc(); - udfFunc.setArgTypes("1"); - udfFunc.setId(1); - udfFunc.setResourceName("name1"); - udfmap.put(udfFunc, "map1"); - - UdfFunc udfFunc2 = new UdfFunc(); - udfFunc2.setArgTypes("2"); - udfFunc2.setId(2); - udfFunc2.setResourceName("name2"); - udfmap.put(udfFunc2, "map2"); - - sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfmap); - - String contextJson = JSONUtils.toJsonString(sqlTaskExecutionContext); - SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class); - - assertNotNull(parseSqlTask); - assertEquals(sqlTaskExecutionContext.getWarningGroupId(), parseSqlTask.getWarningGroupId()); - assertEquals(sqlTaskExecutionContext.getUdfFuncTenantCodeMap().size(), parseSqlTask.getUdfFuncTenantCodeMap().size()); - } - - /** - * test the SQLTaskExecutionContext - */ - @Test - public void testSqlTaskExecutionContextParse() { - - // SQLTaskExecutionContext.udfFuncTenantCodeMap is null - String contextJson = "{\n" - + " \"warningGroupId\":0,\n" - + " \"connectionParams\":null,\n" - + " \"udfFuncTenantCodeMap\":null" - + "}\n}"; - SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class); - - assertNotNull(parseSqlTask); - assertEquals(0,parseSqlTask.getWarningGroupId()); - assertNull(parseSqlTask.getUdfFuncTenantCodeMap()); - - // SQLTaskExecutionContext.udfFuncTenantCodeMap is not null - contextJson = "{\"warningGroupId\":0," - + "\"connectionParams\":null," - + "\"udfFuncTenantCodeMap\":{\"" - + "{\\\"id\\\":2,\\\"userId\\\":0," - + "\\\"funcName\\\":null,\\\"className\\\":null,\\\"argTypes\\\":\\\"2\\\",\\\"database\\\":null," - + "\\\"description\\\":null,\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name2\\\",\\\"type\\\":null," - + "\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map2\"," - + "\"{\\\"id\\\":1,\\\"userId\\\":0,\\\"funcName\\\":null," - + "\\\"className\\\":null,\\\"argTypes\\\":\\\"1\\\"," - + "\\\"database\\\":null,\\\"description\\\":null," - + "\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name1\\\"," - + "\\\"type\\\":null,\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map1\"}}\n"; - - SQLTaskExecutionContext parseSqlTask2 = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class); - - assertNotNull(parseSqlTask2); - assertEquals(0,parseSqlTask2.getWarningGroupId()); - assertEquals(2,parseSqlTask2.getUdfFuncTenantCodeMap().size()); - } - -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 2d4a1fe5ce..976e058169 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -32,12 +32,12 @@ import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; +import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import java.util.ArrayList; import java.util.Date; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java index d0e822b80a..8b89215049 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.spi.common;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.common;/* * limitations under the License. */ +package org.apache.dolphinscheduler.spi.common; + import org.apache.dolphinscheduler.spi.params.base.PluginParams; import java.util.List; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java index adb913675d..55f5203967 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.spi.task;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/* * limitations under the License. */ +package org.apache.dolphinscheduler.spi.task; + import org.apache.dolphinscheduler.spi.utils.CollectionUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -37,7 +39,7 @@ public abstract class AbstractParameters implements IParameters { /** * local parameters */ - private List localParams; + protected List localParams; /** * var pool diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java index b9c2632e5e..5a6cfd2c51 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java @@ -60,6 +60,10 @@ public abstract class AbstractTask { */ protected volatile int exitStatusCode = -1; + protected boolean needAlert = false; + + protected TaskAlertInfo taskAlertInfo; + /** * constructor * @@ -79,7 +83,6 @@ public abstract class AbstractTask { return null; } - /** * task handle * @@ -142,6 +145,22 @@ public abstract class AbstractTask { this.resultString = resultString; } + public boolean getNeedAlert() { + return needAlert; + } + + public void setNeedAlert(boolean needAlert) { + this.needAlert = needAlert; + } + + public TaskAlertInfo getTaskAlertInfo() { + return taskAlertInfo; + } + + public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) { + this.taskAlertInfo = taskAlertInfo; + } + /** * get task parameters * @@ -149,13 +168,6 @@ public abstract class AbstractTask { */ public abstract AbstractParameters getParameters(); - /** - * result processing maybe - */ - public void after() { - - } - /** * get exit status according to exitCode * diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java new file mode 100644 index 0000000000..75faad503f --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.task; + +public class TaskAlertInfo { + + private String title; + + private String content; + + private Integer alertGroupId; + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } + + public Integer getAlertGroupId() { + return alertGroupId; + } + + public void setAlertGroupId(Integer alertGroupId) { + this.alertGroupId = alertGroupId; + } +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java index 7ab3b17aeb..681970f243 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java @@ -1,4 +1,4 @@ -package org.apache.dolphinscheduler.spi.task;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/* * limitations under the License. */ +package org.apache.dolphinscheduler.spi.task; + import org.apache.dolphinscheduler.spi.task.request.TaskRequest; public interface TaskChannel { diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java index b7d5cbb01e..5b7ae95043 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java @@ -222,6 +222,11 @@ public class TaskConstants { public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0; public static final int LOG_QUERY_LIMIT = 4096; + /** + * default display rows + */ + public static final int DEFAULT_DISPLAY_ROWS = 10; + /** * jar */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java rename to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java index dd8d64698f..276923d37c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.spi.task.request; import java.io.Serializable; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java similarity index 84% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java rename to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java index fa670e2853..b712b50a7f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.spi.task.request; + +import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer; -import org.apache.dolphinscheduler.dao.entity.UdfFunc; -import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.io.Serializable; import java.util.Map; @@ -42,7 +42,7 @@ public class SQLTaskExecutionContext implements Serializable { * udf function tenant code map */ @JsonDeserialize(keyUsing = UdfFuncDeserializer.class) - private Map udfFuncTenantCodeMap; + private Map udfFuncTenantCodeMap; public int getWarningGroupId() { @@ -53,11 +53,11 @@ public class SQLTaskExecutionContext implements Serializable { this.warningGroupId = warningGroupId; } - public Map getUdfFuncTenantCodeMap() { + public Map getUdfFuncTenantCodeMap() { return udfFuncTenantCodeMap; } - public void setUdfFuncTenantCodeMap(Map udfFuncTenantCodeMap) { + public void setUdfFuncTenantCodeMap(Map udfFuncTenantCodeMap) { this.udfFuncTenantCodeMap = udfFuncTenantCodeMap; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java similarity index 98% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java rename to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java index c74414bb21..720892c93f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.entity; +package org.apache.dolphinscheduler.spi.task.request; import java.io.Serializable; diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java index 0de96e1734..3e131f25f5 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java @@ -188,6 +188,22 @@ public class TaskRequest { private Map paramsMap; + + /** + * sql TaskExecutionContext + */ + private SQLTaskExecutionContext sqlTaskExecutionContext; + + /** + * datax TaskExecutionContext + */ + private DataxTaskExecutionContext dataxTaskExecutionContext; + + /** + * sqoop TaskExecutionContext + */ + private SqoopTaskExecutionContext sqoopTaskExecutionContext; + public Map getResources() { return resources; } @@ -428,4 +444,27 @@ public class TaskRequest { this.delayTime = delayTime; } + public SQLTaskExecutionContext getSqlTaskExecutionContext() { + return sqlTaskExecutionContext; + } + + public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) { + this.sqlTaskExecutionContext = sqlTaskExecutionContext; + } + + public DataxTaskExecutionContext getDataxTaskExecutionContext() { + return dataxTaskExecutionContext; + } + + public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) { + this.dataxTaskExecutionContext = dataxTaskExecutionContext; + } + + public SqoopTaskExecutionContext getSqoopTaskExecutionContext() { + return sqoopTaskExecutionContext; + } + + public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) { + this.sqoopTaskExecutionContext = sqoopTaskExecutionContext; + } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java new file mode 100644 index 0000000000..cbbd9e2596 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.task.request; + +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.io.IOException; +import java.util.Date; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.KeyDeserializer; + +/** + * udf function + */ +public class UdfFuncRequest { + /** + * id + */ + private int id; + /** + * user id + */ + private int userId; + + /** + * udf function name + */ + private String funcName; + + /** + * udf class name + */ + private String className; + + /** + * udf argument types + */ + private String argTypes; + + /** + * udf data base + */ + private String database; + + /** + * udf description + */ + private String description; + + /** + * resource id + */ + private int resourceId; + + /** + * resource name + */ + private String resourceName; + + /** + * udf function type: hive / spark + */ + private UdfType type; + + /** + * create time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + /** + * update time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public int getUserId() { + return userId; + } + + public void setUserId(int userId) { + this.userId = userId; + } + + public String getFuncName() { + return funcName; + } + + public void setFuncName(String funcName) { + this.funcName = funcName; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public String getArgTypes() { + return argTypes; + } + + public void setArgTypes(String argTypes) { + this.argTypes = argTypes; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public int getResourceId() { + return resourceId; + } + + public void setResourceId(int resourceId) { + this.resourceId = resourceId; + } + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + + public UdfType getType() { + return type; + } + + public void setType(UdfType type) { + this.type = type; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(Date updateTime) { + this.updateTime = updateTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + UdfFuncRequest udfFuncRequest = (UdfFuncRequest) o; + + if (id != udfFuncRequest.id) { + return false; + } + return !(funcName != null ? !funcName.equals(udfFuncRequest.funcName) : udfFuncRequest.funcName != null); + + } + + @Override + public int hashCode() { + int result = id; + result = 31 * result + (funcName != null ? funcName.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return JSONUtils.toJsonString(this); + } + + public static class UdfFuncDeserializer extends KeyDeserializer { + + @Override + public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException { + if (StringUtils.isBlank(key)) { + return null; + } + return JSONUtils.parseObject(key, UdfFuncRequest.class); + } + } +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java new file mode 100644 index 0000000000..2b31087797 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.task.request; + +/** + * UDF type + */ +public enum UdfType { + /** + * 0 hive; 1 spark + */ + HIVE(0, "hive"), + SPARK(1, "spark"); + + UdfType(int code, String descp) { + this.code = code; + this.descp = descp; + } + + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } + + public static UdfType of(int type) { + for (UdfType ut : values()) { + if (ut.getCode() == type) { + return ut; + } + } + throw new IllegalArgumentException("invalid type : " + type); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index d5e3e0d4a8..d2c7c559f6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -65,8 +65,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor { public void cancelApplication(boolean status) throws Exception { cancel = true; // cancel process - - //todo 交给上层处理 shellCommandExecutor.cancelApplication(); // TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); // if (status && taskInstance != null){ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java index 01484c47cc..6f7806341b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java @@ -32,5 +32,4 @@ public class HttpTaskChannel implements TaskChannel { public AbstractTask createTask(TaskRequest taskRequest) { return new HttpTask(taskRequest); } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml index 6f567441ee..09ddf41adf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml @@ -26,6 +26,7 @@ 4.0.0 dolphinscheduler-task-sql + dolphinscheduler-plugin @@ -39,8 +40,15 @@ ${project.version} - - + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + dolphinscheduler-task-sql-${project.version} + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java new file mode 100644 index 0000000000..a90c8db5e5 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.spi.task.Property; + +import java.util.Map; + +/** + * Used to contains both prepared sql string and its to-be-bind parameters + */ +public class SqlBinds { + private final String sql; + private final Map paramsMap; + + public SqlBinds(String sql, Map paramsMap) { + this.sql = sql; + this.paramsMap = paramsMap; + } + + public String getSql() { + return sql; + } + + public Map getParamsMap() { + return paramsMap; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java new file mode 100644 index 0000000000..487c5bd481 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.spi.enums.DataType; +import org.apache.dolphinscheduler.spi.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.task.Property; +import org.apache.dolphinscheduler.spi.task.ResourceInfo; +import org.apache.dolphinscheduler.spi.utils.CollectionUtils; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Sql/Hql parameter + */ +public class SqlParameters extends AbstractParameters { + /** + * data source type,eg MYSQL, POSTGRES, HIVE ... + */ + private String type; + + /** + * datasource id + */ + private int datasource; + + /** + * sql + */ + private String sql; + + /** + * sql type + * 0 query + * 1 NON_QUERY + */ + private int sqlType; + + /** + * send email + */ + private Boolean sendEmail; + + /** + * display rows + */ + private int displayRows; + + /** + * udf list + */ + private String udfs; + /** + * show type + * 0 TABLE + * 1 TEXT + * 2 attachment + * 3 TABLE+attachment + */ + private String showType; + /** + * SQL connection parameters + */ + private String connParams; + /** + * Pre Statements + */ + private List preStatements; + /** + * Post Statements + */ + private List postStatements; + + /** + * groupId + */ + private int groupId; + /** + * title + */ + private String title; + + private int limit; + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public int getDatasource() { + return datasource; + } + + public void setDatasource(int datasource) { + this.datasource = datasource; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getUdfs() { + return udfs; + } + + public void setUdfs(String udfs) { + this.udfs = udfs; + } + + public int getSqlType() { + return sqlType; + } + + public void setSqlType(int sqlType) { + this.sqlType = sqlType; + } + + public Boolean getSendEmail() { + return sendEmail; + } + + public void setSendEmail(Boolean sendEmail) { + this.sendEmail = sendEmail; + } + + public int getDisplayRows() { + return displayRows; + } + + public void setDisplayRows(int displayRows) { + this.displayRows = displayRows; + } + + public String getShowType() { + return showType; + } + + public void setShowType(String showType) { + this.showType = showType; + } + + public String getConnParams() { + return connParams; + } + + public void setConnParams(String connParams) { + this.connParams = connParams; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public List getPreStatements() { + return preStatements; + } + + public void setPreStatements(List preStatements) { + this.preStatements = preStatements; + } + + public List getPostStatements() { + return postStatements; + } + + public void setPostStatements(List postStatements) { + this.postStatements = postStatements; + } + + public int getGroupId() { + return groupId; + } + + public void setGroupId(int groupId) { + this.groupId = groupId; + } + + @Override + public boolean checkParameters() { + return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql); + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + @Override + public void dealOutParam(String result) { + if (CollectionUtils.isEmpty(localParams)) { + return; + } + List outProperty = getOutProperty(localParams); + if (CollectionUtils.isEmpty(outProperty)) { + return; + } + if (StringUtils.isEmpty(result)) { + varPool.addAll(outProperty); + return; + } + List> sqlResult = getListMapByString(result); + if (CollectionUtils.isEmpty(sqlResult)) { + return; + } + //if sql return more than one line + if (sqlResult.size() > 1) { + Map> sqlResultFormat = new HashMap<>(); + //init sqlResultFormat + Set keySet = sqlResult.get(0).keySet(); + for (String key : keySet) { + sqlResultFormat.put(key, new ArrayList<>()); + } + for (Map info : sqlResult) { + for (String key : info.keySet()) { + sqlResultFormat.get(key).add(String.valueOf(info.get(key))); + } + } + for (Property info : outProperty) { + if (info.getType() == DataType.LIST) { + info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp()))); + varPool.add(info); + } + } + } else { + //result only one line + Map firstRow = sqlResult.get(0); + for (Property info : outProperty) { + info.setValue(String.valueOf(firstRow.get(info.getProp()))); + varPool.add(info); + } + } + + } + + @Override + public String toString() { + return "SqlParameters{" + + "type='" + type + '\'' + + ", datasource=" + datasource + + ", sql='" + sql + '\'' + + ", sqlType=" + sqlType + + ", sendEmail=" + sendEmail + + ", displayRows=" + displayRows + + ", limit=" + limit + + ", udfs='" + udfs + '\'' + + ", showType='" + showType + '\'' + + ", connParams='" + connParams + '\'' + + ", groupId='" + groupId + '\'' + + ", title='" + title + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java new file mode 100644 index 0000000000..6fd439746a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam; +import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil; +import org.apache.dolphinscheduler.plugin.task.util.MapUtils; +import org.apache.dolphinscheduler.spi.enums.DbType; +import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.spi.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.task.Direct; +import org.apache.dolphinscheduler.spi.task.Property; +import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; +import org.apache.dolphinscheduler.spi.task.TaskConstants; +import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils; +import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils; +import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest; +import org.apache.dolphinscheduler.spi.utils.CollectionUtils; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.slf4j.Logger; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class SqlTask extends AbstractYarnTask { + + /** + * taskExecutionContext + */ + private TaskRequest taskExecutionContext; + + /** + * sql parameters + */ + private SqlParameters sqlParameters; + + /** + * base datasource + */ + private BaseConnectionParam baseConnectionParam; + + /** + * create function format + */ + private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''"; + + /** + * Abstract Yarn Task + * + * @param taskRequest taskRequest + */ + public SqlTask(TaskRequest taskRequest) { + super(taskRequest); + this.taskExecutionContext = taskRequest; + this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); + + assert sqlParameters != null; + if (!sqlParameters.checkParameters()) { + throw new RuntimeException("sql task params is not valid"); + } + } + + @Override + public AbstractParameters getParameters() { + return null; + } + + @Override + protected String buildCommand() { + return null; + } + + @Override + protected void setMainJarName() { + + } + + @Override + public void handle() throws Exception { + // set the name of the current thread + String threadLoggerInfoName = String.format(TaskConstants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + logger.info("Full sql parameters: {}", sqlParameters); + logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", + sqlParameters.getType(), + sqlParameters.getDatasource(), + sqlParameters.getSql(), + sqlParameters.getLocalParams(), + sqlParameters.getUdfs(), + sqlParameters.getShowType(), + sqlParameters.getConnParams(), + sqlParameters.getVarPool(), + sqlParameters.getLimit()); + try { + SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext(); + + // get datasource + baseConnectionParam = (BaseConnectionParam) DatasourceUtil.buildConnectionParams( + DbType.valueOf(sqlParameters.getType()), + sqlTaskExecutionContext.getConnectionParams()); + + // ready to execute SQL and parameter entity Map + SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); + List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) + .orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()) + .orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + + List createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), + logger); + + // execute sql task + executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + + setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS); + + } catch (Exception e) { + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + logger.error("sql task error: {}", e.toString()); + throw e; + } + } + + /** + * execute function and sql + * + * @param mainSqlBinds main sql binds + * @param preStatementsBinds pre statements binds + * @param postStatementsBinds post statements binds + * @param createFuncs create functions + */ + public void executeFuncAndSql(SqlBinds mainSqlBinds, + List preStatementsBinds, + List postStatementsBinds, + List createFuncs) throws Exception { + Connection connection = null; + PreparedStatement stmt = null; + ResultSet resultSet = null; + try { + + // create connection + connection = DatasourceUtil.getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam); + // create temp function + if (CollectionUtils.isNotEmpty(createFuncs)) { + createTempFunction(connection, createFuncs); + } + + // pre sql + preSql(connection, preStatementsBinds); + stmt = prepareStatementAndBind(connection, mainSqlBinds); + + String result = null; + // decide whether to executeQuery or executeUpdate based on sqlType + if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { + // query statements need to be convert to JsonArray and inserted into Alert to send + resultSet = stmt.executeQuery(); + result = resultProcess(resultSet); + + } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { + // non query statement + String updateResult = String.valueOf(stmt.executeUpdate()); + result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); + } + //deal out params + sqlParameters.dealOutParam(result); + postSql(connection, postStatementsBinds); + } catch (Exception e) { + logger.error("execute sql error: {}", e.getMessage()); + throw e; + } finally { + close(resultSet, stmt, connection); + } + } + + private String setNonQuerySqlReturn(String updateResult, List properties) { + String result = null; + for (Property info : properties) { + if (Direct.OUT == info.getDirect()) { + List> updateRL = new ArrayList<>(); + Map updateRM = new HashMap<>(); + updateRM.put(info.getProp(), updateResult); + updateRL.add(updateRM); + result = JSONUtils.toJsonString(updateRL); + break; + } + } + return result; + } + + /** + * result process + * + * @param resultSet resultSet + * @throws Exception Exception + */ + private String resultProcess(ResultSet resultSet) throws Exception { + ArrayNode resultJSONArray = JSONUtils.createArrayNode(); + if (resultSet != null) { + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + + int rowCount = 0; + + while (rowCount < sqlParameters.getLimit() && resultSet.next()) { + ObjectNode mapOfColValues = JSONUtils.createObjectNode(); + for (int i = 1; i <= num; i++) { + mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i))); + } + resultJSONArray.add(mapOfColValues); + rowCount++; + } + + int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS; + displayRows = Math.min(displayRows, resultJSONArray.size()); + logger.info("display sql result {} rows as follows:", displayRows); + for (int i = 0; i < displayRows; i++) { + String row = JSONUtils.toJsonString(resultJSONArray.get(i)); + logger.info("row {} : {}", i + 1, row); + } + } + String result = JSONUtils.toJsonString(resultJSONArray); + if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) { + sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) + ? sqlParameters.getTitle() + : taskExecutionContext.getTaskName() + " query result sets", result); + } + logger.debug("execute sql result : {}", result); + return result; + } + + /** + * send alert as an attachment + * + * @param title title + * @param content content + */ + private void sendAttachment(int groupId, String title, String content) { + setNeedAlert(Boolean.TRUE); + TaskAlertInfo taskAlertInfo = new TaskAlertInfo(); + taskAlertInfo.setAlertGroupId(groupId); + taskAlertInfo.setContent(content); + taskAlertInfo.setTitle(title); + } + + /** + * pre sql + * + * @param connection connection + * @param preStatementsBinds preStatementsBinds + */ + private void preSql(Connection connection, + List preStatementsBinds) throws Exception { + for (SqlBinds sqlBind : preStatementsBinds) { + try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) { + int result = pstmt.executeUpdate(); + logger.info("pre statement execute result: {}, for sql: {}", result, sqlBind.getSql()); + + } + } + } + + /** + * post sql + * + * @param connection connection + * @param postStatementsBinds postStatementsBinds + */ + private void postSql(Connection connection, + List postStatementsBinds) throws Exception { + for (SqlBinds sqlBind : postStatementsBinds) { + try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) { + int result = pstmt.executeUpdate(); + logger.info("post statement execute result: {},for sql: {}", result, sqlBind.getSql()); + } + } + } + + /** + * create temp function + * + * @param connection connection + * @param createFuncs createFuncs + */ + private void createTempFunction(Connection connection, + List createFuncs) throws Exception { + try (Statement funcStmt = connection.createStatement()) { + for (String createFunc : createFuncs) { + logger.info("hive create function sql: {}", createFunc); + funcStmt.execute(createFunc); + } + } + } + + /** + * close jdbc resource + * + * @param resultSet resultSet + * @param pstmt pstmt + * @param connection connection + */ + private void close(ResultSet resultSet, + PreparedStatement pstmt, + Connection connection) { + if (resultSet != null) { + try { + resultSet.close(); + } catch (SQLException e) { + logger.error("close result set error : {}", e.getMessage(), e); + } + } + + if (pstmt != null) { + try { + pstmt.close(); + } catch (SQLException e) { + logger.error("close prepared statement error : {}", e.getMessage(), e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + logger.error("close connection error : {}", e.getMessage(), e); + } + } + } + + /** + * preparedStatement bind + * + * @param connection connection + * @param sqlBinds sqlBinds + * @return PreparedStatement + * @throws Exception Exception + */ + private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) { + // is the timeout set + boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED + || taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) { + if (timeoutFlag) { + stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout()); + } + Map params = sqlBinds.getParamsMap(); + if (params != null) { + for (Map.Entry entry : params.entrySet()) { + Property prop = entry.getValue(); + ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); + } + } + logger.info("prepare statement replace sql : {} ", stmt); + return stmt; + } catch (Exception exception) { + throw new TaskException("SQL task prepareStatementAndBind error", exception); + } + + } + + /** + * regular expressions match the contents between two specified strings + * + * @param content content + * @param rgex rgex + * @param sqlParamsMap sql params map + * @param paramsPropsMap params props map + */ + public void setSqlParamsMap(String content, String rgex, Map sqlParamsMap, Map paramsPropsMap) { + Pattern pattern = Pattern.compile(rgex); + Matcher m = pattern.matcher(content); + int index = 1; + while (m.find()) { + + String paramName = m.group(1); + Property prop = paramsPropsMap.get(paramName); + + if (prop == null) { + logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance" + + " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId()); + } else { + sqlParamsMap.put(index, prop); + index++; + logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content); + } + + } + } + + /** + * print replace sql + * + * @param content content + * @param formatSql format sql + * @param rgex rgex + * @param sqlParamsMap sql params map + */ + private void printReplacedSql(String content, String formatSql, String rgex, Map sqlParamsMap) { + //parameter print style + logger.info("after replace sql , preparing : {}", formatSql); + StringBuilder logPrint = new StringBuilder("replaced sql , parameters:"); + if (sqlParamsMap == null) { + logger.info("printReplacedSql: sqlParamsMap is null."); + } else { + for (int i = 1; i <= sqlParamsMap.size(); i++) { + logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")"); + } + } + logger.info("Sql Params are {}", logPrint); + } + + /** + * ready to execute SQL and parameter entity Map + * + * @return SqlBinds + */ + private SqlBinds getSqlAndSqlParamsMap(String sql) { + Map sqlParamsMap = new HashMap<>(); + StringBuilder sqlBuilder = new StringBuilder(); + + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); + + // spell SQL according to the final user-defined variable + if (paramsMap == null) { + sqlBuilder.append(sql); + return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); + } + + if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { + String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), + ParamUtils.convert(paramsMap)); + logger.info("SQL title : {}", title); + sqlParameters.setTitle(title); + } + + //new + //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job + sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); + // special characters need to be escaped, ${} needs to be escaped + String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; + setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); + //Replace the original value in sql !{...} ,Does not participate in precompilation + String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*"; + sql = replaceOriginalValue(sql, rgexo, paramsMap); + // replace the ${} of the SQL statement with the Placeholder + String formatSql = sql.replaceAll(rgex, "?"); + sqlBuilder.append(formatSql); + + // print repalce sql + printReplacedSql(sql, formatSql, rgex, sqlParamsMap); + return new SqlBinds(sqlBuilder.toString(), sqlParamsMap); + } + + private String replaceOriginalValue(String content, String rgex, Map sqlParamsMap) { + Pattern pattern = Pattern.compile(rgex); + while (true) { + Matcher m = pattern.matcher(content); + if (!m.find()) { + break; + } + String paramName = m.group(1); + String paramValue = sqlParamsMap.get(paramName).getValue(); + content = m.replaceFirst(paramValue); + } + return content; + } + + /** + * create function list + * + * @param udfFuncTenantCodeMap key is udf function,value is tenant code + * @param logger logger + * @return create function list + */ + public static List createFuncs(Map udfFuncTenantCodeMap, Logger logger) { + + if (MapUtils.isEmpty(udfFuncTenantCodeMap)) { + logger.info("can't find udf function resource"); + return null; + } + List funcList = new ArrayList<>(); + // build temp function sql + buildTempFuncSql(funcList, new ArrayList<>(udfFuncTenantCodeMap.keySet())); + + return funcList; + } + + /** + * build temp function sql + * + * @param sqls sql list + * @param udfFuncRequests udf function list + */ + private static void buildTempFuncSql(List sqls, List udfFuncRequests) { + if (CollectionUtils.isNotEmpty(udfFuncRequests)) { + for (UdfFuncRequest udfFuncRequest : udfFuncRequests) { + sqls.add(MessageFormat + .format(CREATE_FUNCTION_FORMAT, udfFuncRequest.getFuncName(), udfFuncRequest.getClassName())); + } + } + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java new file mode 100644 index 0000000000..8da50f2a3b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.spi.task.AbstractTask; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; + +public class SqlTaskChannel implements TaskChannel { + @Override + public void cancelApplication(boolean status) { + + } + + @Override + public AbstractTask createTask(TaskRequest taskRequest) { + return new SqlTask(taskRequest); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java new file mode 100644 index 0000000000..47091ebb70 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import java.util.List; + +public class SqlTaskChannelFactory implements TaskChannelFactory { + @Override + public String getName() { + return "SQL"; + } + + @Override + public List getParams() { + return null; + } + + @Override + public TaskChannel create() { + return new SqlTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java new file mode 100644 index 0000000000..65a3977bba --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import com.google.common.collect.ImmutableList; + +public class SqlTaskPlugin implements DolphinSchedulerPlugin { + + @Override + public Iterable getTaskChannelFactorys() { + return ImmutableList.of(new SqlTaskChannelFactory()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java new file mode 100644 index 0000000000..9db9268902 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.sql; + +public enum SqlType { + /** + * sql type + * 0 query + * 1 NON_QUERY + */ + QUERY, NON_QUERY +}