diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 6f7d8f43d2..f8b705179a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler import org.apache.dolphinscheduler.dao.DaoConfiguration; import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.entity.PluginDefine; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider; import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; @@ -69,6 +70,7 @@ public class ApiApplicationServer { log.info("Received spring application context ready event will load taskPlugin and write to DB"); // install task plugin TaskPluginManager.loadPlugin(); + DataSourceProcessorProvider.initialize(); for (Map.Entry entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) { String taskPluginName = entry.getKey(); TaskChannelFactory taskChannelFactory = entry.getValue(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 7469d8db13..49e42da561 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -367,10 +367,15 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst } for (TaskInstance taskInstance : needToDeleteTaskInstances) { if (StringUtils.isNotBlank(taskInstance.getLogPath())) { - ILogService iLogService = - SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), - ILogService.class); - iLogService.removeTaskInstanceLog(taskInstance.getLogPath()); + try { + // Remove task instance log failed will not affect the deletion of task instance + ILogService iLogService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), + ILogService.class); + iLogService.removeTaskInstanceLog(taskInstance.getLogPath()); + } catch (Exception ex) { + log.error("Remove task instance log error", ex); + } } } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java index 751ac1ba08..973421615f 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java @@ -37,6 +37,10 @@ public class DataSourceProcessorProvider { private DataSourceProcessorProvider() { } + public static void initialize() { + log.info("Initialize DataSourceProcessorProvider"); + } + public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) { return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name()); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7988570135..9b507500b9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.DaoConfiguration; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider; import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.registry.api.RegistryConfiguration; @@ -108,6 +109,7 @@ public class MasterServer implements IStoppable { // install task plugin TaskPluginManager.loadPlugin(); + DataSourceProcessorProvider.initialize(); this.masterSlotManager.start(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 725b7e000a..72c52922d7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -57,11 +57,10 @@ import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; -import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.StateEventHandleError; @@ -97,6 +96,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -376,7 +376,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { if (taskInstance.getState().isSuccess()) { completeTaskSet.add(taskInstance.getTaskCode()); - mergeTaskInstanceVarPool(taskInstance); + workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString( + Lists.newArrayList(workflowInstance.getVarPool(), taskInstance.getVarPool()))); processInstanceDao.upsertProcessInstance(workflowInstance); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); @@ -441,7 +442,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { /** * crate new task instance to retry, different objects from the original - * */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); @@ -532,16 +532,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } - /** - * check if task instance exist by id - */ - public boolean checkTaskInstanceById(int taskInstanceId) { - if (taskInstanceMap.isEmpty()) { - return false; - } - return taskInstanceMap.containsKey(taskInstanceId); - } - /** * get task instance from memory */ @@ -1070,7 +1060,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { * new a taskInstance * * @param processInstance process instance - * @param taskNode task node + * @param taskNode task node * @return task instance */ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode taskNode) { @@ -1161,80 +1151,32 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return taskInstance; } - public void getPreVarPool(TaskInstance taskInstance, Set preTask) { + void initializeTaskInstanceVarPool(TaskInstance taskInstance) { + // get pre task ,get all the task varPool to this task + // Do not use dag.getPreviousNodes because of the dag may be miss the upstream node + String preTasks = + workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()).getPreTasks(); + Set preTaskList = new HashSet<>(JSONUtils.toList(preTasks, Long.class)); ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); - Map allProperty = new HashMap<>(); - Map allTaskInstance = new HashMap<>(); - if (CollectionUtils.isNotEmpty(preTask)) { - for (Long preTaskCode : preTask) { - Optional existTaskInstanceOptional = getTaskInstance(preTaskCode); - if (!existTaskInstanceOptional.isPresent()) { - continue; - } - Integer taskId = existTaskInstanceOptional.get().getId(); - if (taskId == null) { - continue; - } - TaskInstance preTaskInstance = taskInstanceMap.get(taskId); - if (preTaskInstance == null) { - continue; - } - String preVarPool = preTaskInstance.getVarPool(); - if (StringUtils.isNotEmpty(preVarPool)) { - List properties = JSONUtils.toList(preVarPool, Property.class); - for (Property info : properties) { - setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info); - } - } - } - if (allProperty.size() > 0) { - taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); - } - } else { - if (StringUtils.isNotEmpty(workflowInstance.getVarPool())) { - taskInstance.setVarPool(workflowInstance.getVarPool()); - } + if (CollectionUtils.isEmpty(preTaskList)) { + taskInstance.setVarPool(workflowInstance.getVarPool()); + return; } + List preTaskInstanceVarPools = preTaskList + .stream() + .map(taskCode -> getTaskInstance(taskCode).orElse(null)) + .filter(Objects::nonNull) + .sorted(Comparator.comparing(TaskInstance::getEndTime)) + .map(TaskInstance::getVarPool) + .collect(Collectors.toList()); + taskInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(preTaskInstanceVarPools)); } public Collection getAllTaskInstances() { return taskInstanceMap.values(); } - private void setVarPoolValue(Map allProperty, - Map allTaskInstance, - TaskInstance preTaskInstance, Property thisProperty) { - // for this taskInstance all the param in this part is IN. - thisProperty.setDirect(Direct.IN); - // get the pre taskInstance Property's name - String proName = thisProperty.getProp(); - // if the Previous nodes have the Property of same name - if (allProperty.containsKey(proName)) { - // comparison the value of two Property - Property otherPro = allProperty.get(proName); - // if this property'value of loop is empty,use the other,whether the other's value is empty or not - if (StringUtils.isEmpty(thisProperty.getValue())) { - allProperty.put(proName, otherPro); - // if property'value of loop is not empty,and the other's value is not empty too, use the earlier value - } else if (StringUtils.isNotEmpty(otherPro.getValue())) { - TaskInstance otherTask = allTaskInstance.get(proName); - if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) { - allProperty.put(proName, thisProperty); - allTaskInstance.put(proName, preTaskInstance); - } else { - allProperty.put(proName, otherPro); - } - } else { - allProperty.put(proName, thisProperty); - allTaskInstance.put(proName, preTaskInstance); - } - } else { - allProperty.put(proName, thisProperty); - allTaskInstance.put(proName, preTaskInstance); - } - } - /** * get complete task instance map, taskCode as key */ @@ -1311,45 +1253,10 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } // the end node of the branch of the dag if (parentNodeCode != null && dag.getEndNode().contains(parentNodeCode)) { - Optional existTaskInstanceOptional = getTaskInstance(parentNodeCode); - if (existTaskInstanceOptional.isPresent()) { - TaskInstance endTaskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); - String taskInstanceVarPool = endTaskInstance.getVarPool(); - if (StringUtils.isNotEmpty(taskInstanceVarPool)) { - Set taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class)); - String processInstanceVarPool = workflowInstance.getVarPool(); - List processGlobalParams = - new ArrayList<>(JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class)); - Map oldProcessGlobalParamsMap = processGlobalParams.stream() - .collect(Collectors.toMap(Property::getProp, Property::getDirect)); - Set processVarPoolOut = taskProperties.stream() - .filter(property -> property.getDirect().equals(Direct.OUT) - && oldProcessGlobalParamsMap.containsKey(property.getProp()) - && oldProcessGlobalParamsMap.get(property.getProp()).equals(Direct.OUT)) - .collect(Collectors.toSet()); - Set taskVarPoolIn = - taskProperties.stream().filter(property -> property.getDirect().equals(Direct.IN)) - .collect(Collectors.toSet()); - if (StringUtils.isNotEmpty(processInstanceVarPool)) { - Set properties = - new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class)); - Set newProcessVarPoolKeys = - taskProperties.stream().map(Property::getProp).collect(Collectors.toSet()); - properties = properties.stream() - .filter(property -> !newProcessVarPoolKeys.contains(property.getProp())) - .collect(Collectors.toSet()); - properties.addAll(processVarPoolOut); - properties.addAll(taskVarPoolIn); - - workflowInstance.setVarPool(JSONUtils.toJsonString(properties)); - } else { - Set varPool = new HashSet<>(); - varPool.addAll(taskVarPoolIn); - varPool.addAll(processVarPoolOut); - workflowInstance.setVarPool(JSONUtils.toJsonString(varPool)); - } - } - } + getTaskInstance(parentNodeCode) + .ifPresent(endTaskInstance -> workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString( + Lists.newArrayList(workflowInstance.getVarPool(), endTaskInstance.getVarPool())))); + } // if previous node success , post node submit @@ -1907,14 +1814,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { continue; } } - // init varPool only this task is the first time running if (task.isFirstRun()) { - // get pre task ,get all the task varPool to this task - // Do not use dag.getPreviousNodes because of the dag may be miss the upstream node - String preTasks = workflowExecuteContext.getWorkflowGraph() - .getTaskNodeByCode(task.getTaskCode()).getPreTasks(); - Set preTaskList = new HashSet<>(JSONUtils.toList(preTasks, Long.class)); - getPreVarPool(task, preTaskList); + initializeTaskInstanceVarPool(task); } DependResult dependResult = getDependResultForTask(task); if (DependResult.SUCCESS == dependResult) { @@ -2095,29 +1996,9 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { taskInstanceDao.updateById(taskInstance); } - Set removeSet = new HashSet<>(); - for (TaskInstance taskInstance : removeTaskInstances) { - String taskVarPool = taskInstance.getVarPool(); - if (StringUtils.isNotEmpty(taskVarPool)) { - List properties = JSONUtils.toList(taskVarPool, Property.class); - List keys = properties.stream() - .filter(property -> property.getDirect().equals(Direct.OUT)) - .map(property -> String.format("%s_%s", property.getProp(), property.getType())) - .collect(Collectors.toList()); - removeSet.addAll(keys); - } - } - - // remove varPool data and update process instance - // TODO: we can remove this snippet if : we get varPool from pre taskInstance instead of process instance when - // task can not get pre task from incomplete dag - List processProperties = JSONUtils.toList(workflowInstance.getVarPool(), Property.class); - processProperties = processProperties.stream() - .filter(property -> !(property.getDirect().equals(Direct.IN) - && removeSet.contains(String.format("%s_%s", property.getProp(), property.getType())))) - .collect(Collectors.toList()); - - workflowInstance.setVarPool(JSONUtils.toJsonString(processProperties)); + workflowInstance.setVarPool( + VarPoolUtils.subtractVarPoolJson(workflowInstance.getVarPool(), + removeTaskInstances.stream().map(TaskInstance::getVarPool).collect(Collectors.toList()))); processInstanceDao.updateById(workflowInstance); // remove task instance from taskInstanceMap,taskCodeInstanceMap , completeTaskSet, validTaskMap, errorTaskMap @@ -2154,25 +2035,4 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } - private void mergeTaskInstanceVarPool(TaskInstance taskInstance) { - String taskVarPoolJson = taskInstance.getVarPool(); - if (StringUtils.isEmpty(taskVarPoolJson)) { - return; - } - ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); - String processVarPoolJson = workflowInstance.getVarPool(); - if (StringUtils.isEmpty(processVarPoolJson)) { - workflowInstance.setVarPool(taskVarPoolJson); - return; - } - List processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class)); - List taskVarPool = JSONUtils.toList(taskVarPoolJson, Property.class); - Set newProcessVarPoolKeys = taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet()); - processVarPool = processVarPool.stream().filter(property -> !newProcessVarPoolKeys.contains(property.getProp())) - .collect(Collectors.toList()); - - processVarPool.addAll(taskVarPool); - - workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool)); - } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index c08fb206f4..409f1b7691 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -71,6 +71,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.springframework.context.ApplicationContext; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; @ExtendWith(MockitoExtension.class) @@ -106,6 +107,8 @@ public class WorkflowExecuteRunnableTest { private TaskGroupCoordinator taskGroupCoordinator; + private WorkflowExecuteContext workflowExecuteContext; + @BeforeEach public void init() throws Exception { applicationContext = Mockito.mock(ApplicationContext.class); @@ -134,7 +137,7 @@ public class WorkflowExecuteRunnableTest { stateWheelExecuteThread = Mockito.mock(StateWheelExecuteThread.class); curingGlobalParamsService = Mockito.mock(CuringParamsService.class); ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class); - WorkflowExecuteContext workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class); + workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class); Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance); IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class); Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); @@ -209,11 +212,13 @@ public class WorkflowExecuteRunnableTest { } @Test - public void testGetPreVarPool() { + public void testInitializeTaskInstanceVarPool() { try { - Set preTaskName = new HashSet<>(); - preTaskName.add(1L); - preTaskName.add(2L); + IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class); + Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); + TaskNode taskNode = Mockito.mock(TaskNode.class); + Mockito.when(workflowGraph.getTaskNodeByCode(Mockito.anyLong())).thenReturn(taskNode); + Mockito.when(taskNode.getPreTasks()).thenReturn(JSONUtils.toJsonString(Lists.newArrayList(1L, 2L))); TaskInstance taskInstance = new TaskInstance(); @@ -255,7 +260,7 @@ public class WorkflowExecuteRunnableTest { taskCodeInstanceMapField.setAccessible(true); taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap); - workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); + workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance); Assertions.assertNotNull(taskInstance.getVarPool()); taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); @@ -266,7 +271,7 @@ public class WorkflowExecuteRunnableTest { taskInstanceMapField.setAccessible(true); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); - workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); + workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance); Assertions.assertNotNull(taskInstance.getVarPool()); } catch (Exception e) { Assertions.fail(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java index bac4e651df..f2da8ac40a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java @@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import java.io.Serializable; import java.util.Objects; +import lombok.Data; + +@Data public class Property implements Serializable { private static final long serialVersionUID = -4045513703397452451L; @@ -56,62 +59,6 @@ public class Property implements Serializable { this.value = value; } - /** - * getter method - * - * @return the prop - * @see Property#prop - */ - public String getProp() { - return prop; - } - - /** - * setter method - * - * @param prop the prop to set - * @see Property#prop - */ - public void setProp(String prop) { - this.prop = prop; - } - - /** - * getter method - * - * @return the value - * @see Property#value - */ - public String getValue() { - return value; - } - - /** - * setter method - * - * @param value the value to set - * @see Property#value - */ - public void setValue(String value) { - this.value = value; - } - - public Direct getDirect() { - return direct; - } - - public void setDirect(Direct direct) { - this.direct = direct; - } - - public DataType getType() { - return type; - } - - public void setType(DataType type) { - this.type = type; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java index f11a83bc54..f99578d743 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -35,6 +36,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import lombok.Getter; import lombok.Setter; @@ -42,6 +44,7 @@ import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.google.common.collect.Lists; @Getter @Slf4j @@ -82,6 +85,7 @@ public abstract class AbstractParameters implements IParameters { /** * get input local parameters map if the param direct is IN + * * @return parameters map */ public Map getInputLocalParametersMap() { @@ -121,44 +125,30 @@ public abstract class AbstractParameters implements IParameters { } public void dealOutParam(Map taskOutputParams) { - if (CollectionUtils.isEmpty(localParams)) { - return; - } List outProperty = getOutProperty(localParams); if (CollectionUtils.isEmpty(outProperty)) { return; } - if (MapUtils.isEmpty(taskOutputParams)) { - outProperty.forEach(this::addPropertyToValPool); - return; - } - - for (Property info : outProperty) { - String propValue = taskOutputParams.get(info.getProp()); - if (StringUtils.isNotEmpty(propValue)) { - info.setValue(propValue); - addPropertyToValPool(info); - continue; - } - addPropertyToValPool(info); - if (StringUtils.isEmpty(info.getValue())) { - log.warn("The output parameter {} value is empty and cannot find the out parameter from task output", - info); + if (CollectionUtils.isNotEmpty(outProperty) && MapUtils.isNotEmpty(taskOutputParams)) { + // Inject the value + for (Property info : outProperty) { + String value = taskOutputParams.get(info.getProp()); + if (value != null) { + info.setValue(value); + } } } + + varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty)); } - public List getOutProperty(List params) { + protected List getOutProperty(List params) { if (CollectionUtils.isEmpty(params)) { return new ArrayList<>(); } - List result = new ArrayList<>(); - for (Property info : params) { - if (info.getDirect() == Direct.OUT) { - result.add(info); - } - } - return result; + return params.stream() + .filter(info -> info.getDirect() == Direct.OUT) + .collect(Collectors.toList()); } public List> getListMapByString(String json) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index 0f1a893a30..75ebe6c9cd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters; +import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -40,6 +41,7 @@ import java.util.stream.Collectors; import com.google.common.base.Enums; import com.google.common.base.Strings; +import com.google.common.collect.Lists; /** * Sql/Hql parameter @@ -245,7 +247,7 @@ public class SqlParameters extends AbstractParameters { return; } if (StringUtils.isEmpty(result)) { - varPool.addAll(outProperty); + varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty)); return; } List> sqlResult = getListMapByString(result); @@ -268,7 +270,6 @@ public class SqlParameters extends AbstractParameters { for (Property info : outProperty) { if (info.getType() == DataType.LIST) { info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp()))); - varPool.add(info); } } } else { @@ -276,9 +277,9 @@ public class SqlParameters extends AbstractParameters { Map firstRow = sqlResult.get(0); for (Property info : outProperty) { info.setValue(String.valueOf(firstRow.get(info.getProp()))); - varPool.add(info); } } + varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty)); } @@ -322,6 +323,7 @@ public class SqlParameters extends AbstractParameters { /** * TODO SQLTaskExecutionContext needs to be optimized + * * @param parametersHelper * @return */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java new file mode 100644 index 0000000000..7c24eb9a21 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@UtilityClass +public class VarPoolUtils { + + public List deserializeVarPool(String varPoolJson) { + return JSONUtils.toList(varPoolJson, Property.class); + } + + /** + * @see #mergeVarPool(List) + */ + public String mergeVarPoolJsonString(List varPoolJsons) { + if (CollectionUtils.isEmpty(varPoolJsons)) { + return null; + } + List> varPools = varPoolJsons.stream() + .map(VarPoolUtils::deserializeVarPool) + .collect(Collectors.toList()); + List finalVarPool = mergeVarPool(varPools); + return JSONUtils.toJsonString(finalVarPool); + } + + /** + * Merge the given two varpools, and return the merged varpool. + * If the two varpools have the same property({@link Property#getProp()} and {@link Property#getDirect()} is same), the value of the property in varpool2 will be used. + * // todo: we may need to consider the datatype of the property + */ + public List mergeVarPool(List> varPools) { + if (CollectionUtils.isEmpty(varPools)) { + return null; + } + if (varPools.size() == 1) { + return varPools.get(0); + } + Map result = new HashMap<>(); + for (List varPool : varPools) { + if (CollectionUtils.isEmpty(varPool)) { + continue; + } + for (Property property : varPool) { + if (!Direct.OUT.equals(property.getDirect())) { + log.info("The direct should be OUT in varPool, but got {}", property.getDirect()); + continue; + } + result.put(property.getProp(), property); + } + } + return new ArrayList<>(result.values()); + } + + public String subtractVarPoolJson(String varPool, List subtractVarPool) { + List varPoolList = deserializeVarPool(varPool); + List> subtractVarPoolList = subtractVarPool.stream() + .map(VarPoolUtils::deserializeVarPool) + .collect(Collectors.toList()); + List finalVarPool = subtractVarPool(varPoolList, subtractVarPoolList); + return JSONUtils.toJsonString(finalVarPool); + } + + /** + * Return the subtracted varpool, which key is in varPool but not in subtractVarPool. + */ + public List subtractVarPool(List varPool, List> subtractVarPool) { + if (CollectionUtils.isEmpty(varPool)) { + return null; + } + if (CollectionUtils.isEmpty(subtractVarPool)) { + return varPool; + } + Map subtractVarPoolMap = new HashMap<>(); + for (List properties : subtractVarPool) { + for (Property property : properties) { + subtractVarPoolMap.put(property.getProp(), property); + } + } + List result = new ArrayList<>(); + for (Property property : varPool) { + if (!subtractVarPoolMap.containsKey(property.getProp())) { + result.add(property); + } + } + return result; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java new file mode 100644 index 0000000000..231d97029f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.plugin.task.api.enums.DataType; +import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Lists; +import com.google.common.truth.Truth; + +class VarPoolUtilsTest { + + @Test + void mergeVarPool() { + Truth.assertThat(VarPoolUtils.mergeVarPool(null)).isNull(); + + // Override the value of the same property + // Merge the property with different key. + List varpool1 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom")); + List varpool2 = Lists.newArrayList( + new Property("name", Direct.OUT, DataType.VARCHAR, "tim"), + new Property("age", Direct.OUT, DataType.INTEGER, "10")); + + Truth.assertThat(VarPoolUtils.mergeVarPool(Lists.newArrayList(varpool1, varpool2))) + .containsExactly( + new Property("name", Direct.OUT, DataType.VARCHAR, "tim"), + new Property("age", Direct.OUT, DataType.INTEGER, "10")); + + } + + @Test + void subtractVarPool() { + Truth.assertThat(VarPoolUtils.subtractVarPool(null, null)).isNull(); + List varpool1 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom"), + new Property("age", Direct.OUT, DataType.INTEGER, "10")); + List varpool2 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom")); + List varpool3 = Lists.newArrayList(new Property("location", Direct.OUT, DataType.VARCHAR, "china")); + + Truth.assertThat(VarPoolUtils.subtractVarPool(varpool1, Lists.newArrayList(varpool2, varpool3))) + .containsExactly(new Property("age", Direct.OUT, DataType.INTEGER, "10")); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 8dd842a8ed..797d8f78a3 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.meter.metrics.MetricsProvider; import org.apache.dolphinscheduler.meter.metrics.SystemMetrics; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider; import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; @@ -86,6 +87,7 @@ public class WorkerServer implements IStoppable { public void run() { this.workerRpcServer.start(); TaskPluginManager.loadPlugin(); + DataSourceProcessorProvider.initialize(); this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start(); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index 41cef62028..e64a421001 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -283,6 +283,7 @@ public abstract class WorkerTaskExecutor implements Runnable { // upload out files and modify the "OUT FILE" property in VarPool TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate); + log.info("Upload output files: {} successfully", TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT)); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java index 87438c614b..f060c0a17d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java @@ -70,17 +70,18 @@ public class TaskFilesTransferUtils { */ public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) throws TaskException { - List varPools = getVarPools(taskExecutionContext); - // get map of varPools for quick search - Map varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); - // get OUTPUT FILE parameters List localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT); - if (localParamsProperty.isEmpty()) { return; } + List varPools = getVarPools(taskExecutionContext); + // get map of varPools for quick search + Map varPoolsMap = varPools.stream() + .filter(property -> Direct.OUT.equals(property.getDirect())) + .collect(Collectors.toMap(Property::getProp, x -> x)); + log.info("Upload output files ..."); for (Property property : localParamsProperty) { // get local file path @@ -137,10 +138,6 @@ public class TaskFilesTransferUtils { * @throws TaskException task exception */ public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) { - List varPools = getVarPools(taskExecutionContext); - // get map of varPools for quick search - Map varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x)); - // get "IN FILE" parameters List localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN); @@ -148,6 +145,13 @@ public class TaskFilesTransferUtils { return; } + List varPools = getVarPools(taskExecutionContext); + // get map of varPools for quick search + Map varPoolsMap = varPools + .stream() + .filter(property -> Direct.IN.equals(property.getDirect())) + .collect(Collectors.toMap(Property::getProp, x -> x)); + String executePath = taskExecutionContext.getExecutePath(); // data path to download packaged data String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);