From 65841a5709aa681e7b3df6d8cdc10b575fdb9d7b Mon Sep 17 00:00:00 2001 From: simon824 Date: Fri, 12 Jun 2020 09:31:04 +0800 Subject: [PATCH] 1. change windows new line 'CR' to linux new line 'LF' 2. solve conflict --- .../command/TaskKillRequestCommand.java | 62 ++++++++- .../command/TaskKillResponseCommand.java | 120 +++++++++++++++++- .../worker/task/sqoop/SqoopTaskTest.java | 12 +- pom.xml | 1 + 4 files changed, 187 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java index 092e462a1f..4c0830b7cf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java @@ -1 +1,61 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task id */ private int taskInstanceId; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; } } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + +import java.io.Serializable; + +/** + * kill task request command + */ +public class TaskKillRequestCommand implements Serializable { + + /** + * task id + */ + private int taskInstanceId; + + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.TASK_KILL_REQUEST); + byte[] body = JsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "TaskKillRequestCommand{" + + "taskInstanceId=" + taskInstanceId + + '}'; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 9dc0c541d7..4b48c1ef4f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -1 +1,119 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; import java.util.List; /** * kill task response command */ public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List getAppIds() { return appIds; } public void setAppIds(List appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; } } \ No newline at end of file +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + +import java.io.Serializable; +import java.util.List; + +/** + * kill task response command + */ +public class TaskKillResponseCommand implements Serializable { + + /** + * taskInstanceId + */ + private int taskInstanceId; + + /** + * host + */ + private String host; + + /** + * status + */ + private int status; + + + /** + * processId + */ + private int processId; + + /** + * other resource manager appId , for example : YARN etc + */ + protected List appIds; + + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public List getAppIds() { + return appIds; + } + + public void setAppIds(List appIds) { + this.appIds = appIds; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.TASK_KILL_RESPONSE); + byte[] body = JsonSerializer.serialize(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "TaskKillResponseCommand{" + + "taskInstanceId=" + taskInstanceId + + ", host='" + host + '\'' + + ", status=" + status + + ", processId=" + processId + + ", appIds=" + appIds + + '}'; + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 0f18b4e2ac..2d0e39aa69 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -16,8 +16,8 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop; -import com.alibaba.fastjson.JSON; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; @@ -86,7 +86,7 @@ public class SqoopTaskTest { //import mysql to HDFS with hadoo String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," + "\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters mysqlToHdfsParams = JSON.parseObject(mysqlToHdfs,SqoopParameters.class); + SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs,SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext); String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; @@ -96,28 +96,28 @@ public class SqoopTaskTest { String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters hdfsToMysqlParams = JSON.parseObject(hdfsToMysql,SqoopParameters.class); + SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql,SqoopParameters.class); String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext); String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); //export hive to mysql String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters hiveToMysqlParams = JSON.parseObject(hiveToMysql,SqoopParameters.class); + SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql,SqoopParameters.class); String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext); String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); //import mysql to hive String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; - SqoopParameters mysqlToHiveParams = JSON.parseObject(mysqlToHive,SqoopParameters.class); + SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive,SqoopParameters.class); String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext); String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); //sqoop CUSTOM job String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}"; - SqoopParameters sqoopCustomParams = JSON.parseObject(sqoopCustomString, SqoopParameters.class); + SqoopParameters sqoopCustomParams = JSONUtils.parseObject(sqoopCustomString, SqoopParameters.class); String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext()); String sqoopCustomExpected = "sqoop import"; Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript); diff --git a/pom.xml b/pom.xml index 0e625ea9a7..dacc4280c0 100644 --- a/pom.xml +++ b/pom.xml @@ -815,6 +815,7 @@ **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/EnvFileTest.java **/server/worker/task/spark/SparkTaskTest.java + **/server/worker/task/datax/DataxTaskTest.java **/server/worker/task/sqoop/SqoopTaskTest.java **/server/worker/EnvFileTest.java **/service/quartz/cron/CronUtilsTest.java