diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 7116dc4c90..c110df0c9f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -221,7 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("processDefinitionId") int processDefinitionId, @Param("states") int[] states); - int updateGlobalParamById( + int updateGlobalParamsById( @Param("globalParams") String globalParams, @Param("id") int id); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 793b58e567..d4cf947b8c 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -219,7 +219,7 @@ order by id asc - + update t_ds_process_instance set global_params = #{globalParams} where id = #{id} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index 21fe47198c..93cc3eab12 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -72,6 +72,7 @@ public class TaskExecuteResponseCommand implements Serializable { * task return result */ private String result; + public void setVarPool(String varPool) { this.varPool = varPool; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 0e6c0d8e21..80bd3f7a9d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -558,27 +558,31 @@ public class MasterExecThread implements Runnable { String globalParams = this.processInstance.getGlobalParams(); if (StringUtils.isNotEmpty(globalParams)) { Map globalMap = getGlobalParamMap(globalParams); - if (globalMap != null) { - // the param save in localParams - Map result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); - Object localParams = result.get(LOCAL_PARAMS); - if (localParams != null) { - List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); - for (Property info : allParam) { - if (info.getDirect().equals(Direct.IN)) { - String paramName = info.getProp(); - String value = globalMap.get(paramName); - if (StringUtils.isNotEmpty(value)) { - info.setValue(value); - } - } + if (globalMap != null && globalMap.size() != 0) { + setGlobalMapToTask(taskNode, taskInstance, globalMap); + } + } + } + + private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map globalMap) { + // the param save in localParams + Map result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class); + Object localParams = result.get(LOCAL_PARAMS); + if (localParams != null) { + List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); + for (Property info : allParam) { + if (info.getDirect().equals(Direct.IN)) { + String paramName = info.getProp(); + String value = globalMap.get(paramName); + if (StringUtils.isNotEmpty(value)) { + info.setValue(value); } - result.put(LOCAL_PARAMS, allParam); - taskNode.setParams(JSONUtils.toJsonString(result)); - // task instance node json - taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); } } + result.put(LOCAL_PARAMS, allParam); + taskNode.setParams(JSONUtils.toJsonString(result)); + // task instance node json + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); } } @@ -997,8 +1001,8 @@ public class MasterExecThread implements Runnable { task.getName(), task.getId(), task.getState()); // node success , post node submit if (task.getState() == ExecutionStatus.SUCCESS) { - processInstance.setVarPool(task.getVarPool()); processInstance = processService.findProcessInstanceById(processInstance.getId()); + processInstance.setVarPool(task.getVarPool()); processService.updateProcessInstance(processInstance); completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 52f7363c66..037bde6c73 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -88,7 +88,7 @@ public abstract class AbstractCommandExecutor { /** * SHELL result string */ - protected String resultString; + protected String taskResultString; /** * taskExecutionContext @@ -110,6 +110,10 @@ public abstract class AbstractCommandExecutor { this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } + protected AbstractCommandExecutor(List logBuffer) { + this.logBuffer = logBuffer; + } + /** * build process * @@ -229,13 +233,6 @@ public abstract class AbstractCommandExecutor { return varPool.toString(); } - public String getResultString() { - return resultString; - } - - public void setResultString(String result) { - this.resultString = result; - } /** * cancel application @@ -369,7 +366,7 @@ public abstract class AbstractCommandExecutor { varPool.append("$VarPool$"); } else { logBuffer.add(line); - resultString = line; + taskResultString = line; lastFlushTime = flush(lastFlushTime); } } @@ -576,4 +573,12 @@ public abstract class AbstractCommandExecutor { protected abstract String commandInterpreter(); protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; + + public String getTaskResultString() { + return taskResultString; + } + + public void setTaskResultString(String taskResultString) { + this.taskResultString = taskResultString; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 5e297abbf0..8f3da4537d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -56,6 +56,9 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { super(logHandler,taskExecutionContext,logger); } + public ShellCommandExecutor(List logBuffer) { + super(logBuffer); + } @Override protected String buildCommandFilePath() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index 5e61f89131..fb0a76cff2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -105,7 +105,7 @@ public class ShellTask extends AbstractTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(commandExecuteResult.getAppIds()); setProcessId(commandExecuteResult.getProcessId()); - setResult(shellCommandExecutor.getResultString()); + setResult(shellCommandExecutor.getTaskResultString()); } catch (Exception e) { logger.error("shell task error", e); setExitStatusCode(Constants.EXIT_CODE_FAILURE); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java new file mode 100644 index 0000000000..348775cf67 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task; + +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) +public class AbstractCommandExecutorTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class); + + private ShellCommandExecutor shellCommandExecutor; + + @Before + public void before() throws Exception { + System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); + shellCommandExecutor = new ShellCommandExecutor(null); + } + + @Test + public void testSetTaskResultString() { + shellCommandExecutor.setTaskResultString("shellReturn"); + } + + @Test + public void testGetTaskResultString() { + logger.info(shellCommandExecutor.getTaskResultString()); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java similarity index 92% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java index 1af0a384f0..e5bf3bfc40 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.shell; +package org.apache.dolphinscheduler.server.worker.task; import static org.mockito.ArgumentMatchers.anyString; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; +import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest; import java.nio.file.Files; import java.nio.file.Paths; @@ -112,4 +114,14 @@ public class ShellTaskReturnTest { shellTask.setResult("shell return string"); logger.info("shell return string:{}", shellTask.getResultString()); } + + @Test + public void testSetTaskResultString() { + shellCommandExecutor.setTaskResultString("shellReturn"); + } + + @Test + public void testGetTaskResultString() { + logger.info(shellCommandExecutor.getTaskResultString()); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java index a8e9e70ded..8c734af2ce 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java @@ -60,7 +60,7 @@ public class ShellTaskTest { System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString()); shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class); PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor); - shellCommandExecutor.setResultString("shellReturn"); + shellCommandExecutor.setTaskResultString("shellReturn"); taskExecutionContext = new TaskExecutionContext(); taskExecutionContext.setTaskInstanceId(1); taskExecutionContext.setTaskName("kris test"); @@ -123,4 +123,5 @@ public class ShellTaskTest { String r = "return"; shellTask.setResult(r); } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b40aa90021..c5f5e3a02d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -166,10 +166,10 @@ public class ProcessService { /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * - * @param logger logger - * @param host host + * @param logger logger + * @param host host * @param validThreadNum validThreadNum - * @param command found command + * @param command found command * @return process instance */ @Transactional(rollbackFor = Exception.class) @@ -209,7 +209,7 @@ public class ProcessService { /** * set process waiting thread * - * @param command command + * @param command command * @param processInstance processInstance * @return process instance */ @@ -227,7 +227,7 @@ public class ProcessService { /** * check thread num * - * @param command command + * @param command command * @param validThreadNum validThreadNum * @return if thread is enough */ @@ -1572,8 +1572,8 @@ public class ProcessService { return; } ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId()); - List params4Process = JSONUtils.toList(processInstance.getGlobalParams(), Property.class); - Map allParamMap = params4Process.stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); + List params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class); + Map allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property)); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); for (Property info : allParam) { @@ -1594,8 +1594,8 @@ public class ProcessService { taskNode.setParams(JSONUtils.toJsonString(taskParams)); // task instance node json taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); - String params4ProcessString = JSONUtils.toJsonString(params4Process); - int updateCount = this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId()); + String params4ProcessString = JSONUtils.toJsonString(params4Property); + int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId()); logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId()); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 7eec3669b4..999ca46a4e 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -460,7 +460,7 @@ public class ProcessServiceTest { String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"}," + "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]"; Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance); - Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId())).thenReturn(1); + Mockito.when(this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId())).thenReturn(1); processService.changeOutParam(result,taskInstance); } diff --git a/pom.xml b/pom.xml index 48ab95aa0a..da7b68e013 100644 --- a/pom.xml +++ b/pom.xml @@ -923,6 +923,8 @@ **/server/worker/task/sqoop/SqoopTaskTest.java **/server/worker/task/shell/ShellTaskTest.java **/server/worker/task/TaskManagerTest.java + **/server/worker/task/AbstractCommandExecutorTest.java + **/server/worker/task/ShellTaskReturnTest.java **/server/worker/EnvFileTest.java **/server/worker/runner/TaskExecuteThreadTest.java **/service/quartz/cron/CronUtilsTest.java