Browse Source

[Feature-3805][mater-worker-ui]fix function of passing variables between tasks bug (#4811)

* global initParam and set Param

* fix dataFormat error

* fix deal outParams bug

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* fix code style

* add test

* fix code style (variable name)

* fix reset globalParams bug

* fix format bug

* fix SonarCloud Code Analysis

* add test

* fix code style

* fix code style

* modify test

* fix test Coverage

* fix code style

* 4 test

* add test pom

* add header

Co-authored-by: wangxj <wangxj31>
pull/3/MERGE
wangxj3 4 years ago committed by GitHub
parent
commit
874ca07e6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  2. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  3. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  4. 42
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  5. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  6. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
  7. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  8. 53
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
  9. 14
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
  10. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
  11. 18
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  12. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  13. 2
      pom.xml

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -221,7 +221,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("processDefinitionId") int processDefinitionId,
@Param("states") int[] states);
int updateGlobalParamById(
int updateGlobalParamsById(
@Param("globalParams") String globalParams,
@Param("id") int id);
}

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -219,7 +219,7 @@
</foreach>
order by id asc
</select>
<update id="updateGlobalParamById">
<update id="updateGlobalParamsById">
update t_ds_process_instance
set global_params = #{globalParams}
where id = #{id}

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -72,6 +72,7 @@ public class TaskExecuteResponseCommand implements Serializable {
* task return result
*/
private String result;
public void setVarPool(String varPool) {
this.varPool = varPool;
}

42
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -558,27 +558,31 @@ public class MasterExecThread implements Runnable {
String globalParams = this.processInstance.getGlobalParams();
if (StringUtils.isNotEmpty(globalParams)) {
Map<String, String> globalMap = getGlobalParamMap(globalParams);
if (globalMap != null) {
// the param save in localParams
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect().equals(Direct.IN)) {
String paramName = info.getProp();
String value = globalMap.get(paramName);
if (StringUtils.isNotEmpty(value)) {
info.setValue(value);
}
}
if (globalMap != null && globalMap.size() != 0) {
setGlobalMapToTask(taskNode, taskInstance, globalMap);
}
}
}
private void setGlobalMapToTask(TaskNode taskNode, TaskInstance taskInstance, Map<String, String> globalMap) {
// the param save in localParams
Map<String, Object> result = JSONUtils.toMap(taskNode.getParams(), String.class, Object.class);
Object localParams = result.get(LOCAL_PARAMS);
if (localParams != null) {
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
if (info.getDirect().equals(Direct.IN)) {
String paramName = info.getProp();
String value = globalMap.get(paramName);
if (StringUtils.isNotEmpty(value)) {
info.setValue(value);
}
result.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(result));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
}
}
result.put(LOCAL_PARAMS, allParam);
taskNode.setParams(JSONUtils.toJsonString(result));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
}
}
@ -997,8 +1001,8 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState());
// node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) {
processInstance.setVarPool(task.getVarPool());
processInstance = processService.findProcessInstanceById(processInstance.getId());
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -88,7 +88,7 @@ public abstract class AbstractCommandExecutor {
/**
* SHELL result string
*/
protected String resultString;
protected String taskResultString;
/**
* taskExecutionContext
@ -110,6 +110,10 @@ public abstract class AbstractCommandExecutor {
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
protected AbstractCommandExecutor(List<String> logBuffer) {
this.logBuffer = logBuffer;
}
/**
* build process
*
@ -229,13 +233,6 @@ public abstract class AbstractCommandExecutor {
return varPool.toString();
}
public String getResultString() {
return resultString;
}
public void setResultString(String result) {
this.resultString = result;
}
/**
* cancel application
@ -369,7 +366,7 @@ public abstract class AbstractCommandExecutor {
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
resultString = line;
taskResultString = line;
lastFlushTime = flush(lastFlushTime);
}
}
@ -576,4 +573,12 @@ public abstract class AbstractCommandExecutor {
protected abstract String commandInterpreter();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
public String getTaskResultString() {
return taskResultString;
}
public void setTaskResultString(String taskResultString) {
this.taskResultString = taskResultString;
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java

@ -56,6 +56,9 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
super(logHandler,taskExecutionContext,logger);
}
public ShellCommandExecutor(List<String> logBuffer) {
super(logBuffer);
}
@Override
protected String buildCommandFilePath() {

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -105,7 +105,7 @@ public class ShellTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
setResult(shellCommandExecutor.getResultString());
setResult(shellCommandExecutor.getTaskResultString());
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);

53
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class AbstractCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
private ShellCommandExecutor shellCommandExecutor;
@Before
public void before() throws Exception {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = new ShellCommandExecutor(null);
}
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
}

14
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskReturnTest.java → dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java

@ -15,13 +15,15 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.shell;
package org.apache.dolphinscheduler.server.worker.task;
import static org.mockito.ArgumentMatchers.anyString;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTaskTest;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -112,4 +114,14 @@ public class ShellTaskReturnTest {
shellTask.setResult("shell return string");
logger.info("shell return string:{}", shellTask.getResultString());
}
@Test
public void testSetTaskResultString() {
shellCommandExecutor.setTaskResultString("shellReturn");
}
@Test
public void testGetTaskResultString() {
logger.info(shellCommandExecutor.getTaskResultString());
}
}

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java

@ -60,7 +60,7 @@ public class ShellTaskTest {
System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
shellCommandExecutor.setResultString("shellReturn");
shellCommandExecutor.setTaskResultString("shellReturn");
taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("kris test");
@ -123,4 +123,5 @@ public class ShellTaskTest {
String r = "return";
shellTask.setResult(r);
}
}

18
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -166,10 +166,10 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
* @param logger logger
* @param host host
* @param logger logger
* @param host host
* @param validThreadNum validThreadNum
* @param command found command
* @param command found command
* @return process instance
*/
@Transactional(rollbackFor = Exception.class)
@ -209,7 +209,7 @@ public class ProcessService {
/**
* set process waiting thread
*
* @param command command
* @param command command
* @param processInstance processInstance
* @return process instance
*/
@ -227,7 +227,7 @@ public class ProcessService {
/**
* check thread num
*
* @param command command
* @param command command
* @param validThreadNum validThreadNum
* @return if thread is enough
*/
@ -1572,8 +1572,8 @@ public class ProcessService {
return;
}
ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
List<Property> params4Process = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Map<String, Property> allParamMap = params4Process.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
List<Property> params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
Map<String, Property> allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
for (Property info : allParam) {
@ -1594,8 +1594,8 @@ public class ProcessService {
taskNode.setParams(JSONUtils.toJsonString(taskParams));
// task instance node json
taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
String params4ProcessString = JSONUtils.toJsonString(params4Process);
int updateCount = this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId());
String params4ProcessString = JSONUtils.toJsonString(params4Property);
int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
}

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -460,7 +460,7 @@ public class ProcessServiceTest {
String params4ProcessString = "[{\"prop\":\"sql2\",\"direct\":null,\"type\":null,\"value\":\"\"},{\"prop\":\"out\",\"direct\":null,\"type\":null,\"value\":\"\"},"
+ "{\"prop\":\"d\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20210203\"}]";
Mockito.when(processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Mockito.when(this.processInstanceMapper.updateGlobalParamById(params4ProcessString, processInstance.getId())).thenReturn(1);
Mockito.when(this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId())).thenReturn(1);
processService.changeOutParam(result,taskInstance);
}

2
pom.xml

@ -923,6 +923,8 @@
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include>
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
<include>**/server/worker/task/ShellTaskReturnTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>

Loading…
Cancel
Save