Browse Source

Merge pull request #3957 from BoYiZhang/pr_3956

[Fix-#3956][Master] When running a task, the resource file is lost
pull/3/MERGE
Kirs 4 years ago committed by GitHub
parent
commit
05b248f8e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 78
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
  2. 54
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  3. 59
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

78
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import java.util.Map;
public class TaskParams {
private String rawScript;
private Map<String, String>[] localParams;
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public void setLocalParams(Map<String, String>[] localParams) {
this.localParams = localParams;
}
public String getRawScript() {
return rawScript;
}
public void setLocalParamValue(String prop, Object value) {
if (localParams == null || value == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
if (localParams[i].get("prop").equals(prop)) {
localParams[i].put("value", (String)value);
}
}
}
public void setLocalParamValue(Map<String, Object> propToValue) {
if (localParams == null || propToValue == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
String prop = localParams[i].get("prop");
if (propToValue.containsKey(prop)) {
localParams[i].put("value",(String)propToValue.get(prop));
}
}
}
public String getLocalParamValue(String prop) {
if (localParams == null) {
return null;
}
for (int i = 0; i < localParams.length; i++) {
String tmpProp = localParams[i].get("prop");
if (tmpProp.equals(prop)) {
return localParams[i].get("value");
}
}
return null;
}
public Map<String, String>[] getLocalParams() {
return localParams;
}
}

54
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -18,42 +18,19 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskParams;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class VarPoolUtils { public class VarPoolUtils {
/**
* getTaskNodeLocalParam
* @param taskNode taskNode
* @param prop prop
* @return localParamForProp
*/
public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return null;
}
return taskParams.getLocalParamValue(prop);
}
/** private static final String LOCALPARAMS = "localParams";
* setTaskNodeLocalParams
* @param taskNode taskNode private static final String PROP = "prop";
* @param prop LocalParamName
* @param value LocalParamValue private static final String VALUE = "value";
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(prop, value);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/** /**
* setTaskNodeLocalParams * setTaskNodeLocalParams
@ -62,11 +39,20 @@ public class VarPoolUtils {
*/ */
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) { public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams(); String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); Map<String,Object> taskParams = JSONUtils.parseObject(taskParamsJson, HashMap.class);
if (taskParams == null) {
return; Object localParamsObject = taskParams.get(LOCALPARAMS);
if (null != localParamsObject && null != propToValue && propToValue.size() > 0) {
ArrayList<Object> localParams = (ArrayList)localParamsObject;
for (int i = 0; i < localParams.size(); i++) {
Map<String,String> map = (Map)localParams.get(i);
String prop = map.get(PROP);
if (StringUtils.isNotEmpty(prop) && propToValue.containsKey(prop)) {
map.put(VALUE,(String)propToValue.get(prop));
}
}
taskParams.put(LOCALPARAMS,localParams);
} }
taskParams.setLocalParamValue(propToValue);
taskNode.setParams(JSONUtils.toJsonString(taskParams)); taskNode.setParams(JSONUtils.toJsonString(taskParams));
} }

59
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert; import org.junit.Assert;
@ -30,27 +32,6 @@ public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test
public void testSetTaskNodeLocalParams() {
String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+ "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1");
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
propToValue.put("p1", "test2");
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2");
}
@Test @Test
public void testConvertVarPoolToMap() throws Exception { public void testConvertVarPoolToMap() throws Exception {
String varPool = "p1,66$VarPool$p2,69$VarPool$"; String varPool = "p1,66$VarPool$p2,69$VarPool$";
@ -70,4 +51,40 @@ public class VarPoolUtilsTest {
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); + "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript); logger.info(rawScript);
} }
@Test
public void testSetTaskNodeLocalParams() throws Exception {
String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
String changeTaskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\","
+ "\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\""
+ "params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\""
+ "localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k1-value-change\"},"
+ "{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k2-value-change\"},"
+ "{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\""
+ "resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},"
+ "{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},"
+ "{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\""
+ "dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\""
+ "workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
Map<String, Object> propToValue = new HashMap<String, Object>();
propToValue.put("k1","k1-value-change");
propToValue.put("k2","k2-value-change");
TaskNode taskNode = JSONUtils.parseObject(taskJson,TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode,propToValue);
Assert.assertEquals(changeTaskJson,JSONUtils.toJsonString(taskNode));
}
} }

Loading…
Cancel
Save