Browse Source

Fix workflow instance restart failed due to duplicate key in varpool (#16001)

3.2.2-release-bak
Wenjun Ruan 7 months ago committed by GitHub
parent
commit
61915a2d5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  2. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  3. 4
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  5. 196
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  6. 19
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
  7. 59
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java
  8. 40
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
  9. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
  10. 119
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
  11. 62
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
  12. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  13. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
  14. 22
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@ -69,6 +70,7 @@ public class ApiApplicationServer {
log.info("Received spring application context ready event will load taskPlugin and write to DB");
// install task plugin
TaskPluginManager.loadPlugin();
DataSourceProcessorProvider.initialize();
for (Map.Entry<String, TaskChannelFactory> entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
String taskPluginName = entry.getKey();
TaskChannelFactory taskChannelFactory = entry.getValue();

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -367,10 +367,15 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
if (StringUtils.isNotBlank(taskInstance.getLogPath())) {
try {
// Remove task instance log failed will not affect the deletion of task instance
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(),
ILogService.class);
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
} catch (Exception ex) {
log.error("Remove task instance log error", ex);
}
}
}

4
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java

@ -37,6 +37,10 @@ public class DataSourceProcessorProvider {
private DataSourceProcessorProvider() {
}
public static void initialize() {
log.info("Initialize DataSourceProcessorProvider");
}
public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
return dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@ -108,6 +109,7 @@ public class MasterServer implements IStoppable {
// install task plugin
TaskPluginManager.loadPlugin();
DataSourceProcessorProvider.initialize();
this.masterSlotManager.start();

196
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -57,11 +57,10 @@ import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
@ -97,6 +96,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -376,7 +376,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
if (taskInstance.getState().isSuccess()) {
completeTaskSet.add(taskInstance.getTaskCode());
mergeTaskInstanceVarPool(taskInstance);
workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
Lists.newArrayList(workflowInstance.getVarPool(), taskInstance.getVarPool())));
processInstanceDao.upsertProcessInstance(workflowInstance);
ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
@ -441,7 +442,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
/**
* crate new task instance to retry, different objects from the original
*
*/
private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
@ -532,16 +532,6 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
}
/**
* check if task instance exist by id
*/
public boolean checkTaskInstanceById(int taskInstanceId) {
if (taskInstanceMap.isEmpty()) {
return false;
}
return taskInstanceMap.containsKey(taskInstanceId);
}
/**
* get task instance from memory
*/
@ -1161,80 +1151,32 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
return taskInstance;
}
public void getPreVarPool(TaskInstance taskInstance, Set<Long> preTask) {
void initializeTaskInstanceVarPool(TaskInstance taskInstance) {
// get pre task ,get all the task varPool to this task
// Do not use dag.getPreviousNodes because of the dag may be miss the upstream node
String preTasks =
workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()).getPreTasks();
Set<Long> preTaskList = new HashSet<>(JSONUtils.toList(preTasks, Long.class));
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
Map<String, Property> allProperty = new HashMap<>();
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (Long preTaskCode : preTask) {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(preTaskCode);
if (!existTaskInstanceOptional.isPresent()) {
continue;
}
Integer taskId = existTaskInstanceOptional.get().getId();
if (taskId == null) {
continue;
}
TaskInstance preTaskInstance = taskInstanceMap.get(taskId);
if (preTaskInstance == null) {
continue;
}
String preVarPool = preTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(preVarPool)) {
List<Property> properties = JSONUtils.toList(preVarPool, Property.class);
for (Property info : properties) {
setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info);
}
}
}
if (allProperty.size() > 0) {
taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
}
} else {
if (StringUtils.isNotEmpty(workflowInstance.getVarPool())) {
if (CollectionUtils.isEmpty(preTaskList)) {
taskInstance.setVarPool(workflowInstance.getVarPool());
return;
}
}
List<String> preTaskInstanceVarPools = preTaskList
.stream()
.map(taskCode -> getTaskInstance(taskCode).orElse(null))
.filter(Objects::nonNull)
.sorted(Comparator.comparing(TaskInstance::getEndTime))
.map(TaskInstance::getVarPool)
.collect(Collectors.toList());
taskInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(preTaskInstanceVarPools));
}
public Collection<TaskInstance> getAllTaskInstances() {
return taskInstanceMap.values();
}
private void setVarPoolValue(Map<String, Property> allProperty,
Map<String, TaskInstance> allTaskInstance,
TaskInstance preTaskInstance, Property thisProperty) {
// for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN);
// get the pre taskInstance Property's name
String proName = thisProperty.getProp();
// if the Previous nodes have the Property of same name
if (allProperty.containsKey(proName)) {
// comparison the value of two Property
Property otherPro = allProperty.get(proName);
// if this property'value of loop is empty,use the other,whether the other's value is empty or not
if (StringUtils.isEmpty(thisProperty.getValue())) {
allProperty.put(proName, otherPro);
// if property'value of loop is not empty,and the other's value is not empty too, use the earlier value
} else if (StringUtils.isNotEmpty(otherPro.getValue())) {
TaskInstance otherTask = allTaskInstance.get(proName);
if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName, preTaskInstance);
} else {
allProperty.put(proName, otherPro);
}
} else {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName, preTaskInstance);
}
} else {
allProperty.put(proName, thisProperty);
allTaskInstance.put(proName, preTaskInstance);
}
}
/**
* get complete task instance map, taskCode as key
*/
@ -1311,45 +1253,10 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
// the end node of the branch of the dag
if (parentNodeCode != null && dag.getEndNode().contains(parentNodeCode)) {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(parentNodeCode);
if (existTaskInstanceOptional.isPresent()) {
TaskInstance endTaskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId());
String taskInstanceVarPool = endTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
String processInstanceVarPool = workflowInstance.getVarPool();
List<Property> processGlobalParams =
new ArrayList<>(JSONUtils.toList(workflowInstance.getGlobalParams(), Property.class));
Map<String, Direct> oldProcessGlobalParamsMap = processGlobalParams.stream()
.collect(Collectors.toMap(Property::getProp, Property::getDirect));
Set<Property> processVarPoolOut = taskProperties.stream()
.filter(property -> property.getDirect().equals(Direct.OUT)
&& oldProcessGlobalParamsMap.containsKey(property.getProp())
&& oldProcessGlobalParamsMap.get(property.getProp()).equals(Direct.OUT))
.collect(Collectors.toSet());
Set<Property> taskVarPoolIn =
taskProperties.stream().filter(property -> property.getDirect().equals(Direct.IN))
.collect(Collectors.toSet());
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties =
new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
Set<String> newProcessVarPoolKeys =
taskProperties.stream().map(Property::getProp).collect(Collectors.toSet());
properties = properties.stream()
.filter(property -> !newProcessVarPoolKeys.contains(property.getProp()))
.collect(Collectors.toSet());
properties.addAll(processVarPoolOut);
properties.addAll(taskVarPoolIn);
workflowInstance.setVarPool(JSONUtils.toJsonString(properties));
} else {
Set<Property> varPool = new HashSet<>();
varPool.addAll(taskVarPoolIn);
varPool.addAll(processVarPoolOut);
workflowInstance.setVarPool(JSONUtils.toJsonString(varPool));
}
}
}
getTaskInstance(parentNodeCode)
.ifPresent(endTaskInstance -> workflowInstance.setVarPool(VarPoolUtils.mergeVarPoolJsonString(
Lists.newArrayList(workflowInstance.getVarPool(), endTaskInstance.getVarPool()))));
}
// if previous node success , post node submit
@ -1907,14 +1814,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
continue;
}
}
// init varPool only this task is the first time running
if (task.isFirstRun()) {
// get pre task ,get all the task varPool to this task
// Do not use dag.getPreviousNodes because of the dag may be miss the upstream node
String preTasks = workflowExecuteContext.getWorkflowGraph()
.getTaskNodeByCode(task.getTaskCode()).getPreTasks();
Set<Long> preTaskList = new HashSet<>(JSONUtils.toList(preTasks, Long.class));
getPreVarPool(task, preTaskList);
initializeTaskInstanceVarPool(task);
}
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
@ -2095,29 +1996,9 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
taskInstanceDao.updateById(taskInstance);
}
Set<String> removeSet = new HashSet<>();
for (TaskInstance taskInstance : removeTaskInstances) {
String taskVarPool = taskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskVarPool)) {
List<Property> properties = JSONUtils.toList(taskVarPool, Property.class);
List<String> keys = properties.stream()
.filter(property -> property.getDirect().equals(Direct.OUT))
.map(property -> String.format("%s_%s", property.getProp(), property.getType()))
.collect(Collectors.toList());
removeSet.addAll(keys);
}
}
// remove varPool data and update process instance
// TODO: we can remove this snippet if : we get varPool from pre taskInstance instead of process instance when
// task can not get pre task from incomplete dag
List<Property> processProperties = JSONUtils.toList(workflowInstance.getVarPool(), Property.class);
processProperties = processProperties.stream()
.filter(property -> !(property.getDirect().equals(Direct.IN)
&& removeSet.contains(String.format("%s_%s", property.getProp(), property.getType()))))
.collect(Collectors.toList());
workflowInstance.setVarPool(JSONUtils.toJsonString(processProperties));
workflowInstance.setVarPool(
VarPoolUtils.subtractVarPoolJson(workflowInstance.getVarPool(),
removeTaskInstances.stream().map(TaskInstance::getVarPool).collect(Collectors.toList())));
processInstanceDao.updateById(workflowInstance);
// remove task instance from taskInstanceMap,taskCodeInstanceMap , completeTaskSet, validTaskMap, errorTaskMap
@ -2154,25 +2035,4 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
}
private void mergeTaskInstanceVarPool(TaskInstance taskInstance) {
String taskVarPoolJson = taskInstance.getVarPool();
if (StringUtils.isEmpty(taskVarPoolJson)) {
return;
}
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
String processVarPoolJson = workflowInstance.getVarPool();
if (StringUtils.isEmpty(processVarPoolJson)) {
workflowInstance.setVarPool(taskVarPoolJson);
return;
}
List<Property> processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class));
List<Property> taskVarPool = JSONUtils.toList(taskVarPoolJson, Property.class);
Set<String> newProcessVarPoolKeys = taskVarPool.stream().map(Property::getProp).collect(Collectors.toSet());
processVarPool = processVarPool.stream().filter(property -> !newProcessVarPoolKeys.contains(property.getProp()))
.collect(Collectors.toList());
processVarPool.addAll(taskVarPool);
workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool));
}
}

