Browse Source

[SPI][Task]add SQL Task (#6237)

Add Sql Task And Fix Task Parse Err
2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
b90152fc2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
  3. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
  4. 5
      dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
  5. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  6. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  7. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  8. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  9. 189
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
  10. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  11. 4
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java
  12. 6
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java
  13. 28
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java
  14. 51
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java
  15. 4
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java
  16. 5
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
  17. 2
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java
  18. 12
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
  19. 2
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java
  20. 39
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java
  21. 231
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java
  22. 55
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java
  23. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  24. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java
  25. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml
  26. 43
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java
  27. 294
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java
  28. 547
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
  29. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java
  30. 41
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java
  31. 31
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java
  32. 27
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java

6
dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml

@ -71,12 +71,6 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java

@ -204,7 +204,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
// verify udfFuncName is exist
if (!funcName.equals(udf.getFuncName())) {
if (checkUdfFuncNameExists(funcName)) {
logger.error("UdfFunc {} has exist, can't create again.", funcName);
logger.error("UdfFuncRequest {} has exist, can't create again.", funcName);
result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS);
result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg());
return result;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java

@ -230,7 +230,7 @@ public class UdfFuncServiceTest {
}
/**
* get UdfFunc id
* get UdfFuncRequest id
*/
private UdfFunc getUdfFunc() {
UdfFunc udfFunc = new UdfFunc();

5
dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml vendored

@ -106,5 +106,10 @@
<unpack/>
</artifact>
</artifactSet>
<artifactSet to="lib/plugin/task/sql">
<artifact id="${project.groupId}:dolphinscheduler-task-sql:zip:${project.version}">
<unpack/>
</artifact>
</artifactSet>
</runtime>

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -25,11 +25,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
/**
* TaskExecutionContext builder

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -42,10 +42,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@ -55,6 +52,10 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
import org.apache.commons.lang.StringUtils;
@ -345,13 +346,14 @@ public class TaskPriorityQueueConsumer extends Thread {
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByIds(udfFunIdsArray);
Map<UdfFunc, String> udfFuncMap = new HashMap<>();
UdfFuncRequest udfFuncRequest;
Map<UdfFuncRequest, String> udfFuncRequestMap = new HashMap<>();
for (UdfFunc udfFunc : udfFuncList) {
udfFuncRequest = JSONUtils.parseObject(JSONUtils.toJsonString(udfFunc), UdfFuncRequest.class);
String tenantCode = processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
udfFuncMap.put(udfFunc, tenantCode);
udfFuncRequestMap.put(udfFuncRequest, tenantCode);
}
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncRequestMap);
}
}

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
@ -166,8 +167,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
if (null == taskChannel) {
throw new PluginNotFoundException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
//TODO Temporary operation, To be adjusted
TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class);
task = taskChannel.createTask(taskRequest);
@ -179,8 +178,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
this.task.handle();
// task result process
this.task.after();
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
@ -203,6 +203,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
private void sendAlert(TaskAlertInfo taskAlertInfo) {
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent());
}
/**
* when task finish, clear execute path.
*/

189
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java

@ -1,189 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
public class SQLTaskExecutionContextTest {
/**
* test parse josn String to TaskExecutionContext
*/
@Test
public void testTaskExecutionContext() {
String contextJson = "{\n"
+ " \"taskInstanceId\":32,\n"
+ " \"taskName\":\"test-hive-func\",\n"
+ " \"startTime\":\"2020-07-19 16:45:46\",\n"
+ " \"taskType\":\"SQL\",\n"
+ " \"host\":null,\n"
+ " \"executePath\":\"/tmp/dolphinscheduler/exec/process/1/5/14/32\",\n"
+ " \"logPath\":null,\n"
+ " \"taskJson\":\"{\\\"id\\\":\\\"tasks-70999\\\",\\\"name\\\":\\\"test-hive-func\\\""
+ ",\\\"desc\\\":null,\\\"type\\\":\\\"SQL\\\",\\\"runFlag\\\":\\\"NORMAL\\\","
+ "\\\"loc\\\":null,\\\"maxRetryTimes\\\":0,\\\"retryInterval\\\":1,"
+ "\\\"params\\\":{\\\"type\\\":\\\"HIVE\\\",\\\"datasource\\\":2,"
+ "\\\"sql\\\":\\\"select mid_id, user_id,"
+ " version_code, version_name, lang, source, os, area, model, "
+ "brand, sdk_version, gmail, height_width, app_time, network,"
+ " lng, lat, dt,\\\\n Lower(model)\\\\nfrom dws_uv_detail_day limit 5;"
+ "\\\",\\\"udfs\\\":\\\"1\\\",\\\"sqlType\\\":\\\"0\\\",\\\"title\\\":\\\""
+ "test-hive-user-func\\\",\\\"receivers\\\":\\\"534634799@qq.com\\\","
+ "\\\"receiversCc\\\":\\\"\\\",\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[],"
+ "\\\"connParams\\\":\\\"\\\",\\\"preStatements\\\":[],\\\"postStatements\\\":[]},"
+ "\\\"preTasks\\\":[],\\\"extras\\\":null,\\\"depList\\\":[],\\\"dependence\\\":{},"
+ "\\\"conditionResult\\\":{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},"
+ "\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"workerGroup\\\":\\\"default\\\","
+ "\\\"workerGroupId\\\":null,\\\"timeout\\\":{\\\"strategy\\\":\\\"\\\",\\\"interval\\\":null,"
+ "\\\"enable\\\":false},\\\"conditionsTask\\\":false,\\\"forbidden\\\":false,"
+ "\\\"taskTimeoutParameter\\\":{\\\"enable\\\":false,\\\"strategy\\\":null,"
+ "\\\"interval\\\":0}}\",\n"
+ " \"processId\":0,\n"
+ " \"appIds\":null,\n"
+ " \"processInstanceId\":14,\n"
+ " \"scheduleTime\":null,\n"
+ " \"globalParams\":null,\n"
+ " \"executorId\":2,\n"
+ " \"cmdTypeIfComplement\":2,\n"
+ " \"tenantCode\":\"sl\",\n"
+ " \"queue\":\"sl\",\n"
+ " \"processDefineId\":5,\n"
+ " \"projectId\":1,\n"
+ " \"taskParams\":null,\n"
+ " \"envFile\":null,\n"
+ " \"definedParams\":null,\n"
+ " \"taskAppId\":null,\n"
+ " \"taskTimeoutStrategy\":0,\n"
+ " \"taskTimeout\":0,\n"
+ " \"workerGroup\":\"default\",\n"
+ " \"resources\":{\n"
+ " },\n"
+ " \"sqlTaskExecutionContext\":{\n"
+ " \"warningGroupId\":0,\n"
+ " \"connectionParams\":\"{\\\"type\\\":null,\\\"address\\\":"
+ "\\\"jdbc:hive2://localhost:10000\\\",\\\"database\\\":\\\"gmall\\\","
+ "\\\"jdbcUrl\\\":\\\"jdbc:hive2://localhost:10000/gmall\\\","
+ "\\\"user\\\":\\\"sl-test\\\",\\\"password\\\":\\\"123456sl\\\"}\",\n"
+ " \"udfFuncTenantCodeMap\": null"
+ " },\n"
+ " \"dataxTaskExecutionContext\":{\n"
+ " \"dataSourceId\":0,\n"
+ " \"sourcetype\":0,\n"
+ " \"sourceConnectionParams\":null,\n"
+ " \"dataTargetId\":0,\n"
+ " \"targetType\":0,\n"
+ " \"targetConnectionParams\":null\n"
+ " },\n"
+ " \"dependenceTaskExecutionContext\":null,\n"
+ " \"sqoopTaskExecutionContext\":{\n"
+ " \"dataSourceId\":0,\n"
+ " \"sourcetype\":0,\n"
+ " \"sourceConnectionParams\":null,\n"
+ " \"dataTargetId\":0,\n"
+ " \"targetType\":0,\n"
+ " \"targetConnectionParams\":null\n"
+ " },\n"
+ " \"procedureTaskExecutionContext\":{\n"
+ " \"connectionParams\":null\n"
+ " }\n"
+ "}\n";
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
assertNotNull(taskExecutionContext);
}
@Test
public void testSqlTaskExecutionContext() {
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setWarningGroupId(0);
Map<UdfFunc, String> udfmap = new HashMap<>();
UdfFunc udfFunc = new UdfFunc();
udfFunc.setArgTypes("1");
udfFunc.setId(1);
udfFunc.setResourceName("name1");
udfmap.put(udfFunc, "map1");
UdfFunc udfFunc2 = new UdfFunc();
udfFunc2.setArgTypes("2");
udfFunc2.setId(2);
udfFunc2.setResourceName("name2");
udfmap.put(udfFunc2, "map2");
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfmap);
String contextJson = JSONUtils.toJsonString(sqlTaskExecutionContext);
SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask);
assertEquals(sqlTaskExecutionContext.getWarningGroupId(), parseSqlTask.getWarningGroupId());
assertEquals(sqlTaskExecutionContext.getUdfFuncTenantCodeMap().size(), parseSqlTask.getUdfFuncTenantCodeMap().size());
}
/**
* test the SQLTaskExecutionContext
*/
@Test
public void testSqlTaskExecutionContextParse() {
// SQLTaskExecutionContext.udfFuncTenantCodeMap is null
String contextJson = "{\n"
+ " \"warningGroupId\":0,\n"
+ " \"connectionParams\":null,\n"
+ " \"udfFuncTenantCodeMap\":null"
+ "}\n}";
SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask);
assertEquals(0,parseSqlTask.getWarningGroupId());
assertNull(parseSqlTask.getUdfFuncTenantCodeMap());
// SQLTaskExecutionContext.udfFuncTenantCodeMap is not null
contextJson = "{\"warningGroupId\":0,"
+ "\"connectionParams\":null,"
+ "\"udfFuncTenantCodeMap\":{\""
+ "{\\\"id\\\":2,\\\"userId\\\":0,"
+ "\\\"funcName\\\":null,\\\"className\\\":null,\\\"argTypes\\\":\\\"2\\\",\\\"database\\\":null,"
+ "\\\"description\\\":null,\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name2\\\",\\\"type\\\":null,"
+ "\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map2\","
+ "\"{\\\"id\\\":1,\\\"userId\\\":0,\\\"funcName\\\":null,"
+ "\\\"className\\\":null,\\\"argTypes\\\":\\\"1\\\","
+ "\\\"database\\\":null,\\\"description\\\":null,"
+ "\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name1\\\","
+ "\\\"type\\\":null,\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map1\"}}\n";
SQLTaskExecutionContext parseSqlTask2 = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask2);
assertEquals(0,parseSqlTask2.getWarningGroupId());
assertEquals(2,parseSqlTask2.getUdfFuncTenantCodeMap().size());
}
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -32,12 +32,12 @@ import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import java.util.ArrayList;
import java.util.Date;

