From fbb8ff438ad0c0db1bfc78d33bbc73b2c7d40683 Mon Sep 17 00:00:00 2001 From: Wuv1Up Date: Sat, 30 May 2020 12:45:39 +0800 Subject: [PATCH 01/17] fix return value of created project (#2804) Co-authored-by: dailidong --- .../apache/dolphinscheduler/api/service/ProjectService.java | 6 ++---- .../api/controller/ProjectControllerTest.java | 5 +++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java index 8f9a62000a..6d3650b77f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java @@ -88,6 +88,8 @@ public class ProjectService extends BaseService{ project.setUpdateTime(now); if (projectMapper.insert(project) > 0) { + Project insertedProject = projectMapper.queryByName(name); + result.put(Constants.DATA_LIST, insertedProject); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.CREATE_PROJECT_ERROR); @@ -124,9 +126,7 @@ public class ProjectService extends BaseService{ * @return true if the login user have permission to see the project */ public Map checkProjectAndAuth(User loginUser, Project project, String projectName) { - Map result = new HashMap<>(5); - if (project == null) { putMsg(result, Status.PROJECT_NOT_FOUNT, projectName); } else if (!checkReadPermission(loginUser, project)) { @@ -135,8 +135,6 @@ public class ProjectService extends BaseService{ }else { putMsg(result, Status.SUCCESS); } - - return result; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProjectControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProjectControllerTest.java index 42cdd1705a..7eb0eb3202 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProjectControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProjectControllerTest.java @@ -46,7 +46,7 @@ public class ProjectControllerTest extends AbstractControllerTest{ MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("projectName","project_test1"); - paramsMap.add("desc","the test project"); + paramsMap.add("description","the test project"); MvcResult mvcResult = mockMvc.perform(post("/projects/create") .header(SESSION_ID, sessionId) @@ -56,7 +56,8 @@ public class ProjectControllerTest extends AbstractControllerTest{ .andReturn(); Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); + Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + Assert.assertNotNull(result.getData()); logger.info(mvcResult.getResponse().getContentAsString()); } From 1c153454423f4b967f3fcd8fb906a4626799872f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A6=82=E6=A2=A6=E6=8A=80=E6=9C=AF?= <596392912@qq.com> Date: Sat, 30 May 2020 15:03:10 +0800 Subject: [PATCH 02/17] fix: Improve the security of datasource management (#2844) Closes 2638 Co-authored-by: dailidong --- .../api/service/DataSourceService.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java index afa13b7414..f4e846fbce 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java @@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; @@ -159,8 +160,18 @@ public class DataSourceService extends BaseService{ putMsg(result, Status.DATASOURCE_EXIST); return result; } + //check password,if the password is not updated, set to the old password. + JSONObject paramObject = JSON.parseObject(parameter); + String password = paramObject.getString(Constants.PASSWORD); + if (StringUtils.isBlank(password)) { + String oldConnectionParams = dataSource.getConnectionParams(); + JSONObject oldParams = JSON.parseObject(oldConnectionParams); + paramObject.put(Constants.PASSWORD, oldParams.getString(Constants.PASSWORD)); + } + // connectionParams json + String connectionParams = paramObject.toJSONString(); - Boolean isConnection = checkConnection(type, parameter); + Boolean isConnection = checkConnection(type, connectionParams); if (!isConnection) { logger.info("connect failed, type:{}, parameter:{}", type, parameter); putMsg(result, Status.DATASOURCE_CONNECT_FAILED); @@ -172,7 +183,7 @@ public class DataSourceService extends BaseService{ dataSource.setNote(desc); dataSource.setUserName(loginUser.getUserName()); dataSource.setType(type); - dataSource.setConnectionParams(parameter); + dataSource.setConnectionParams(connectionParams); dataSource.setUpdateTime(now); dataSourceMapper.updateById(dataSource); putMsg(result, Status.SUCCESS); @@ -257,7 +268,6 @@ public class DataSourceService extends BaseService{ map.put(PRINCIPAL, datasourceForm.getPrincipal()); map.put(DATABASE, database); map.put(USER_NAME, datasourceForm.getUser()); - map.put(PASSWORD, datasourceForm.getPassword()); map.put(OTHER, otherMap); result.put(Constants.DATA_LIST, map); putMsg(result, Status.SUCCESS); From d00627ff7786d417282f16460c76a28232ad5e15 Mon Sep 17 00:00:00 2001 From: GabrielWithTina Date: Sat, 30 May 2020 17:07:22 +0800 Subject: [PATCH 03/17] Return ProcessDefinition ID when call ProcessController.createProject. (#2849) Return Scheduler ID when call SchedulerController.createScheduler. client need the created object ID to bind the relationship between client system and Dolphinscheduler system. --- .../dolphinscheduler/api/service/ProcessDefinitionService.java | 3 +++ .../apache/dolphinscheduler/api/service/SchedulerService.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 881e2fed1a..69213211cc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -159,6 +159,9 @@ public class ProcessDefinitionService extends BaseDAGService { processDefine.setUpdateTime(now); processDefine.setFlag(Flag.YES); processDefineMapper.insert(processDefine); + + // return processDefinition object with ID + result.put(Constants.DATA_LIST, processDefineMapper.selectById(processDefine.getId())); putMsg(result, Status.SUCCESS); result.put("processDefinitionId",processDefine.getId()); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index 9328fe0375..e7af56eaf4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -165,6 +165,9 @@ public class SchedulerService extends BaseService { processDefinition.setReceivers(receivers); processDefinition.setReceiversCc(receiversCc); processDefinitionMapper.updateById(processDefinition); + + // return scheduler object with ID + result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId())); putMsg(result, Status.SUCCESS); result.put("scheduleId", scheduleObj.getId()); From 1513aae3ceb6f70633f64cc3a290b8f5ec6ab042 Mon Sep 17 00:00:00 2001 From: itbasketplayer <825621876@qq.com> Date: Sat, 30 May 2020 23:07:17 +0800 Subject: [PATCH 04/17] add job history to judge application status/2625 (#2848) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) Co-authored-by: yuhaibin@lizhi.fm <35716fc5847f6d154cf556296453ca91> Co-authored-by: dailidong --- .../dolphinscheduler/common/Constants.java | 5 ++ .../common/utils/HadoopUtils.java | 59 ++++++++++++------- .../src/main/resources/common.properties | 14 +++-- .../common/utils/HadoopUtilsTest.java | 6 ++ 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index effa4f0f8e..fc09960026 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -103,6 +103,11 @@ public final class Constants { */ public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address"; + /** + * yarn.job.history.status.address + */ + public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address"; + /** * hdfs configuration * hdfs.root.user diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 963aff5f31..1544b449ff 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -16,16 +16,16 @@ */ package org.apache.dolphinscheduler.common.utils; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONException; +import com.alibaba.fastjson.JSONObject; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import org.apache.commons.io.IOUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ResUploadType; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONException; -import com.alibaba.fastjson.JSONObject; -import org.apache.commons.io.IOUtils; import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; @@ -59,6 +59,7 @@ public class HadoopUtils implements Closeable { public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS); public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS); + public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS); private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY"; @@ -114,11 +115,11 @@ public class HadoopUtils implements Closeable { String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE); ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType); - if (resUploadType == ResUploadType.HDFS){ - if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){ + if (resUploadType == ResUploadType.HDFS) { + if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) { System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH)); - configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos"); + configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); hdfsUser = ""; UserGroupInformation.setConfiguration(configuration); UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME), @@ -195,7 +196,7 @@ public class HadoopUtils implements Closeable { */ String appUrl = ""; //not use resourcemanager - if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){ + if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) { yarnEnabled = false; logger.warn("should not step here"); @@ -212,6 +213,12 @@ public class HadoopUtils implements Closeable { return String.format(appUrl, applicationId); } + public String getJobHistoryUrl(String applicationId) { + //eg:application_1587475402360_712719 -> job_1587475402360_712719 + String jobId = applicationId.replace("application", "job"); + return String.format(jobHistoryAddress, jobId); + } + /** * cat file on hdfs * @@ -389,9 +396,10 @@ public class HadoopUtils implements Closeable { /** * hadoop resourcemanager enabled or not + * * @return result */ - public boolean isYarnEnabled() { + public boolean isYarnEnabled() { return yarnEnabled; } @@ -407,12 +415,22 @@ public class HadoopUtils implements Closeable { return null; } + String result = Constants.FAILED; String applicationUrl = getApplicationUrl(applicationId); + logger.info("applicationUrl={}", applicationUrl); String responseContent = HttpUtils.get(applicationUrl); - - JSONObject jsonObject = JSON.parseObject(responseContent); - String result = jsonObject.getJSONObject("app").getString("finalStatus"); + if (responseContent != null) { + JSONObject jsonObject = JSON.parseObject(responseContent); + result = jsonObject.getJSONObject("app").getString("finalStatus"); + } else { + //may be in job history + String jobHistoryUrl = getJobHistoryUrl(applicationId); + logger.info("jobHistoryUrl={}", jobHistoryUrl); + responseContent = HttpUtils.get(jobHistoryUrl); + JSONObject jsonObject = JSONObject.parseObject(responseContent); + result = jsonObject.getJSONObject("job").getString("state"); + } switch (result) { case Constants.ACCEPTED: @@ -435,6 +453,7 @@ public class HadoopUtils implements Closeable { /** * get data hdfs path + * * @return data hdfs path */ public static String getHdfsDataBasePath() { @@ -452,7 +471,7 @@ public class HadoopUtils implements Closeable { * @param tenantCode tenant code * @return hdfs resource dir */ - public static String getHdfsDir(ResourceType resourceType,String tenantCode) { + public static String getHdfsDir(ResourceType resourceType, String tenantCode) { String hdfsDir = ""; if (resourceType.equals(ResourceType.FILE)) { hdfsDir = getHdfsResDir(tenantCode); @@ -497,16 +516,16 @@ public class HadoopUtils implements Closeable { /** * get hdfs file name * - * @param resourceType resource type - * @param tenantCode tenant code - * @param fileName file name + * @param resourceType resource type + * @param tenantCode tenant code + * @param fileName file name * @return hdfs file name */ public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } - return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName); + return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName); } /** @@ -518,7 +537,7 @@ public class HadoopUtils implements Closeable { */ public static String getHdfsResourceFileName(String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } return String.format("%s/%s", getHdfsResDir(tenantCode), fileName); } @@ -532,7 +551,7 @@ public class HadoopUtils implements Closeable { */ public static String getHdfsUdfFileName(String tenantCode, String fileName) { if (fileName.startsWith("/")) { - fileName = fileName.replaceFirst("/",""); + fileName = fileName.replaceFirst("/", ""); } return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName); } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 3852c310b1..0cc118feb4 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -18,7 +18,7 @@ # resource storage type : HDFS,S3,NONE resource.storage.type=NONE -# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended +# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions."/dolphinscheduler" is recommended #resource.upload.path=/dolphinscheduler # user data local directory path, please make sure the directory exists and have read write permissions @@ -42,16 +42,16 @@ resource.storage.type=NONE # if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path hdfs.root.user=hdfs -# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir fs.defaultFS=hdfs://mycluster:8020 -# if resource.storage.type=S3,s3 endpoint +# if resource.storage.type=S3,s3 endpoint #fs.s3a.endpoint=http://192.168.199.91:9010 -# if resource.storage.type=S3,s3 access key +# if resource.storage.type=S3,s3 access key #fs.s3a.access.key=A3DXS30FO22544RE -# if resource.storage.type=S3,s3 secret key +# if resource.storage.type=S3,s3 secret key #fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK # if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty @@ -59,8 +59,10 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx # If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname. yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s +# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000) +yarn.job.history.status.address=http://ark1:19888/ws/v1/history/mapreduce/jobs/%s # system env path #dolphinscheduler.env.path=env/dolphinscheduler_env.sh development.state=false -kerberos.expire.time=7 \ No newline at end of file +kerberos.expire.time=7 diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java index e239fe7cb0..440f86395a 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java @@ -190,6 +190,12 @@ public class HadoopUtilsTest { logger.info(application_1516778421218_0042); } + @Test + public void getJobHistoryUrl(){ + String application_1516778421218_0042 = hadoopUtils.getJobHistoryUrl("application_1529051418016_0167"); + logger.info(application_1516778421218_0042); + } + @Test public void catFileWithLimitTest() { List stringList = new ArrayList<>(); From c8ab27acb0cd6d0b4e251a2d4cf7e4a7aa33ae89 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Sun, 31 May 2020 10:29:46 +0800 Subject: [PATCH 05/17] Optimization of enumerated values method (#2677) * The value method of enum is a deep copy. If there are many parameters, it is not recommended to use the value method. Specifically see jmh results * beautiful misunderstanding Co-authored-by: dailidong --- .../common/enums/CommandType.java | 20 ++++++++++++++----- .../dolphinscheduler/common/enums/DbType.java | 16 +++++++++++---- .../common/enums/ExecutionStatus.java | 17 +++++++++++----- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index 56fdd078d7..9682016d6f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.common.enums; import com.baomidou.mybatisplus.annotation.EnumValue; +import java.util.HashMap; +import java.util.Map; + /** * command types */ @@ -66,11 +69,18 @@ public enum CommandType { return descp; } - public static CommandType of(Integer status){ - for(CommandType cmdType : values()){ - if(cmdType.getCode() == status){ - return cmdType; - } + private static final Map COMMAND_TYPE_MAP = new HashMap<>(); + + static { + for (CommandType commandType : CommandType.values()) { + COMMAND_TYPE_MAP.put(commandType.code,commandType); + } + } + + + public static CommandType of(Integer status) { + if (COMMAND_TYPE_MAP.containsKey(status)) { + return COMMAND_TYPE_MAP.get(status); } throw new IllegalArgumentException("invalid status : " + status); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java index cc3a29565b..1d28a759c0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.common.enums; import com.baomidou.mybatisplus.annotation.EnumValue; +import java.util.HashMap; + /** * data base types */ @@ -59,11 +61,17 @@ public enum DbType { } + private static HashMap DB_TYPE_MAP =new HashMap<>(); + + static { + for (DbType dbType:DbType.values()){ + DB_TYPE_MAP.put(dbType.getCode(),dbType); + } + } + public static DbType of(int type){ - for(DbType ty : values()){ - if(ty.getCode() == type){ - return ty; - } + if(DB_TYPE_MAP.containsKey(type)){ + return DB_TYPE_MAP.get(type); } throw new IllegalArgumentException("invalid type : " + type); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java index ce141d059a..0f8c9f5d82 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.common.enums; import com.baomidou.mybatisplus.annotation.EnumValue; +import java.util.HashMap; + /** * running status for workflow and task nodes * @@ -62,6 +64,13 @@ public enum ExecutionStatus { private final int code; private final String descp; + private static HashMap EXECUTION_STATUS_MAP=new HashMap<>(); + + static { + for (ExecutionStatus executionStatus:ExecutionStatus.values()){ + EXECUTION_STATUS_MAP.put(executionStatus.code,executionStatus); + } + } /** * status is success @@ -130,11 +139,9 @@ public enum ExecutionStatus { } public static ExecutionStatus of(int status){ - for(ExecutionStatus es : values()){ - if(es.getCode() == status){ - return es; - } - } + if(EXECUTION_STATUS_MAP.containsKey(status)){ + return EXECUTION_STATUS_MAP.get(status); + } throw new IllegalArgumentException("invalid status : " + status); } } From 8f8ddae03098b60f0c4f1637c1e3b5670df18b70 Mon Sep 17 00:00:00 2001 From: Han Gao Date: Sun, 31 May 2020 18:11:10 +0800 Subject: [PATCH 06/17] Using Jackson instead of Fastjson (#2850) * Using Jackson instead of Fastjson * Fix some json bugs Co-authored-by: dailidong --- .../alert/utils/JSONUtilsTest.java | 2 + .../api/service/ProcessDefinitionService.java | 405 +++++++++--------- .../api/service/ProcessInstanceService.java | 5 +- .../utils/exportprocess/DataSourceParam.java | 21 +- .../utils/exportprocess/DependentParam.java | 40 +- .../exportprocess/ProcessAddTaskParam.java | 6 +- .../service/ProcessDefinitionServiceTest.java | 8 +- .../exportprocess/DataSourceParamTest.java | 19 +- .../exportprocess/DependentParamTest.java | 34 +- .../common/utils/JSONUtils.java | 388 ++++++++--------- .../common/utils/ParameterUtils.java | 2 +- .../common/utils/StreamUtils.java | 36 ++ .../common/utils/JSONUtilsTest.java | 36 +- .../common/utils/ParameterUtilsTest.java | 8 +- .../common/utils/StreamUtilsTest.java | 39 ++ 15 files changed, 589 insertions(+), 460 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java index a151abc714..843bcf4083 100644 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/JSONUtilsTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.alert.utils; +import com.fasterxml.jackson.databind.JsonNode; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -109,4 +110,5 @@ public class JSONUtilsTest { } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 69213211cc..cbf432d8c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.api.service; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; @@ -73,6 +74,11 @@ public class ProcessDefinitionService extends BaseDAGService { private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); + private static final String PROCESSDEFINITIONID = "processDefinitionId"; + + private static final String RELEASESTATE = "releaseState"; + + private static final String TASKS = "tasks"; @Autowired private ProjectMapper projectMapper; @@ -99,13 +105,13 @@ public class ProcessDefinitionService extends BaseDAGService { /** * create process definition * - * @param loginUser login user - * @param projectName project name - * @param name process definition name + * @param loginUser login user + * @param projectName project name + * @param name process definition name * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes - * @param connects connects for nodes + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes * @return create result code * @throws JsonProcessingException JsonProcessingException */ @@ -163,29 +169,30 @@ public class ProcessDefinitionService extends BaseDAGService { // return processDefinition object with ID result.put(Constants.DATA_LIST, processDefineMapper.selectById(processDefine.getId())); putMsg(result, Status.SUCCESS); - result.put("processDefinitionId",processDefine.getId()); + result.put("processDefinitionId", processDefine.getId()); return result; } /** * get resource ids + * * @param processData process data * @return resource ids */ private String getResourceIds(ProcessData processData) { List tasks = processData.getTasks(); Set resourceIds = new HashSet<>(); - for(TaskNode taskNode : tasks){ + for (TaskNode taskNode : tasks) { String taskParameter = taskNode.getParams(); - AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(),taskParameter); + AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - Set tempSet = params.getResourceFilesList().stream().map(t->t.getId()).collect(Collectors.toSet()); + Set tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); resourceIds.addAll(tempSet); } } StringBuilder sb = new StringBuilder(); - for(int i : resourceIds) { + for (int i : resourceIds) { if (sb.length() > 0) { sb.append(","); } @@ -198,7 +205,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query process definition list * - * @param loginUser login user + * @param loginUser login user * @param projectName project name * @return definition list */ @@ -224,12 +231,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query process definition list paging * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param searchVal search value - * @param pageNo page number - * @param pageSize page size - * @param userId user id + * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @param userId user id * @return process definition page */ public Map queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { @@ -245,10 +252,10 @@ public class ProcessDefinitionService extends BaseDAGService { Page page = new Page(pageNo, pageSize); IPage processDefinitionIPage = processDefineMapper.queryDefineListPaging( - page, searchVal, userId, project.getId(),isAdmin(loginUser)); + page, searchVal, userId, project.getId(), isAdmin(loginUser)); PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int)processDefinitionIPage.getTotal()); + pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); pageInfo.setLists(processDefinitionIPage.getRecords()); result.put(Constants.DATA_LIST, pageInfo); putMsg(result, Status.SUCCESS); @@ -259,9 +266,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * query datail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param processId process definition id + * @param processId process definition id * @return process definition detail */ public Map queryProcessDefinitionById(User loginUser, String projectName, Integer processId) { @@ -289,12 +296,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * copy process definition * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param processId process definition id + * @param processId process definition id * @return copy result code */ - public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException{ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -313,7 +320,7 @@ public class ProcessDefinitionService extends BaseDAGService { return createProcessDefinition( loginUser, projectName, - processDefinition.getName()+"_copy_"+System.currentTimeMillis(), + processDefinition.getName() + "_copy_" + System.currentTimeMillis(), processDefinition.getProcessDefinitionJson(), processDefinition.getDescription(), processDefinition.getLocations(), @@ -324,14 +331,14 @@ public class ProcessDefinitionService extends BaseDAGService { /** * update process definition * - * @param loginUser login user - * @param projectName project name - * @param name process definition name - * @param id process definition id + * @param loginUser login user + * @param projectName project name + * @param name process definition name + * @param id process definition id * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes - * @param connects connects for nodes + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes * @return update result code */ public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, @@ -400,9 +407,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * verify process definition name unique * - * @param loginUser login user + * @param loginUser login user * @param projectName project name - * @param name name + * @param name name * @return true if process definition name not exists, otherwise false */ public Map verifyProcessDefinitionName(User loginUser, String projectName, String name) { @@ -427,8 +434,8 @@ public class ProcessDefinitionService extends BaseDAGService { /** * delete process definition by id * - * @param loginUser login user - * @param projectName project name + * @param loginUser login user + * @param projectName project name * @param processDefinitionId process definition id * @return delete result code */ @@ -459,22 +466,22 @@ public class ProcessDefinitionService extends BaseDAGService { // check process definition is already online if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,processDefinitionId); + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); return result; } // get the timing according to the process definition List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty() && schedules.size() > 1) { - logger.warn("scheduler num is {},Greater than 1",schedules.size()); + logger.warn("scheduler num is {},Greater than 1", schedules.size()); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); return result; - }else if(schedules.size() == 1){ + } else if (schedules.size() == 1) { Schedule schedule = schedules.get(0); - if(schedule.getReleaseState() == ReleaseState.OFFLINE){ + if (schedule.getReleaseState() == ReleaseState.OFFLINE) { scheduleMapper.deleteById(schedule.getId()); - }else if(schedule.getReleaseState() == ReleaseState.ONLINE){ - putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId()); + } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); return result; } } @@ -492,9 +499,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * release process definition: online / offline * - * @param loginUser login user - * @param projectName project name - * @param id process definition id + * @param loginUser login user + * @param projectName project name + * @param id process definition id * @param releaseState release state * @return release result code */ @@ -513,7 +520,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check state if (null == state) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } @@ -525,12 +532,12 @@ public class ProcessDefinitionService extends BaseDAGService { String resourceIds = processDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID,processService,resourceIdArray,loginUser.getId(),logger); + PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { - logger.error(e.getMessage(),e); - putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, "releaseState"); + logger.error(e.getMessage(), e); + putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); return result; } } @@ -545,7 +552,7 @@ public class ProcessDefinitionService extends BaseDAGService { new int[]{processDefinition.getId()} ); - for(Schedule schedule:scheduleList){ + for (Schedule schedule : scheduleList) { logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); // set status schedule.setReleaseState(ReleaseState.OFFLINE); @@ -554,7 +561,7 @@ public class ProcessDefinitionService extends BaseDAGService { } break; default: - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "releaseState"); + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); return result; } @@ -564,14 +571,15 @@ public class ProcessDefinitionService extends BaseDAGService { /** * batch export process definition by ids + * * @param loginUser * @param projectName * @param processDefinitionIds * @param response */ - public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response){ + public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response) { - if(StringUtils.isEmpty(processDefinitionIds)){ + if (StringUtils.isEmpty(processDefinitionIds)) { return; } @@ -582,24 +590,25 @@ public class ProcessDefinitionService extends BaseDAGService { Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if(resultStatus != Status.SUCCESS){ + if (resultStatus != Status.SUCCESS) { return; } List processDefinitionList = getProcessDefinitionList(processDefinitionIds); - if(CollectionUtils.isNotEmpty(processDefinitionList)){ + if (CollectionUtils.isNotEmpty(processDefinitionList)) { downloadProcessDefinitionFile(response, processDefinitionList); } } /** * get process definition list by ids + * * @param processDefinitionIds * @return */ - private List getProcessDefinitionList(String processDefinitionIds){ + private List getProcessDefinitionList(String processDefinitionIds) { List processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); for (String strProcessDefinitionId : processDefinitionIdArray) { @@ -616,6 +625,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * download the process definition file + * * @param response * @param processDefinitionList */ @@ -631,7 +641,7 @@ public class ProcessDefinitionService extends BaseDAGService { buff.close(); } catch (IOException e) { logger.warn("export process fail", e); - }finally { + } finally { if (null != buff) { try { buff.close(); @@ -651,19 +661,21 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get export process metadata string + * * @param processDefinitionId process definition id - * @param processDefinition process definition + * @param processDefinition process definition * @return export process metadata string */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { //create workflow json file - return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId,processDefinition)); + return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); } /** * get export process metadata string + * * @param processDefinitionId process definition id - * @param processDefinition process definition + * @param processDefinition process definition * @return export process metadata string */ public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { @@ -699,17 +711,18 @@ public class ProcessDefinitionService extends BaseDAGService { /** * correct task param which has datasource or dependent + * * @param processDefinitionJson processDefinitionJson * @return correct processDefinitionJson */ public String addExportTaskNodeSpecialParam(String processDefinitionJson) { - JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + JsonNode taskNode = jsonArray.path(i); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { @@ -717,12 +730,13 @@ public class ProcessDefinitionService extends BaseDAGService { } } } - jsonObject.put("tasks", jsonArray); + jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * check task if has sub process + * * @param taskType task type * @return if task has sub process return true else false */ @@ -732,8 +746,9 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process definition - * @param loginUser login user - * @param file process metadata json file + * + * @param loginUser login user + * @param file process metadata json file * @param currentProjectName current project name * @return import process */ @@ -741,7 +756,7 @@ public class ProcessDefinitionService extends BaseDAGService { public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); - List processMetaList = JSON.parseArray(processMetaJson,ProcessMeta.class); + List processMetaList = JSON.parseArray(processMetaJson, ProcessMeta.class); //check file content if (CollectionUtils.isEmpty(processMetaList)) { @@ -749,9 +764,9 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } - for(ProcessMeta processMeta:processMetaList){ + for (ProcessMeta processMeta : processMetaList) { - if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)){ + if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)) { return result; } } @@ -761,6 +776,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check and import process definition + * * @param loginUser * @param currentProjectName * @param result @@ -769,7 +785,7 @@ public class ProcessDefinitionService extends BaseDAGService { */ private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map result, ProcessMeta processMeta) { - if(!checkImportanceParams(processMeta,result)){ + if (!checkImportanceParams(processMeta, result)) { return false; } @@ -777,7 +793,7 @@ public class ProcessDefinitionService extends BaseDAGService { String processDefinitionName = processMeta.getProcessDefinitionName(); //use currentProjectName to query Project targetProject = projectMapper.queryByName(currentProjectName); - if(null != targetProject){ + if (null != targetProject) { processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1); } @@ -801,14 +817,14 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionName, addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); - if(createProcessResult == null){ + if (createProcessResult == null) { return false; } //create process definition Integer processDefinitionId = - Objects.isNull(createProcessResult.get("processDefinitionId"))? - null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); + Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? + null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); //scheduler param return getImportProcessScheduleResult(loginUser, @@ -822,6 +838,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get create process result + * * @param loginUser * @param currentProjectName * @param result @@ -835,12 +852,12 @@ public class ProcessDefinitionService extends BaseDAGService { Map result, ProcessMeta processMeta, String processDefinitionName, - String importProcessParam){ + String importProcessParam) { Map createProcessResult = null; try { createProcessResult = createProcessDefinition(loginUser - ,currentProjectName, - processDefinitionName+"_import_"+System.currentTimeMillis(), + , currentProjectName, + processDefinitionName + "_import_" + System.currentTimeMillis(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), @@ -856,6 +873,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * get import process schedule result + * * @param loginUser * @param currentProjectName * @param result @@ -887,11 +905,12 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check importance params + * * @param processMeta * @param result * @return */ - private boolean checkImportanceParams(ProcessMeta processMeta,Map result){ + private boolean checkImportanceParams(ProcessMeta processMeta, Map result) { if (StringUtils.isEmpty(processMeta.getProjectName())) { putMsg(result, Status.DATA_IS_NULL, "projectName"); return false; @@ -910,18 +929,19 @@ public class ProcessDefinitionService extends BaseDAGService { /** * import process add special task param - * @param loginUser login user + * + * @param loginUser login user * @param processDefinitionJson process definition json - * @param targetProject target project + * @param targetProject target project * @return import process param */ private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { - JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); //add sql and dependent param for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - String taskType = taskNode.getString("type"); + JsonNode taskNode = jsonArray.path(i); + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); if (null != addTaskParam) { addTaskParam.addImportSpecialParam(taskNode); @@ -931,25 +951,26 @@ public class ProcessDefinitionService extends BaseDAGService { //recursive sub-process parameter correction map key for old process id value for new process id Map subProcessIdMap = new HashMap<>(20); - List subProcessList = jsonArray.stream() - .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type"))) + List subProcessList = StreamUtils.asStream(jsonArray.elements()) + .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(subProcessList)) { importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); } - jsonObject.put("tasks", jsonArray); + jsonObject.set(TASKS, jsonArray); return jsonObject.toString(); } /** * import process schedule - * @param loginUser login user - * @param currentProjectName current project name - * @param processMeta process meta data + * + * @param loginUser login user + * @param currentProjectName current project name + * @param processMeta process meta data * @param processDefinitionName process definition name - * @param processDefinitionId process definition id + * @param processDefinitionId process definition id * @return insert schedule flag */ public int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta, @@ -998,84 +1019,87 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check import process has sub process * recursion create sub process - * @param loginUser login user - * @param targetProject target project - * @param jsonArray process task array + * + * @param loginUser login user + * @param targetProject target project + * @param jsonArray process task array * @param subProcessIdMap correct sub process id map */ - public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map subProcessIdMap) { + public void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessIdMap) { for (int i = 0; i < jsonArray.size(); i++) { - JSONObject taskNode = jsonArray.getJSONObject(i); - String taskType = taskNode.getString("type"); - - if (checkTaskHasSubProcess(taskType)) { - //get sub process info - JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params")); - Integer subProcessId = subParams.getInteger("processDefinitionId"); - ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); - //check is sub process exist in db - if (null != subProcess) { - String subProcessJson = subProcess.getProcessDefinitionJson(); - //check current project has sub process - ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); - - if (null == currentProjectSubProcess) { - JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks"); - - List subProcessList = subJsonArray.stream() - .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type"))) - .collect(Collectors.toList()); - - if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); - //sub process processId correct - if (!subProcessIdMap.isEmpty()) { - - for (Map.Entry entry : subProcessIdMap.entrySet()) { - String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); - String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); - subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); - } - - subProcessIdMap.clear(); - } - } + ObjectNode taskNode = (ObjectNode) jsonArray.path(i); + String taskType = taskNode.path("type").asText(); - //if sub-process recursion - Date now = new Date(); - //create sub process in target project - ProcessDefinition processDefine = new ProcessDefinition(); - processDefine.setName(subProcess.getName()); - processDefine.setVersion(subProcess.getVersion()); - processDefine.setReleaseState(subProcess.getReleaseState()); - processDefine.setProjectId(targetProject.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(subProcessJson); - processDefine.setDescription(subProcess.getDescription()); - processDefine.setLocations(subProcess.getLocations()); - processDefine.setConnects(subProcess.getConnects()); - processDefine.setTimeout(subProcess.getTimeout()); - processDefine.setTenantId(subProcess.getTenantId()); - processDefine.setGlobalParams(subProcess.getGlobalParams()); - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(subProcess.getFlag()); - processDefine.setReceivers(subProcess.getReceivers()); - processDefine.setReceiversCc(subProcess.getReceiversCc()); - processDefineMapper.insert(processDefine); - - logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); - - //modify task node - ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName()); - - if (null != newSubProcessDefine) { - subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); - subParams.put("processDefinitionId", newSubProcessDefine.getId()); - taskNode.put("params", subParams); + if (!checkTaskHasSubProcess(taskType)) { + continue; + } + //get sub process info + ObjectNode subParams = (ObjectNode) taskNode.path("params"); + Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); + ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); + //check is sub process exist in db + if (null == subProcess) { + continue; + } + String subProcessJson = subProcess.getProcessDefinitionJson(); + //check current project has sub process + ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); + + if (null == currentProjectSubProcess) { + ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); + + List subProcessList = StreamUtils.asStream(subJsonArray.elements()) + .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) + .collect(Collectors.toList()); + + if (CollectionUtils.isNotEmpty(subProcessList)) { + importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); + //sub process processId correct + if (!subProcessIdMap.isEmpty()) { + + for (Map.Entry entry : subProcessIdMap.entrySet()) { + String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); + String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); + subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); } + + subProcessIdMap.clear(); } } + + //if sub-process recursion + Date now = new Date(); + //create sub process in target project + ProcessDefinition processDefine = new ProcessDefinition(); + processDefine.setName(subProcess.getName()); + processDefine.setVersion(subProcess.getVersion()); + processDefine.setReleaseState(subProcess.getReleaseState()); + processDefine.setProjectId(targetProject.getId()); + processDefine.setUserId(loginUser.getId()); + processDefine.setProcessDefinitionJson(subProcessJson); + processDefine.setDescription(subProcess.getDescription()); + processDefine.setLocations(subProcess.getLocations()); + processDefine.setConnects(subProcess.getConnects()); + processDefine.setTimeout(subProcess.getTimeout()); + processDefine.setTenantId(subProcess.getTenantId()); + processDefine.setGlobalParams(subProcess.getGlobalParams()); + processDefine.setCreateTime(now); + processDefine.setUpdateTime(now); + processDefine.setFlag(subProcess.getFlag()); + processDefine.setReceivers(subProcess.getReceivers()); + processDefine.setReceiversCc(subProcess.getReceiversCc()); + processDefineMapper.insert(processDefine); + + logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); + + //modify task node + ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); + + if (null != newSubProcessDefine) { + subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); + subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); + taskNode.set("params", subParams); + } } } } @@ -1084,7 +1108,7 @@ public class ProcessDefinitionService extends BaseDAGService { /** * check the process definition node meets the specifications * - * @param processData process data + * @param processData process data * @param processDefinitionJson process definition json * @return check result code */ @@ -1094,7 +1118,7 @@ public class ProcessDefinitionService extends BaseDAGService { try { if (processData == null) { logger.error("process data is null"); - putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); return result; } @@ -1125,7 +1149,7 @@ public class ProcessDefinitionService extends BaseDAGService { // check extra params CheckUtils.checkOtherParams(taskNode.getExtras()); } - putMsg(result,Status.SUCCESS); + putMsg(result, Status.SUCCESS); } catch (Exception e) { result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); result.put(Constants.MSG, e.getMessage()); @@ -1138,9 +1162,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineId define id * @return task node list - * @throws Exception exception */ - public Map getTaskNodeListByDefinitionId(Integer defineId) throws Exception { + public Map getTaskNodeListByDefinitionId(Integer defineId) { Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); @@ -1158,7 +1181,7 @@ public class ProcessDefinitionService extends BaseDAGService { //process data check if (null == processData) { logger.error("process data is null"); - putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); return result; } @@ -1176,15 +1199,14 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param defineIdList define id list * @return task node list - * @throws Exception exception */ - public Map getTaskNodeListByDefinitionIdList(String defineIdList) throws Exception { + public Map getTaskNodeListByDefinitionIdList(String defineIdList) { Map result = new HashMap<>(); Map> taskNodeMap = new HashMap<>(); String[] idList = defineIdList.split(","); List idIntList = new ArrayList<>(); - for(String definitionId : idList) { + for (String definitionId : idList) { idIntList.add(Integer.parseInt(definitionId)); } Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); @@ -1195,7 +1217,7 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } - for(ProcessDefinition processDefinition : processDefinitionList){ + for (ProcessDefinition processDefinition : processDefinitionList) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); @@ -1231,7 +1253,7 @@ public class ProcessDefinitionService extends BaseDAGService { * Encapsulates the TreeView structure * * @param processId process definition id - * @param limit limit + * @param limit limit * @return tree view json data * @throws Exception exception */ @@ -1241,7 +1263,7 @@ public class ProcessDefinitionService extends BaseDAGService { ProcessDefinition processDefinition = processDefineMapper.selectById(processId); if (null == processDefinition) { logger.info("process define not exists"); - putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); return result; } DAG dag = genDagGraph(processDefinition); @@ -1260,8 +1282,8 @@ public class ProcessDefinitionService extends BaseDAGService { */ List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); - for(ProcessInstance processInstance:processInstanceList){ - processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(),processInstance.getEndTime())); + for (ProcessInstance processInstance : processInstanceList) { + processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); } if (limit > processInstanceList.size()) { @@ -1364,9 +1386,8 @@ public class ProcessDefinitionService extends BaseDAGService { * * @param processDefinition process definition * @return dag graph - * @throws Exception if exception happens */ - private DAG genDagGraph(ProcessDefinition processDefinition) throws Exception { + private DAG genDagGraph(ProcessDefinition processDefinition) { String processDefinitionJson = processDefinition.getProcessDefinitionJson(); @@ -1386,8 +1407,6 @@ public class ProcessDefinitionService extends BaseDAGService { } - - /** * whether the graph has a ring * @@ -1405,7 +1424,7 @@ public class ProcessDefinitionService extends BaseDAGService { // Fill edge relations for (TaskNode taskNodeResponse : taskNodeResponseList) { taskNodeResponse.getPreTasks(); - List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(),String.class); + List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); if (CollectionUtils.isNotEmpty(preTasks)) { for (String preTask : preTasks) { if (!graph.addEdge(preTask, taskNodeResponse.getName())) { @@ -1418,19 +1437,19 @@ public class ProcessDefinitionService extends BaseDAGService { return graph.hasCycle(); } - private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){ + private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); if (processDefinition != null) { - if(num > 1){ - String str = processDefinitionName.substring(0,processDefinitionName.length() - 3); - processDefinitionName = str + "("+num+")"; - }else{ - processDefinitionName = processDefinition.getName() + "("+num+")"; + if (num > 1) { + String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); + processDefinitionName = str + "(" + num + ")"; + } else { + processDefinitionName = processDefinition.getName() + "(" + num + ")"; } - }else{ + } else { return processDefinitionName; } - return recursionProcessDefinitionName(projectId,processDefinitionName,num + 1); + return recursionProcessDefinitionName(projectId, processDefinitionName, num + 1); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index a5a341376e..7e8a2abeae 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -504,9 +504,8 @@ public class ProcessInstanceService extends BaseDAGService { * * @param processInstanceId process instance id * @return variables data - * @throws Exception exception */ - public Map viewVariables( Integer processInstanceId) throws Exception { + public Map viewVariables(Integer processInstanceId) { Map result = new HashMap<>(5); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); @@ -537,7 +536,7 @@ public class ProcessInstanceService extends BaseDAGService { List taskNodeList = workflowData.getTasks(); // global param string - String globalParamStr = JSON.toJSONString(globalParams); + String globalParamStr = JSONUtils.toJson(globalParams); globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); globalParams = JSON.parseArray(globalParamStr, Property.class); for (Property property : globalParams) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java index 00d95779ed..f34554bb89 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java @@ -16,9 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.springframework.beans.factory.InitializingBean; @@ -33,6 +33,7 @@ import java.util.List; @Service public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { + private static final String PARAMS = "params"; @Autowired private DataSourceMapper dataSourceMapper; @@ -42,14 +43,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addExportSpecialParam(JSONObject taskNode) { + public JsonNode addExportSpecialParam(JsonNode taskNode) { // add sqlParameters - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); + ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); + DataSource dataSource = dataSourceMapper.selectById(sqlParameters.get("datasource").asInt()); if (null != dataSource) { sqlParameters.put("datasourceName", dataSource.getName()); } - taskNode.put("params", sqlParameters); + ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } @@ -60,14 +61,14 @@ public class DataSourceParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addImportSpecialParam(JSONObject taskNode) { - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); + public JsonNode addImportSpecialParam(JsonNode taskNode) { + ObjectNode sqlParameters = (ObjectNode) taskNode.path(PARAMS); + List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.path("datasourceName").asText()); if (!dataSources.isEmpty()) { DataSource dataSource = dataSources.get(0); sqlParameters.put("datasource", dataSource.getId()); } - taskNode.put("params", sqlParameters); + ((ObjectNode)taskNode).set(PARAMS, sqlParameters); return taskNode; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java index b42b3b5a02..ce43d0ec01 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java @@ -16,8 +16,9 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -34,6 +35,7 @@ import org.springframework.stereotype.Service; @Service public class DependentParam implements ProcessAddTaskParam, InitializingBean { + private static final String DEPENDENCE = "dependence"; @Autowired ProcessDefinitionMapper processDefineMapper; @@ -47,18 +49,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return task node json object */ @Override - public JSONObject addExportSpecialParam(JSONObject taskNode) { + public JsonNode addExportSpecialParam(JsonNode taskNode) { // add dependent param - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if (null != dependentParameters) { - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + ArrayNode dependTaskList = (ArrayNode) dependentParameters.get("dependTaskList"); for (int j = 0; j < dependTaskList.size(); j++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + JsonNode dependentTaskModel = dependTaskList.path(j); + ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - int definitionId = dependentItem.getInteger("definitionId"); + ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); + int definitionId = dependentItem.path("definitionId").asInt(); ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); if (null != definition) { dependentItem.put("projectName", definition.getProjectName()); @@ -66,7 +68,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } - taskNode.put("dependence", dependentParameters); + ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; @@ -78,18 +80,18 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { * @return */ @Override - public JSONObject addImportSpecialParam(JSONObject taskNode) { - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + public JsonNode addImportSpecialParam(JsonNode taskNode) { + ObjectNode dependentParameters = JSONUtils.parseObject(taskNode.path(DEPENDENCE).asText()); if(dependentParameters != null){ - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + ArrayNode dependTaskList = (ArrayNode) dependentParameters.path("dependTaskList"); for (int h = 0; h < dependTaskList.size(); h++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + ObjectNode dependentTaskModel = (ObjectNode) dependTaskList.path(h); + ArrayNode dependItemList = (ArrayNode) dependentTaskModel.get("dependItemList"); for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); + ObjectNode dependentItem = (ObjectNode) dependItemList.path(k); + Project dependentItemProject = projectMapper.queryByName(dependentItem.path("projectName").asText()); if(dependentItemProject != null){ - ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); + ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.path("definitionName").asText()); if(definition != null){ dependentItem.put("projectId",dependentItemProject.getId()); dependentItem.put("definitionId",definition.getId()); @@ -97,7 +99,7 @@ public class DependentParam implements ProcessAddTaskParam, InitializingBean { } } } - taskNode.put("dependence", dependentParameters); + ((ObjectNode)taskNode).set(DEPENDENCE, dependentParameters); } return taskNode; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java index b30b777ca3..8e408556b0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/ProcessAddTaskParam.java @@ -16,7 +16,7 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; /** * ProcessAddTaskParam @@ -28,12 +28,12 @@ public interface ProcessAddTaskParam { * @param taskNode task node json object * @return task node json object */ - JSONObject addExportSpecialParam(JSONObject taskNode); + JsonNode addExportSpecialParam(JsonNode taskNode); /** * add task special param: sql task dependent task * @param taskNode task node json object * @return task node json object */ - JSONObject addImportSpecialParam(JSONObject taskNode); + JsonNode addImportSpecialParam(JsonNode taskNode); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8f69b94274..d1a051295b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -16,8 +16,8 @@ */ package org.apache.dolphinscheduler.api.service; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; @@ -621,8 +621,8 @@ public class ProcessDefinitionServiceTest { "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; - JSONObject jsonObject = JSONUtils.parseObject(topProcessJson); - JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); + ObjectNode jsonObject = JSONUtils.parseObject(topProcessJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.path("tasks"); String originSubJson = jsonArray.toString(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java index b8fcd62333..4566d93af2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java @@ -16,7 +16,8 @@ */ package org.apache.dolphinscheduler.api.utils.exportprocess; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -48,13 +49,13 @@ public class DataSourceParamTest { "\"preTasks\":[\"dependent\"]}"; - JSONObject taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject sql = addTaskParam.addExportSpecialParam(taskNode); + JsonNode sql = addTaskParam.addExportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); } @@ -72,13 +73,13 @@ public class DataSourceParamTest { "\"taskInstancePriority\":\"MEDIUM\",\"name\":\"mysql\",\"dependence\":{}," + "\"retryInterval\":\"1\",\"preTasks\":[\"dependent\"],\"id\":\"tasks-8745\"}"; - JSONObject taskNode = JSONUtils.parseObject(sqlJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject sql = addTaskParam.addImportSpecialParam(taskNode); + JsonNode sql = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), sql.toString(), false); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java index d21b7be0e2..be61ab7559 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.utils.exportprocess; import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -43,13 +45,13 @@ public class DependentParamTest { "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; - JSONObject taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addExportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addExportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); } @@ -57,13 +59,13 @@ public class DependentParamTest { String dependentEmpty = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"}"; - JSONObject taskEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskEmpty.getString("type"))) { - String taskType = taskEmpty.getString("type"); + ObjectNode taskEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskEmpty.path("type").asText())) { + String taskType = taskEmpty.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskEmpty); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskEmpty); JSONAssert.assertEquals(taskEmpty.toString(), dependent.toString(), false); } @@ -81,13 +83,13 @@ public class DependentParamTest { "\"projectId\":1,\"cycle\":\"day\",\"definitionId\":7}],\"relation\":\"AND\"}]," + "\"relation\":\"AND\"},\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - JSONObject taskNode = JSONUtils.parseObject(dependentJson); - if (StringUtils.isNotEmpty(taskNode.getString("type"))) { - String taskType = taskNode.getString("type"); + ObjectNode taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNode.toString(), dependent.toString(), false); } @@ -97,13 +99,13 @@ public class DependentParamTest { "\"strategy\":\"\"},\"maxRetryTimes\":\"0\",\"taskInstancePriority\":\"MEDIUM\"" + ",\"name\":\"dependent\",\"retryInterval\":\"1\",\"preTasks\":[],\"id\":\"tasks-55485\"}"; - JSONObject taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); - if (StringUtils.isNotEmpty(taskNodeEmpty.getString("type"))) { - String taskType = taskNodeEmpty.getString("type"); + JsonNode taskNodeEmpty = JSONUtils.parseObject(dependentEmpty); + if (StringUtils.isNotEmpty(taskNodeEmpty.path("type").asText())) { + String taskType = taskNodeEmpty.path("type").asText(); ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - JSONObject dependent = addTaskParam.addImportSpecialParam(taskNode); + JsonNode dependent = addTaskParam.addImportSpecialParam(taskNode); JSONAssert.assertEquals(taskNodeEmpty.toString(), dependent.toString(), false); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index f0aed91a0d..4f701490e5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -16,13 +16,12 @@ */ package org.apache.dolphinscheduler.common.utils; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.TypeReference; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,230 +34,233 @@ import java.util.*; */ public class JSONUtils { - private static final Logger logger = LoggerFactory.getLogger(JSONUtils.class); - - /** - * can use static singleton, inject: just make sure to reuse! - */ - private static final ObjectMapper objectMapper = new ObjectMapper(); - - private JSONUtils() { - //Feature that determines whether encountering of unknown properties, false means not analyzer unknown properties - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).setTimeZone(TimeZone.getDefault()); - } - - /** - * json representation of object - * @param object object - * @return object to json string - */ - public static String toJson(Object object) { - try{ - return JSON.toJSONString(object,false); - } catch (Exception e) { - logger.error("object to json exception!",e); + private static final Logger logger = LoggerFactory.getLogger(JSONUtils.class); + + /** + * can use static singleton, inject: just make sure to reuse! + */ + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private JSONUtils() { } - return null; - } - - - /** - * - * This method deserializes the specified Json into an object of the specified class. It is not - * suitable to use if the specified class is a generic type since it will not have the generic - * type information because of the Type Erasure feature of Java. Therefore, this method should not - * be used if the desired type is a generic type. Note that this method works fine if the any of - * the fields of the specified object are generics, just the object itself should not be a - * generic type. - * - * @param json the string from which the object is to be deserialized - * @param clazz the class of T - * @param T - * @return an object of type T from the string - * classOfT - */ - public static T parseObject(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return null; + static { + //Feature that determines whether encountering of unknown properties, false means not analyzer unknown properties + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).setTimeZone(TimeZone.getDefault()); + objectMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true).setTimeZone(TimeZone.getDefault()); } - try { - return JSON.parseObject(json, clazz); - } catch (Exception e) { - logger.error("parse object exception!",e); + /** + * json representation of object + * + * @param object object + * @return object to json string + */ + public static String toJson(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + logger.error("object to json exception!", e); + } + + return null; } - return null; - } - - - /** - * json to list - * - * @param json json string - * @param clazz class - * @param T - * @return list - */ - public static List toList(String json, Class clazz) { - if (StringUtils.isEmpty(json)) { - return new ArrayList<>(); + + + /** + * This method deserializes the specified Json into an object of the specified class. It is not + * suitable to use if the specified class is a generic type since it will not have the generic + * type information because of the Type Erasure feature of Java. Therefore, this method should not + * be used if the desired type is a generic type. Note that this method works fine if the any of + * the fields of the specified object are generics, just the object itself should not be a + * generic type. + * + * @param json the string from which the object is to be deserialized + * @param clazz the class of T + * @param T + * @return an object of type T from the string + * classOfT + */ + public static T parseObject(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, clazz); + } catch (Exception e) { + logger.error("parse object exception!", e); + } + return null; } - try { - return JSONArray.parseArray(json, clazz); - } catch (Exception e) { - logger.error("JSONArray.parseArray exception!",e); + + + /** + * json to list + * + * @param json json string + * @param clazz class + * @param T + * @return list + */ + public static List toList(String json, Class clazz) { + if (StringUtils.isEmpty(json)) { + return new ArrayList<>(); + } + try { + return objectMapper.readValue(json, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("JSONArray.parseArray exception!", e); + } + + return new ArrayList<>(); } - return new ArrayList<>(); - } + /** + * check json object valid + * + * @param json json + * @return true if valid + */ + public static boolean checkJsonValid(String json) { + if (StringUtils.isEmpty(json)) { + return false; + } - /** - * check json object valid - * - * @param json json - * @return true if valid - */ - public static boolean checkJsonValid(String json) { + try { + objectMapper.readTree(json); + return true; + } catch (IOException e) { + logger.error("check json object valid exception!", e); + } - if (StringUtils.isEmpty(json)) { - return false; + return false; } - try { - objectMapper.readTree(json); - return true; - } catch (IOException e) { - logger.error("check json object valid exception!",e); - } - return false; - } - - - /** - * Method for finding a JSON Object field with specified name in this - * node or its child nodes, and returning value it has. - * If no matching field is found in this node or its descendants, returns null. - * - * @param jsonNode json node - * @param fieldName Name of field to look for - * - * @return Value of first matching node found, if any; null if none - */ - public static String findValue(JsonNode jsonNode, String fieldName) { - JsonNode node = jsonNode.findValue(fieldName); - - if (node == null) { - return null; - } + /** + * Method for finding a JSON Object field with specified name in this + * node or its child nodes, and returning value it has. + * If no matching field is found in this node or its descendants, returns null. + * + * @param jsonNode json node + * @param fieldName Name of field to look for + * @return Value of first matching node found, if any; null if none + */ + public static String findValue(JsonNode jsonNode, String fieldName) { + JsonNode node = jsonNode.findValue(fieldName); - return node.toString(); - } - - - /** - * json to map - * - * {@link #toMap(String, Class, Class)} - * - * @param json json - * @return json to map - */ - public static Map toMap(String json) { - if (StringUtils.isEmpty(json)) { - return null; - } + if (node == null) { + return null; + } - try { - return JSON.parseObject(json, new TypeReference>(){}); - } catch (Exception e) { - logger.error("json to map exception!",e); + return node.toString(); } - return null; - } - - /** - * - * json to map - * - * @param json json - * @param classK classK - * @param classV classV - * @param K - * @param V - * @return to map - */ - public static Map toMap(String json, Class classK, Class classV) { - if (StringUtils.isEmpty(json)) { - return null; + + /** + * json to map + *

+ * {@link #toMap(String, Class, Class)} + * + * @param json json + * @return json to map + */ + public static Map toMap(String json) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, new TypeReference>() {}); + } catch (Exception e) { + logger.error("json to map exception!", e); + } + + return null; } - try { - return JSON.parseObject(json, new TypeReference>() {}); - } catch (Exception e) { - logger.error("json to map exception!",e); + /** + * json to map + * + * @param json json + * @param classK classK + * @param classV classV + * @param K + * @param V + * @return to map + */ + public static Map toMap(String json, Class classK, Class classV) { + if (StringUtils.isEmpty(json)) { + return null; + } + + try { + return objectMapper.readValue(json, new TypeReference>() { + }); + } catch (Exception e) { + logger.error("json to map exception!", e); + } + + return null; } - return null; - } - - /** - * object to json string - * @param object object - * @return json string - */ - public static String toJsonString(Object object) { - try{ - return JSON.toJSONString(object,false); - } catch (Exception e) { - throw new RuntimeException("Object json deserialization exception.", e); + /** + * object to json string + * + * @param object object + * @return json string + */ + public static String toJsonString(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (Exception e) { + throw new RuntimeException("Object json deserialization exception.", e); + } } - } - public static JSONObject parseObject(String text) { - try{ - return JSON.parseObject(text); - } catch (Exception e) { - throw new RuntimeException("String json deserialization exception.", e); + public static ObjectNode parseObject(String text) { + try { + return (ObjectNode) objectMapper.readTree(text); + } catch (Exception e) { + throw new RuntimeException("String json deserialization exception.", e); + } } - } - public static JSONArray parseArray(String text) { - try{ - return JSON.parseArray(text); - } catch (Exception e) { - throw new RuntimeException("Json deserialization exception.", e); + public static ArrayNode parseArray(String text) { + try { + return (ArrayNode) objectMapper.readTree(text); + } catch (Exception e) { + throw new RuntimeException("Json deserialization exception.", e); + } } - } + /** + * json serializer + */ + public static class JsonDataSerializer extends JsonSerializer { - /** - * json serializer - */ - public static class JsonDataSerializer extends JsonSerializer { + @Override + public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeRawValue(value); + } - @Override - public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException { - gen.writeRawValue(value); } - } + /** + * json data deserializer + */ + public static class JsonDataDeserializer extends JsonDeserializer { - /** - * json data deserializer - */ - public static class JsonDataDeserializer extends JsonDeserializer { + @Override + public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + return node.toString(); + } - @Override - public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { - JsonNode node = p.getCodec().readTree(p); - return node.toString(); } - - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java index 270e0c4696..f3efef2eca 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java @@ -196,7 +196,7 @@ public class ParameterUtils { property.setValue(val); } } - return JSON.toJSONString(globalParamList); + return JSONUtils.toJson(globalParamList); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java new file mode 100644 index 0000000000..f30638cda2 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class StreamUtils { + + private StreamUtils() { } + + public static Stream asStream(Iterator sourceIterator) { + return asStream(sourceIterator, false); + } + + public static Stream asStream(Iterator sourceIterator, boolean parallel) { + Iterable iterable = () -> sourceIterator; + return StreamSupport.stream(iterable.spliterator(), parallel); + } + +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java index 8ce60349ed..1756078fe1 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.utils; import com.alibaba.fastjson.JSON; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; @@ -101,9 +103,6 @@ public class JSONUtilsTest { @Test public void testParseObject() { - Assert.assertEquals("{\"foo\":\"bar\"}", JSONUtils.parseObject( - "{\n" + "\"foo\": \"bar\",\n" + "}", String.class)); - Assert.assertNull(JSONUtils.parseObject("", null)); Assert.assertNull(JSONUtils.parseObject("foo", String.class)); } @@ -134,15 +133,19 @@ public class JSONUtilsTest { map.put("foo","bar"); Assert.assertTrue(map.equals(JSONUtils.toMap( - "{\n" + "\"foo\": \"bar\",\n" + "}"))); + "{\n" + "\"foo\": \"bar\"\n" + "}"))); Assert.assertFalse(map.equals(JSONUtils.toMap( - "{\n" + "\"bar\": \"foo\",\n" + "}"))); + "{\n" + "\"bar\": \"foo\"\n" + "}"))); Assert.assertNull(JSONUtils.toMap("3")); Assert.assertNull(JSONUtils.toMap(null)); Assert.assertNull(JSONUtils.toMap("3", null, null)); Assert.assertNull(JSONUtils.toMap(null, null, null)); + + String str = "{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}"; + Map m = JSONUtils.toMap(str); + Assert.assertNotNull(m); } @Test @@ -155,4 +158,27 @@ public class JSONUtilsTest { Assert.assertEquals(String.valueOf((Object) null), JSONUtils.toJsonString(null)); } + + @Test + public void parseObject() { + String str = "{\"color\":\"yellow\",\"type\":\"renault\"}"; + ObjectNode node = JSONUtils.parseObject(str); + + Assert.assertEquals("yellow", node.path("color").asText()); + + node.put("price", 100); + Assert.assertEquals(100, node.path("price").asInt()); + + node.put("color", "red"); + Assert.assertEquals("red", node.path("color").asText()); + } + + @Test + public void parseArray() { + String str = "[{\"color\":\"yellow\",\"type\":\"renault\"}]"; + ArrayNode node = JSONUtils.parseArray(str); + + Assert.assertEquals("yellow", node.path(0).path("color").asText()); + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java index abdc15cc6e..3d40e7a4cf 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java @@ -91,13 +91,13 @@ public class ParameterUtilsTest { globalParamList.add(property); String result2 = ParameterUtils.curingGlobalParams(null,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,scheduleTime); - Assert.assertEquals(result2, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result2, JSONUtils.toJson(globalParamList)); String result3 = ParameterUtils.curingGlobalParams(globalParamMap,globalParamList,CommandType.START_CURRENT_TASK_PROCESS,null); - Assert.assertEquals(result3, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result3, JSONUtils.toJson(globalParamList)); String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); - Assert.assertEquals(result4, JSON.toJSONString(globalParamList)); + Assert.assertEquals(result4, JSONUtils.toJson(globalParamList)); //test var $ startsWith globalParamMap.put("bizDate","${system.biz.date}"); @@ -113,7 +113,7 @@ public class ParameterUtilsTest { globalParamList.add(property4); String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime); - Assert.assertEquals(result5,JSONUtils.toJsonString(globalParamList)); + Assert.assertEquals(result5, JSONUtils.toJson(globalParamList)); } /** diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java new file mode 100644 index 0000000000..5a04969dee --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.*; + +public class StreamUtilsTest { + + @Test + public void asStream() { + List list = Arrays.asList("a", "b", "c"); + List ret = StreamUtils.asStream(list.iterator()) + .filter(item -> item.equals("a")) + .collect(Collectors.toList()); + Assert.assertEquals("a", ret.get(0)); + } + +} \ No newline at end of file From 12c249ab47efb895ea04f14862f553f2555c9024 Mon Sep 17 00:00:00 2001 From: khadgarmage Date: Thu, 4 Jun 2020 23:13:43 +0800 Subject: [PATCH 07/17] fix ci log bug (#2901) --- .github/workflows/ci_e2e.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_e2e.yml b/.github/workflows/ci_e2e.yml index 82c81ef4e1..68230e56fb 100644 --- a/.github/workflows/ci_e2e.yml +++ b/.github/workflows/ci_e2e.yml @@ -69,6 +69,6 @@ jobs: uses: actions/upload-artifact@v1 with: name: dslogs - path: /var/lib/docker/volumes/docker-swarm_dolphinscheduler-logs/_data + path: /var/lib/docker/volumes/dolphinscheduler-logs/_data From b21259d6005ee7e8d760ea32302d668c552463f7 Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Sat, 6 Jun 2020 17:11:20 +0800 Subject: [PATCH 08/17] add MicroBenchMark model (#2851) * add MicroBenchMark model * add log * fix propertyName * small change * add ut * format * add @Test * fix maven compile error * remove enum set method * exclusion microbench ut * small change * Use assembly instead of shade Co-authored-by: dailidong --- .github/workflows/ci_ut.yml | 2 +- dolphinscheduler-microbench/pom.xml | 101 ++++++++++++++ .../base/AbstractBaseBenchmark.java | 123 ++++++++++++++++++ .../microbench/common/EnumBenchMark.java | 112 ++++++++++++++++ pom.xml | 1 + 5 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 dolphinscheduler-microbench/pom.xml create mode 100644 dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/base/AbstractBaseBenchmark.java create mode 100644 dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/common/EnumBenchMark.java diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 55f1259d2b..b301cf77bd 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -78,7 +78,7 @@ jobs: -Dsonar.core.codeCoveragePlugin=jacoco -Dsonar.projectKey=apache-dolphinscheduler -Dsonar.login=e4058004bc6be89decf558ac819aa1ecbee57682 - -Dsonar.exclusions=dolphinscheduler-ui/src/**/i18n/locale/*.js + -Dsonar.exclusions=dolphinscheduler-ui/src/**/i18n/locale/*.js,dolphinscheduler-microbench/src/**/* env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} diff --git a/dolphinscheduler-microbench/pom.xml b/dolphinscheduler-microbench/pom.xml new file mode 100644 index 0000000000..6080343ed0 --- /dev/null +++ b/dolphinscheduler-microbench/pom.xml @@ -0,0 +1,101 @@ + + + + + dolphinscheduler + org.apache.dolphinscheduler + 1.2.1-SNAPSHOT + + 4.0.0 + + dolphinscheduler-microbench + jar + ${project.artifactId} + + + UTF-8 + 1.21 + 1.8 + benchmarks + + + + + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + provided + + + + junit + junit + compile + + + + org.slf4j + slf4j-api + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven-compiler-plugin.version} + + ${javac.target} + ${javac.target} + ${javac.target} + false + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + + org.openjdk.jmh.Main + + + + jar-with-dependencies + + + + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/base/AbstractBaseBenchmark.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/base/AbstractBaseBenchmark.java new file mode 100644 index 0000000000..25f0ae9114 --- /dev/null +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/base/AbstractBaseBenchmark.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.microbench.base; + +import org.junit.Test; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * BaseBenchMark + * If you need to test jmh, please extend him first + */ +@Warmup(iterations = AbstractBaseBenchmark.DEFAULT_WARMUP_ITERATIONS) +@Measurement(iterations = AbstractBaseBenchmark.DEFAULT_MEASURE_ITERATIONS) +@State(Scope.Thread) +@Fork(AbstractBaseBenchmark.DEFAULT_FORKS) +public abstract class AbstractBaseBenchmark { + + static final int DEFAULT_WARMUP_ITERATIONS = 10; + + static final int DEFAULT_MEASURE_ITERATIONS = 10; + + static final int DEFAULT_FORKS = 2; + + private static Logger logger = LoggerFactory.getLogger(AbstractBaseBenchmark.class); + + + private ChainedOptionsBuilder newOptionsBuilder() { + + String className = getClass().getSimpleName(); + + ChainedOptionsBuilder optBuilder = new OptionsBuilder() + // set benchmark ClassName + .include(className); + + if (getMeasureIterations() > 0) { + optBuilder.warmupIterations(getMeasureIterations()); + } + + if (getMeasureIterations() > 0) { + optBuilder.measurementIterations(getMeasureIterations()); + } + + if (getForks() > 0) { + optBuilder.forks(getForks()); + } + + String output = getReportDir(); + if (output != null) { + boolean writeFileStatus; + String filePath = getReportDir() + className + ".json"; + File file = new File(filePath); + + if (file.exists()) { + writeFileStatus = file.delete(); + + + } else { + writeFileStatus = file.getParentFile().mkdirs(); + try { + writeFileStatus = file.createNewFile(); + } catch (IOException e) { + logger.warn("jmh test create file error" + e); + } + } + if (writeFileStatus) { + optBuilder.resultFormat(ResultFormatType.JSON) + .result(filePath); + } + } + return optBuilder; + } + + @Test + public void run() throws Exception { + new Runner(newOptionsBuilder().build()).run(); + } + + private int getWarmupIterations() { + + String value = System.getProperty("warmupIterations"); + return null != value ? Integer.parseInt(value) : -1; + } + + private int getMeasureIterations() { + String value = System.getProperty("measureIterations"); + return null != value ? Integer.parseInt(value) : -1; + } + + private static String getReportDir() { + return System.getProperty("perfReportDir"); + } + + private static int getForks() { + String value = System.getProperty("forkCount"); + return null != value ? Integer.parseInt(value) : -1; + } + + +} + diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/common/EnumBenchMark.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/common/EnumBenchMark.java new file mode 100644 index 0000000000..dcce5368e3 --- /dev/null +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/common/EnumBenchMark.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.microbench.common; + + +import org.apache.dolphinscheduler.microbench.base.AbstractBaseBenchmark; +import org.openjdk.jmh.annotations.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + *Enum values JMH test + */ +@Warmup(iterations = 2, time = 1) +@Measurement(iterations = 4, time = 1) +@State(Scope.Benchmark) +public class EnumBenchMark extends AbstractBaseBenchmark { + + @Benchmark + public boolean simpleTest(){ + return Boolean.TRUE; + } + @Param({"101", "108", "103", "104", "105", "103"}) + private int testNum; + + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void enumValuesTest() { + TestTypeEnum.oldGetNameByType(testNum); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void enumStaticMapTest() { + TestTypeEnum.newGetNameByType(testNum); + } + + + public enum TestTypeEnum { + + TYPE_101(101, "TYPE101"), + TYPE_102(102, "TYPE102"), + TYPE_103(103, "TYPE103"), + TYPE_104(104, "TYPE104"), + TYPE_105(105, "TYPE105"), + TYPE_106(106, "TYPE106"), + TYPE_107(107, "TYPE107"), + TYPE_108(108, "TYPE108"); + + private int code; + private String name; + + public int getCode() { + return code; + } + + + public String getName() { + return name; + } + + + TestTypeEnum(int code, String name) { + this.code = code; + this.name = name; + } + + private static final Map TEST_TYPE_MAP = new HashMap<>(); + + static { + for (TestTypeEnum testTypeEnum : TestTypeEnum.values()) { + TEST_TYPE_MAP.put(testTypeEnum.code,testTypeEnum); + } + } + + public static void newGetNameByType(int code) { + if (TEST_TYPE_MAP.containsKey(code)) { + TEST_TYPE_MAP.get(code); + return; + } + throw new IllegalArgumentException("invalid code : " + code); + } + + public static void oldGetNameByType(int code) { + for (TestTypeEnum testTypeEnum : TestTypeEnum.values()) { + if (testTypeEnum.getCode() == code) { + return; + } + } + throw new IllegalArgumentException("invalid code : " + code); + } + } + +} diff --git a/pom.xml b/pom.xml index 5dca76ce4c..06121b1321 100644 --- a/pom.xml +++ b/pom.xml @@ -1010,5 +1010,6 @@ dolphinscheduler-remote dolphinscheduler-service dolphinscheduler-plugin-api + dolphinscheduler-microbench From c8f28ab2ba865a40063486ca0879ef183342086d Mon Sep 17 00:00:00 2001 From: eights Date: Tue, 9 Jun 2020 03:16:35 -0700 Subject: [PATCH 09/17] sqoop task optimization --- .../dolphinscheduler/api/enums/Status.java | 2 +- .../common/enums/SqoopJobType.java | 41 ++++++++ .../common/task/sqoop/SqoopParameters.java | 99 +++++++++++++++++-- .../consumer/TaskPriorityQueueConsumer.java | 32 +++--- .../task/sqoop/generator/CommonGenerator.java | 39 ++++++++ .../sources/MysqlSourceGenerator.java | 14 +-- 6 files changed, 198 insertions(+), 29 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index ae7300f54d..7ed6addcb5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -218,7 +218,7 @@ public enum Status { DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), - PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), + PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java new file mode 100644 index 0000000000..f1fde27928 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +public enum SqoopJobType { + CUSTOM(0, "CUSTOM"), + TEMPLATE(1, "TEMPLATE"); + + SqoopJobType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java index 7f02f42387..8b566a8472 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.task.sqoop; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -28,6 +30,23 @@ import java.util.List; */ public class SqoopParameters extends AbstractParameters { + /** + * sqoop job type: + * CUSTOM - custom sqoop job + * TEMPLATE - sqoop template job + */ + private String jobType; + + /** + * customJob eq 1, use customShell + */ + private String customShell; + + /** + * sqoop job name - map-reduce job name + */ + private String jobName; + /** * model type */ @@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters { */ private String targetParams; + /** + * hadoop custom param for sqoop job + */ + private List hadoopCustomParams; + + /** + * sqoop advanced param + */ + private List sqoopAdvancedParams; + public String getModelType() { return modelType; } @@ -101,18 +130,74 @@ public class SqoopParameters extends AbstractParameters { this.targetParams = targetParams; } + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getCustomShell() { + return customShell; + } + + public void setCustomShell(String customShell) { + this.customShell = customShell; + } + + public List getHadoopCustomParams() { + return hadoopCustomParams; + } + + public void setHadoopCustomParams(List hadoopCustomParams) { + this.hadoopCustomParams = hadoopCustomParams; + } + + public List getSqoopAdvancedParams() { + return sqoopAdvancedParams; + } + + public void setSqoopAdvancedParams(List sqoopAdvancedParams) { + this.sqoopAdvancedParams = sqoopAdvancedParams; + } + @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(modelType)&& - concurrency != 0 && - StringUtils.isNotEmpty(sourceType)&& - StringUtils.isNotEmpty(targetType)&& - StringUtils.isNotEmpty(sourceParams)&& - StringUtils.isNotEmpty(targetParams); + + boolean sqoopParamsCheck = false; + + if (StringUtils.isEmpty(jobType)) { + return sqoopParamsCheck; + } + + if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isEmpty(customShell) && + StringUtils.isNotEmpty(modelType) && + StringUtils.isNotEmpty(jobName) && + concurrency != 0 && + StringUtils.isNotEmpty(sourceType) && + StringUtils.isNotEmpty(targetType) && + StringUtils.isNotEmpty(sourceParams) && + StringUtils.isNotEmpty(targetParams); + } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isNotEmpty(customShell) && + StringUtils.isEmpty(jobName); + } + + return sqoopParamsCheck; } @Override public List getResourceFilesList() { - return new ArrayList<>(); + return new ArrayList<>(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 3314789fdb..da2363fc81 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -258,29 +259,32 @@ public class TaskPriorityQueueConsumer extends Thread{ /** - * set datax task relation + * set sqoop task relation * @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class); - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); + // sqoop job type is template set task relation + if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); - DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); - DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); + DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - if (dataSource != null){ - sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - } + if (dataSource != null){ + sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - if (dataTarget != null){ - sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java index 4944bac5ba..ffca73544d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -16,10 +16,17 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + + /** * common script generator */ @@ -32,6 +39,38 @@ public class CommonGenerator { try{ result.append("sqoop ") .append(sqoopParameters.getModelType()); + + //set sqoop job name + result.append(" -D mapred.job.name") + .append(Constants.EQUAL_SIGN) + .append(sqoopParameters.getJobName()); + + //set hadoop custom param + List hadoopCustomParams = sqoopParameters.getHadoopCustomParams(); + if (CollectionUtils.isNotEmpty(hadoopCustomParams)) { + for (Property hadoopCustomParam : hadoopCustomParams) { + String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp() + + Constants.EQUAL_SIGN + hadoopCustomParam.getValue(); + + if (StringUtils.isNotEmpty(hadoopCustomParamStr)) { + result.append(hadoopCustomParamStr); + } + } + } + + //set sqoop advanced custom param + List sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams(); + if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) { + + for (Property sqoopAdvancedParam : sqoopAdvancedParams) { + String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp() + + " " + sqoopAdvancedParam.getValue(); + if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) { + result.append(sqoopAdvancedParamStr); + } + } + } + if(sqoopParameters.getConcurrency() >0){ result.append(" -m ") .append(sqoopParameters.getConcurrency()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index f8e3d57c7d..402f4f8ee0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -77,19 +77,19 @@ public class MysqlSourceGenerator implements ISourceGenerator { }else{ srcQuery += " WHERE $CONDITIONS"; } - result.append(" --query \'"+srcQuery+"\'"); + result.append(" --query \'").append(srcQuery).append("\'"); } List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); if(mapColumnHive != null && !mapColumnHive.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnHive){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-hive ") .append(columnMap.substring(0,columnMap.length()-1)); } @@ -98,12 +98,12 @@ public class MysqlSourceGenerator implements ISourceGenerator { List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); if(mapColumnJava != null && !mapColumnJava.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnJava){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-java ") .append(columnMap.substring(0,columnMap.length()-1)); } From 50da6951fc09b436eabf3fc2ab6338f325c32fd5 Mon Sep 17 00:00:00 2001 From: eights Date: Tue, 9 Jun 2020 03:39:29 -0700 Subject: [PATCH 10/17] sqoop front-end optimization --- .../formModel/tasks/_source/datasource.vue | 2 +- .../dag/_source/formModel/tasks/sqoop.vue | 962 +++++++++++------- .../src/js/module/i18n/locale/en_US.js | 7 + .../src/js/module/i18n/locale/zh_CN.js | 7 + 4 files changed, 612 insertions(+), 366 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue index a173139d15..05e248f518 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue @@ -131,7 +131,7 @@ }, created () { let supportType = this.supportType || [] - this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS) + this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS) // Have a specified data source if (supportType.length) { let is = (type) => { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue index 6594ffcf74..4bed92d27a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue @@ -1,134 +1,194 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/