19
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -71,6 +71,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ExtendWith(MockitoExtension.class)
@ -106,6 +107,8 @@ public class WorkflowExecuteRunnableTest {
private TaskGroupCoordinator taskGroupCoordinator;
private WorkflowExecuteContext workflowExecuteContext;
@BeforeEach
public void init() throws Exception {
applicationContext = Mockito.mock(ApplicationContext.class);
@ -134,7 +137,7 @@ public class WorkflowExecuteRunnableTest {
stateWheelExecuteThread = Mockito.mock(StateWheelExecuteThread.class);
curingGlobalParamsService = Mockito.mock(CuringParamsService.class);
ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class);
WorkflowExecuteContext workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class);
workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class);
Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance);
IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
@ -209,11 +212,13 @@ public class WorkflowExecuteRunnableTest {
}
@Test
public void testGetPreVarPool() {
public void testInitializeTaskInstanceVarPool() {
try {
Set<Long> preTaskName = new HashSet<>();
preTaskName.add(1L);
preTaskName.add(2L);
IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class);
Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph);
TaskNode taskNode = Mockito.mock(TaskNode.class);
Mockito.when(workflowGraph.getTaskNodeByCode(Mockito.anyLong())).thenReturn(taskNode);
Mockito.when(taskNode.getPreTasks()).thenReturn(JSONUtils.toJsonString(Lists.newArrayList(1L, 2L)));
TaskInstance taskInstance = new TaskInstance();
@ -255,7 +260,7 @@ public class WorkflowExecuteRunnableTest {
taskCodeInstanceMapField.setAccessible(true);
taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
Assertions.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
@ -266,7 +271,7 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
workflowExecuteThread.initializeTaskInstanceVarPool(taskInstance);
Assertions.assertNotNull(taskInstance.getVarPool());
} catch (Exception e) {
Assertions.fail();

59
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/Property.java

@ -23,6 +23,9 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import java.io.Serializable;
import java.util.Objects;
import lombok.Data;
@Data
public class Property implements Serializable {
private static final long serialVersionUID = -4045513703397452451L;
@ -56,62 +59,6 @@ public class Property implements Serializable {
this.value = value;
}
/**
* getter method
*
* @return the prop
* @see Property#prop
*/
public String getProp() {
return prop;
}
/**
* setter method
*
* @param prop the prop to set
* @see Property#prop
*/
public void setProp(String prop) {
this.prop = prop;
}
/**
* getter method
*
* @return the value
* @see Property#value
*/
public String getValue() {
return value;
}
/**
* setter method
*
* @param value the value to set
* @see Property#value
*/
public void setValue(String value) {
this.value = value;
}
public Direct getDirect() {
return direct;
}
public void setDirect(Direct direct) {
this.direct = direct;
}
public DataType getType() {
return type;
}
public void setType(DataType type) {
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) {

40
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@ -35,6 +36,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
@ -42,6 +44,7 @@ import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Lists;
@Getter
@Slf4j
@ -82,6 +85,7 @@ public abstract class AbstractParameters implements IParameters {
/**
* get input local parameters map if the param direct is IN
*
* @return parameters map
*/
public Map<String, Property> getInputLocalParametersMap() {
@ -121,44 +125,30 @@ public abstract class AbstractParameters implements IParameters {
}
public void dealOutParam(Map<String, String> taskOutputParams) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
if (MapUtils.isEmpty(taskOutputParams)) {
outProperty.forEach(this::addPropertyToValPool);
return;
}
if (CollectionUtils.isNotEmpty(outProperty) && MapUtils.isNotEmpty(taskOutputParams)) {
// Inject the value
for (Property info : outProperty) {
String propValue = taskOutputParams.get(info.getProp());
if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue);
addPropertyToValPool(info);
continue;
String value = taskOutputParams.get(info.getProp());
if (value != null) {
info.setValue(value);
}
addPropertyToValPool(info);
if (StringUtils.isEmpty(info.getValue())) {
log.warn("The output parameter {} value is empty and cannot find the out parameter from task output",
info);
}
}
varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
}
public List<Property> getOutProperty(List<Property> params) {
protected List<Property> getOutProperty(List<Property> params) {
if (CollectionUtils.isEmpty(params)) {
return new ArrayList<>();
}
List<Property> result = new ArrayList<>();
for (Property info : params) {
if (info.getDirect() == Direct.OUT) {
result.add(info);
}
}
return result;
return params.stream()
.filter(info -> info.getDirect() == Direct.OUT)
.collect(Collectors.toList());
}
public List<Map<String, String>> getListMapByString(String json) {

8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.UdfFuncParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -40,6 +41,7 @@ import java.util.stream.Collectors;
import com.google.common.base.Enums;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
/**
* Sql/Hql parameter
@ -245,7 +247,7 @@ public class SqlParameters extends AbstractParameters {
return;
}
if (StringUtils.isEmpty(result)) {
varPool.addAll(outProperty);
varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
return;
}
List<Map<String, String>> sqlResult = getListMapByString(result);
@ -268,7 +270,6 @@ public class SqlParameters extends AbstractParameters {
for (Property info : outProperty) {
if (info.getType() == DataType.LIST) {
info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
varPool.add(info);
}
}
} else {
@ -276,9 +277,9 @@ public class SqlParameters extends AbstractParameters {
Map<String, String> firstRow = sqlResult.get(0);
for (Property info : outProperty) {
info.setValue(String.valueOf(firstRow.get(info.getProp())));
varPool.add(info);
}
}
varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
}
@ -322,6 +323,7 @@ public class SqlParameters extends AbstractParameters {
/**
* TODO SQLTaskExecutionContext needs to be optimized
*
* @param parametersHelper
* @return
*/

119
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@UtilityClass
public class VarPoolUtils {
public List<Property> deserializeVarPool(String varPoolJson) {
return JSONUtils.toList(varPoolJson, Property.class);
}
/**
* @see #mergeVarPool(List)
*/
public String mergeVarPoolJsonString(List<String> varPoolJsons) {
if (CollectionUtils.isEmpty(varPoolJsons)) {
return null;
}
List<List<Property>> varPools = varPoolJsons.stream()
.map(VarPoolUtils::deserializeVarPool)
.collect(Collectors.toList());
List<Property> finalVarPool = mergeVarPool(varPools);
return JSONUtils.toJsonString(finalVarPool);
}
/**
* Merge the given two varpools, and return the merged varpool.
* If the two varpools have the same property({@link Property#getProp()} and {@link Property#getDirect()} is same), the value of the property in varpool2 will be used.
* // todo: we may need to consider the datatype of the property
*/
public List<Property> mergeVarPool(List<List<Property>> varPools) {
if (CollectionUtils.isEmpty(varPools)) {
return null;
}
if (varPools.size() == 1) {
return varPools.get(0);
}
Map<String, Property> result = new HashMap<>();
for (List<Property> varPool : varPools) {
if (CollectionUtils.isEmpty(varPool)) {
continue;
}
for (Property property : varPool) {
if (!Direct.OUT.equals(property.getDirect())) {
log.info("The direct should be OUT in varPool, but got {}", property.getDirect());
continue;
}
result.put(property.getProp(), property);
}
}
return new ArrayList<>(result.values());
}
public String subtractVarPoolJson(String varPool, List<String> subtractVarPool) {
List<Property> varPoolList = deserializeVarPool(varPool);
List<List<Property>> subtractVarPoolList = subtractVarPool.stream()
.map(VarPoolUtils::deserializeVarPool)
.collect(Collectors.toList());
List<Property> finalVarPool = subtractVarPool(varPoolList, subtractVarPoolList);
return JSONUtils.toJsonString(finalVarPool);
}
/**
* Return the subtracted varpool, which key is in varPool but not in subtractVarPool.
*/
public List<Property> subtractVarPool(List<Property> varPool, List<List<Property>> subtractVarPool) {
if (CollectionUtils.isEmpty(varPool)) {
return null;
}
if (CollectionUtils.isEmpty(subtractVarPool)) {
return varPool;
}
Map<String, Property> subtractVarPoolMap = new HashMap<>();
for (List<Property> properties : subtractVarPool) {
for (Property property : properties) {
subtractVarPoolMap.put(property.getProp(), property);
}
}
List<Property> result = new ArrayList<>();
for (Property property : varPool) {
if (!subtractVarPoolMap.containsKey(property.getProp())) {
result.add(property);
}
}
return result;
}
}

62
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import java.util.List;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Lists;
import com.google.common.truth.Truth;
class VarPoolUtilsTest {
@Test
void mergeVarPool() {
Truth.assertThat(VarPoolUtils.mergeVarPool(null)).isNull();
// Override the value of the same property
// Merge the property with different key.
List<Property> varpool1 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom"));
List<Property> varpool2 = Lists.newArrayList(
new Property("name", Direct.OUT, DataType.VARCHAR, "tim"),
new Property("age", Direct.OUT, DataType.INTEGER, "10"));
Truth.assertThat(VarPoolUtils.mergeVarPool(Lists.newArrayList(varpool1, varpool2)))
.containsExactly(
new Property("name", Direct.OUT, DataType.VARCHAR, "tim"),
new Property("age", Direct.OUT, DataType.INTEGER, "10"));
}
@Test
void subtractVarPool() {
Truth.assertThat(VarPoolUtils.subtractVarPool(null, null)).isNull();
List<Property> varpool1 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom"),
new Property("age", Direct.OUT, DataType.INTEGER, "10"));
List<Property> varpool2 = Lists.newArrayList(new Property("name", Direct.OUT, DataType.VARCHAR, "tom"));
List<Property> varpool3 = Lists.newArrayList(new Property("location", Direct.OUT, DataType.VARCHAR, "china"));
Truth.assertThat(VarPoolUtils.subtractVarPool(varpool1, Lists.newArrayList(varpool2, varpool3)))
.containsExactly(new Property("age", Direct.OUT, DataType.INTEGER, "10"));
}
}

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
@ -86,6 +87,7 @@ public class WorkerServer implements IStoppable {
public void run() {
this.workerRpcServer.start();
TaskPluginManager.loadPlugin();
DataSourceProcessorProvider.initialize();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java

@ -283,6 +283,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
// upload out files and modify the "OUT FILE" property in VarPool
TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
log.info("Upload output files: {} successfully",
TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));

22
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

@ -70,17 +70,18 @@ public class TaskFilesTransferUtils {
*/
public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
StorageOperate storageOperate) throws TaskException {
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
// get OUTPUT FILE parameters
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);
if (localParamsProperty.isEmpty()) {
return;
}
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream()
.filter(property -> Direct.OUT.equals(property.getDirect()))
.collect(Collectors.toMap(Property::getProp, x -> x));
log.info("Upload output files ...");
for (Property property : localParamsProperty) {
// get local file path
@ -137,10 +138,6 @@ public class TaskFilesTransferUtils {
* @throws TaskException task exception
*/
public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));
// get "IN FILE" parameters
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);
@ -148,6 +145,13 @@ public class TaskFilesTransferUtils {
return;
}
List<Property> varPools = getVarPools(taskExecutionContext);
// get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools
.stream()
.filter(property -> Direct.IN.equals(property.getDirect()))
.collect(Collectors.toMap(Property::getProp, x -> x));
String executePath = taskExecutionContext.getExecutePath();
// data path to download packaged data
String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);

Loading…
Cancel
Save