4
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/common/UiChannelFactory.java

@ -1,4 +1,4 @@
package org.apache.dolphinscheduler.spi.common;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.common;/*
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.common;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;

6
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java

@ -1,4 +1,4 @@
package org.apache.dolphinscheduler.spi.task;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -37,7 +39,7 @@ public abstract class AbstractParameters implements IParameters {
/**
* local parameters
*/
private List<Property> localParams;
protected List<Property> localParams;
/**
* var pool

28
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractTask.java

@ -60,6 +60,10 @@ public abstract class AbstractTask {
*/
protected volatile int exitStatusCode = -1;
protected boolean needAlert = false;
protected TaskAlertInfo taskAlertInfo;
/**
* constructor
*
@ -79,7 +83,6 @@ public abstract class AbstractTask {
return null;
}
/**
* task handle
*
@ -142,6 +145,22 @@ public abstract class AbstractTask {
this.resultString = resultString;
}
public boolean getNeedAlert() {
return needAlert;
}
public void setNeedAlert(boolean needAlert) {
this.needAlert = needAlert;
}
public TaskAlertInfo getTaskAlertInfo() {
return taskAlertInfo;
}
public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) {
this.taskAlertInfo = taskAlertInfo;
}
/**
* get task parameters
*
@ -149,13 +168,6 @@ public abstract class AbstractTask {
*/
public abstract AbstractParameters getParameters();
/**
* result processing maybe
*/
public void after() {
}
/**
* get exit status according to exitCode
*

51
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskAlertInfo.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
public class TaskAlertInfo {
private String title;
private String content;
private Integer alertGroupId;
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Integer getAlertGroupId() {
return alertGroupId;
}
public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}
}

4
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskChannel.java

@ -1,4 +1,4 @@
package org.apache.dolphinscheduler.spi.task;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -15,6 +15,8 @@ package org.apache.dolphinscheduler.spi.task;/*
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public interface TaskChannel {

5
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java

@ -222,6 +222,11 @@ public class TaskConstants {
public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0;
public static final int LOG_QUERY_LIMIT = 4096;
/**
* default display rows
*/
public static final int DEFAULT_DISPLAY_ROWS = 10;
/**
* jar
*/

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/DataxTaskExecutionContext.java → dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/DataxTaskExecutionContext.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.spi.task.request;
import java.io.Serializable;

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java → dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java

