Browse Source

[fix-6570][task] add sql task udf resource load (#6570) (#6571)

* [fix-6570][task] add sql task udf resource load (#6570)

-add sql task udf resource load
-Fix UdfFuncRequest serialization

* fix checkstyle

* add defaultFS in sqlTaskContext

* fix checkstyle
3.0.0/version-upgrade
mask 3 years ago committed by GitHub
parent
commit
9b1984c244
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  6. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java
  7. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  8. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  9. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  10. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
  11. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java
  12. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
  13. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  14. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
  15. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  16. 6
      dolphinscheduler-spi/pom.xml
  17. 4
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/ResourceType.java
  18. 7
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
  19. 27
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java
  20. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java
  21. 28
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ResourcesController.java

@ -49,7 +49,7 @@ import org.apache.dolphinscheduler.api.service.UdfFuncService;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/resources/ResourceComponent.java

@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.dto.resources; package org.apache.dolphinscheduler.api.dto.resources;
import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
/** /**
* resource component * resource component
*/ */

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils; import org.apache.dolphinscheduler.common.utils.EncryptionUtils;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ResourcesControllerTest.java

@ -25,7 +25,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.api.service.impl.UsersServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils; import org.apache.dolphinscheduler.common.utils.EncryptionUtils;

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -22,7 +22,7 @@ import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.exception.BaseException; import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -182,6 +182,13 @@ public class HadoopUtils implements Closeable {
return configuration; return configuration;
} }
/**
* @return DefaultFS
*/
public String getDefaultFS() {
return getConfiguration().get(Constants.FS_DEFAULTFS);
}
/** /**
* get application url * get application url
* *

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java

@ -16,7 +16,7 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Resource.java

@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.util.Date;
@TableName("t_ds_resources") @TableName("t_ds_resources")
public class Resource { public class Resource {

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java

@ -24,7 +24,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.runner.task; package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.enums.UdfType;
@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
@ -296,6 +297,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
DataSource datasource = processService.findDataSourceById(datasourceId); DataSource datasource = processService.findDataSourceById(datasourceId);
sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams()); sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
sqlTaskExecutionContext.setDefaultFS(HadoopUtils.getInstance().getDefaultFS());
// whether udf type // whether udf type
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& !StringUtils.isEmpty(sqlParameters.getUdfs()); && !StringUtils.isEmpty(sqlParameters.getUdfs());

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java

@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -37,7 +37,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;

6
dolphinscheduler-spi/pom.xml

@ -66,6 +66,12 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-annotation</artifactId>
<version>${mybatis-plus.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java → dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/ResourceType.java

@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.enums;
package org.apache.dolphinscheduler.spi.enums;
import com.baomidou.mybatisplus.annotation.EnumValue; import com.baomidou.mybatisplus.annotation.EnumValue;
@ -28,7 +29,6 @@ public enum ResourceType {
FILE(0, "file"), FILE(0, "file"),
UDF(1, "udf"); UDF(1, "udf");
ResourceType(int code, String descp) { ResourceType(int code, String descp) {
this.code = code; this.code = code;
this.descp = descp; this.descp = descp;

7
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java

@ -324,4 +324,11 @@ public class TaskConstants {
* Task Logger Thread's name * Task Logger Thread's name
*/ */
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* hdfs/s3 configuration
* resource.upload.path
*/
public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
} }

27
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/request/SQLTaskExecutionContext.java

@ -17,12 +17,13 @@
package org.apache.dolphinscheduler.spi.task.request; package org.apache.dolphinscheduler.spi.task.request;
import org.apache.dolphinscheduler.spi.task.UdfFuncBean.UdfFuncDeserializer; import org.apache.dolphinscheduler.spi.task.request.UdfFuncRequest.UdfFuncDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
/** /**
* SQL Task ExecutionContext * SQL Task ExecutionContext
*/ */
@ -44,6 +45,10 @@ public class SQLTaskExecutionContext implements Serializable {
@JsonDeserialize(keyUsing = UdfFuncDeserializer.class) @JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
private Map<UdfFuncRequest,String> udfFuncTenantCodeMap; private Map<UdfFuncRequest,String> udfFuncTenantCodeMap;
/**
* DefaultFS
*/
private String defaultFS;
public int getWarningGroupId() { public int getWarningGroupId() {
return warningGroupId; return warningGroupId;
@ -69,12 +74,20 @@ public class SQLTaskExecutionContext implements Serializable {
this.connectionParams = connectionParams; this.connectionParams = connectionParams;
} }
public String getDefaultFS() {
return defaultFS;
}
public void setDefaultFS(String defaultFS) {
this.defaultFS = defaultFS;
}
@Override @Override
public String toString() { public String toString() {
return "SQLTaskExecutionContext{" + return "SQLTaskExecutionContext{"
"warningGroupId=" + warningGroupId + + "warningGroupId=" + warningGroupId
", connectionParams='" + connectionParams + '\'' + + ", connectionParams='" + connectionParams + '\''
", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap + + ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap
'}'; + ", defaultFS='" + defaultFS + '\'' + '}';
} }
} }

