From 8f621ff98bba10b723014e8625d3cd9d10cb1170 Mon Sep 17 00:00:00 2001 From: WangJPLeo <103574007+WangJPLeo@users.noreply.github.com> Date: Thu, 30 Jun 2022 22:45:25 +0800 Subject: [PATCH] [Optimization] Calculate global parameter and local parameter at master. (#10704) * Global parameter and local parameter calculation external expansion. * k8s task ut fix. * TimePlaceholderUtils import DateUtils fix * follow the review comments to fix. * follow the review comments to fix. * e2e rerun --- .../impl/ProcessInstanceServiceImpl.java | 4 +- .../service/ProcessInstanceServiceTest.java | 6 +- .../dolphinscheduler/common/Constants.java | 2 + .../DolphinSchedulerCuringGlobalParams.java | 96 -------- .../placeholder/TimePlaceholderUtils.java | 2 +- .../common/utils/ParameterUtilsTest.java | 17 -- .../builder/TaskExecutionContextBuilder.java | 27 +++ .../master/runner/MasterSchedulerService.java | 4 +- .../runner/WorkflowExecuteRunnable.java | 11 +- .../master/runner/task/BaseTaskProcessor.java | 11 + .../master/WorkflowExecuteTaskTest.java | 7 +- .../service/expand/CuringGlobalParams.java | 228 ++++++++++++++++++ .../service/expand/CuringParamsService.java | 25 +- .../TimePlaceholderResolverExpandService.java | 2 +- ...ePlaceholderResolverExpandServiceImpl.java | 2 +- .../service/process/ProcessServiceImpl.java | 5 +- .../expand/CuringGlobalParamsServiceTest.java | 6 +- ...ePlaceholderResolverExpandServiceTest.java | 2 +- .../service/process/ProcessServiceTest.java | 4 +- .../plugin/task/api/TaskExecutionContext.java | 5 + .../plugin/task/api/parser/ParamUtils.java | 85 ------- .../task/api/parser/ParameterUtils.java | 56 ----- .../plugin/task/dq/DataQualityTask.java | 15 +- .../plugin/task/datax/DataxTask.java | 8 +- .../plugin/task/flink/FlinkTask.java | 8 +- .../plugin/task/http/HttpTask.java | 8 +- .../plugin/task/jupyter/JupyterTask.java | 8 +- .../plugin/task/k8s/K8sTask.java | 8 +- .../plugin/task/k8s/K8sTaskTest.java | 7 + .../plugin/task/mlflow/MlflowTask.java | 9 +- .../plugin/task/mr/MapReduceTask.java | 8 +- .../plugin/task/procedure/ProcedureTask.java | 2 +- .../plugin/task/python/PythonTask.java | 9 +- .../plugin/task/seatunnel/SeatunnelTask.java | 8 +- .../plugin/task/shell/ShellTask.java | 10 +- .../plugin/task/spark/SparkTask.java | 8 +- .../plugin/task/sql/SqlTask.java | 2 +- .../plugin/task/sqoop/SqoopTask.java | 9 +- .../worker/runner/TaskExecuteThread.java | 34 --- 39 files changed, 344 insertions(+), 424 deletions(-) delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java rename dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java => dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java (69%) rename {dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common => dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service}/expand/TimePlaceholderResolverExpandService.java (95%) rename {dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common => dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service}/expand/TimePlaceholderResolverExpandServiceImpl.java (95%) rename {dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common => dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service}/expand/CuringGlobalParamsServiceTest.java (97%) rename {dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common => dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service}/expand/TimePlaceholderResolverExpandServiceTest.java (97%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 3b9e3f89c7..9b4c34ad16 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -51,7 +51,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -155,7 +155,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce private ScheduleMapper scheduleMapper; @Autowired - private CuringGlobalParamsService curingGlobalParamsService; + private CuringParamsService curingGlobalParamsService; /** * return top n SUCCESS process instance order by running time which started between startTime and endTime diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index fe8149536d..039da1e397 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -36,8 +36,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; -import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -72,7 +71,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.springframework.beans.factory.annotation.Autowired; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; @@ -131,7 +129,7 @@ public class ProcessInstanceServiceTest { ScheduleMapper scheduleMapper; @Mock - CuringGlobalParamsService curingGlobalParamsService; + CuringParamsService curingGlobalParamsService; private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 5bc1d7657f..a081570f4a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -831,4 +831,6 @@ public final class Constants { public static final int USER_PASSWORD_MAX_LENGTH = 20; public static final int USER_PASSWORD_MIN_LENGTH = 2; + + public static final String FUNCTION_START_WITH = "$"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java deleted file mode 100644 index dde31af4fa..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.expand; - -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@Component -public class DolphinSchedulerCuringGlobalParams implements CuringGlobalParamsService { - - @Autowired - private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; - - @Override - public String convertParameterPlaceholders(String val, Map allParamMap) { - return ParameterUtils.convertParameterPlaceholders(val, allParamMap); - } - - @Override - public boolean timeFunctionNeedExpand(String placeholderName) { - return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName); - } - - @Override - public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) { - return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName); - } - - @Override - public String curingGlobalParams(Integer processInstanceId, Map globalParamMap, List globalParamList, CommandType commandType, Date scheduleTime, String timezone) { - if (globalParamList == null || globalParamList.isEmpty()) { - return null; - } - Map globalMap = new HashMap<>(); - if (globalParamMap != null) { - globalMap.putAll(globalParamMap); - } - Map allParamMap = new HashMap<>(); - //If it is a complement, a complement time needs to be passed in, according to the task type - Map timeParams = BusinessTimeUtils. - getBusinessTime(commandType, scheduleTime, timezone); - - if (timeParams != null) { - allParamMap.putAll(timeParams); - } - allParamMap.putAll(globalMap); - Set> entries = allParamMap.entrySet(); - Map resolveMap = new HashMap<>(); - for (Map.Entry entry : entries) { - String val = entry.getValue(); - if (val.startsWith("$")) { - String str = ""; - if (timeFunctionNeedExpand(val)) { - str = timeFunctionExtension(processInstanceId, timezone, val); - } else { - str = convertParameterPlaceholders(val, allParamMap); - } - resolveMap.put(entry.getKey(), str); - } - } - globalMap.putAll(resolveMap); - for (Property property : globalParamList) { - String val = globalMap.get(property.getProp()); - if (val != null) { - property.setValue(val); - } - } - return JSONUtils.toJsonString(globalParamList); - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java index ee176da299..28d96a5969 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java @@ -512,7 +512,7 @@ public class TimePlaceholderUtils { if (Character.isDigit(expression.charAt(index + 1))) { String addMinuteExpr = expression.substring(index + 1); - Date targetDate = org.apache.commons.lang.time.DateUtils + Date targetDate = org.apache.commons.lang3.time.DateUtils .addMinutes(date, calcMinutes(addMinuteExpr)); String dateFormat = expression.substring(0, index); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index 6f9bb89c0f..ba27032ad8 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -20,27 +20,16 @@ package org.apache.dolphinscheduler.common.utils; import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlaceholderUtils.replacePlaceholders; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService; -import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl; import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils; -import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; -import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import java.text.ParseException; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,12 +38,6 @@ import org.slf4j.LoggerFactory; public class ParameterUtilsTest { public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class); - @Mock - private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; - - @InjectMocks - private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl; - /** * Test convertParameterPlaceholders */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 6c7154b45d..962e42a98b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -29,7 +29,12 @@ import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; + +import java.util.Map; /** * TaskExecutionContext builder @@ -131,6 +136,28 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext); return this; } + + /** + * build global and local params + * @param propertyMap + * @return + */ + public TaskExecutionContextBuilder buildParamInfo(Map propertyMap) { + taskExecutionContext.setPrepareParamsMap(propertyMap); + return this; + } + + /** + * build business params + * @param businessParamsMap + * @return + */ + public TaskExecutionContextBuilder buildBusinessParamsMap(Map businessParamsMap) { + taskExecutionContext.setParamsMap(businessParamsMap); + return this; + } + + /** * create * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 5c43256623..3254d6c5b5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.SlotCheckState; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -95,7 +95,7 @@ public class MasterSchedulerService extends BaseDaemonThread { private StateWheelExecuteThread stateWheelExecuteThread; @Autowired - private CuringGlobalParamsService curingGlobalParamsService; + private CuringParamsService curingGlobalParamsService; private String masterAddress; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 42654665a2..25766d0a4c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -49,7 +49,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -239,7 +238,7 @@ public class WorkflowExecuteRunnable implements Runnable { /** * curing global params service */ - private final CuringGlobalParamsService curingGlobalParamsService; + private final CuringParamsService curingParamsService; /** * @param processInstance processInstance @@ -255,14 +254,14 @@ public class WorkflowExecuteRunnable implements Runnable { , ProcessAlertManager processAlertManager , MasterConfig masterConfig , StateWheelExecuteThread stateWheelExecuteThread - , CuringGlobalParamsService curingGlobalParamsService) { + , CuringParamsService curingParamsService) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; - this.curingGlobalParamsService = curingGlobalParamsService; + this.curingParamsService = curingParamsService; TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -1008,7 +1007,7 @@ public class WorkflowExecuteRunnable implements Runnable { if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { processInstance.setScheduleTime(complementListDate.get(0)); - String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), + String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(), processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 9af664e0e3..05348a2dd3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ConnectorType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType; import org.apache.dolphinscheduler.plugin.task.api.model.JdbcInfo; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; @@ -73,6 +74,7 @@ import org.apache.dolphinscheduler.plugin.task.k8s.K8sTaskParameters; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -123,6 +125,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected TaskPluginManager taskPluginManager; + protected CuringParamsService curingParamsService; + protected String threadLoggerInfoName; @Override @@ -130,6 +134,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { processService = SpringApplicationContext.getBean(ProcessService.class); masterConfig = SpringApplicationContext.getBean(MasterConfig.class); taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class); + curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class); this.taskInstance = taskInstance; this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); @@ -301,6 +306,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { setK8sTaskRelation(k8sTaskExecutionContext, taskInstance); } + Map businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance); + + AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build()); + Map propertyMap = curingParamsService.paramParsingPreparation(taskInstance, baseParam, processInstance); return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine()) @@ -309,6 +318,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { .buildResourceParametersInfo(resources) .buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext) .buildK8sTaskRelatedInfo(k8sTaskExecutionContext) + .buildBusinessParamsMap(businessParamsMap) + .buildParamInfo(propertyMap) .create(); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java index 8c7104ca30..097ba120cb 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java @@ -26,7 +26,7 @@ import static org.powermock.api.mockito.PowerMockito.mock; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -87,7 +86,7 @@ public class WorkflowExecuteTaskTest { private StateWheelExecuteThread stateWheelExecuteThread; - private CuringGlobalParamsService curingGlobalParamsService; + private CuringParamsService curingGlobalParamsService; @Before public void init() throws Exception { @@ -116,7 +115,7 @@ public class WorkflowExecuteTaskTest { Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); stateWheelExecuteThread = mock(StateWheelExecuteThread.class); - curingGlobalParamsService = mock(CuringGlobalParamsService.class); + curingGlobalParamsService = mock(CuringParamsService.class); workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService)); // prepareProcess init dag Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java new file mode 100644 index 0000000000..9055efd2a2 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.expand; + +import lombok.NonNull; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID; + +@Component +public class CuringGlobalParams implements CuringParamsService { + + @Autowired + private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; + + @Override + public String convertParameterPlaceholders(String val, Map allParamMap) { + return ParameterUtils.convertParameterPlaceholders(val, allParamMap); + } + + @Override + public boolean timeFunctionNeedExpand(String placeholderName) { + return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName); + } + + @Override + public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) { + return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName); + } + + /** + * here it is judged whether external expansion calculation is required and the calculation result is obtained + * @param processInstanceId + * @param globalParamMap + * @param globalParamList + * @param commandType + * @param scheduleTime + * @param timezone + * @return + */ + @Override + public String curingGlobalParams(Integer processInstanceId, Map globalParamMap, List globalParamList, CommandType commandType, Date scheduleTime, String timezone) { + if (globalParamList == null || globalParamList.isEmpty()) { + return null; + } + Map globalMap = new HashMap<>(); + if (globalParamMap != null) { + globalMap.putAll(globalParamMap); + } + Map allParamMap = new HashMap<>(); + //If it is a complement, a complement time needs to be passed in, according to the task type + Map timeParams = BusinessTimeUtils. + getBusinessTime(commandType, scheduleTime, timezone); + + if (timeParams != null) { + allParamMap.putAll(timeParams); + } + allParamMap.putAll(globalMap); + Set> entries = allParamMap.entrySet(); + Map resolveMap = new HashMap<>(); + for (Map.Entry entry : entries) { + String val = entry.getValue(); + if (val.startsWith(Constants.FUNCTION_START_WITH)) { + String str = ""; + // whether external scaling calculation is required + if (timeFunctionNeedExpand(val)) { + str = timeFunctionExtension(processInstanceId, timezone, val); + } else { + str = convertParameterPlaceholders(val, allParamMap); + } + resolveMap.put(entry.getKey(), str); + } + } + globalMap.putAll(resolveMap); + for (Property property : globalParamList) { + String val = globalMap.get(property.getProp()); + if (val != null) { + property.setValue(val); + } + } + return JSONUtils.toJsonString(globalParamList); + } + + /** + * the global parameters and local parameters used in the worker will be prepared here. + * + * @param taskInstance + * @param parameters + * @param processInstance + * @return + */ + @Override + public Map paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters parameters, @NonNull ProcessInstance processInstance) { + // assign value to definedParams here + Map globalParamsMap = setGlobalParamsMap(processInstance); + Map globalParams = ParamUtils.getUserDefParamsMap(globalParamsMap); + CommandType commandType = processInstance.getCmdTypeIfComplement(); + Date scheduleTime = processInstance.getScheduleTime(); + + // combining local and global parameters + Map localParams = parameters.getInputLocalParametersMap(); + + //stream pass params + Map varParams = parameters.getVarPoolMap(); + + if (globalParams.isEmpty() && localParams.isEmpty() && varParams.isEmpty()) { + return null; + } + // if it is a complement, + // you need to pass in the task instance id to locate the time + // of the process instance complement + Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + String timeZone = cmdParam.get(Constants.SCHEDULE_TIMEZONE); + Map params = BusinessTimeUtils.getBusinessTime(commandType, scheduleTime, timeZone); + + if (globalParamsMap != null) { + params.putAll(globalParamsMap); + } + + if (StringUtils.isNotBlank(taskInstance.getExecutePath())) { + params.put(PARAMETER_TASK_EXECUTE_PATH, taskInstance.getExecutePath()); + } + params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskInstance.getId())); + + if (varParams.size() != 0) { + globalParams.putAll(varParams); + } + if (localParams.size() != 0) { + globalParams.putAll(localParams); + } + + Iterator> iter = globalParams.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry en = iter.next(); + Property property = en.getValue(); + + if (StringUtils.isNotEmpty(property.getValue()) && property.getValue().startsWith(Constants.FUNCTION_START_WITH)) { + /** + * local parameter refers to global parameter with the same name + * note: the global parameters of the process instance here are solidified parameters, + * and there are no variables in them. + */ + String val = property.getValue(); + // whether external scaling calculation is required + if (timeFunctionNeedExpand(val)) { + val = timeFunctionExtension(taskInstance.getProcessInstanceId(), timeZone, val); + } else { + val = convertParameterPlaceholders(val, params); + } + property.setValue(val); + } + } + if (MapUtils.isEmpty(globalParams)) { + globalParams = new HashMap<>(); + } + // put schedule time param to params map + Map paramsMap = preBuildBusinessParams(processInstance); + if (MapUtils.isNotEmpty(paramsMap)) { + globalParams.putAll(paramsMap); + } + return globalParams; + } + + private Map setGlobalParamsMap(ProcessInstance processInstance) { + Map globalParamsMap = new HashMap<>(16); + + // global params string + String globalParamsStr = processInstance.getGlobalParams(); + if (globalParamsStr != null) { + List globalParamsList = JSONUtils.toList(globalParamsStr, Property.class); + globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue))); + } + return globalParamsMap; + } + + @Override + public Map preBuildBusinessParams(ProcessInstance processInstance) { + Map paramsMap = new HashMap<>(); + // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job + if (processInstance.getScheduleTime() != null) { + Date date = processInstance.getScheduleTime(); + String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME, null); + Property p = new Property(); + p.setValue(dateTime); + p.setProp(Constants.PARAMETER_DATETIME); + paramsMap.put(Constants.PARAMETER_DATETIME, p); + } + return paramsMap; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java similarity index 69% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java index 63ea658c9f..3342bd6bba 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java @@ -15,16 +15,21 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.expand; +package org.apache.dolphinscheduler.service.expand; +import lombok.NonNull; import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import java.util.Date; import java.util.List; import java.util.Map; -public interface CuringGlobalParamsService { +public interface CuringParamsService { /** * time function need expand @@ -61,4 +66,20 @@ public interface CuringGlobalParamsService { * @return */ String curingGlobalParams(Integer processInstanceId, Map globalParamMap, List globalParamList, CommandType commandType, Date scheduleTime, String timezone); + + /** + * param parsing preparation + * @param parameters + * @param taskInstance + * @param processInstance + * @return + */ + Map paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters parameters, @NonNull ProcessInstance processInstance); + + /** + * preBuildBusinessParams + * @param processInstance + * @return + */ + Map preBuildBusinessParams(ProcessInstance processInstance); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java similarity index 95% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java index bdd811522c..98a0bb2b00 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.expand; +package org.apache.dolphinscheduler.service.expand; public interface TimePlaceholderResolverExpandService { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java similarity index 95% rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java index b37fcf076b..2ac37b0887 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.expand; +package org.apache.dolphinscheduler.service.expand; import org.springframework.stereotype.Component; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 97d1266830..057d6e18a5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.DagData; import org.apache.dolphinscheduler.dao.entity.DataSource; @@ -273,7 +272,7 @@ public class ProcessServiceImpl implements ProcessService { private K8sMapper k8sMapper; @Autowired - private CuringGlobalParamsService curingGlobalParamsService; + private CuringParamsService curingGlobalParamsService; /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java similarity index 97% rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java index f6ea074aa0..e89748446c 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java @@ -16,7 +16,7 @@ */ -package org.apache.dolphinscheduler.common.expand; +package org.apache.dolphinscheduler.service.expand; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -45,10 +45,10 @@ public class CuringGlobalParamsServiceTest { private static final String placeHolderName = "$[yyyy-MM-dd-1]"; @Mock - private CuringGlobalParamsService curingGlobalParamsService; + private CuringParamsService curingGlobalParamsService; @InjectMocks - private DolphinSchedulerCuringGlobalParams dolphinSchedulerCuringGlobalParams; + private CuringGlobalParams dolphinSchedulerCuringGlobalParams; @Mock private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService; diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java similarity index 97% rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java index 542869530e..08d04be5c5 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.common.expand; +package org.apache.dolphinscheduler.service.expand; import org.apache.commons.lang3.StringUtils; import org.junit.Assert; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 0dd9734bd1..2e65a0cb5b 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService; +import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -164,7 +164,7 @@ public class ProcessServiceTest { private ScheduleMapper scheduleMapper; @Mock - CuringGlobalParamsService curingGlobalParamsService; + CuringParamsService curingGlobalParamsService; @Test public void testCreateSubCommand() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 2ada887868..c1276d5806 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -182,6 +182,11 @@ public class TaskExecutionContext implements Serializable { */ private Map definedParams; + /** + * prepare params map + */ + private Map prepareParamsMap; + /** * task AppId */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java index a59e1bf28d..15a7c14b76 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java @@ -17,104 +17,19 @@ package org.apache.dolphinscheduler.plugin.task.api.parser; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID; - -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; -import org.apache.dolphinscheduler.spi.enums.CommandType; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; -import org.apache.dolphinscheduler.spi.utils.StringUtils; -import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import com.google.common.base.Preconditions; - /** * param utils */ public class ParamUtils { - /** - * parameter conversion - * Warning: - * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified. - * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current - * situation is wrong. So I cannot modify the original logic. - * - * @param taskExecutionContext the context of this task instance - * @param parameters the parameters - * @return global params - * - */ - public static Map convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { - Preconditions.checkNotNull(taskExecutionContext); - Preconditions.checkNotNull(parameters); - Map globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams()); - Map globalParamsMap = taskExecutionContext.getDefinedParams(); - CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); - Date scheduleTime = taskExecutionContext.getScheduleTime(); - - // combining local and global parameters - Map localParams = parameters.getInputLocalParametersMap(); - - //stream pass params - Map varParams = parameters.getVarPoolMap(); - - if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) { - return null; - } - // if it is a complement, - // you need to pass in the task instance id to locate the time - // of the process instance complement - Map params = BusinessTimeUtils - .getBusinessTime(commandType, - scheduleTime); - - if (globalParamsMap != null) { - - params.putAll(globalParamsMap); - } - - if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) { - params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath()); - } - params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId())); - - if (varParams.size() != 0) { - globalParams.putAll(varParams); - } - if (localParams.size() != 0) { - globalParams.putAll(localParams); - } - - Iterator> iter = globalParams.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - Property property = en.getValue(); - - if (StringUtils.isNotEmpty(property.getValue()) - && property.getValue().startsWith("$")) { - /** - * local parameter refers to global parameter with the same name - * note: the global parameters of the process instance here are solidified parameters, - * and there are no variables in them. - */ - String val = property.getValue(); - - val = ParameterUtils.convertParameterPlaceholders(val, params); - property.setValue(val); - } - } - - return globalParams; - } - /** * format convert * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java index 383424f71a..8da11db459 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java @@ -22,19 +22,15 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_SHECDULE_TIME; import org.apache.dolphinscheduler.plugin.task.api.model.Property; -import org.apache.dolphinscheduler.spi.enums.CommandType; import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; import org.apache.dolphinscheduler.spi.utils.DateUtils; -import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.sql.PreparedStatement; import java.util.Date; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -152,58 +148,6 @@ public class ParameterUtils { } } - /** - * curing user define parameters - * - * @param globalParamMap global param map - * @param globalParamList global param list - * @param commandType command type - * @param scheduleTime schedule time - * @return curing user define parameters - */ - public static String curingGlobalParams(Map globalParamMap, List globalParamList, - CommandType commandType, Date scheduleTime) { - - if (globalParamList == null || globalParamList.isEmpty()) { - return null; - } - - Map globalMap = new HashMap<>(); - if (globalParamMap != null) { - globalMap.putAll(globalParamMap); - } - Map allParamMap = new HashMap<>(); - //If it is a complement, a complement time needs to be passed in, according to the task type - Map timeParams = BusinessTimeUtils - .getBusinessTime(commandType, scheduleTime); - - if (timeParams != null) { - allParamMap.putAll(timeParams); - } - - allParamMap.putAll(globalMap); - - Set> entries = allParamMap.entrySet(); - - Map resolveMap = new HashMap<>(); - for (Map.Entry entry : entries) { - String val = entry.getValue(); - if (val.startsWith("$")) { - String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap); - resolveMap.put(entry.getKey(), str); - } - } - globalMap.putAll(resolveMap); - - for (Property property : globalParamList) { - String val = globalMap.get(property.getProp()); - if (val != null) { - property.setValue(val); - } - } - return JSONUtils.toJsonString(globalParamList); - } - /** * $[yyyyMMdd] replace schedule time */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 039987da0e..578feec68d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -165,19 +165,8 @@ public class DataQualityTask extends AbstractYarnTask { args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters())); // replace placeholder - Map paramsMap = ParamUtils.convert(dqTaskExecutionContext,getParameters()); - - String command = null; - - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - - if (MapUtils.isNotEmpty(dqTaskExecutionContext.getParamsMap())) { - paramsMap.putAll(dqTaskExecutionContext.getParamsMap()); - } - - command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); + Map paramsMap = dqTaskExecutionContext.getPrepareParamsMap(); + String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); logger.info("data quality task command: {}", command); return command; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 10dc3dcf8c..49ec953725 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -150,13 +150,7 @@ public class DataxTask extends AbstractTaskExecutor { public void handle() throws Exception { try { // replace placeholder,and combine local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); // run datax procesDataSourceService.s String jsonFilePath = buildDataxJsonFile(paramsMap); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index aa8b26698b..f04282a144 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -160,13 +160,7 @@ public class FlinkTask extends AbstractYarnTask { String mainArgs = flinkParameters.getMainArgs(); if (StringUtils.isNotEmpty(mainArgs)) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap))); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index c829ad0736..47bb7e9b21 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -124,13 +124,7 @@ public class HttpTask extends AbstractTaskExecutor { RequestBuilder builder = createRequestBuilder(); // replace placeholder,and combine local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); List httpPropertyList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 96f7882797..cec72c9601 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -127,13 +127,7 @@ public class JupyterTask extends AbstractTaskExecutor { args.addAll(populateJupyterOptions()); // replace placeholder, and combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java index a5ce9992f1..64b4f55a3d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java @@ -65,13 +65,7 @@ public class K8sTask extends AbstractK8sTask { @Override protected String buildCommand() { K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters(); - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace()); String namespaceName = namespace.get(NAMESPACE_NAME); String clusterName = namespace.get(CLUSTER); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java index 60506999e4..84c1d895f4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java @@ -65,6 +65,13 @@ public class K8sTaskTest { Map paramsMap = new HashMap<>(); paramsMap.put(DAY,property); taskRequest.setParamsMap(paramsMap); + + Map prepareParamsMap = new HashMap<>(); + Property property1 = new Property(); + property1.setProp("day"); + property1.setValue("20220507"); + prepareParamsMap.put("day", property1); + taskRequest.setPrepareParamsMap(prepareParamsMap); k8sTask = new K8sTask(taskRequest); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index 03fd4dce03..e1e13d17c5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -193,14 +193,7 @@ public class MlflowTask extends AbstractTaskExecutor { private Map getParamsMap() { // replace placeholder, and combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } - return paramsMap; + return taskExecutionContext.getPrepareParamsMap(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 588dcda184..0a519dc65f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -79,13 +79,7 @@ public class MapReduceTask extends AbstractYarnTask { setMainJarName(); // replace placeholder,and combine local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); mapreduceParameters.setMainArgs(args); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 4f61ef1022..fc886983e5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -103,7 +103,7 @@ public class ProcedureTask extends AbstractTaskExecutor { // get jdbc connection connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); Map sqlParamsMap = new HashMap<>(); - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); String proceduerSql = formatSql(sqlParamsMap, paramsMap); // call method stmt = connection.prepareCall(proceduerSql); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 1e5f8f8af3..f7f94736dc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -217,14 +217,7 @@ public class PythonTask extends AbstractTaskExecutor { protected Map mergeParamsWithContext(AbstractParameters parameters) { // replace placeholder - Map paramsMap = ParamUtils.convert(taskRequest, parameters); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) { - paramsMap.putAll(taskRequest.getParamsMap()); - } - return paramsMap; + return taskRequest.getPrepareParamsMap(); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index b5270a7221..2ba758d5ad 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -158,13 +158,7 @@ public class SeatunnelTask extends AbstractTaskExecutor { private String parseScript(String script) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 441deb495b..b5529bc8eb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.SystemUtils; import java.io.File; @@ -41,7 +40,6 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -167,13 +165,7 @@ public class ShellTask extends AbstractTaskExecutor { private String parseScript(String script) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 152cbb22ce..b30ac64ae6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -118,13 +118,7 @@ public class SparkTask extends AbstractYarnTask { args.addAll(populateSparkOptions()); // replace placeholder, and combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 419118803c..ef47c07133 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -405,7 +405,7 @@ public class SqlTask extends AbstractTaskExecutor { StringBuilder sqlBuilder = new StringBuilder(); // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); // spell SQL according to the final user-defined variable if (paramsMap == null) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index d0e58911a2..fe4c428576 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -77,14 +77,7 @@ public class SqoopTask extends AbstractYarnTask { String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); - - if (MapUtils.isEmpty(paramsMap)) { - paramsMap = new HashMap<>(); - } - if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) { - paramsMap.putAll(taskExecutionContext.getParamsMap()); - } + Map paramsMap = taskExecutionContext.getPrepareParamsMap(); String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); logger.info("sqoop script: {}", resultScripts); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 7d49005395..2aa34a21fd 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -155,14 +155,11 @@ public class TaskExecuteThread implements Runnable, Delayed { } taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); - taskExecutionContext.setDefinedParams(getGlobalParamsMap()); taskExecutionContext.setTaskAppId(String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); - preBuildBusinessParams(); - TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()); if (null == taskChannel) { throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); @@ -252,23 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed { } } - /** - * get global paras map - * - * @return map - */ - private Map getGlobalParamsMap() { - Map globalParamsMap = new HashMap<>(16); - - // global params string - String globalParamsStr = taskExecutionContext.getGlobalParams(); - if (globalParamsStr != null) { - List globalParamsList = JSONUtils.toList(globalParamsStr, Property.class); - globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue))); - } - return globalParamsMap; - } - /** * kill task */ @@ -359,18 +339,4 @@ public class TaskExecuteThread implements Runnable, Delayed { public AbstractTask getTask() { return task; } - - private void preBuildBusinessParams() { - Map paramsMap = new HashMap<>(); - // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job - if (taskExecutionContext.getScheduleTime() != null) { - Date date = taskExecutionContext.getScheduleTime(); - String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME, null); - Property p = new Property(); - p.setValue(dateTime); - p.setProp(Constants.PARAMETER_DATETIME); - paramsMap.put(Constants.PARAMETER_DATETIME, p); - } - taskExecutionContext.setParamsMap(paramsMap); - } }