From a632be73c094263e8005fbf702db1bf13dfbc80e Mon Sep 17 00:00:00 2001 From: kezhenxu94 Date: Fri, 10 Sep 2021 00:35:39 +0800 Subject: [PATCH] Clean up unused util classes... (#6154) --- .../processor/AlertRequestProcessor.java | 3 +- .../controller/EnvironmentControllerTest.java | 2 +- .../common/utils/IOUtils.java | 38 --- .../common/utils/Preconditions.java | 71 ----- .../common/utils/StreamUtils.java | 39 --- .../common/utils/VarPoolUtils.java | 108 -------- .../common/utils/PreconditionsTest.java | 67 ----- .../common/utils/StreamUtilsTest.java | 39 --- .../common/utils/VarPoolUtilsTest.java | 51 ---- .../HostUpdateResponseProcessor.java | 3 +- .../master/processor/StateEventProcessor.java | 3 +- .../master/processor/TaskAckProcessor.java | 3 +- .../processor/TaskKillResponseProcessor.java | 3 +- .../processor/TaskResponseProcessor.java | 3 +- .../server/utils/DataxUtils.java | 137 --------- .../server/utils/FlinkArgsUtils.java | 138 --------- .../server/utils/MapReduceArgsUtils.java | 85 ------ .../server/utils/ParamUtils.java | 157 ----------- .../server/utils/SparkArgsUtils.java | 133 --------- .../server/utils/UDFUtils.java | 100 ------- .../worker/processor/DBTaskAckProcessor.java | 3 +- .../processor/DBTaskResponseProcessor.java | 3 +- .../worker/processor/HostUpdateProcessor.java | 3 +- .../processor/TaskExecuteProcessor.java | 3 +- .../worker/processor/TaskKillProcessor.java | 3 +- .../server/utils/DataxUtilsTest.java | 113 -------- .../server/utils/FlinkArgsUtilsTest.java | 136 --------- .../server/utils/MapReduceArgsUtilsTest.java | 95 ------- .../server/utils/ParamUtilsTest.java | 261 ------------------ .../server/utils/SparkArgsUtilsTest.java | 133 --------- 30 files changed, 23 insertions(+), 1913 deletions(-) delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java delete mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java delete mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java delete mode 100755 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java delete mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java index e576d00491..9421a97546 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.alert.processor; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.runner.AlertSender; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -31,6 +30,8 @@ import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; public class AlertRequestProcessor implements NettyRequestProcessor { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java index 7ba51ae785..0b7233be92 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java @@ -25,7 +25,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.junit.After; import org.junit.Assert; @@ -39,6 +38,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; /** * environment controller test diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java deleted file mode 100644 index 96366d539f..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.io.Closeable; -import java.io.IOException; - -public class IOUtils { - - private IOUtils() { - throw new UnsupportedOperationException("Construct IOUtils"); - } - - public static void closeQuietly(Closeable closeable) { - if (closeable != null) { - try { - closeable.close(); - } catch (IOException ignore) { - // nothing need to do - } - } - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java deleted file mode 100644 index 9db2852644..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -/** - * utility methods for validating input - */ -public final class Preconditions { - - private Preconditions() { - throw new UnsupportedOperationException("Construct Preconditions"); - } - - /** - * if obj is null will throw NPE - * - * @param obj obj - * @param T - * @return T - */ - public static T checkNotNull(T obj) { - if (obj == null) { - throw new NullPointerException(); - } - return obj; - } - - /** - * if obj is null will throw NullPointerException with error message - * - * @param obj obj - * @param errorMsg error message - * @param T - * @return T - */ - public static T checkNotNull(T obj, String errorMsg) { - if (obj == null) { - throw new NullPointerException(errorMsg); - } - return obj; - } - - /** - * if condition is false will throw an IllegalArgumentException with the given message - * - * @param condition condition - * @param errorMsg error message - * @throws IllegalArgumentException Thrown, if the condition is violated. - */ - public static void checkArgument(boolean condition, Object errorMsg) { - if (!condition) { - throw new IllegalArgumentException(String.valueOf(errorMsg)); - } - } - -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java deleted file mode 100644 index fb4941a95d..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.util.Iterator; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -public class StreamUtils { - - private StreamUtils() { - throw new UnsupportedOperationException("Construct StreamUtils"); - } - - public static Stream asStream(Iterator sourceIterator) { - return asStream(sourceIterator, false); - } - - public static Stream asStream(Iterator sourceIterator, boolean parallel) { - Iterable iterable = () -> sourceIterator; - return StreamSupport.stream(iterable.spliterator(), parallel); - } - -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java deleted file mode 100644 index f286300d0d..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import org.apache.dolphinscheduler.common.model.TaskNode; - -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Map; - -public class VarPoolUtils { - - private static final String LOCALPARAMS = "localParams"; - - private static final String PROP = "prop"; - - private static final String VALUE = "value"; - - /** - * setTaskNodeLocalParams - * @param taskNode taskNode - * @param propToValue propToValue - */ - public static void setTaskNodeLocalParams(TaskNode taskNode, Map propToValue) { - String taskParamsJson = taskNode.getParams(); - Map taskParams = JSONUtils.toMap(taskParamsJson, String.class, Object.class); - Object localParamsObject = taskParams.get(LOCALPARAMS); - if (null != localParamsObject && null != propToValue && propToValue.size() > 0) { - ArrayList localParams = (ArrayList)localParamsObject; - for (int i = 0; i < localParams.size(); i++) { - Map map = (Map)localParams.get(i); - String prop = map.get(PROP); - if (StringUtils.isNotEmpty(prop) && propToValue.containsKey(prop)) { - map.put(VALUE,(String)propToValue.get(prop)); - } - } - taskParams.put(LOCALPARAMS,localParams); - } - taskNode.setParams(JSONUtils.toJsonString(taskParams)); - } - - /** - * convertVarPoolToMap - * @param propToValue propToValue - * @param varPool varPool - * @throws ParseException ParseException - */ - public static void convertVarPoolToMap(Map propToValue, String varPool) throws ParseException { - if (propToValue == null || StringUtils.isEmpty(varPool)) { - return; - } - String[] splits = varPool.split("\\$VarPool\\$"); - for (String kv : splits) { - String[] kvs = kv.split(","); - if (kvs.length == 2) { - propToValue.put(kvs[0], kvs[1]); - } else { - return; - } - } - } - - /** - * convertPythonScriptPlaceholders - * @param rawScript rawScript - * @return String - * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException - */ - public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { - int len = "${setShareVar(${".length(); - int scriptStart = 0; - while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) { - int start = -1; - int end = rawScript.indexOf('}', scriptStart + len); - String prop = rawScript.substring(scriptStart + len, end); - - start = rawScript.indexOf(',', end); - end = rawScript.indexOf(')', start); - - String value = rawScript.substring(start + 1, end); - - start = rawScript.indexOf('}', start) + 1; - end = rawScript.length(); - - String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value); - - rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end); - - scriptStart += replaceScript.length(); - } - return rawScript; - } -} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java deleted file mode 100644 index 3bf13aa9f3..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.utils; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - - -public class PreconditionsTest { - public static final Logger logger = LoggerFactory.getLogger(PreconditionsTest.class); - - /** - * Test checkNotNull - */ - @Test - public void testCheckNotNull() throws Exception { - String testReference = "test object"; - Assert.assertEquals(testReference, Preconditions.checkNotNull(testReference)); - Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"object is null")); - - //test reference is null - try { - Preconditions.checkNotNull(null); - } catch (NullPointerException ex) { - assertNull(ex.getMessage()); - } - - try { - Preconditions.checkNotNull(""); - } catch (NullPointerException ex) { - assertNull(ex.getMessage()); - } - - try { - Preconditions.checkNotNull(null,"object is null"); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString("object is null")); - } - - try { - Preconditions.checkNotNull("","object is null"); - } catch (NullPointerException ex) { - assertThat(ex.getMessage(), containsString("object is null")); - } - - } - -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java deleted file mode 100644 index 5a04969dee..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.utils; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -import static org.junit.Assert.*; - -public class StreamUtilsTest { - - @Test - public void asStream() { - List list = Arrays.asList("a", "b", "c"); - List ret = StreamUtils.asStream(list.iterator()) - .filter(item -> item.equals("a")) - .collect(Collectors.toList()); - Assert.assertEquals("a", ret.get(0)); - } - -} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java deleted file mode 100644 index 3fbd2288ad..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import java.util.concurrent.ConcurrentHashMap; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VarPoolUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); - - @Test - public void testConvertVarPoolToMap() throws Exception { - String varPool = "p1,66$VarPool$p2,69$VarPool$"; - ConcurrentHashMap propToValue = new ConcurrentHashMap(); - VarPoolUtils.convertVarPoolToMap(propToValue, varPool); - Assert.assertEquals((String) propToValue.get("p1"), "66"); - Assert.assertEquals((String) propToValue.get("p2"), "69"); - logger.info(propToValue.toString()); - } - - @Test - public void testConvertPythonScriptPlaceholders() throws Exception { - String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; - rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); - Assert.assertEquals(rawScript, "print(${p1});\n" - + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" - + "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); - logger.info(rawScript); - } - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java index 2717175b4e..322870b2c1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; @@ -26,6 +25,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; public class HostUpdateResponseProcessor implements NettyRequestProcessor { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java index f544400a67..824bff2f90 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; @@ -35,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index ae8455d3a2..15f97c17a5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -37,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index afd0577d87..28f18fe961 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; @@ -27,6 +26,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 07d2fdf116..5c6ade7fcc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -36,6 +35,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java deleted file mode 100755 index 9eba4f977e..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.utils; - - -import org.apache.dolphinscheduler.common.enums.DbType; - -import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; -import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; -import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; -import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; -import com.alibaba.druid.sql.parser.SQLStatementParser; - - -public class DataxUtils { - - public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader"; - - public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader"; - - public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader"; - - public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader"; - - public static final String DATAX_READER_PLUGIN_CLICKHOUSE = "clickhousereader"; - - public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; - - public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; - - public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter"; - - public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter"; - - public static final String DATAX_WRITER_PLUGIN_CLICKHOUSE = "clickhousewriter"; - - public static String getReaderPluginName(DbType dbType) { - switch (dbType) { - case MYSQL: - return DATAX_READER_PLUGIN_MYSQL; - case POSTGRESQL: - return DATAX_READER_PLUGIN_POSTGRESQL; - case ORACLE: - return DATAX_READER_PLUGIN_ORACLE; - case SQLSERVER: - return DATAX_READER_PLUGIN_SQLSERVER; - case CLICKHOUSE: - return DATAX_READER_PLUGIN_CLICKHOUSE; - default: - return null; - } - } - - public static String getWriterPluginName(DbType dbType) { - switch (dbType) { - case MYSQL: - return DATAX_WRITER_PLUGIN_MYSQL; - case POSTGRESQL: - return DATAX_WRITER_PLUGIN_POSTGRESQL; - case ORACLE: - return DATAX_WRITER_PLUGIN_ORACLE; - case SQLSERVER: - return DATAX_WRITER_PLUGIN_SQLSERVER; - case CLICKHOUSE: - return DATAX_WRITER_PLUGIN_CLICKHOUSE; - default: - return null; - } - } - - public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) { - switch (dbType) { - case MYSQL: - return new MySqlStatementParser(sql); - case POSTGRESQL: - return new PGSQLStatementParser(sql); - case ORACLE: - return new OracleStatementParser(sql); - case SQLSERVER: - return new SQLServerStatementParser(sql); - default: - return null; - } - } - - public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { - if (columns == null) { - return null; - } - - String[] toColumns = new String[columns.length]; - for (int i = 0; i < columns.length; i++ ) { - toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); - } - - return toColumns; - } - - public static String doConvertKeywordsColumn(DbType dbType, String column) { - if (column == null) { - return column; - } - - column = column.trim(); - column = column.replace("`", ""); - column = column.replace("\"", ""); - column = column.replace("'", ""); - - switch (dbType) { - case MYSQL: - return String.format("`%s`", column); - case POSTGRESQL: - return String.format("\"%s\"", column); - case ORACLE: - return String.format("\"%s\"", column); - case SQLSERVER: - return String.format("`%s`", column); - default: - return column; - } - } - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java deleted file mode 100644 index dbd92e020f..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; -import org.apache.dolphinscheduler.common.utils.StringUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * flink args utils - */ -public class FlinkArgsUtils { - - private static final String LOCAL_DEPLOY_MODE = "local"; - - private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10"; - - private FlinkArgsUtils() { - throw new IllegalStateException("Utility class"); - } - - /** - * build args - * @param param flink parameters - * @return argument list - */ - public static List buildArgs(FlinkParameters param) { - List args = new ArrayList<>(); - - String deployMode = "cluster"; - String tmpDeployMode = param.getDeployMode(); - if (StringUtils.isNotEmpty(tmpDeployMode)) { - deployMode = tmpDeployMode; - } - String others = param.getOthers(); - if (!LOCAL_DEPLOY_MODE.equals(deployMode)) { - args.add(Constants.FLINK_RUN_MODE); //-m - - args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster - - int slot = param.getSlot(); - if (slot > 0) { - args.add(Constants.FLINK_YARN_SLOT); - args.add(String.format("%d", slot)); //-ys - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { //-ynm - args.add(Constants.FLINK_APP_NAME); - args.add(ArgsUtils.escape(appName)); - } - - // judge flink version, the parameter -yn has removed from flink 1.10 - String flinkVersion = param.getFlinkVersion(); - if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) { - int taskManager = param.getTaskManager(); - if (taskManager > 0) { //-yn - args.add(Constants.FLINK_TASK_MANAGE); - args.add(String.format("%d", taskManager)); - } - } - String jobManagerMemory = param.getJobManagerMemory(); - if (StringUtils.isNotEmpty(jobManagerMemory)) { - args.add(Constants.FLINK_JOB_MANAGE_MEM); - args.add(jobManagerMemory); //-yjm - } - - String taskManagerMemory = param.getTaskManagerMemory(); - if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm - args.add(Constants.FLINK_TASK_MANAGE_MEM); - args.add(taskManagerMemory); - } - - if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { // -yqu - args.add(Constants.FLINK_QUEUE); - args.add(queue); - } - } - } - - int parallelism = param.getParallelism(); - if (parallelism > 0) { - args.add(Constants.FLINK_PARALLELISM); - args.add(String.format("%d", parallelism)); // -p - } - - // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly - // The task status will be synchronized with the cluster job status - args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae - - // -s -yqu -yat -yD -D - if (StringUtils.isNotEmpty(others)) { - args.add(others); - } - - ProgramType programType = param.getProgramType(); - String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(Constants.FLINK_MAIN_CLASS); //-c - args.add(param.getMainClass()); //main class - } - - ResourceInfo mainJar = param.getMainJar(); - if (mainJar != null) { - args.add(mainJar.getRes()); - } - - String mainArgs = param.getMainArgs(); - if (StringUtils.isNotEmpty(mainArgs)) { - args.add(mainArgs); - } - - return args; - } - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java deleted file mode 100644 index 31e182b650..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; -import org.apache.dolphinscheduler.common.utils.StringUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * mapreduce args utils - */ -public class MapReduceArgsUtils { - - private MapReduceArgsUtils() { - throw new IllegalStateException("Utility class"); - } - - /** - * build args - * - * @param param param - * @return argument list - */ - public static List buildArgs(MapReduceParameters param) { - List args = new ArrayList<>(); - - ResourceInfo mainJar = param.getMainJar(); - if (mainJar != null) { - args.add(Constants.JAR); - args.add(mainJar.getRes()); - } - - ProgramType programType = param.getProgramType(); - String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(mainClass); - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { - args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName))); - } - - String others = param.getOthers(); - if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue)); - } - } - - // -conf -archives -files -libjars -D - if (StringUtils.isNotEmpty(others)) { - args.add(others); - } - - String mainArgs = param.getMainArgs(); - if (StringUtils.isNotEmpty(mainArgs)) { - args.add(mainArgs); - } - - return args; - } - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java deleted file mode 100644 index 3dd8df0058..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.DataType; -import org.apache.dolphinscheduler.common.enums.Direct; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; - -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * param utils - */ -public class ParamUtils { - - /** - * parameter conversion - * Warning: - * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified. - * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current - * situation is wrong. So I cannot modify the original logic. - * - * @param taskExecutionContext the context of this task instance - * @param parameters the parameters - * @return global params - * - */ - public static Map convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { - Preconditions.checkNotNull(taskExecutionContext); - Preconditions.checkNotNull(parameters); - Map globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams()); - Map globalParamsMap = taskExecutionContext.getDefinedParams(); - CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); - Date scheduleTime = taskExecutionContext.getScheduleTime(); - - // combining local and global parameters - Map localParams = parameters.getLocalParametersMap(); - - Map varParams = parameters.getVarPoolMap(); - - if (globalParams == null && localParams == null) { - return null; - } - // if it is a complement, - // you need to pass in the task instance id to locate the time - // of the process instance complement - Map params = BusinessTimeUtils - .getBusinessTime(commandType, - scheduleTime); - - if (globalParamsMap != null) { - - params.putAll(globalParamsMap); - } - - if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) { - params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath()); - } - params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId())); - - if (globalParams != null && localParams != null) { - globalParams.putAll(localParams); - } else if (globalParams == null && localParams != null) { - globalParams = localParams; - } - if (varParams != null) { - varParams.putAll(globalParams); - globalParams = varParams; - } - Iterator> iter = globalParams.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - Property property = en.getValue(); - - if (StringUtils.isNotEmpty(property.getValue()) - && property.getValue().startsWith("$")) { - /** - * local parameter refers to global parameter with the same name - * note: the global parameters of the process instance here are solidified parameters, - * and there are no variables in them. - */ - String val = property.getValue(); - - val = ParameterUtils.convertParameterPlaceholders(val, params); - property.setValue(val); - } - } - - return globalParams; - } - - /** - * format convert - * - * @param paramsMap params map - * @return Map of converted - */ - public static Map convert(Map paramsMap) { - if (paramsMap == null) { - return null; - } - - Map map = new HashMap<>(); - Iterator> iter = paramsMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - map.put(en.getKey(), en.getValue().getValue()); - } - return map; - } - - /** - * get parameters map - * - * @param definedParams definedParams - * @return parameters map - */ - public static Map getUserDefParamsMap(Map definedParams) { - if (definedParams != null) { - Map userDefParamsMaps = new HashMap<>(); - Iterator> iter = definedParams.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR, en.getValue()); - userDefParamsMaps.put(property.getProp(),property); - } - return userDefParamsMaps; - } - return null; - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java deleted file mode 100644 index 4d0fb2a5b6..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.spark.SparkParameters; -import org.apache.dolphinscheduler.common.utils.StringUtils; - -import java.util.ArrayList; -import java.util.List; - -/** - * spark args utils - */ -public class SparkArgsUtils { - - private static final String SPARK_CLUSTER = "cluster"; - - private static final String SPARK_LOCAL = "local"; - - private static final String SPARK_ON_YARN = "yarn"; - - private SparkArgsUtils() { - throw new IllegalStateException("Utility class"); - } - - /** - * build args - * - * @param param param - * @return argument list - */ - public static List buildArgs(SparkParameters param) { - List args = new ArrayList<>(); - args.add(Constants.MASTER); - - String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER; - if (!SPARK_LOCAL.equals(deployMode)) { - args.add(SPARK_ON_YARN); - args.add(Constants.DEPLOY_MODE); - } - args.add(deployMode); - - ProgramType programType = param.getProgramType(); - String mainClass = param.getMainClass(); - if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) { - args.add(Constants.MAIN_CLASS); - args.add(mainClass); - } - - int driverCores = param.getDriverCores(); - if (driverCores > 0) { - args.add(Constants.DRIVER_CORES); - args.add(String.format("%d", driverCores)); - } - - String driverMemory = param.getDriverMemory(); - if (StringUtils.isNotEmpty(driverMemory)) { - args.add(Constants.DRIVER_MEMORY); - args.add(driverMemory); - } - - int numExecutors = param.getNumExecutors(); - if (numExecutors > 0) { - args.add(Constants.NUM_EXECUTORS); - args.add(String.format("%d", numExecutors)); - } - - int executorCores = param.getExecutorCores(); - if (executorCores > 0) { - args.add(Constants.EXECUTOR_CORES); - args.add(String.format("%d", executorCores)); - } - - String executorMemory = param.getExecutorMemory(); - if (StringUtils.isNotEmpty(executorMemory)) { - args.add(Constants.EXECUTOR_MEMORY); - args.add(executorMemory); - } - - String appName = param.getAppName(); - if (StringUtils.isNotEmpty(appName)) { - args.add(Constants.SPARK_NAME); - args.add(ArgsUtils.escape(appName)); - } - - String others = param.getOthers(); - if (!SPARK_LOCAL.equals(deployMode)) { - if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) { - String queue = param.getQueue(); - if (StringUtils.isNotEmpty(queue)) { - args.add(Constants.SPARK_QUEUE); - args.add(queue); - } - } - } - - // --conf --files --jars --packages - if (StringUtils.isNotEmpty(others)) { - args.add(others); - } - - ResourceInfo mainJar = param.getMainJar(); - if (mainJar != null) { - args.add(mainJar.getRes()); - } - - String mainArgs = param.getMainArgs(); - if (StringUtils.isNotEmpty(mainArgs)) { - args.add(mainArgs); - } - - return args; - } - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java deleted file mode 100644 index 71234f5539..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.server.utils; - -import org.apache.commons.collections.MapUtils; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.UdfFunc; -import org.slf4j.Logger; - -import java.text.MessageFormat; -import java.util.*; -import java.util.stream.Collectors; - -import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty; - -/** - * udf utils - */ -public class UDFUtils { - - /** - * create function format - */ - private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''"; - - /** - * create function list - * @param udfFuncTenantCodeMap key is udf function,value is tenant code - * @param logger logger - * @return create function list - */ - public static List createFuncs(Map udfFuncTenantCodeMap, Logger logger){ - - if (MapUtils.isEmpty(udfFuncTenantCodeMap)){ - logger.info("can't find udf function resource"); - return null; - } - List funcList = new ArrayList<>(); - - // build jar sql - buildJarSql(funcList, udfFuncTenantCodeMap); - - // build temp function sql - buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList())); - - return funcList; - } - - /** - * build jar sql - * @param sqls sql list - * @param udfFuncTenantCodeMap key is udf function,value is tenant code - */ - private static void buildJarSql(List sqls, Map udfFuncTenantCodeMap) { - String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS); - String resourceFullName; - Set> entries = udfFuncTenantCodeMap.entrySet(); - for (Map.Entry entry:entries){ - String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS; - String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue()); - resourceFullName = entry.getKey().getResourceName(); - resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName); - sqls.add(String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName)); - } - - } - - /** - * build temp function sql - * @param sqls sql list - * @param udfFuncs udf function list - */ - private static void buildTempFuncSql(List sqls, List udfFuncs) { - if (isNotEmpty(udfFuncs)) { - for (UdfFunc udfFunc : udfFuncs) { - sqls.add(MessageFormat - .format(CREATE_FUNCTION_FORMAT, udfFunc.getFuncName(), udfFunc.getClassName())); - } - } - } - - -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java index 551661722f..a340ad704e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java @@ -20,13 +20,14 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.*; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * db task ack processor */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java index 40b5b2e90c..97a9cf527a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; @@ -29,6 +28,8 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * db task response processor */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java index 439b59b86d..5be3276134 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; @@ -29,6 +28,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index aa751063be..3466326e74 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -52,6 +51,8 @@ import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 20fbc9ba3a..7ca312e155 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -47,6 +46,8 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import io.netty.channel.Channel; /** diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java deleted file mode 100644 index 3c20ba793b..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.dolphinscheduler.common.enums.DbType; - -import org.junit.Test; - -import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; -import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; -import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; -import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; - -/** - * DataxUtils Tester. - */ -public class DataxUtilsTest { - - /** - * - * Method: getReaderPluginName(DbType dbType) - * - */ - @Test - public void testGetReaderPluginName() { - assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL)); - assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL)); - assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER)); - assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE)); - assertEquals(DataxUtils.DATAX_READER_PLUGIN_CLICKHOUSE, DataxUtils.getReaderPluginName(DbType.CLICKHOUSE)); - assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null); - } - - /** - * - * Method: getWriterPluginName(DbType dbType) - * - */ - @Test - public void testGetWriterPluginName() { - assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL)); - assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL)); - assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER)); - assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE)); - assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_CLICKHOUSE, DataxUtils.getWriterPluginName(DbType.CLICKHOUSE)); - assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null); - } - - /** - * - * Method: getSqlStatementParser(DbType dbType, String sql) - * - */ - @Test - public void testGetSqlStatementParser() throws Exception { - assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser); - assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser); - assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser); - assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser); - assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null); - } - - /** - * - * Method: convertKeywordsColumns(DbType dbType, String[] columns) - * - */ - @Test - public void testConvertKeywordsColumns() throws Exception { - String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; - String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; - - String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); - - assertTrue(fromColumns.length == toColumns.length); - - for (int i = 0; i < toColumns.length; i++) { - assertEquals(targetColumns[i], toColumns[i]); - } - } - - /** - * - * Method: doConvertKeywordsColumn(DbType dbType, String column) - * - */ - @Test - public void testDoConvertKeywordsColumn() throws Exception { - assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); - assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); - assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); - assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); - assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); - } -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java deleted file mode 100644 index 88437a1102..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; - -import java.util.List; - -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test FlinkArgsUtils - */ -public class FlinkArgsUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class); - - public String mode = "cluster"; - public int slot = 2; - public int parallelism = 3; - public String appName = "testFlink"; - public int taskManager = 4; - public String taskManagerMemory = "2G"; - public String jobManagerMemory = "4G"; - public ProgramType programType = ProgramType.JAVA; - public String mainClass = "com.test"; - public ResourceInfo mainJar = null; - public String mainArgs = "testArgs --input file:///home"; - public String queue = "queue1"; - public String others = "-s hdfs:///flink/savepoint-1537"; - public String flinkVersion = "<1.10"; - - @Before - public void setUp() { - ResourceInfo main = new ResourceInfo(); - main.setRes("testflink-1.0.0-SNAPSHOT.jar"); - mainJar = main; - } - - /** - * Test buildArgs - */ - @Test - public void testBuildArgs() { - //Define params - FlinkParameters param = new FlinkParameters(); - param.setDeployMode(mode); - param.setMainClass(mainClass); - param.setAppName(appName); - param.setSlot(slot); - param.setParallelism(parallelism); - param.setTaskManager(taskManager); - param.setJobManagerMemory(jobManagerMemory); - param.setTaskManagerMemory(taskManagerMemory); - param.setMainJar(mainJar); - param.setProgramType(programType); - param.setMainArgs(mainArgs); - param.setQueue(queue); - param.setOthers(others); - param.setFlinkVersion(flinkVersion); - - //Invoke buildArgs - List result = FlinkArgsUtils.buildArgs(param); - for (String s : result) { - logger.info(s); - } - - //Expected values and order - assertEquals(22, result.size()); - - assertEquals("-m", result.get(0)); - assertEquals("yarn-cluster", result.get(1)); - - assertEquals("-ys", result.get(2)); - assertSame(slot, Integer.valueOf(result.get(3))); - - assertEquals("-ynm", result.get(4)); - assertEquals(appName, result.get(5)); - - assertEquals("-yn", result.get(6)); - assertSame(taskManager, Integer.valueOf(result.get(7))); - - assertEquals("-yjm", result.get(8)); - assertEquals(jobManagerMemory, result.get(9)); - - assertEquals("-ytm", result.get(10)); - assertEquals(taskManagerMemory, result.get(11)); - - assertEquals("-yqu", result.get(12)); - assertEquals(queue, result.get(13)); - - assertEquals("-p", result.get(14)); - assertSame(parallelism, Integer.valueOf(result.get(15))); - - assertEquals("-sae", result.get(16)); - - assertEquals(others, result.get(17)); - - assertEquals("-c", result.get(18)); - assertEquals(mainClass, result.get(19)); - - assertEquals(mainJar.getRes(), result.get(20)); - assertEquals(mainArgs, result.get(21)); - - //Others param without -yqu - FlinkParameters param1 = new FlinkParameters(); - param1.setQueue(queue); - param1.setDeployMode(mode); - result = FlinkArgsUtils.buildArgs(param1); - assertEquals(5, result.size()); - } - -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java deleted file mode 100644 index eb68672e07..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import static org.junit.Assert.assertEquals; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters; - -import java.util.List; - -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test MapReduceArgsUtils - */ -public class MapReduceArgsUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class); - - public String mainClass = "com.examples.WordCount"; - public ResourceInfo mainJar = null; - public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt"; - public ProgramType programType = ProgramType.JAVA; - public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false"; - public String appName = "mapreduce test"; - public String queue = "queue1"; - - @Before - public void setUp() { - ResourceInfo main = new ResourceInfo(); - main.setRes("testspark-1.0.0-SNAPSHOT.jar"); - mainJar = main; - } - - /** - * Test buildArgs - */ - @Test - public void testBuildArgs() { - //Define params - MapReduceParameters param = new MapReduceParameters(); - param.setMainClass(mainClass); - param.setMainJar(mainJar); - param.setMainArgs(mainArgs); - param.setProgramType(programType); - param.setOthers(others); - param.setAppName(appName); - param.setQueue(queue); - - //Invoke buildArgs - List result = MapReduceArgsUtils.buildArgs(param); - for (String s : result) { - logger.info(s); - } - - //Expected values and order - assertEquals(7, result.size()); - - assertEquals("jar", result.get(0)); - assertEquals(mainJar.getRes(), result.get(1)); - assertEquals(mainClass, result.get(2)); - assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3)); - assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4)); - assertEquals(others, result.get(5)); - assertEquals(mainArgs, result.get(6)); - - //Others param without --queue - param.setOthers("-files xxx/hive-site.xml"); - param.setQueue(null); - result = MapReduceArgsUtils.buildArgs(param); - assertEquals(6, result.size()); - } - -} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java deleted file mode 100644 index 4d7bc93b41..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.DataType; -import org.apache.dolphinscheduler.common.enums.Direct; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.shell.ShellParameters; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; - -import java.util.Calendar; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.SerializationFeature; - -/** - * Test ParamUtils - */ -public class ParamUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(ParamUtilsTest.class); - - //Define global variables - public Map globalParams = new HashMap<>(); - - public Map globalParamsMap = new HashMap<>(); - - public Map localParams = new HashMap<>(); - - public Map varPoolParams = new HashMap<>(); - - /** - * Init params - * - * @throws Exception - */ - @Before - public void setUp() throws Exception { - - Property property = new Property(); - property.setProp("global_param"); - property.setDirect(Direct.IN); - property.setType(DataType.VARCHAR); - property.setValue("${system.biz.date}"); - globalParams.put("global_param", property); - - globalParamsMap.put("global_param", "${system.biz.date}"); - - Property localProperty = new Property(); - localProperty.setProp("local_param"); - localProperty.setDirect(Direct.IN); - localProperty.setType(DataType.VARCHAR); - localProperty.setValue("${global_param}"); - localParams.put("local_param", localProperty); - - Property varProperty = new Property(); - varProperty.setProp("varPool"); - varProperty.setDirect(Direct.IN); - varProperty.setType(DataType.VARCHAR); - varProperty.setValue("${global_param}"); - varPoolParams.put("varPool", varProperty); - } - - /** - * This is basic test case for ParamUtils.convert. - * Warning: - * As you can see,this case invokes the function of convert in different situations. When you first invoke the function of convert, - * the variables of localParams and varPool in the ShellParameters will be modified. But in the whole system the variables of localParams - * and varPool have been used in other functions. I'm not sure if this current situation is wrong. So I cannot modify the original logic. - */ - @Test - public void testConvert() { - //The expected value - String expected = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," - + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," - + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; - - //The expected value when globalParams is null but localParams is not null - String expected1 = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," - + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; - //Define expected date , the month is 0-base - Calendar calendar = Calendar.getInstance(); - calendar.set(2019, 11, 30); - Date date = calendar.getTime(); - - List globalParamList = globalParams.values().stream().collect(Collectors.toList()); - List localParamList = localParams.values().stream().collect(Collectors.toList()); - List varPoolParamList = varPoolParams.values().stream().collect(Collectors.toList()); - - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setTaskInstanceId(1); - taskExecutionContext.setTaskName("params test"); - taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); - taskExecutionContext.setHost("127.0.0.1:1234"); - taskExecutionContext.setExecutePath("/tmp/test"); - taskExecutionContext.setLogPath("/log"); - taskExecutionContext.setProcessInstanceId(1); - taskExecutionContext.setExecutorId(1); - taskExecutionContext.setCmdTypeIfComplement(0); - taskExecutionContext.setScheduleTime(date); - taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList)); - taskExecutionContext.setDefinedParams(globalParamsMap); - taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]"); - taskExecutionContext.setTaskParams( - "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\"," - + "\"localParams\":" - + "[],\"resourceList\":[]}"); - - ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class); - shellParameters.setLocalParams(localParamList); - - String varPoolParamsJson = JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - shellParameters.setVarPool(taskExecutionContext.getVarPool()); - shellParameters.dealOutParam(varPoolParamsJson); - - //Invoke convert - Map paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters); - String result = JSONUtils.toJsonString(paramsMap); - assertEquals(expected, result); - - //Invoke convert with null globalParams - taskExecutionContext.setDefinedParams(null); - Map paramsMap1 = ParamUtils.convert(taskExecutionContext, shellParameters); - - String result1 = JSONUtils.toJsonString(paramsMap1); - assertEquals(expected1, result1); - - // Null check, invoke convert with null globalParams and null localParams - shellParameters.setLocalParams(null); - Map paramsMap2 = ParamUtils.convert(taskExecutionContext, shellParameters); - assertNull(paramsMap2); - } - - /** - * Test some new params related to task - */ - @Test - public void testConvertForParamsRelatedTask() throws Exception { - // start to form some test data for new paramters - Map globalParams = new HashMap<>(); - Map globalParamsMap = new HashMap<>(); - - Property taskInstanceIdProperty = new Property(); - String propName = "task_execution_id"; - String paramValue = String.format("${%s}", Constants.PARAMETER_TASK_INSTANCE_ID); - taskInstanceIdProperty.setProp(propName); - taskInstanceIdProperty.setDirect(Direct.IN); - taskInstanceIdProperty.setType(DataType.VARCHAR); - taskInstanceIdProperty.setValue(paramValue); - globalParams.put(propName,taskInstanceIdProperty); - globalParamsMap.put(propName,paramValue); - - Property taskExecutionPathProperty = new Property(); - propName = "task_execution_path"; - paramValue = String.format("${%s}", Constants.PARAMETER_TASK_EXECUTE_PATH); - taskExecutionPathProperty.setProp(propName); - taskExecutionPathProperty.setDirect(Direct.IN); - taskExecutionPathProperty.setType(DataType.VARCHAR); - taskExecutionPathProperty.setValue(paramValue); - - globalParams.put(propName,taskExecutionPathProperty); - globalParamsMap.put(propName,paramValue); - - Calendar calendar = Calendar.getInstance(); - calendar.set(2019,11,30); - Date date = calendar.getTime(); - - List globalParamList = globalParams.values().stream().collect(Collectors.toList()); - - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setTaskInstanceId(1); - taskExecutionContext.setTaskName("params test"); - taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); - taskExecutionContext.setHost("127.0.0.1:1234"); - taskExecutionContext.setExecutePath("/tmp/test"); - taskExecutionContext.setLogPath("/log"); - taskExecutionContext.setProcessInstanceId(1); - taskExecutionContext.setExecutorId(1); - taskExecutionContext.setCmdTypeIfComplement(0); - taskExecutionContext.setScheduleTime(date); - taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList)); - taskExecutionContext.setDefinedParams(globalParamsMap); - taskExecutionContext.setTaskParams( - "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\"," - + "\"localParams\":" - + "[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"}," - + "{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR" - + "\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}"); - - ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class); - - //The expected value - String expected = "{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"}," - + "\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}"; - - //The expected value when globalParams is null but localParams is not null - Map paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters); - - String result = JSONUtils.toJsonString(paramsMap); - - Map resultMap = JSONUtils.parseObject(result,Map.class); - Map expectedMap = JSONUtils.parseObject(expected,Map.class); - - result = JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - expected = JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); - - assertEquals(expected, result); - - } - - /** - * Test the overload method of convert - */ - @Test - public void testConvert1() { - - //The expected value - String expected = "{\"global_param\":\"${system.biz.date}\"}"; - - //Invoke convert - Map paramsMap = ParamUtils.convert(globalParams); - String result = JSONUtils.toJsonString(paramsMap); - assertEquals(expected, result); - - logger.info(result); - - //Null check - Map paramsMap1 = ParamUtils.convert(null); - assertNull(paramsMap1); - } -} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java deleted file mode 100644 index 7e05cec30b..0000000000 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.utils; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - -import org.apache.dolphinscheduler.common.enums.ProgramType; -import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.task.spark.SparkParameters; - -import java.util.List; - -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test SparkArgsUtils - */ -public class SparkArgsUtilsTest { - - private static final Logger logger = LoggerFactory.getLogger(SparkArgsUtilsTest.class); - - public String mode = "cluster"; - public String mainClass = "com.test"; - public ResourceInfo mainJar = null; - public String mainArgs = "partitions=2"; - public String driverMemory = "2G"; - public String executorMemory = "4G"; - public ProgramType programType = ProgramType.JAVA; - public int driverCores = 2; - public int executorCores = 6; - public String sparkVersion = "SPARK1"; - public int numExecutors = 4; - public String appName = "spark test"; - public String queue = "queue1"; - - @Before - public void setUp() { - ResourceInfo main = new ResourceInfo(); - main.setRes("testspark-1.0.0-SNAPSHOT.jar"); - mainJar = main; - } - - /** - * Test buildArgs - */ - @Test - public void testBuildArgs() { - //Define params - SparkParameters param = new SparkParameters(); - param.setDeployMode(mode); - param.setMainClass(mainClass); - param.setDriverCores(driverCores); - param.setDriverMemory(driverMemory); - param.setExecutorCores(executorCores); - param.setExecutorMemory(executorMemory); - param.setMainJar(mainJar); - param.setNumExecutors(numExecutors); - param.setProgramType(programType); - param.setSparkVersion(sparkVersion); - param.setMainArgs(mainArgs); - param.setAppName(appName); - param.setQueue(queue); - - //Invoke buildArgs - List result = SparkArgsUtils.buildArgs(param); - for (String s : result) { - logger.info(s); - } - - //Expected values and order - assertEquals(22, result.size()); - - assertEquals("--master", result.get(0)); - assertEquals("yarn", result.get(1)); - - assertEquals("--deploy-mode", result.get(2)); - assertEquals(mode, result.get(3)); - - assertEquals("--class", result.get(4)); - assertEquals(mainClass, result.get(5)); - - assertEquals("--driver-cores", result.get(6)); - assertSame(driverCores, Integer.valueOf(result.get(7))); - - assertEquals("--driver-memory", result.get(8)); - assertEquals(driverMemory, result.get(9)); - - assertEquals("--num-executors", result.get(10)); - assertSame(numExecutors, Integer.valueOf(result.get(11))); - - assertEquals("--executor-cores", result.get(12)); - assertSame(executorCores, Integer.valueOf(result.get(13))); - - assertEquals("--executor-memory", result.get(14)); - assertEquals(executorMemory, result.get(15)); - - assertEquals("--name", result.get(16)); - assertEquals(ArgsUtils.escape(appName), result.get(17)); - - assertEquals("--queue", result.get(18)); - assertEquals(queue, result.get(19)); - - assertEquals(mainJar.getRes(), result.get(20)); - assertEquals(mainArgs, result.get(21)); - - //Others param without --queue - SparkParameters param1 = new SparkParameters(); - param1.setOthers("--files xxx/hive-site.xml"); - param1.setQueue(queue); - result = SparkArgsUtils.buildArgs(param1); - assertEquals(7, result.size()); - } - -} \ No newline at end of file