35
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/CommonUtils.java

@ -27,6 +27,7 @@ import static org.apache.dolphinscheduler.spi.task.TaskConstants.LOGIN_USER_KEY_
import static org.apache.dolphinscheduler.spi.task.TaskConstants.RESOURCE_STORAGE_TYPE; import static org.apache.dolphinscheduler.spi.task.TaskConstants.RESOURCE_STORAGE_TYPE;
import org.apache.dolphinscheduler.spi.enums.ResUploadType; import org.apache.dolphinscheduler.spi.enums.ResUploadType;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -40,6 +41,8 @@ import java.io.IOException;
*/ */
public class CommonUtils { public class CommonUtils {
public static final String resourceUploadPath = PropertyUtils.getString(TaskConstants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
private CommonUtils() { private CommonUtils() {
throw new UnsupportedOperationException("Construct CommonUtils"); throw new UnsupportedOperationException("Construct CommonUtils");
} }
@ -103,4 +106,36 @@ public class CommonUtils {
return false; return false;
} }
/**
* hdfs udf dir
*
* @param tenantCode tenant code
* @return get udf dir on hdfs
*/
public static String getHdfsUdfDir(String tenantCode) {
return String.format("%s/udfs", getHdfsTenantDir(tenantCode));
}
/**
* @param tenantCode tenant code
* @return file directory of tenants on hdfs
*/
public static String getHdfsTenantDir(String tenantCode) {
return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
}
/**
* get data hdfs path
*
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
if ("/".equals(resourceUploadPath)) {
// if basepath is configured to /, the generated url may be //default/resources (with extra leading /)
return "";
} else {
return resourceUploadPath;
}
}
} }

28
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.plugin.task.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil; import org.apache.dolphinscheduler.plugin.task.datasource.DatasourceUtil;
import org.apache.dolphinscheduler.plugin.task.util.CommonUtils;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils; import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.spi.enums.TaskTimeoutStrategy;
@ -49,7 +50,9 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -146,7 +149,7 @@ public class SqlTask extends AbstractTaskExecutor {
.collect(Collectors.toList()); .collect(Collectors.toList());
List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(), List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
logger); sqlTaskExecutionContext.getDefaultFS(), logger);
// execute sql task // execute sql task
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
@ -517,13 +520,17 @@ public class SqlTask extends AbstractTaskExecutor {
* @param logger logger * @param logger logger
* @return create function list * @return create function list
*/ */
public static List<String> createFuncs(Map<UdfFuncRequest, String> udfFuncTenantCodeMap, Logger logger) { public static List<String> createFuncs(Map<UdfFuncRequest, String> udfFuncTenantCodeMap, String defaultFS, Logger logger) {
if (MapUtils.isEmpty(udfFuncTenantCodeMap)) { if (MapUtils.isEmpty(udfFuncTenantCodeMap)) {
logger.info("can't find udf function resource"); logger.info("can't find udf function resource");
return null; return null;
} }
List<String> funcList = new ArrayList<>(); List<String> funcList = new ArrayList<>();
// build jar sql
buildJarSql(funcList, udfFuncTenantCodeMap, defaultFS);
// build temp function sql // build temp function sql
buildTempFuncSql(funcList, new ArrayList<>(udfFuncTenantCodeMap.keySet())); buildTempFuncSql(funcList, new ArrayList<>(udfFuncTenantCodeMap.keySet()));
@ -545,4 +552,21 @@ public class SqlTask extends AbstractTaskExecutor {
} }
} }
/**
* build jar sql
* @param sqls sql list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
*/
private static void buildJarSql(List<String> sqls, Map<UdfFuncRequest,String> udfFuncTenantCodeMap, String defaultFS) {
String resourceFullName;
Set<Entry<UdfFuncRequest, String>> entries = udfFuncTenantCodeMap.entrySet();
for (Map.Entry<UdfFuncRequest, String> entry : entries) {
String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS;
String uploadPath = CommonUtils.getHdfsUdfDir(entry.getValue());
resourceFullName = entry.getKey().getResourceName();
resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName);
sqls.add(String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName));
}
}
} }

Loading…
Cancel
Save