diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java new file mode 100644 index 0000000000..abea2d95b0 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task; + +import java.util.Map; + +public class TaskParams { + + private String rawScript; + private Map[] localParams; + + public void setRawScript(String rawScript) { + this.rawScript = rawScript; + } + + public void setLocalParams(Map[] localParams) { + this.localParams = localParams; + } + + public String getRawScript() { + return rawScript; + } + + public void setLocalParamValue(String prop, Object value) { + if (localParams == null || value == null) { + return; + } + for (int i = 0; i < localParams.length; i++) { + if (localParams[i].get("prop").equals(prop)) { + localParams[i].put("value", (String)value); + } + } + } + + public void setLocalParamValue(Map propToValue) { + if (localParams == null || propToValue == null) { + return; + } + for (int i = 0; i < localParams.length; i++) { + String prop = localParams[i].get("prop"); + if (propToValue.containsKey(prop)) { + localParams[i].put("value",(String)propToValue.get(prop)); + } + } + } + + public String getLocalParamValue(String prop) { + if (localParams == null) { + return null; + } + for (int i = 0; i < localParams.length; i++) { + String tmpProp = localParams[i].get("prop"); + if (tmpProp.equals(prop)) { + return localParams[i].get("value"); + } + } + return null; + } + + public Map[] getLocalParams() { + return localParams; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java new file mode 100644 index 0000000000..837e96f55f --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.TaskParams; + +import java.text.ParseException; +import java.util.Map; + +public class VarPoolUtils { + /** + * getTaskNodeLocalParam + * @param taskNode taskNode + * @param prop prop + * @return localParamForProp + */ + public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return null; + } + return taskParams.getLocalParamValue(prop); + } + + /** + * setTaskNodeLocalParams + * @param taskNode taskNode + * @param prop LocalParamName + * @param value LocalParamValue + */ + public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return; + } + taskParams.setLocalParamValue(prop, value); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + } + + /** + * setTaskNodeLocalParams + * @param taskNode taskNode + * @param propToValue propToValue + */ + public static void setTaskNodeLocalParams(TaskNode taskNode, Map propToValue) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return; + } + taskParams.setLocalParamValue(propToValue); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + } + + /** + * convertVarPoolToMap + * @param propToValue propToValue + * @param varPool varPool + * @throws ParseException ParseException + */ + public static void convertVarPoolToMap(Map propToValue, String varPool) throws ParseException { + if (varPool == null || propToValue == null) { + return; + } + String[] splits = varPool.split("\\$VarPool\\$"); + for (String kv : splits) { + String[] kvs = kv.split(","); + if (kvs.length == 2) { + propToValue.put(kvs[0], kvs[1]); + } else { + throw new ParseException(kv, 2); + } + } + } + + /** + * convertPythonScriptPlaceholders + * @param rawScript rawScript + * @return String + * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException + */ + public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { + int len = "${setShareVar(${".length(); + int scriptStart = 0; + while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) { + int start = -1; + int end = rawScript.indexOf('}', scriptStart + len); + String prop = rawScript.substring(scriptStart + len, end); + + start = rawScript.indexOf(',', end); + end = rawScript.indexOf(')', start); + + String value = rawScript.substring(start + 1, end); + + start = rawScript.indexOf('}', start) + 1; + end = rawScript.length(); + + String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value); + + rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end); + + scriptStart += replaceScript.length(); + } + return rawScript; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java new file mode 100644 index 0000000000..e47203c225 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.model.TaskNode; + +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VarPoolUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); + + @Test + public void testSetTaskNodeLocalParams() { + String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}]," + + "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}"; + TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + + VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1"); + Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1"); + + ConcurrentHashMap propToValue = new ConcurrentHashMap(); + propToValue.put("p1", "test2"); + + VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue); + Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2"); + } + + @Test + public void testConvertVarPoolToMap() throws Exception { + String varPool = "p1,66$VarPool$p2,69$VarPool$"; + ConcurrentHashMap propToValue = new ConcurrentHashMap(); + VarPoolUtils.convertVarPoolToMap(propToValue, varPool); + Assert.assertEquals((String)propToValue.get("p1"), "66"); + Assert.assertEquals((String)propToValue.get("p2"), "69"); + logger.info(propToValue.toString()); + } + + @Test + public void testConvertPythonScriptPlaceholders() throws Exception { + String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; + rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); + Assert.assertEquals(rawScript, "print(${p1});\n" + + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" + + "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); + logger.info(rawScript); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 3d1a756d25..e3a3f11386 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -224,6 +224,11 @@ public class ProcessInstance { */ private int tenantId; + /** + * varPool string + */ + private String varPool; + /** * receivers for api */ @@ -256,6 +261,14 @@ public class ProcessInstance { DateUtils.getCurrentTimeStamp(); } + public String getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public ProcessDefinition getProcessDefinition() { return processDefinition; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 9688200b2c..b13ca87e38 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -211,6 +211,11 @@ public class TaskInstance implements Serializable { */ private int executorId; + /** + * varPool string + */ + private String varPool; + /** * executor name */ @@ -232,7 +237,14 @@ public class TaskInstance implements Serializable { this.executePath = executePath; } + public String getVarPool() { + return varPool; + } + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public ProcessInstance getProcessInstance() { return processInstance; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index e559334f48..7f6ee668a8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -63,7 +63,19 @@ public class TaskExecuteResponseCommand implements Serializable { */ private String appIds; + /** + * varPool string + */ + private String varPool; + public void setVarPool(String varPool) { + this.varPool = varPool; + } + + public String getVarPool() { + return varPool; + } + public int getTaskInstanceId() { return taskInstanceId; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index b04b930fd4..2633ccd634 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -90,7 +90,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId()); + responseCommand.getTaskInstanceId(), + responseCommand.getVarPool()); taskResponseService.addResponse(taskResponseEvent); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 051cc388bf..ba07be50f3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -79,7 +79,12 @@ public class TaskResponseEvent { */ private Event event; - public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){ + /** + * varPool + */ + private String varPool; + + public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setStartTime(startTime); @@ -91,7 +96,7 @@ public class TaskResponseEvent { return event; } - public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){ + public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId, String varPool) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -99,9 +104,18 @@ public class TaskResponseEvent { event.setAppIds(appIds); event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.RESULT); + event.setVarPool(varPool); return event; } + public String getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public int getTaskInstanceId() { return taskInstanceId; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index ba07313a9a..6434db70e5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -139,7 +139,8 @@ public class TaskResponseService { taskResponseEvent.getEndTime(), taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId()); + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool()); break; default: throw new IllegalArgumentException("invalid event type : " + event); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 788b30638e..3c28e16651 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.VarPoolUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable { * submit post node * @param parentNodeName parent node name */ + private Map propToValue = new ConcurrentHashMap(); private void submitPostNode(String parentNodeName){ List submitTaskNodeList = parsePostNodeList(parentNodeName); List taskInstances = new ArrayList<>(); for(String taskNode : submitTaskNodeList){ + try { + VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool()); + } catch (ParseException e) { + logger.error("parse {} exception", processInstance.getVarPool(), e); + throw new RuntimeException(); + } + TaskNode taskNodeObject = dag.getNode(taskNode); + VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue); taskInstances.add(createTaskInstance(processInstance, taskNode, - dag.getNode(taskNode))); + taskNodeObject)); } // if previous node success , post node submit @@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable { task.getName(), task.getId(), task.getState()); // node success , post node submit if(task.getState() == ExecutionStatus.SUCCESS){ + processInstance.setVarPool(task.getVarPool()); + processService.updateProcessInstance(processInstance); completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); continue; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 3ba49451c7..58f743303c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setEndTime(new Date()); responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); + responseCommand.setVarPool(task.getVarPool()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 3dedeced06..dddd1a64b7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor { */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); + protected StringBuilder varPool = new StringBuilder(); /** * process */ @@ -234,7 +235,10 @@ public abstract class AbstractCommandExecutor { return result; } - + public String getVarPool() { + return varPool.toString(); + } + /** * cancel application * @throws Exception exception @@ -347,8 +351,13 @@ public abstract class AbstractCommandExecutor { long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { - logBuffer.add(line); - lastFlushTime = flush(lastFlushTime); + if (line.startsWith("${setValue(")) { + varPool.append(line.substring("${setValue(".length(), line.length() - 2)); + varPool.append("$VarPool$"); + } else { + logBuffer.add(line); + lastFlushTime = flush(lastFlushTime); + } } } catch (Exception e) { logger.error(e.getMessage(),e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index ae03932a52..1a66349817 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -47,6 +47,11 @@ import java.util.Map; */ public abstract class AbstractTask { + /** + * varPool string + */ + protected String varPool; + /** * taskExecutionContext **/ @@ -121,6 +126,14 @@ public abstract class AbstractTask { logger.info(" -> {}", String.join("\n\t", logs)); } + public void setVarPool(String varPool) { + this.varPool = varPool; + } + + public String getVarPool() { + return varPool; + } + /** * get exit status code * @return exit status code diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 367da80a0c..6e561c1cab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -38,103 +38,110 @@ import java.util.Map; */ public class PythonTask extends AbstractTask { - /** - * python parameters - */ - private PythonParameters pythonParameters; - - /** - * task dir - */ - private String taskDir; - - /** - * python command executor - */ - private PythonCommandExecutor pythonCommandExecutor; - - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; - - /** - * constructor - * @param taskExecutionContext taskExecutionContext - * @param logger logger - */ - public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - - this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, - taskExecutionContext, - logger); - } - - @Override - public void init() { - logger.info("python task params {}", taskExecutionContext.getTaskParams()); - - pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class); - - if (!pythonParameters.checkParameters()) { - throw new RuntimeException("python task params is not valid"); + /** + * python parameters + */ + private PythonParameters pythonParameters; + + /** + * task dir + */ + private String taskDir; + + /** + * python command executor + */ + private PythonCommandExecutor pythonCommandExecutor; + + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + + /** + * constructor + * @param taskExecutionContext taskExecutionContext + * @param logger logger + */ + public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.taskExecutionContext = taskExecutionContext; + + this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, + taskExecutionContext, + logger); } - } - @Override - public void handle() throws Exception { - try { - // construct process - CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand()); + @Override + public void init() { + logger.info("python task params {}", taskExecutionContext.getTaskParams()); - setExitStatusCode(commandExecuteResult.getExitStatusCode()); - setAppIds(commandExecuteResult.getAppIds()); - setProcessId(commandExecuteResult.getProcessId()); - } - catch (Exception e) { - logger.error("python task failure", e); - setExitStatusCode(Constants.EXIT_CODE_FAILURE); - throw e; - } - } - - @Override - public void cancelApplication(boolean cancelApplication) throws Exception { - // cancel process - pythonCommandExecutor.cancelApplication(); - } - - /** - * build command - * @return raw python script - * @throws Exception exception - */ - private String buildCommand() throws Exception { - String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); - - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - pythonParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - if (paramsMap != null){ - rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); - } + pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class); - logger.info("raw python script : {}", pythonParameters.getRawScript()); - logger.info("task dir : {}", taskDir); - - return rawPythonScript; - } + if (!pythonParameters.checkParameters()) { + throw new RuntimeException("python task params is not valid"); + } + } - @Override - public AbstractParameters getParameters() { - return pythonParameters; - } + @Override + public void handle() throws Exception { + try { + // construct process + CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand()); + + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); + setVarPool(pythonCommandExecutor.getVarPool()); + } + catch (Exception e) { + logger.error("python task failure", e); + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + throw e; + } + } + @Override + public void cancelApplication(boolean cancelApplication) throws Exception { + // cancel process + pythonCommandExecutor.cancelApplication(); + } + /** + * build command + * @return raw python script + * @throws Exception exception + */ + private String buildCommand() throws Exception { + String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); + + // replace placeholder + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + pythonParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + + try { + rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); + } + catch (StringIndexOutOfBoundsException e) { + logger.error("setShareVar field format error, raw python script : {}", rawPythonScript); + } + + if (paramsMap != null) { + rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); + } + + logger.info("raw python script : {}", pythonParameters.getRawScript()); + logger.info("task dir : {}", taskDir); + + return rawPythonScript; + } + @Override + public AbstractParameters getParameters() { + return pythonParameters; + } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6f642672cd..7344cf13e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1464,17 +1464,20 @@ public class ProcessService { * @param state state * @param endTime endTime * @param taskInstId taskInstId + * @param varPool varPool */ public void changeTaskState(ExecutionStatus state, Date endTime, int processId, String appIds, - int taskInstId) { + int taskInstId, + String varPool) { TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); + taskInstance.setVarPool(varPool); saveTaskInstance(taskInstance); } diff --git a/pom.xml b/pom.xml index c8cb5a9dea..e895b01d89 100644 --- a/pom.xml +++ b/pom.xml @@ -785,6 +785,7 @@ **/common/utils/StringTest.java **/common/utils/StringUtilsTest.java **/common/utils/TaskParametersUtilsTest.java + **/common/utils/VarPoolUtilsTest.java **/common/utils/HadoopUtilsTest.java **/common/utils/HttpUtilsTest.java **/common/utils/KerberosHttpClientTest.java diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index 5ae37e1be8..e2f5ebd91f 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance ( worker_group varchar(64) , timeout int DEFAULT '0' , tenant_id int NOT NULL DEFAULT '-1' , + var_pool text , PRIMARY KEY (id) ) ; create index process_instance_index on t_ds_process_instance (process_definition_id,id); @@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance ( executor_id int DEFAULT NULL , first_submit_time timestamp DEFAULT NULL , delay_time int DEFAULT '0' , + var_pool text , PRIMARY KEY (id) ) ; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 61e697568a..9039a19084 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` ( `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', + `var_pool` longtext COMMENT 'var_pool', PRIMARY KEY (`id`), KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE, KEY `start_time_index` (`start_time`) USING BTREE @@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` ( `executor_id` int(11) DEFAULT NULL, `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', + `var_pool` longtext COMMENT 'var_pool', PRIMARY KEY (`id`), KEY `process_instance_id` (`process_instance_id`) USING BTREE, KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE, diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql index 43488272e2..ae66da914f 100644 --- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -56,6 +56,46 @@ delimiter ; CALL uc_dolphin_T_t_ds_task_instance_A_delay_time(); DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time; +-- uc_dolphin_T_t_ds_task_instance_A_var_pool +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_task_instance_A_var_pool(); +DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool; + +-- uc_dolphin_T_t_ds_process_instance_A_var_pool +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_process_instance_A_var_pool(); +DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool; + -- uc_dolphin_T_t_ds_process_definition_A_modify_by drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version; delimiter d// diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index e2767617df..3351cac88c 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -51,6 +51,42 @@ delimiter ; SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time(); +-- uc_dolphin_T_t_ds_process_instance_A_var_pool +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool(); + +-- uc_dolphin_T_t_ds_task_instance_A_var_pool +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool(); + -- uc_dolphin_T_t_ds_process_definition_A_modify_by delimiter d// CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$