Browse Source

[Feature]Add Python task "task variable / result transfer" implementation (#3659)

* 增加Python Task的“任务变量/结果传递”实现
Signed-off-by: 古崟佑

* add two files license
Signed-off-by: 古崟佑

* fix 'server/worker/task/AbstractCommandExecutor.java' code style
Signed-off-by: 1941815847Cy4 <1941815847cy4@kuaishou.com>

* update DB
Signed-off-by: 古崟佑

* update DB -- 2
Signed-off-by: 古崟佑

* fix codeStyle
Signed-off-by: 古崟佑

* fix codestyle
Signed-off-by: 古崟佑

* fix codeStyle
Signed-off-by: 古崟佑

* fix codeStyle
Signed-off-by: 古崟佑

* fix codeStyle
Signed-off-by: 古崟佑

* add VarPoolUtils Test
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest codeStyle
Signed-off-by: 古崟佑

* add test config for VarPoolUtilsTest
Signed-off-by: 古崟佑

* fix unit test
Signed-off-by: 古崟佑

* fix codeStyle
Signed-off-by: 古崟佑

* fix VarPoolUtilsTest.java
Signed-off-by: 古崟佑

* fix
Signed-off-by: 古崟佑

* change the test class path
Signed-off-by: 古崟佑

* fix
Signed-off-by: 古崟佑

* fix "print the error message"
Signed-off-by: 古崟佑

* fix bug
Signed-off-by: 古崟佑

* fix
Signed-off-by: 古崟佑

Co-authored-by: 1941815847Cy4 <1941815847cy4@kuaishou.com>
data_quality_design
guyinyou 4 years ago committed by GitHub
parent
commit
3b581455fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 78
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
  2. 124
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  3. 73
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
  4. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  5. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  6. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  7. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  8. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  9. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  10. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  11. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  12. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  13. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  14. 189
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  15. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  16. 1
      pom.xml
  17. 2
      sql/dolphinscheduler-postgre.sql
  18. 2
      sql/dolphinscheduler_mysql.sql
  19. 40
      sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
  20. 36
      sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

78
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import java.util.Map;
public class TaskParams {
private String rawScript;
private Map<String, String>[] localParams;
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public void setLocalParams(Map<String, String>[] localParams) {
this.localParams = localParams;
}
public String getRawScript() {
return rawScript;
}
public void setLocalParamValue(String prop, Object value) {
if (localParams == null || value == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
if (localParams[i].get("prop").equals(prop)) {
localParams[i].put("value", (String)value);
}
}
}
public void setLocalParamValue(Map<String, Object> propToValue) {
if (localParams == null || propToValue == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
String prop = localParams[i].get("prop");
if (propToValue.containsKey(prop)) {
localParams[i].put("value",(String)propToValue.get(prop));
}
}
}
public String getLocalParamValue(String prop) {
if (localParams == null) {
return null;
}
for (int i = 0; i < localParams.length; i++) {
String tmpProp = localParams[i].get("prop");
if (tmpProp.equals(prop)) {
return localParams[i].get("value");
}
}
return null;
}
public Map<String, String>[] getLocalParams() {
return localParams;
}
}

124
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskParams;
import java.text.ParseException;
import java.util.Map;
public class VarPoolUtils {
/**
* getTaskNodeLocalParam
* @param taskNode taskNode
* @param prop prop
* @return localParamForProp
*/
public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return null;
}
return taskParams.getLocalParamValue(prop);
}
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param prop LocalParamName
* @param value LocalParamValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(prop, value);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param propToValue propToValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(propToValue);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* convertVarPoolToMap
* @param propToValue propToValue
* @param varPool varPool
* @throws ParseException ParseException
*/
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (varPool == null || propToValue == null) {
return;
}
String[] splits = varPool.split("\\$VarPool\\$");
for (String kv : splits) {
String[] kvs = kv.split(",");
if (kvs.length == 2) {
propToValue.put(kvs[0], kvs[1]);
} else {
throw new ParseException(kv, 2);
}
}
}
/**
* convertPythonScriptPlaceholders
* @param rawScript rawScript
* @return String
* @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
*/
public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
int len = "${setShareVar(${".length();
int scriptStart = 0;
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) {
int start = -1;
int end = rawScript.indexOf('}', scriptStart + len);
String prop = rawScript.substring(scriptStart + len, end);
start = rawScript.indexOf(',', end);
end = rawScript.indexOf(')', start);
String value = rawScript.substring(start + 1, end);
start = rawScript.indexOf('}', start) + 1;
end = rawScript.length();
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end);
scriptStart += replaceScript.length();
}
return rawScript;
}
}

