Browse Source

When running a task, the resource file is lost, which results in an error

pull/3/MERGE
BoYiZhang 4 years ago
parent
commit
aace551782
  1. 94
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
  2. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  3. 43
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

94
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import java.util.List;
import java.util.Map;
public class TaskParams {
private String rawScript;
private Map<String, String>[] localParams;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public void setLocalParams(Map<String, String>[] localParams) {
this.localParams = localParams;
}
public String getRawScript() {
return rawScript;
}
public void setLocalParamValue(String prop, Object value) {
if (localParams == null || value == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
if (localParams[i].get("prop").equals(prop)) {
localParams[i].put("value", (String)value);
}
}
}
public void setLocalParamValue(Map<String, Object> propToValue) {
if (localParams == null || propToValue == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
String prop = localParams[i].get("prop");
if (propToValue.containsKey(prop)) {
localParams[i].put("value",(String)propToValue.get(prop));
}
}
}
public String getLocalParamValue(String prop) {
if (localParams == null) {
return null;
}
for (int i = 0; i < localParams.length; i++) {
String tmpProp = localParams[i].get("prop");
if (tmpProp.equals(prop)) {
return localParams[i].get("value");
}
}
return null;
}
public Map<String, String>[] getLocalParams() {
return localParams;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
}

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -18,42 +18,13 @@
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskParams;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class VarPoolUtils {
/**
* getTaskNodeLocalParam
* @param taskNode taskNode
* @param prop prop
* @return localParamForProp
*/
public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return null;
}
return taskParams.getLocalParamValue(prop);
}
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param prop LocalParamName
* @param value LocalParamValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(prop, value);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* setTaskNodeLocalParams
@ -62,11 +33,20 @@ public class VarPoolUtils {
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
Map<String,Object> taskParams = JSONUtils.parseObject(taskParamsJson, HashMap.class);
Object localParamsObject = taskParams.get("localParams");
if (null != localParamsObject && propToValue.size() >0) {
ArrayList<Object> localParams = (ArrayList)localParamsObject;
for (int i = 0; i < localParams.size(); i++) {
Map<String,String> map = (Map)localParams.get(i);
String prop = map.get("prop");
if (StringUtils.isNotEmpty(prop) && propToValue.containsKey(prop)) {
map.put("value",(String)propToValue.get(prop));
}
}
taskParams.put("localParams",localParams);
}
taskParams.setLocalParamValue(propToValue);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}

43
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
@ -29,28 +31,7 @@ import org.slf4j.LoggerFactory;
public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test
public void testSetTaskNodeLocalParams() {
String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+ "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1");
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
propToValue.put("p1", "test2");
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2");
}
@Test
public void testConvertVarPoolToMap() throws Exception {
String varPool = "p1,66$VarPool$p2,69$VarPool$";
@ -70,4 +51,22 @@ public class VarPoolUtilsTest {
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript);
}
@Test
public void testSetTaskNodeLocalParams() throws Exception {
String taskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v1\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v2\"},{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
String changeTaskJson = "{\"id\":\"tasks-66199\",\"name\":\"file-shell\",\"desc\":null,\"type\":\"SHELL\",\"runFlag\":\"NORMAL\",\"loc\":null,\"maxRetryTimes\":0,\"retryInterval\":1,\"params\":{\"rawScript\":\"sh n-1/n-1-1/run.sh\",\"localParams\":[{\"prop\":\"k1\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k1-value-change\"},{\"prop\":\"k2\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"k2-value-change\"},{\"prop\":\"k3\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"v3\"}],\"resourceList\":[{\"id\":\"dolphinschedule-code\",\"res\":\"n-1/n-1-1/dolphinscheduler-api-server.log\"},{\"id\":\"mr-code\",\"res\":\"n-1/n-1-1/hadoop-mapreduce-examples-2.7.4.jar\"},{\"id\":\"run\",\"res\":\"n-1/n-1-1/run.sh\"}]},\"preTasks\":[],\"extras\":null,\"depList\":[],\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"taskInstancePriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"workerGroupId\":null,\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"delayTime\":0}";
Map<String, Object> propToValue = new HashMap<String, Object>();
propToValue.put("k1","k1-value-change");
propToValue.put("k2","k2-value-change");
TaskNode taskNode = JSONUtils.parseObject(taskJson,TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode,propToValue);
Assert.assertEquals(changeTaskJson,JSONUtils.toJsonString(taskNode));
}
}

Loading…
Cancel
Save