From 38fb2b4413ad73a4caffd46e3dacc770a07c29d8 Mon Sep 17 00:00:00 2001 From: Kirs Date: Thu, 21 Oct 2021 15:33:18 +0800 Subject: [PATCH] [fix-6570][task] add sql task udf resource load (#6570) (#6571) (#6579) * [fix-6570][task] add sql task udf resource load (#6570) -add sql task udf resource load -Fix UdfFuncRequest serialization * fix checkstyle * add defaultFS in sqlTaskContext * fix checkstyle Co-authored-by: mask <39329477+Narcasserun@users.noreply.github.com> --- .../api/controller/ResourcesController.java | 2 +- .../api/dto/resources/ResourceComponent.java | 6 ++-- .../api/service/ResourcesService.java | 2 +- .../service/impl/ResourcesServiceImpl.java | 2 +- .../api/service/impl/UsersServiceImpl.java | 2 +- .../controller/ResourcesControllerTest.java | 2 +- .../api/service/ResourcesServiceTest.java | 2 +- .../api/service/UsersServiceTest.java | 2 +- .../common/utils/HadoopUtils.java | 9 ++++- .../common/utils/HadoopUtilsTest.java | 2 +- .../dolphinscheduler/dao/entity/Resource.java | 9 ++--- .../dao/mapper/ResourceMapperTest.java | 2 +- .../master/runner/task/BaseTaskProcessor.java | 5 ++- .../runner/task/CommonTaskProcessorTest.java | 2 +- .../service/process/ProcessService.java | 2 +- dolphinscheduler-spi/pom.xml | 6 ++++ .../spi}/enums/ResourceType.java | 8 ++--- .../spi/task/TaskConstants.java | 9 ++++- .../task/request/SQLTaskExecutionContext.java | 27 ++++++++++---- .../plugin/task/util/CommonUtils.java | 35 +++++++++++++++++++ .../plugin/task/sql/SqlTask.java | 28 +++++++++++++-- 21 files changed, 131 insertions(+), 33 deletions(-) rename {dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common => dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi}/enums/ResourceType.java (91%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java index d8fc7c2932..0073ad0048 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java @@ -49,7 +49,7 @@ import org.apache.dolphinscheduler.api.service.UdfFuncService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.User; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java index 999fc88b04..c66172c2ec 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.dto.resources; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.util.ArrayList; import java.util.List; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + /** * resource component */ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java index f91da7756e..25a5af24b1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.io.IOException; import java.util.Map; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index f0cbcc9a7f..7835716551 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index 7e50a5416d..a45482e506 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java index 81bde4d119..fd46f48c6f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java @@ -25,7 +25,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.utils.JSONUtils; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index 65a087286b..cca867c8ac 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index 7252641602..429e849716 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.api.service.impl.UsersServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 8b03913d82..8f2e2340a2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -22,7 +22,7 @@ import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ResUploadType; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.exception.BaseException; import org.apache.commons.io.IOUtils; @@ -182,6 +182,13 @@ public class HadoopUtils implements Closeable { return configuration; } + /** + * @return DefaultFS + */ + public String getDefaultFS() { + return getConfiguration().get(Constants.FS_DEFAULTFS); + } + /** * get application url * diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java index de7f9a4489..a349cc6d69 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.common.utils; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java index 214fbe83e4..fa711ffe2c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java @@ -14,16 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import java.util.Date; -import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.common.enums.ResourceType; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; - -import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; @TableName("t_ds_resources") public class Resource { diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java index d32095d2a9..e2ba437c3a 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.dao.entity.Resource; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index e3a6c07ae4..4afa2cc0f4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner.task; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; @@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.EnumUtils; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; @@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; @@ -296,6 +297,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { DataSource datasource = processService.findDataSourceById(datasourceId); sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); + sqlTaskExecutionContext.setDefaultFS(HadoopUtils.getInstance().getDefaultFS()); + // whether udf type boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && !StringUtils.isEmpty(sqlParameters.getUdfs()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java index c0aeb7273a..b4e41cb06a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.dao.entity.DataSource; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 988a763ed5..044c6a470f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -37,7 +37,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.enums.ResourceType; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; diff --git a/dolphinscheduler-spi/pom.xml b/dolphinscheduler-spi/pom.xml index 01d5ef0bdd..774c5cfde2 100644 --- a/dolphinscheduler-spi/pom.xml +++ b/dolphinscheduler-spi/pom.xml @@ -66,6 +66,12 @@ slf4j-api provided + + com.baomidou + mybatis-plus-annotation + ${mybatis-plus.version} + provided + junit diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/ResourceType.java similarity index 91% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java rename to dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/ResourceType.java index 043402c2ae..8e80802b85 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/ResourceType.java @@ -14,22 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.common.enums; + +package org.apache.dolphinscheduler.spi.enums; import com.baomidou.mybatisplus.annotation.EnumValue; /** * resource type */ -public enum ResourceType { +public enum ResourceType { /** * 0 file, 1 udf */ FILE(0, "file"), UDF(1, "udf"); - - ResourceType(int code, String descp){ + ResourceType(int code, String descp) { this.code = code; this.descp = descp; } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java index 4e1002e725..65266bd5c3 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java @@ -324,4 +324,11 @@ public class TaskConstants { * Task Logger Thread's name */ public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; -} + + /** + * hdfs/s3 configuration + * resource.upload.path + */ + public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path"; + +} \ No newline at end of file diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java index b712b50a7f..c02cd4361c 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java @@ -17,12 +17,13 @@ package org.apache.dolphinscheduler.spi.task.request; -import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer; +import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest.UdfFuncDeserializer; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.io.Serializable; import java.util.Map; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + /** * SQL Task ExecutionContext */ @@ -44,6 +45,10 @@ public class SQLTaskExecutionContext implements Serializable { @JsonDeserialize(keyUsing = UdfFuncDeserializer.class) private Map udfFuncTenantCodeMap; + /** + * DefaultFS + */ + private String defaultFS; public int getWarningGroupId() { return warningGroupId; @@ -69,12 +74,20 @@ public class SQLTaskExecutionContext implements Serializable { this.connectionParams = connectionParams; } + public String getDefaultFS() { + return defaultFS; + } + + public void setDefaultFS(String defaultFS) { + this.defaultFS = defaultFS; + } + @Override public String toString() { - return "SQLTaskExecutionContext{" + - "warningGroupId=" + warningGroupId + - ", connectionParams='" + connectionParams + '\'' + - ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap + - '}'; + return "SQLTaskExecutionContext{" + + "warningGroupId=" + warningGroupId + + ", connectionParams='" + connectionParams + '\'' + + ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap + + ", defaultFS='" + defaultFS + '\'' + '}'; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java index 0ce90adc27..0c771389bd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java @@ -27,6 +27,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.LOGIN_USER_KEY_ import static org.apache.dolphinscheduler.spi.task.TaskConstants.RESOURCE_STORAGE_TYPE; import org.apache.dolphinscheduler.spi.enums.ResUploadType; +import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -40,6 +41,8 @@ import java.io.IOException; */ public class CommonUtils { + public static final String resourceUploadPath = PropertyUtils.getString(TaskConstants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); + private CommonUtils() { throw new UnsupportedOperationException("Construct CommonUtils"); } @@ -103,4 +106,36 @@ public class CommonUtils { return false; } + /** + * hdfs udf dir + * + * @param tenantCode tenant code + * @return get udf dir on hdfs + */ + public static String getHdfsUdfDir(String tenantCode) { + return String.format("%s/udfs", getHdfsTenantDir(tenantCode)); + } + + /** + * @param tenantCode tenant code + * @return file directory of tenants on hdfs + */ + public static String getHdfsTenantDir(String tenantCode) { + return String.format("%s/%s", getHdfsDataBasePath(), tenantCode); + } + + /** + * get data hdfs path + * + * @return data hdfs path + */ + public static String getHdfsDataBasePath() { + if ("/".equals(resourceUploadPath)) { + // if basepath is configured to /, the generated url may be //default/resources (with extra leading /) + return ""; + } else { + return resourceUploadPath; + } + } + } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index cc18d5e468..387b8be5f4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil; +import org.apache.dolphinscheduler.plugin.task.util.CommonUtils; import org.apache.dolphinscheduler.plugin.task.util.MapUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy; @@ -49,7 +50,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -146,7 +149,7 @@ public class SqlTask extends AbstractTaskExecutor { .collect(Collectors.toList()); List createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), - logger); + sqlTaskExecutionContext.getDefaultFS(), logger); // execute sql task executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); @@ -513,13 +516,17 @@ public class SqlTask extends AbstractTaskExecutor { * @param logger logger * @return create function list */ - public static List createFuncs(Map udfFuncTenantCodeMap, Logger logger) { + public static List createFuncs(Map udfFuncTenantCodeMap, String defaultFS, Logger logger) { if (MapUtils.isEmpty(udfFuncTenantCodeMap)) { logger.info("can't find udf function resource"); return null; } List funcList = new ArrayList<>(); + + // build jar sql + buildJarSql(funcList, udfFuncTenantCodeMap, defaultFS); + // build temp function sql buildTempFuncSql(funcList, new ArrayList<>(udfFuncTenantCodeMap.keySet())); @@ -541,4 +548,21 @@ public class SqlTask extends AbstractTaskExecutor { } } + /** + * build jar sql + * @param sqls sql list + * @param udfFuncTenantCodeMap key is udf function,value is tenant code + */ + private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap, String defaultFS) { + String resourceFullName; + Set> entries = udfFuncTenantCodeMap.entrySet(); + for (Map.Entry entry : entries) { + String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS; + String uploadPath = CommonUtils.getHdfsUdfDir(entry.getValue()); + resourceFullName = entry.getKey().getResourceName(); + resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName); + sqls.add(String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName)); + } + } + }