Browse Source

[Feature-#130] pass global param values when starting new process instance (#4372)

* [DS-130][feat] pass global param values when starting new process instance
    add optional param for start-process-instance api
    reuse command_param in command table for persistence
    overload curingGlobalParams function in ParameterUtils
    not adapt the UI code yet

* change import order

* support datetime expression

* print start params

* (fix) avoid npe when cmdParam is null

Change-Id: I3b4c4b5fa1df316ff221e27146e45d7d4d3a404e
pull/3/MERGE
Dean Wong 4 years ago committed by GitHub
parent
commit
83d53505de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  2. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  3. 16
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java
  4. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  5. 20
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
  6. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  7. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*; import io.swagger.annotations.*;
import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.enums.*;
@ -107,21 +108,26 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup, @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
@RequestParam(value = "timeout", required = false) Integer timeout) throws ParseException { @RequestParam(value = "timeout", required = false) Integer timeout,
@RequestParam(value = "startParams", required = false) String startParams) throws ParseException {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}", + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroup: {}, timeout: {}, "
+ "startParams: {}",
loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, loginUser.getUserName(), projectName, processDefinitionId, scheduleTime,
failureStrategy, startNodeList, taskDependType, warningType, workerGroup, receivers, receiversCc, runMode, processInstancePriority, failureStrategy, startNodeList, taskDependType, warningType, workerGroup, receivers, receiversCc, runMode, processInstancePriority,
workerGroup, timeout); workerGroup, timeout, startParams);
if (timeout == null) { if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT; timeout = Constants.MAX_TASK_TIMEOUT;
} }
Map<String, String> startParamMap = null;
if (startParams != null) {
startParamMap = JSONUtils.toMap(startParams);
}
Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy, Map<String, Object> result = execService.execProcessInstance(loginUser, projectName, processDefinitionId, scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, startNodeList, taskDependType, warningType,
warningGroupId, receivers, receiversCc, runMode, processInstancePriority, workerGroup, timeout); warningGroupId, receivers, receiversCc, runMode, processInstancePriority, workerGroup, timeout, startParamMap);
return returnDataList(result); return returnDataList(result);
} }

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT; import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.ExecuteType;
@ -112,6 +113,7 @@ public class ExecutorService extends BaseService {
* @param workerGroup worker group name * @param workerGroup worker group name
* @param runMode run mode * @param runMode run mode
* @param timeout timeout * @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @return execute process instance code * @return execute process instance code
* @throws ParseException Parse Exception * @throws ParseException Parse Exception
*/ */
@ -120,7 +122,8 @@ public class ExecutorService extends BaseService {
FailureStrategy failureStrategy, String startNodeList, FailureStrategy failureStrategy, String startNodeList,
TaskDependType taskDependType, WarningType warningType, int warningGroupId, TaskDependType taskDependType, WarningType warningType, int warningGroupId,
String receivers, String receiversCc, RunMode runMode, String receivers, String receiversCc, RunMode runMode,
Priority processInstancePriority, String workerGroup, Integer timeout) throws ParseException { Priority processInstancePriority, String workerGroup, Integer timeout,
Map<String, String> startParams) throws ParseException {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
// timeout is invalid // timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) { if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
@ -157,7 +160,7 @@ public class ExecutorService extends BaseService {
*/ */
int create = this.createCommand(commandType, processDefinitionId, int create = this.createCommand(commandType, processDefinitionId,
taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(), taskDependType, failureStrategy, startNodeList, cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority, workerGroup); warningGroupId, runMode, processInstancePriority, workerGroup, startParams);
if (create > 0) { if (create > 0) {
/** /**
* according to the process definition ID updateProcessInstance and CC recipient * according to the process definition ID updateProcessInstance and CC recipient
@ -502,7 +505,8 @@ public class ExecutorService extends BaseService {
TaskDependType nodeDep, FailureStrategy failureStrategy, TaskDependType nodeDep, FailureStrategy failureStrategy,
String startNodeList, String schedule, WarningType warningType, String startNodeList, String schedule, WarningType warningType,
int executorId, int warningGroupId, int executorId, int warningGroupId,
RunMode runMode, Priority processInstancePriority, String workerGroup) throws ParseException { RunMode runMode, Priority processInstancePriority, String workerGroup,
Map<String, String> startParams) throws ParseException {
/** /**
* instantiate command schedule instance * instantiate command schedule instance
@ -529,6 +533,9 @@ public class ExecutorService extends BaseService {
if (warningType != null) { if (warningType != null) {
command.setWarningType(warningType); command.setWarningType(warningType);
} }
if (startParams != null && startParams.size() > 0) {
cmdParam.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
}
command.setCommandParam(JSONUtils.toJsonString(cmdParam)); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setExecutorId(executorId); command.setExecutorId(executorId);
command.setWarningGroupId(warningGroupId); command.setWarningGroupId(warningGroupId);

16
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorService2Test.java

@ -149,7 +149,7 @@ public class ExecutorService2Test {
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL, "", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(processService, times(1)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -169,7 +169,7 @@ public class ExecutorService2Test {
null, "n1,n2", null, "n1,n2",
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL, "", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(processService, times(1)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -190,7 +190,7 @@ public class ExecutorService2Test {
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL, "", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class)); verify(processService, times(0)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -210,7 +210,7 @@ public class ExecutorService2Test {
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_SERIAL, "", "", RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class)); verify(processService, times(1)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -230,7 +230,7 @@ public class ExecutorService2Test {
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL, "", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class)); verify(processService, times(31)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -250,7 +250,7 @@ public class ExecutorService2Test {
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL, "", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class)); verify(processService, times(15)).createCommand(any(Command.class));
} catch (Exception e) { } catch (Exception e) {
@ -260,14 +260,14 @@ public class ExecutorService2Test {
@Test @Test
public void testNoMsterServers() throws ParseException { public void testNoMsterServers() throws ParseException {
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<Server>()); Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(new ArrayList<>());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName, Map<String, Object> result = executorService.execProcessInstance(loginUser, projectName,
processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA, processDefinitionId, cronTime, CommandType.COMPLEMENT_DATA,
null, null, null, null,
null, null, 0, null, null, 0,
"", "", RunMode.RUN_MODE_PARALLEL, "", "", RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110); Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 110, null);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
} }

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -458,6 +458,8 @@ public final class Constants {
public static final String CMD_PARAM_START_NODE_NAMES = "StartNodeNameList"; public static final String CMD_PARAM_START_NODE_NAMES = "StartNodeNameList";
public static final String CMD_PARAM_START_PARAMS = "StartParams";
/** /**
* complement data start date * complement data start date
*/ */

20
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java

@ -132,6 +132,26 @@ public class ParameterUtilsTest {
String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList)); Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
globalParamList.add(testStartParamProperty);
Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
globalParamList.add(testStartParam2Property);
globalParamMap.put("testStartParam", "");
globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
Map<String, String> startParamMap = new HashMap<>(2);
startParamMap.put("testStartParam", "$[yyyyMMdd]");
for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime);
Assert.assertTrue(result6.contains("20191220"));
} }
/** /**

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -558,6 +558,25 @@ public class ProcessService {
processInstance.setCommandStartTime(command.getStartTime()); processInstance.setCommandStartTime(command.getStartTime());
processInstance.setLocations(processDefinition.getLocations()); processInstance.setLocations(processDefinition.getLocations());
processInstance.setConnects(processDefinition.getConnects()); processInstance.setConnects(processDefinition.getConnects());
// get start params from command param
Map<String, String> startParamMap = null;
if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_START_PARAMS)) {
String startParamJson = cmdParam.get(Constants.CMD_PARAM_START_PARAMS);
startParamMap = JSONUtils.toMap(startParamJson);
}
// set start param into global params
if (startParamMap != null && startParamMap.size() > 0
&& processDefinition.getGlobalParamMap() != null) {
for (Map.Entry<String, String> param : processDefinition.getGlobalParamMap().entrySet()) {
String val = startParamMap.get(param.getKey());
if (val != null) {
param.setValue(val);
}
}
}
// curing global params // curing global params
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processDefinition.getGlobalParamMap(), processDefinition.getGlobalParamMap(),

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -234,13 +235,14 @@ public class ProcessServiceTest {
processDefinition.setId(123); processDefinition.setId(123);
processDefinition.setName("test"); processDefinition.setName("test");
processDefinition.setVersion(1); processDefinition.setVersion(1);
processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" processDefinition.setProcessDefinitionJson("{\"globalParams\":[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"tasks\":[{\"conditionResult\":"
+ "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}"
+ ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\""
+ ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}"
+ ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\""
+ ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\""
+ ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}");
processDefinition.setGlobalParams("[{\"prop\":\"startParam1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
ProcessInstance processInstance = new ProcessInstance(); ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(222); processInstance.setId(222);
Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition); Mockito.when(processDefineMapper.selectById(command1.getProcessDefinitionId())).thenReturn(processDefinition);
@ -265,6 +267,17 @@ public class ProcessServiceTest {
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}"); command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setCommandType(CommandType.REPEAT_RUNNING);
Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4)); Assert.assertNotNull(processService.handleCommand(logger, host, validThreadNum, command4));
Command command5 = new Command();
command5.setProcessDefinitionId(123);
HashMap<String, String> startParams = new HashMap<>();
startParams.put("startParam1", "testStartParam1");
HashMap<String, String> commandParams = new HashMap<>();
commandParams.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams));
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, validThreadNum, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
} }
@Test @Test

Loading…
Cancel
Save