@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.spi.task.request;
import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.Serializable;
import java.util.Map;
@ -42,7 +42,7 @@ public class SQLTaskExecutionContext implements Serializable {
* udf function tenant code map
*/
@JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
private Map<UdfFunc,String> udfFuncTenantCodeMap;
private Map<UdfFuncRequest,String> udfFuncTenantCodeMap;
public int getWarningGroupId() {
@ -53,11 +53,11 @@ public class SQLTaskExecutionContext implements Serializable {
this.warningGroupId = warningGroupId;
}
public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
public Map<UdfFuncRequest, String> getUdfFuncTenantCodeMap() {
return udfFuncTenantCodeMap;
}
public void setUdfFuncTenantCodeMap(Map<UdfFunc, String> udfFuncTenantCodeMap) {
public void setUdfFuncTenantCodeMap(Map<UdfFuncRequest, String> udfFuncTenantCodeMap) {
this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java → dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SqoopTaskExecutionContext.java

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
package org.apache.dolphinscheduler.spi.task.request;
import java.io.Serializable;

39
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/TaskRequest.java

@ -188,6 +188,22 @@ public class TaskRequest {
private Map<String, Property> paramsMap;
/**
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
/**
* datax TaskExecutionContext
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
/**
* sqoop TaskExecutionContext
*/
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
public Map<String, String> getResources() {
return resources;
}
@ -428,4 +444,27 @@ public class TaskRequest {
this.delayTime = delayTime;
}
public SQLTaskExecutionContext getSqlTaskExecutionContext() {
return sqlTaskExecutionContext;
}
public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) {
this.sqlTaskExecutionContext = sqlTaskExecutionContext;
}
public DataxTaskExecutionContext getDataxTaskExecutionContext() {
return dataxTaskExecutionContext;
}
public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) {
this.dataxTaskExecutionContext = dataxTaskExecutionContext;
}
public SqoopTaskExecutionContext getSqoopTaskExecutionContext() {
return sqoopTaskExecutionContext;
}
public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) {
this.sqoopTaskExecutionContext = sqoopTaskExecutionContext;
}
}

231
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfFuncRequest.java

@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.KeyDeserializer;
/**
* udf function
*/
public class UdfFuncRequest {
/**
* id
*/
private int id;
/**
* user id
*/
private int userId;
/**
* udf function name
*/
private String funcName;
/**
* udf class name
*/
private String className;
/**
* udf argument types
*/
private String argTypes;
/**
* udf data base
*/
private String database;
/**
* udf description
*/
private String description;
/**
* resource id
*/
private int resourceId;
/**
* resource name
*/
private String resourceName;
/**
* udf function type: hive / spark
*/
private UdfType type;
/**
* create time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date createTime;
/**
* update time
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public String getFuncName() {
return funcName;
}
public void setFuncName(String funcName) {
this.funcName = funcName;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getArgTypes() {
return argTypes;
}
public void setArgTypes(String argTypes) {
this.argTypes = argTypes;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public int getResourceId() {
return resourceId;
}
public void setResourceId(int resourceId) {
this.resourceId = resourceId;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
public UdfType getType() {
return type;
}
public void setType(UdfType type) {
this.type = type;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UdfFuncRequest udfFuncRequest = (UdfFuncRequest) o;
if (id != udfFuncRequest.id) {
return false;
}
return !(funcName != null ? !funcName.equals(udfFuncRequest.funcName) : udfFuncRequest.funcName != null);
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (funcName != null ? funcName.hashCode() : 0);
return result;
}
@Override
public String toString() {
return JSONUtils.toJsonString(this);
}
public static class UdfFuncDeserializer extends KeyDeserializer {
@Override
public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
if (StringUtils.isBlank(key)) {
return null;
}
return JSONUtils.parseObject(key, UdfFuncRequest.class);
}
}
}

55
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/UdfType.java

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.spi.task.request;
/**
* UDF type
*/
public enum UdfType {
/**
* 0 hive; 1 spark
*/
HIVE(0, "hive"),
SPARK(1, "spark");
UdfType(int code, String descp) {
this.code = code;
this.descp = descp;
}
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
public static UdfType of(int type) {
for (UdfType ut : values()) {
if (ut.getCode() == type) {
return ut;
}
}
throw new IllegalArgumentException("invalid type : " + type);
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -65,8 +65,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
//todo 交给上层处理
shellCommandExecutor.cancelApplication();
// TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());
// if (status && taskInstance != null){

1
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java

@ -32,5 +32,4 @@ public class HttpTaskChannel implements TaskChannel {
public AbstractTask createTask(TaskRequest taskRequest) {
return new HttpTask(taskRequest);
}
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml

@ -26,6 +26,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-sql</artifactId>
<packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
@ -39,8 +40,15 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>dolphinscheduler-task-sql-${project.version}</finalName>
</build>
</project>

43
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlBinds.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.task.Property;
import java.util.Map;
/**
* Used to contains both prepared sql string and its to-be-bind parameters
*/
public class SqlBinds {
private final String sql;
private final Map<Integer, Property> paramsMap;
public SqlBinds(String sql, Map<Integer, Property> paramsMap) {
this.sql = sql;
this.paramsMap = paramsMap;
}
public String getSql() {
return sql;
}
public Map<Integer, Property> getParamsMap() {
return paramsMap;
}
}

294
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlParameters.java

@ -0,0 +1,294 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.enums.DataType;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Sql/Hql parameter
*/
public class SqlParameters extends AbstractParameters {
/**
* data source typeeg MYSQL, POSTGRES, HIVE ...
*/
private String type;
/**
* datasource id
*/
private int datasource;
/**
* sql
*/
private String sql;
/**
* sql type
* 0 query
* 1 NON_QUERY
*/
private int sqlType;
/**
* send email
*/
private Boolean sendEmail;
/**
* display rows
*/
private int displayRows;
/**
* udf list
*/
private String udfs;
/**
* show type
* 0 TABLE
* 1 TEXT
* 2 attachment
* 3 TABLE+attachment
*/
private String showType;
/**
* SQL connection parameters
*/
private String connParams;
/**
* Pre Statements
*/
private List<String> preStatements;
/**
* Post Statements
*/
private List<String> postStatements;
/**
* groupId
*/
private int groupId;
/**
* title
*/
private String title;
private int limit;
public int getLimit() {
return limit;
}
public void setLimit(int limit) {
this.limit = limit;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public int getDatasource() {
return datasource;
}
public void setDatasource(int datasource) {
this.datasource = datasource;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getUdfs() {
return udfs;
}
public void setUdfs(String udfs) {
this.udfs = udfs;
}
public int getSqlType() {
return sqlType;
}
public void setSqlType(int sqlType) {
this.sqlType = sqlType;
}
public Boolean getSendEmail() {
return sendEmail;
}
public void setSendEmail(Boolean sendEmail) {
this.sendEmail = sendEmail;
}
public int getDisplayRows() {
return displayRows;
}
public void setDisplayRows(int displayRows) {
this.displayRows = displayRows;
}
public String getShowType() {
return showType;
}
public void setShowType(String showType) {
this.showType = showType;
}
public String getConnParams() {
return connParams;
}
public void setConnParams(String connParams) {
this.connParams = connParams;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public List<String> getPreStatements() {
return preStatements;
}
public void setPreStatements(List<String> preStatements) {
this.preStatements = preStatements;
}
public List<String> getPostStatements() {
return postStatements;
}
public void setPostStatements(List<String> postStatements) {
this.postStatements = postStatements;
}
public int getGroupId() {
return groupId;
}
public void setGroupId(int groupId) {
this.groupId = groupId;
}
@Override
public boolean checkParameters() {
return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public void dealOutParam(String result) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty);
return;
}
List<Map<String, String>> sqlResult = getListMapByString(result);
if (CollectionUtils.isEmpty(sqlResult)) {
return;
}
//if sql return more than one line
if (sqlResult.size() > 1) {
Map<String, List<String>> sqlResultFormat = new HashMap<>();
//init sqlResultFormat
Set<String> keySet = sqlResult.get(0).keySet();
for (String key : keySet) {
sqlResultFormat.put(key, new ArrayList<>());
}
for (Map<String, String> info : sqlResult) {
for (String key : info.keySet()) {
sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
}
}
for (Property info : outProperty) {
if (info.getType() == DataType.LIST) {
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
varPool.add(info);
}
}
} else {
//result only one line
Map<String, String> firstRow = sqlResult.get(0);
for (Property info : outProperty) {
info.setValue(String.valueOf(firstRow.get(info.getProp())));
varPool.add(info);
}
}
}
@Override
public String toString() {
return "SqlParameters{"
+ "type='" + type + '\''
+ ", datasource=" + datasource
+ ", sql='" + sql + '\''
+ ", sqlType=" + sqlType
+ ", sendEmail=" + sendEmail
+ ", displayRows=" + displayRows
+ ", limit=" + limit
+ ", udfs='" + udfs + '\''
+ ", showType='" + showType + '\''
+ ", connParams='" + connParams + '\''
+ ", groupId='" + groupId + '\''
+ ", title='" + title + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ '}';
}
}

547
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -0,0 +1,547 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Direct;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest;
import org.apache.dolphinscheduler.spi.utils.CollectionUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class SqlTask extends AbstractYarnTask {
/**
* taskExecutionContext
*/
private TaskRequest taskExecutionContext;
/**
* sql parameters
*/
private SqlParameters sqlParameters;
/**
* base datasource
*/
private BaseConnectionParam baseConnectionParam;
/**
* create function format
*/
private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''";
/**
* Abstract Yarn Task
*
* @param taskRequest taskRequest
*/
public SqlTask(TaskRequest taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
assert sqlParameters != null;
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public AbstractParameters getParameters() {
return null;
}
@Override
protected String buildCommand() {
return null;
}
@Override
protected void setMainJarName() {
}
@Override
public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(TaskConstants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
sqlParameters.getSql(),
sqlParameters.getLocalParams(),
sqlParameters.getUdfs(),
sqlParameters.getShowType(),
sqlParameters.getConnParams(),
sqlParameters.getVarPool(),
sqlParameters.getLimit());
try {
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
// get datasource
baseConnectionParam = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.valueOf(sqlParameters.getType()),
sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
.orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
logger);
// execute sql task
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("sql task error: {}", e.toString());
throw e;
}
}
/**
* execute function and sql
*
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs) throws Exception {
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
try {
// create connection
connection = DatasourceUtil.getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam);
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection, createFuncs);
}
// pre sql
preSql(connection, preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
String result = null;
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
result = resultProcess(resultSet);
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
String updateResult = String.valueOf(stmt.executeUpdate());
result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
}
//deal out params
sqlParameters.dealOutParam(result);
postSql(connection, postStatementsBinds);
} catch (Exception e) {
logger.error("execute sql error: {}", e.getMessage());
throw e;
} finally {
close(resultSet, stmt, connection);
}
}
private String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
String result = null;
for (Property info : properties) {
if (Direct.OUT == info.getDirect()) {
List<Map<String, String>> updateRL = new ArrayList<>();
Map<String, String> updateRM = new HashMap<>();
updateRM.put(info.getProp(), updateResult);
updateRL.add(updateRM);
result = JSONUtils.toJsonString(updateRL);
break;
}
}
return result;
}
/**
* result process
*
* @param resultSet resultSet
* @throws Exception Exception
*/
private String resultProcess(ResultSet resultSet) throws Exception {
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
if (resultSet != null) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < sqlParameters.getLimit() && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, resultJSONArray.size());
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
String row = JSONUtils.toJsonString(resultJSONArray.get(i));
logger.info("row {} : {}", i + 1, row);
}
}
String result = JSONUtils.toJsonString(resultJSONArray);
if (sqlParameters.getSendEmail() == null || sqlParameters.getSendEmail()) {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle())
? sqlParameters.getTitle()
: taskExecutionContext.getTaskName() + " query result sets", result);
}
logger.debug("execute sql result : {}", result);
return result;
}
/**
* send alert as an attachment
*
* @param title title
* @param content content
*/
private void sendAttachment(int groupId, String title, String content) {
setNeedAlert(Boolean.TRUE);
TaskAlertInfo taskAlertInfo = new TaskAlertInfo();
taskAlertInfo.setAlertGroupId(groupId);
taskAlertInfo.setContent(content);
taskAlertInfo.setTitle(title);
}
/**
* pre sql
*
* @param connection connection
* @param preStatementsBinds preStatementsBinds
*/
private void preSql(Connection connection,
List<SqlBinds> preStatementsBinds) throws Exception {
for (SqlBinds sqlBind : preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
int result = pstmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}", result, sqlBind.getSql());
}
}
}
/**
* post sql
*
* @param connection connection
* @param postStatementsBinds postStatementsBinds
*/
private void postSql(Connection connection,
List<SqlBinds> postStatementsBinds) throws Exception {
for (SqlBinds sqlBind : postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)) {
int result = pstmt.executeUpdate();
logger.info("post statement execute result: {},for sql: {}", result, sqlBind.getSql());
}
}
}
/**
* create temp function
*
* @param connection connection
* @param createFuncs createFuncs
*/
private void createTempFunction(Connection connection,
List<String> createFuncs) throws Exception {
try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
}
}
}
/**
* close jdbc resource
*
* @param resultSet resultSet
* @param pstmt pstmt
* @param connection connection
*/
private void close(ResultSet resultSet,
PreparedStatement pstmt,
Connection connection) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
logger.error("close result set error : {}", e.getMessage(), e);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}", e.getMessage(), e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}", e.getMessage(), e);
}
}
}
/**
* preparedStatement bind
*
* @param connection connection
* @param sqlBinds sqlBinds
* @return PreparedStatement
* @throws Exception Exception
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) {
// is the timeout set
boolean timeoutFlag = taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED
|| taskExecutionContext.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
try (PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql())) {
if (timeoutFlag) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if (params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
} catch (Exception exception) {
throw new TaskException("SQL task prepareStatementAndBind error", exception);
}
}
/**
* regular expressions match the contents between two specified strings
*
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsPropsMap) {
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
if (prop == null) {
logger.error("setSqlParamsMap: No Property with paramName: {} is found in paramsPropsMap of task instance"
+ " with id: {}. So couldn't put Property in sqlParamsMap.", paramName, taskExecutionContext.getTaskInstanceId());
} else {
sqlParamsMap.put(index, prop);
index++;
logger.info("setSqlParamsMap: Property with paramName: {} put in sqlParamsMap of content {} successfully.", paramName, content);
}
}
}
/**
* print replace sql
*
* @param content content
* @param formatSql format sql
* @param rgex rgex
* @param sqlParamsMap sql params map
*/
private void printReplacedSql(String content, String formatSql, String rgex, Map<Integer, Property> sqlParamsMap) {
//parameter print style
logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
if (sqlParamsMap == null) {
logger.info("printReplacedSql: sqlParamsMap is null.");
} else {
for (int i = 1; i <= sqlParamsMap.size(); i++) {
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
}
}
logger.info("Sql Params are {}", logPrint);
}
/**
* ready to execute SQL and parameter entity Map
*
* @return SqlBinds
*/
private SqlBinds getSqlAndSqlParamsMap(String sql) {
Map<Integer, Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder();
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
// spell SQL according to the final user-defined variable
if (paramsMap == null) {
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
ParamUtils.convert(paramsMap));
logger.info("SQL title : {}", title);
sqlParameters.setTitle(title);
}
//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
//Replace the original value in sql !{...} ,Does not participate in precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex, "?");
sqlBuilder.append(formatSql);
// print repalce sql
printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
private String replaceOriginalValue(String content, String rgex, Map<String, Property> sqlParamsMap) {
Pattern pattern = Pattern.compile(rgex);
while (true) {
Matcher m = pattern.matcher(content);
if (!m.find()) {
break;
}
String paramName = m.group(1);
String paramValue = sqlParamsMap.get(paramName).getValue();
content = m.replaceFirst(paramValue);
}
return content;
}
/**
* create function list
*
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
* @param logger logger
* @return create function list
*/
public static List<String> createFuncs(Map<UdfFuncRequest, String> udfFuncTenantCodeMap, Logger logger) {
if (MapUtils.isEmpty(udfFuncTenantCodeMap)) {
logger.info("can't find udf function resource");
return null;
}
List<String> funcList = new ArrayList<>();
// build temp function sql
buildTempFuncSql(funcList, new ArrayList<>(udfFuncTenantCodeMap.keySet()));
return funcList;
}
/**
* build temp function sql
*
* @param sqls sql list
* @param udfFuncRequests udf function list
*/
private static void buildTempFuncSql(List<String> sqls, List<UdfFuncRequest> udfFuncRequests) {
if (CollectionUtils.isNotEmpty(udfFuncRequests)) {
for (UdfFuncRequest udfFuncRequest : udfFuncRequests) {
sqls.add(MessageFormat
.format(CREATE_FUNCTION_FORMAT, udfFuncRequest.getFuncName(), udfFuncRequest.getClassName()));
}
}
}
}

35
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannel.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class SqlTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public AbstractTask createTask(TaskRequest taskRequest) {
return new SqlTask(taskRequest);
}
}

41
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskChannelFactory.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class SqlTaskChannelFactory implements TaskChannelFactory {
@Override
public String getName() {
return "SQL";
}
@Override
public List<PluginParams> getParams() {
return null;
}
@Override
public TaskChannel create() {
return new SqlTaskChannel();
}
}

31
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskPlugin.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import com.google.common.collect.ImmutableList;
public class SqlTaskPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<TaskChannelFactory> getTaskChannelFactorys() {
return ImmutableList.of(new SqlTaskChannelFactory());
}
}

27
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlType.java

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.sql;
public enum SqlType {
/**
* sql type
* 0 query
* 1 NON_QUERY
*/
QUERY, NON_QUERY
}
Loading…
Cancel
Save