73
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test
public void testSetTaskNodeLocalParams() {
String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+ "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1");
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
propToValue.put("p1", "test2");
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2");
}
@Test
public void testConvertVarPoolToMap() throws Exception {
String varPool = "p1,66$VarPool$p2,69$VarPool$";
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
Assert.assertEquals((String)propToValue.get("p1"), "66");
Assert.assertEquals((String)propToValue.get("p2"), "69");
logger.info(propToValue.toString());
}
@Test
public void testConvertPythonScriptPlaceholders() throws Exception {
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
Assert.assertEquals(rawScript, "print(${p1});\n"
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript);
}
}

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -224,6 +224,11 @@ public class ProcessInstance {
*/
private int tenantId;
/**
* varPool string
*/
private String varPool;
/**
* receivers for api
*/
@ -256,6 +261,14 @@ public class ProcessInstance {
DateUtils.getCurrentTimeStamp();
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public ProcessDefinition getProcessDefinition() {
return processDefinition;
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -211,6 +211,11 @@ public class TaskInstance implements Serializable {
*/
private int executorId;
/**
* varPool string
*/
private String varPool;
/**
* executor name
*/
@ -232,7 +237,14 @@ public class TaskInstance implements Serializable {
this.executePath = executePath;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public ProcessInstance getProcessInstance() {
return processInstance;
}

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -63,7 +63,19 @@ public class TaskExecuteResponseCommand implements Serializable {
*/
private String appIds;
/**
* varPool string
*/
private String varPool;
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -90,7 +90,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool());
taskResponseService.addResponse(taskResponseEvent);

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -79,7 +79,12 @@ public class TaskResponseEvent {
*/
private Event event;
public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){
/**
* varPool
*/
private String varPool;
public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setStartTime(startTime);
@ -91,7 +96,7 @@ public class TaskResponseEvent {
return event;
}
public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){
public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId, String varPool) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
@ -99,9 +104,18 @@ public class TaskResponseEvent {
event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
return event;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -139,7 +139,8 @@ public class TaskResponseService {
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId());
taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool());
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable {
* submit post node
* @param parentNodeName parent node name
*/
private Map<String,Object> propToValue = new ConcurrentHashMap<String, Object>();
private void submitPostNode(String parentNodeName){
List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){
try {
VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
} catch (ParseException e) {
logger.error("parse {} exception", processInstance.getVarPool(), e);
throw new RuntimeException();
}
TaskNode taskNodeObject = dag.getNode(taskNode);
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode)));
taskNodeObject));
}
// if previous node success , post node submit
@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState());
// node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task);
submitPostNode(task.getName());
continue;

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
responseCommand.setVarPool(task.getVarPool());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) {
logger.error("task scheduler failure", e);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor {
*/
protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX);
protected StringBuilder varPool = new StringBuilder();
/**
* process
*/
@ -234,7 +235,10 @@ public abstract class AbstractCommandExecutor {
return result;
}
public String getVarPool() {
return varPool.toString();
}
/**
* cancel application
* @throws Exception exception
@ -347,8 +351,13 @@ public abstract class AbstractCommandExecutor {
long lastFlushTime = System.currentTimeMillis();
while ((line = inReader.readLine()) != null) {
logBuffer.add(line);
lastFlushTime = flush(lastFlushTime);
if (line.startsWith("${setValue(")) {
varPool.append(line.substring("${setValue(".length(), line.length() - 2));
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
lastFlushTime = flush(lastFlushTime);
}
}
} catch (Exception e) {
logger.error(e.getMessage(),e);

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -47,6 +47,11 @@ import java.util.Map;
*/
public abstract class AbstractTask {
/**
* varPool string
*/
protected String varPool;
/**
* taskExecutionContext
**/
@ -121,6 +126,14 @@ public abstract class AbstractTask {
logger.info(" -> {}", String.join("\n\t", logs));
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
/**
* get exit status code
* @return exit status code

189
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -38,103 +38,110 @@ import java.util.Map;
*/
public class PythonTask extends AbstractTask {
/**
* python parameters
*/
private PythonParameters pythonParameters;
/**
* task dir
*/
private String taskDir;
/**
* python command executor
*/
private PythonCommandExecutor pythonCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
@Override
public void init() {
logger.info("python task params {}", taskExecutionContext.getTaskParams());
pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class);
if (!pythonParameters.checkParameters()) {
throw new RuntimeException("python task params is not valid");
/**
* python parameters
*/
private PythonParameters pythonParameters;
/**
* task dir
*/
private String taskDir;
/**
* python command executor
*/
private PythonCommandExecutor pythonCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand());
@Override
public void init() {
logger.info("python task params {}", taskExecutionContext.getTaskParams());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
}
catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
pythonCommandExecutor.cancelApplication();
}
/**
* build command
* @return raw python script
* @throws Exception exception
*/
private String buildCommand() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null){
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class);
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
return rawPythonScript;
}
if (!pythonParameters.checkParameters()) {
throw new RuntimeException("python task params is not valid");
}
}
@Override
public AbstractParameters getParameters() {
return pythonParameters;
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
setVarPool(pythonCommandExecutor.getVarPool());
}
catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
pythonCommandExecutor.cancelApplication();
}
/**
* build command
* @return raw python script
* @throws Exception exception
*/
private String buildCommand() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
}
catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
if (paramsMap != null) {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
return rawPythonScript;
}
@Override
public AbstractParameters getParameters() {
return pythonParameters;
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1464,17 +1464,20 @@ public class ProcessService {
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
*/
public void changeTaskState(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstId) {
int taskInstId,
String varPool) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
saveTaskInstance(taskInstance);
}

1
pom.xml

@ -785,6 +785,7 @@
<include>**/common/utils/StringTest.java</include>
<include>**/common/utils/StringUtilsTest.java</include>
<include>**/common/utils/TaskParametersUtilsTest.java</include>
<include>**/common/utils/VarPoolUtilsTest.java</include>
<include>**/common/utils/HadoopUtilsTest.java</include>
<include>**/common/utils/HttpUtilsTest.java</include>
<include>**/common/utils/KerberosHttpClientTest.java</include>

2
sql/dolphinscheduler-postgre.sql

@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance (
worker_group varchar(64) ,
timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
PRIMARY KEY (id)
) ;
create index process_instance_index on t_ds_process_instance (process_definition_id,id);
@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance (
executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' ,
var_pool text ,
PRIMARY KEY (id)
) ;

2
sql/dolphinscheduler_mysql.sql

@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` (
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE
@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` (
`executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,

40
sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql

@ -56,6 +56,46 @@ delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time;
-- uc_dolphin_T_t_ds_task_instance_A_var_pool
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_var_pool();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool;
-- uc_dolphin_T_t_ds_process_instance_A_var_pool
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_var_pool();
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool;
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version;
delimiter d//

36
sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

@ -51,6 +51,42 @@ delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time();
-- uc_dolphin_T_t_ds_process_instance_A_var_pool
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool();
-- uc_dolphin_T_t_ds_task_instance_A_var_pool
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool();
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
delimiter d//
CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$

Loading…
Cancel
Save