Browse Source

[FixBug][JsonSplit] dag show bugs (#5226)

* fix: dag show bugs

* fix: code format

* fix: code format

* fix: code format

Co-authored-by: wen-hemin <wenhemin@apache.com>
pull/3/MERGE
wen-hemin 4 years ago committed by GitHub
parent
commit
cc82a98f50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  4. 79
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java
  5. 100
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java
  6. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  7. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1108,7 +1108,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check whether the process definition json is normal
for (TaskNode taskNode : taskNodes) {
if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getType())) {
if (!CheckUtils.checkTaskNodeParameters(taskNode)) {
logger.error("task node {} parameter invalid", taskNode.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
return result;

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -205,8 +205,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
ProcessData processData = processService.genProcessData(processDefinition);
processInstance.setProcessInstanceJson(JSONUtils.toJsonString(processData));
result.put(DATA_LIST, processInstance);
putMsg(result, Status.SUCCESS);
}
return result;
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -237,7 +237,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return;
}
if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getName())) {
if (!CheckUtils.checkTaskNodeParameters(taskNode)) {
logger.error("task node {} parameter invalid", taskNode.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
}

79
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/CheckUtils.java

@ -16,19 +16,20 @@
*/
package org.apache.dolphinscheduler.api.utils;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
/**
* check utils
@ -38,10 +39,12 @@ public class CheckUtils {
private CheckUtils() {
throw new IllegalStateException("CheckUtils class");
}
/**
* check username
*
* @param userName user name
* @param userName
* user name
* @return true if user name regex valid,otherwise return false
*/
public static boolean checkUserName(String userName) {
@ -51,7 +54,8 @@ public class CheckUtils {
/**
* check email
*
* @param email email
* @param email
* email
* @return true if email regex valid, otherwise return false
*/
public static boolean checkEmail(String email) {
@ -65,14 +69,16 @@ public class CheckUtils {
/**
* check project description
*
* @param desc desc
* @param desc
* desc
* @return true if description regex valid, otherwise return false
*/
public static Map<String, Object> checkDesc(String desc) {
Map<String, Object> result = new HashMap<>();
if (StringUtils.isNotEmpty(desc) && desc.length() > 200) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
result.put(Constants.MSG,
MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
} else {
result.put(Constants.STATUS, Status.SUCCESS);
}
@ -82,7 +88,8 @@ public class CheckUtils {
/**
* check extra info
*
* @param otherParams other parames
* @param otherParams
* other parames
* @return true if other parameters are valid, otherwise return false
*/
public static boolean checkOtherParams(String otherParams) {
@ -92,7 +99,8 @@ public class CheckUtils {
/**
* check password
*
* @param password password
* @param password
* password
* @return true if password regex valid, otherwise return false
*/
public static boolean checkPassword(String password) {
@ -100,25 +108,31 @@ public class CheckUtils {
}
/**
* check phone
* phone can be empty.
* @param phone phone
* check phone phone can be empty.
*
* @param phone
* phone
* @return true if phone regex valid, otherwise return false
*/
public static boolean checkPhone(String phone) {
return StringUtils.isEmpty(phone) || phone.length() == 11;
}
/**
* check task node parameter
*
* @param parameter parameter
* @param taskType task type
* @param taskNode
* TaskNode
* @return true if task node parameters are valid, otherwise return false
*/
public static boolean checkTaskNodeParameters(String parameter, String taskType) {
AbstractParameters abstractParameters = TaskParametersUtils.getParameters(taskType, parameter);
public static boolean checkTaskNodeParameters(TaskNode taskNode) {
AbstractParameters abstractParameters;
if (TaskType.of(taskNode.getType()) == TaskType.DEPENDENT) {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getDependence());
} else {
abstractParameters = TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
}
if (abstractParameters != null) {
return abstractParameters.checkParameters();
@ -129,24 +143,29 @@ public class CheckUtils {
/**
* check params
* @param userName user name
* @param password password
* @param email email
* @param phone phone
*
* @param userName
* user name
* @param password
* password
* @param email
* email
* @param phone
* phone
* @return true if user parameters are valid, other return false
*/
public static boolean checkUserParams(String userName, String password, String email, String phone) {
return CheckUtils.checkUserName(userName) &&
CheckUtils.checkEmail(email) &&
CheckUtils.checkPassword(password) &&
CheckUtils.checkPhone(phone);
return CheckUtils.checkUserName(userName) && CheckUtils.checkEmail(email) && CheckUtils.checkPassword(password)
&& CheckUtils.checkPhone(phone);
}
/**
* regex check
*
* @param str input string
* @param pattern regex pattern
* @param str
* input string
* @param pattern
* regex pattern
* @return true if regex pattern is right, otherwise return false
*/
private static boolean regexChecks(String str, Pattern pattern) {

100
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
@ -134,93 +135,140 @@ public class CheckUtilsTest {
}
@Test
public void testCheckTaskNodeParameters() {
TaskNode taskNode = new TaskNode();
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
assertFalse(CheckUtils.checkTaskNodeParameters(null,null));
assertFalse(CheckUtils.checkTaskNodeParameters(null,"unKnown"));
assertFalse(CheckUtils.checkTaskNodeParameters("unKnown","unKnown"));
assertFalse(CheckUtils.checkTaskNodeParameters("unKnown",null));
taskNode.setType("unKnown");
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
taskNode.setParams("unKnown");
taskNode.setType("unKnown");
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
taskNode.setParams("unKnown");
taskNode.setType(null);
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
// sub SubProcessParameters
SubProcessParameters subProcessParameters = new SubProcessParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(subProcessParameters), TaskType.SUB_PROCESS.toString()));
taskNode.setParams(JSONUtils.toJsonString(subProcessParameters));
taskNode.setType(TaskType.SUB_PROCESS.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
subProcessParameters.setProcessDefinitionId(1234);
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(subProcessParameters), TaskType.SUB_PROCESS.toString()));
taskNode.setParams(JSONUtils.toJsonString(subProcessParameters));
taskNode.setType(TaskType.SUB_PROCESS.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// ShellParameters
ShellParameters shellParameters = new ShellParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString()));
taskNode.setParams(JSONUtils.toJsonString(shellParameters));
taskNode.setType(TaskType.SHELL.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
shellParameters.setRawScript("");
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString()));
taskNode.setParams(JSONUtils.toJsonString(shellParameters));
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
shellParameters.setRawScript("sss");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(shellParameters), TaskType.SHELL.toString()));
taskNode.setParams(JSONUtils.toJsonString(shellParameters));
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// ProcedureParameters
ProcedureParameters procedureParameters = new ProcedureParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(procedureParameters), TaskType.PROCEDURE.toString()));
taskNode.setParams(JSONUtils.toJsonString(procedureParameters));
taskNode.setType(TaskType.PROCEDURE.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
procedureParameters.setDatasource(1);
procedureParameters.setType("xx");
procedureParameters.setMethod("yy");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(procedureParameters), TaskType.PROCEDURE.toString()));
taskNode.setParams(JSONUtils.toJsonString(procedureParameters));
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// SqlParameters
SqlParameters sqlParameters = new SqlParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString()));
taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
taskNode.setType(TaskType.SQL.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
sqlParameters.setDatasource(1);
sqlParameters.setType("xx");
sqlParameters.setSql("yy");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sqlParameters), TaskType.SQL.toString()));
taskNode.setParams(JSONUtils.toJsonString(sqlParameters));
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// MapReduceParameters
MapReduceParameters mapreduceParameters = new MapReduceParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters));
taskNode.setType(TaskType.MR.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
ResourceInfo resourceInfoMapreduce = new ResourceInfo();
resourceInfoMapreduce.setId(1);
resourceInfoMapreduce.setRes("");
mapreduceParameters.setMainJar(resourceInfoMapreduce);
mapreduceParameters.setProgramType(ProgramType.JAVA);
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(mapreduceParameters), TaskType.MR.toString()));
taskNode.setParams(JSONUtils.toJsonString(mapreduceParameters));
taskNode.setType(TaskType.MR.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// SparkParameters
SparkParameters sparkParameters = new SparkParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sparkParameters), TaskType.SPARK.toString()));
taskNode.setParams(JSONUtils.toJsonString(sparkParameters));
taskNode.setType(TaskType.SPARK.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
sparkParameters.setMainJar(new ResourceInfo());
sparkParameters.setProgramType(ProgramType.SCALA);
sparkParameters.setSparkVersion("1.1.1");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(sparkParameters), TaskType.SPARK.toString()));
taskNode.setParams(JSONUtils.toJsonString(sparkParameters));
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// PythonParameters
PythonParameters pythonParameters = new PythonParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(pythonParameters), TaskType.PYTHON.toString()));
taskNode.setParams(JSONUtils.toJsonString(pythonParameters));
taskNode.setType(TaskType.PYTHON.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
pythonParameters.setRawScript("ss");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(pythonParameters), TaskType.PYTHON.toString()));
taskNode.setParams(JSONUtils.toJsonString(pythonParameters));
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// DependentParameters
DependentParameters dependentParameters = new DependentParameters();
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dependentParameters), TaskType.DEPENDENT.toString()));
taskNode.setParams(JSONUtils.toJsonString(dependentParameters));
taskNode.setType(TaskType.DEPENDENT.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// FlinkParameters
FlinkParameters flinkParameters = new FlinkParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(flinkParameters), TaskType.FLINK.toString()));
taskNode.setParams(JSONUtils.toJsonString(flinkParameters));
taskNode.setType(TaskType.FLINK.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
flinkParameters.setMainJar(new ResourceInfo());
flinkParameters.setProgramType(ProgramType.JAVA);
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(flinkParameters), TaskType.FLINK.toString()));
taskNode.setParams(JSONUtils.toJsonString(flinkParameters));
taskNode.setType(TaskType.FLINK.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// HTTP
HttpParameters httpParameters = new HttpParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(httpParameters), TaskType.HTTP.toString()));
taskNode.setParams(JSONUtils.toJsonString(httpParameters));
taskNode.setType(TaskType.HTTP.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
httpParameters.setUrl("httpUrl");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(httpParameters), TaskType.HTTP.toString()));
taskNode.setParams(JSONUtils.toJsonString(httpParameters));
taskNode.setType(TaskType.HTTP.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
// DataxParameters
DataxParameters dataxParameters = new DataxParameters();
assertFalse(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString()));
taskNode.setParams(JSONUtils.toJsonString(dataxParameters));
taskNode.setType(TaskType.DATAX.toString());
assertFalse(CheckUtils.checkTaskNodeParameters(taskNode));
dataxParameters.setCustomConfig(0);
dataxParameters.setDataSource(111);
dataxParameters.setDataTarget(333);
dataxParameters.setSql("sql");
dataxParameters.setTargetTable("tar");
assertTrue(CheckUtils.checkTaskNodeParameters(JSONUtils.toJsonString(dataxParameters), TaskType.DATAX.toString()));
taskNode.setParams(JSONUtils.toJsonString(dataxParameters));
taskNode.setType(TaskType.DATAX.toString());
assertTrue(CheckUtils.checkTaskNodeParameters(taskNode));
}
}

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -35,6 +35,7 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.JsonNode;
/**
* task definition
@ -250,10 +251,12 @@ public class TaskDefinition {
}
public List<Property> getTaskParamList() {
List<Property> propList = JSONUtils.toList(JSONUtils.parseObject(taskParams).findValue("localParams").toString(),
Property.class);
return taskParamList = propList;
JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams");
if (localParams != null) {
taskParamList = JSONUtils.toList(localParams.toString(), Property.class);
}
return taskParamList;
}
public void setTaskParamList(List<Property> taskParamList) {
@ -266,10 +269,12 @@ public class TaskDefinition {
public Map<String, String> getTaskParamMap() {
if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) {
List<Property> propList = JSONUtils.toList(JSONUtils.parseObject(taskParams).findValue("localParams").toString(),
Property.class);
JsonNode localParams = JSONUtils.parseObject(taskParams).findValue("localParams");
if (localParams != null) {
List<Property> propList = JSONUtils.toList(localParams.toString(), Property.class);
taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
}
return taskParamMap;
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2533,7 +2533,7 @@ public class ProcessService {
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_NORMAL : Constants.FLOWNODE_RUN_FLAG_FORBIDDEN);
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
v.setParams(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? "" : taskDefinitionLog.getTaskParams());
v.setParams(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? null : taskDefinitionLog.getTaskParams());
v.setDependence(taskDefinitionLog.getTaskType() == TaskType.DEPENDENT ? taskDefinitionLog.getTaskParams() : null);
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());

Loading…
Cancel
Save