Browse Source

Clean up unused util classes... (#6154)

2.0.7-release
kezhenxu94 3 years ago committed by GitHub
parent
commit
a632be73c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
  2. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java
  3. 38
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java
  4. 71
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java
  5. 39
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java
  6. 108
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  7. 67
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java
  8. 39
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java
  9. 51
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
  10. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java
  11. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  12. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  13. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
  14. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  15. 137
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
  16. 138
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java
  17. 85
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java
  18. 157
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
  19. 133
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java
  20. 100
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
  21. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
  22. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
  23. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java
  24. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  25. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  26. 113
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
  27. 136
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java
  28. 95
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java
  29. 261
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
  30. 133
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

3
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.alert.processor;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -31,6 +30,8 @@ import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
public class AlertRequestProcessor implements NettyRequestProcessor {

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/EnvironmentControllerTest.java

@ -25,7 +25,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.junit.After;
import org.junit.Assert;
@ -39,6 +38,7 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
/**
* environment controller test

38
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/IOUtils.java

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.io.Closeable;
import java.io.IOException;
public class IOUtils {
private IOUtils() {
throw new UnsupportedOperationException("Construct IOUtils");
}
public static void closeQuietly(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException ignore) {
// nothing need to do
}
}
}
}

71
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
/**
* utility methods for validating input
*/
public final class Preconditions {
private Preconditions() {
throw new UnsupportedOperationException("Construct Preconditions");
}
/**
* if obj is null will throw NPE
*
* @param obj obj
* @param <T> T
* @return T
*/
public static <T> T checkNotNull(T obj) {
if (obj == null) {
throw new NullPointerException();
}
return obj;
}
/**
* if obj is null will throw NullPointerException with error message
*
* @param obj obj
* @param errorMsg error message
* @param <T> T
* @return T
*/
public static <T> T checkNotNull(T obj, String errorMsg) {
if (obj == null) {
throw new NullPointerException(errorMsg);
}
return obj;
}
/**
* if condition is false will throw an IllegalArgumentException with the given message
*
* @param condition condition
* @param errorMsg error message
* @throws IllegalArgumentException Thrown, if the condition is violated.
*/
public static void checkArgument(boolean condition, Object errorMsg) {
if (!condition) {
throw new IllegalArgumentException(String.valueOf(errorMsg));
}
}
}

39
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StreamUtils.java

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamUtils {
private StreamUtils() {
throw new UnsupportedOperationException("Construct StreamUtils");
}
public static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
return asStream(sourceIterator, false);
}
public static <T> Stream<T> asStream(Iterator<T> sourceIterator, boolean parallel) {
Iterable<T> iterable = () -> sourceIterator;
return StreamSupport.stream(iterable.spliterator(), parallel);
}
}

108
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Map;
public class VarPoolUtils {
private static final String LOCALPARAMS = "localParams";
private static final String PROP = "prop";
private static final String VALUE = "value";
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param propToValue propToValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams();
Map<String,Object> taskParams = JSONUtils.toMap(taskParamsJson, String.class, Object.class);
Object localParamsObject = taskParams.get(LOCALPARAMS);
if (null != localParamsObject && null != propToValue && propToValue.size() > 0) {
ArrayList<Object> localParams = (ArrayList)localParamsObject;
for (int i = 0; i < localParams.size(); i++) {
Map<String,String> map = (Map)localParams.get(i);
String prop = map.get(PROP);
if (StringUtils.isNotEmpty(prop) && propToValue.containsKey(prop)) {
map.put(VALUE,(String)propToValue.get(prop));
}
}
taskParams.put(LOCALPARAMS,localParams);
}
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* convertVarPoolToMap
* @param propToValue propToValue
* @param varPool varPool
* @throws ParseException ParseException
*/
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (propToValue == null || StringUtils.isEmpty(varPool)) {
return;
}
String[] splits = varPool.split("\\$VarPool\\$");
for (String kv : splits) {
String[] kvs = kv.split(",");
if (kvs.length == 2) {
propToValue.put(kvs[0], kvs[1]);
} else {
return;
}
}
}
/**
* convertPythonScriptPlaceholders
* @param rawScript rawScript
* @return String
* @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
*/
public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
int len = "${setShareVar(${".length();
int scriptStart = 0;
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) {
int start = -1;
int end = rawScript.indexOf('}', scriptStart + len);
String prop = rawScript.substring(scriptStart + len, end);
start = rawScript.indexOf(',', end);
end = rawScript.indexOf(')', start);
String value = rawScript.substring(start + 1, end);
start = rawScript.indexOf('}', start) + 1;
end = rawScript.length();
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end);
scriptStart += replaceScript.length();
}
return rawScript;
}
}

67
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PreconditionsTest.java

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
public class PreconditionsTest {
public static final Logger logger = LoggerFactory.getLogger(PreconditionsTest.class);
/**
* Test checkNotNull
*/
@Test
public void testCheckNotNull() throws Exception {
String testReference = "test object";
Assert.assertEquals(testReference, Preconditions.checkNotNull(testReference));
Assert.assertEquals(testReference,Preconditions.checkNotNull(testReference,"object is null"));
//test reference is null
try {
Preconditions.checkNotNull(null);
} catch (NullPointerException ex) {
assertNull(ex.getMessage());
}
try {
Preconditions.checkNotNull("");
} catch (NullPointerException ex) {
assertNull(ex.getMessage());
}
try {
Preconditions.checkNotNull(null,"object is null");
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString("object is null"));
}
try {
Preconditions.checkNotNull("","object is null");
} catch (NullPointerException ex) {
assertThat(ex.getMessage(), containsString("object is null"));
}
}
}

39
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/StreamUtilsTest.java

@ -1,39 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.*;
public class StreamUtilsTest {
@Test
public void asStream() {
List<String> list = Arrays.asList("a", "b", "c");
List<String> ret = StreamUtils.asStream(list.iterator())
.filter(item -> item.equals("a"))
.collect(Collectors.toList());
Assert.assertEquals("a", ret.get(0));
}
}

51
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test
public void testConvertVarPoolToMap() throws Exception {
String varPool = "p1,66$VarPool$p2,69$VarPool$";
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
Assert.assertEquals((String) propToValue.get("p1"), "66");
Assert.assertEquals((String) propToValue.get("p2"), "69");
logger.info(propToValue.toString());
}
@Test
public void testConvertPythonScriptPlaceholders() throws Exception {
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
Assert.assertEquals(rawScript, "print(${p1});\n"
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript);
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/HostUpdateResponseProcessor.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
@ -26,6 +25,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
public class HostUpdateResponseProcessor implements NettyRequestProcessor {

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
@ -35,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -37,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
@ -27,6 +26,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -36,6 +35,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

137
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java

@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.DbType;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
public class DataxUtils {
public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader";
public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader";
public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader";
public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader";
public static final String DATAX_READER_PLUGIN_CLICKHOUSE = "clickhousereader";
public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter";
public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter";
public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter";
public static final String DATAX_WRITER_PLUGIN_CLICKHOUSE = "clickhousewriter";
public static String getReaderPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_READER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_READER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_READER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_READER_PLUGIN_SQLSERVER;
case CLICKHOUSE:
return DATAX_READER_PLUGIN_CLICKHOUSE;
default:
return null;
}
}
public static String getWriterPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_WRITER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_WRITER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_WRITER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_WRITER_PLUGIN_SQLSERVER;
case CLICKHOUSE:
return DATAX_WRITER_PLUGIN_CLICKHOUSE;
default:
return null;
}
}
public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) {
switch (dbType) {
case MYSQL:
return new MySqlStatementParser(sql);
case POSTGRESQL:
return new PGSQLStatementParser(sql);
case ORACLE:
return new OracleStatementParser(sql);
case SQLSERVER:
return new SQLServerStatementParser(sql);
default:
return null;
}
}
public static String[] convertKeywordsColumns(DbType dbType, String[] columns) {
if (columns == null) {
return null;
}
String[] toColumns = new String[columns.length];
for (int i = 0; i < columns.length; i++ ) {
toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]);
}
return toColumns;
}
public static String doConvertKeywordsColumn(DbType dbType, String column) {
if (column == null) {
return column;
}
column = column.trim();
column = column.replace("`", "");
column = column.replace("\"", "");
column = column.replace("'", "");
switch (dbType) {
case MYSQL:
return String.format("`%s`", column);
case POSTGRESQL:
return String.format("\"%s\"", column);
case ORACLE:
return String.format("\"%s\"", column);
case SQLSERVER:
return String.format("`%s`", column);
default:
return column;
}
}
}

138
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtils.java

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* flink args utils
*/
public class FlinkArgsUtils {
private static final String LOCAL_DEPLOY_MODE = "local";
private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
private FlinkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
* @param param flink parameters
* @return argument list
*/
public static List<String> buildArgs(FlinkParameters param) {
List<String> args = new ArrayList<>();
String deployMode = "cluster";
String tmpDeployMode = param.getDeployMode();
if (StringUtils.isNotEmpty(tmpDeployMode)) {
deployMode = tmpDeployMode;
}
String others = param.getOthers();
if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
args.add(Constants.FLINK_RUN_MODE); //-m
args.add(Constants.FLINK_YARN_CLUSTER); //yarn-cluster
int slot = param.getSlot();
if (slot > 0) {
args.add(Constants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); //-ys
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) { //-ynm
args.add(Constants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
String flinkVersion = param.getFlinkVersion();
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = param.getTaskManager();
if (taskManager > 0) { //-yn
args.add(Constants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = param.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(Constants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); //-yjm
}
String taskManagerMemory = param.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(Constants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
if (StringUtils.isEmpty(others) || !others.contains(Constants.FLINK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) { // -yqu
args.add(Constants.FLINK_QUEUE);
args.add(queue);
}
}
}
int parallelism = param.getParallelism();
if (parallelism > 0) {
args.add(Constants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
// The task status will be synchronized with the cluster job status
args.add(Constants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.FLINK_MAIN_CLASS); //-c
args.add(param.getMainClass()); //main class
}
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

85
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtils.java

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* mapreduce args utils
*/
public class MapReduceArgsUtils {
private MapReduceArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
* @param param param
* @return argument list
*/
public static List<String> buildArgs(MapReduceParameters param) {
List<String> args = new ArrayList<>();
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(Constants.JAR);
args.add(mainJar.getRes());
}
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(mainClass);
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_NAME, ArgsUtils.escape(appName)));
}
String others = param.getOthers();
if (StringUtils.isEmpty(others) || !others.contains(Constants.MR_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(String.format("%s%s=%s", Constants.D, Constants.MR_QUEUE, queue));
}
}
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

157
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java

@ -1,157 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* param utils
*/
public class ParamUtils {
/**
* parameter conversion
* Warning:
* When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
* But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current
* situation is wrong. So I cannot modify the original logic.
*
* @param taskExecutionContext the context of this task instance
* @param parameters the parameters
* @return global params
*
*/
public static Map<String,Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
Preconditions.checkNotNull(taskExecutionContext);
Preconditions.checkNotNull(parameters);
Map<String,Property> globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams());
Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
Date scheduleTime = taskExecutionContext.getScheduleTime();
// combining local and global parameters
Map<String,Property> localParams = parameters.getLocalParametersMap();
Map<String,Property> varParams = parameters.getVarPoolMap();
if (globalParams == null && localParams == null) {
return null;
}
// if it is a complement,
// you need to pass in the task instance id to locate the time
// of the process instance complement
Map<String,String> params = BusinessTimeUtils
.getBusinessTime(commandType,
scheduleTime);
if (globalParamsMap != null) {
params.putAll(globalParamsMap);
}
if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
}
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
if (globalParams != null && localParams != null) {
globalParams.putAll(localParams);
} else if (globalParams == null && localParams != null) {
globalParams = localParams;
}
if (varParams != null) {
varParams.putAll(globalParams);
globalParams = varParams;
}
Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
Property property = en.getValue();
if (StringUtils.isNotEmpty(property.getValue())
&& property.getValue().startsWith("$")) {
/**
* local parameter refers to global parameter with the same name
* note: the global parameters of the process instance here are solidified parameters,
* and there are no variables in them.
*/
String val = property.getValue();
val = ParameterUtils.convertParameterPlaceholders(val, params);
property.setValue(val);
}
}
return globalParams;
}
/**
* format convert
*
* @param paramsMap params map
* @return Map of converted
*/
public static Map<String,String> convert(Map<String,Property> paramsMap) {
if (paramsMap == null) {
return null;
}
Map<String, String> map = new HashMap<>();
Iterator<Map.Entry<String, Property>> iter = paramsMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Property> en = iter.next();
map.put(en.getKey(), en.getValue().getValue());
}
return map;
}
/**
* get parameters map
*
* @param definedParams definedParams
* @return parameters map
*/
public static Map<String, Property> getUserDefParamsMap(Map<String, String> definedParams) {
if (definedParams != null) {
Map<String, Property> userDefParamsMaps = new HashMap<>();
Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> en = iter.next();
Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR, en.getValue());
userDefParamsMaps.put(property.getProp(),property);
}
return userDefParamsMaps;
}
return null;
}
}

133
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtils.java

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* spark args utils
*/
public class SparkArgsUtils {
private static final String SPARK_CLUSTER = "cluster";
private static final String SPARK_LOCAL = "local";
private static final String SPARK_ON_YARN = "yarn";
private SparkArgsUtils() {
throw new IllegalStateException("Utility class");
}
/**
* build args
*
* @param param param
* @return argument list
*/
public static List<String> buildArgs(SparkParameters param) {
List<String> args = new ArrayList<>();
args.add(Constants.MASTER);
String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
if (!SPARK_LOCAL.equals(deployMode)) {
args.add(SPARK_ON_YARN);
args.add(Constants.DEPLOY_MODE);
}
args.add(deployMode);
ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(Constants.MAIN_CLASS);
args.add(mainClass);
}
int driverCores = param.getDriverCores();
if (driverCores > 0) {
args.add(Constants.DRIVER_CORES);
args.add(String.format("%d", driverCores));
}
String driverMemory = param.getDriverMemory();
if (StringUtils.isNotEmpty(driverMemory)) {
args.add(Constants.DRIVER_MEMORY);
args.add(driverMemory);
}
int numExecutors = param.getNumExecutors();
if (numExecutors > 0) {
args.add(Constants.NUM_EXECUTORS);
args.add(String.format("%d", numExecutors));
}
int executorCores = param.getExecutorCores();
if (executorCores > 0) {
args.add(Constants.EXECUTOR_CORES);
args.add(String.format("%d", executorCores));
}
String executorMemory = param.getExecutorMemory();
if (StringUtils.isNotEmpty(executorMemory)) {
args.add(Constants.EXECUTOR_MEMORY);
args.add(executorMemory);
}
String appName = param.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(Constants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
String others = param.getOthers();
if (!SPARK_LOCAL.equals(deployMode)) {
if (StringUtils.isEmpty(others) || !others.contains(Constants.SPARK_QUEUE)) {
String queue = param.getQueue();
if (StringUtils.isNotEmpty(queue)) {
args.add(Constants.SPARK_QUEUE);
args.add(queue);
}
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
ResourceInfo mainJar = param.getMainJar();
if (mainJar != null) {
args.add(mainJar.getRes());
}
String mainArgs = param.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}
}

100
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java

@ -1,100 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.slf4j.Logger;
import java.text.MessageFormat;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
/**
* udf utils
*/
public class UDFUtils {
/**
* create function format
*/
private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''";
/**
* create function list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
* @param logger logger
* @return create function list
*/
public static List<String> createFuncs(Map<UdfFunc,String> udfFuncTenantCodeMap, Logger logger){
if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
logger.info("can't find udf function resource");
return null;
}
List<String> funcList = new ArrayList<>();
// build jar sql
buildJarSql(funcList, udfFuncTenantCodeMap);
// build temp function sql
buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
return funcList;
}
/**
* build jar sql
* @param sqls sql list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
*/
private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> udfFuncTenantCodeMap) {
String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
String resourceFullName;
Set<Map.Entry<UdfFunc,String>> entries = udfFuncTenantCodeMap.entrySet();
for (Map.Entry<UdfFunc,String> entry:entries){
String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS;
String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
resourceFullName = entry.getKey().getResourceName();
resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s",resourceFullName);
sqls.add(String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName));
}
}
/**
* build temp function sql
* @param sqls sql list
* @param udfFuncs udf function list
*/
private static void buildTempFuncSql(List<String> sqls, List<UdfFunc> udfFuncs) {
if (isNotEmpty(udfFuncs)) {
for (UdfFunc udfFunc : udfFuncs) {
sqls.add(MessageFormat
.format(CREATE_FUNCTION_FORMAT, udfFunc.getFuncName(), udfFunc.getClassName()));
}
}
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java

@ -20,13 +20,14 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.*;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* db task ack processor
*/

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
@ -29,6 +28,8 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
/**
* db task response processor
*/

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/HostUpdateProcessor.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
@ -29,6 +28,8 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -52,6 +51,8 @@ import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -47,6 +46,8 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**

113
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java

@ -1,113 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.junit.Test;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
/**
* DataxUtils Tester.
*/
public class DataxUtilsTest {
/**
*
* Method: getReaderPluginName(DbType dbType)
*
*/
@Test
public void testGetReaderPluginName() {
assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_CLICKHOUSE, DataxUtils.getReaderPluginName(DbType.CLICKHOUSE));
assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null);
}
/**
*
* Method: getWriterPluginName(DbType dbType)
*
*/
@Test
public void testGetWriterPluginName() {
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_CLICKHOUSE, DataxUtils.getWriterPluginName(DbType.CLICKHOUSE));
assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null);
}
/**
*
* Method: getSqlStatementParser(DbType dbType, String sql)
*
*/
@Test
public void testGetSqlStatementParser() throws Exception {
assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null);
}
/**
*
* Method: convertKeywordsColumns(DbType dbType, String[] columns)
*
*/
@Test
public void testConvertKeywordsColumns() throws Exception {
String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "};
String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"};
String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns);
assertTrue(fromColumns.length == toColumns.length);
for (int i = 0; i < toColumns.length; i++) {
assertEquals(targetColumns[i], toColumns[i]);
}
}
/**
*
* Method: doConvertKeywordsColumn(DbType dbType, String column)
*
*/
@Test
public void testDoConvertKeywordsColumn() throws Exception {
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" "));
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" "));
assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" "));
}
}

136
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/FlinkArgsUtilsTest.java

@ -1,136 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test FlinkArgsUtils
*/
public class FlinkArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(FlinkArgsUtilsTest.class);
public String mode = "cluster";
public int slot = 2;
public int parallelism = 3;
public String appName = "testFlink";
public int taskManager = 4;
public String taskManagerMemory = "2G";
public String jobManagerMemory = "4G";
public ProgramType programType = ProgramType.JAVA;
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
public String mainArgs = "testArgs --input file:///home";
public String queue = "queue1";
public String others = "-s hdfs:///flink/savepoint-1537";
public String flinkVersion = "<1.10";
@Before
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testflink-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
FlinkParameters param = new FlinkParameters();
param.setDeployMode(mode);
param.setMainClass(mainClass);
param.setAppName(appName);
param.setSlot(slot);
param.setParallelism(parallelism);
param.setTaskManager(taskManager);
param.setJobManagerMemory(jobManagerMemory);
param.setTaskManagerMemory(taskManagerMemory);
param.setMainJar(mainJar);
param.setProgramType(programType);
param.setMainArgs(mainArgs);
param.setQueue(queue);
param.setOthers(others);
param.setFlinkVersion(flinkVersion);
//Invoke buildArgs
List<String> result = FlinkArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(22, result.size());
assertEquals("-m", result.get(0));
assertEquals("yarn-cluster", result.get(1));
assertEquals("-ys", result.get(2));
assertSame(slot, Integer.valueOf(result.get(3)));
assertEquals("-ynm", result.get(4));
assertEquals(appName, result.get(5));
assertEquals("-yn", result.get(6));
assertSame(taskManager, Integer.valueOf(result.get(7)));
assertEquals("-yjm", result.get(8));
assertEquals(jobManagerMemory, result.get(9));
assertEquals("-ytm", result.get(10));
assertEquals(taskManagerMemory, result.get(11));
assertEquals("-yqu", result.get(12));
assertEquals(queue, result.get(13));
assertEquals("-p", result.get(14));
assertSame(parallelism, Integer.valueOf(result.get(15)));
assertEquals("-sae", result.get(16));
assertEquals(others, result.get(17));
assertEquals("-c", result.get(18));
assertEquals(mainClass, result.get(19));
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without -yqu
FlinkParameters param1 = new FlinkParameters();
param1.setQueue(queue);
param1.setDeployMode(mode);
result = FlinkArgsUtils.buildArgs(param1);
assertEquals(5, result.size());
}
}

95
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/MapReduceArgsUtilsTest.java

@ -1,95 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.mr.MapReduceParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test MapReduceArgsUtils
*/
public class MapReduceArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(MapReduceArgsUtilsTest.class);
public String mainClass = "com.examples.WordCount";
public ResourceInfo mainJar = null;
public String mainArgs = "/user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt";
public ProgramType programType = ProgramType.JAVA;
public String others = "-files cachefile.txt -libjars mylib.jar -archives myarchive.zip -Dwordcount.case.sensitive=false";
public String appName = "mapreduce test";
public String queue = "queue1";
@Before
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
MapReduceParameters param = new MapReduceParameters();
param.setMainClass(mainClass);
param.setMainJar(mainJar);
param.setMainArgs(mainArgs);
param.setProgramType(programType);
param.setOthers(others);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
List<String> result = MapReduceArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(7, result.size());
assertEquals("jar", result.get(0));
assertEquals(mainJar.getRes(), result.get(1));
assertEquals(mainClass, result.get(2));
assertEquals(String.format("-D%s=%s", Constants.MR_NAME, ArgsUtils.escape(appName)), result.get(3));
assertEquals(String.format("-D%s=%s", Constants.MR_QUEUE, queue), result.get(4));
assertEquals(others, result.get(5));
assertEquals(mainArgs, result.get(6));
//Others param without --queue
param.setOthers("-files xxx/hive-site.xml");
param.setQueue(null);
result = MapReduceArgsUtils.buildArgs(param);
assertEquals(6, result.size());
}
}

261
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java

@ -1,261 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Test ParamUtils
*/
public class ParamUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ParamUtilsTest.class);
//Define global variables
public Map<String, Property> globalParams = new HashMap<>();
public Map<String, String> globalParamsMap = new HashMap<>();
public Map<String, Property> localParams = new HashMap<>();
public Map<String, Property> varPoolParams = new HashMap<>();
/**
* Init params
*
* @throws Exception
*/
@Before
public void setUp() throws Exception {
Property property = new Property();
property.setProp("global_param");
property.setDirect(Direct.IN);
property.setType(DataType.VARCHAR);
property.setValue("${system.biz.date}");
globalParams.put("global_param", property);
globalParamsMap.put("global_param", "${system.biz.date}");
Property localProperty = new Property();
localProperty.setProp("local_param");
localProperty.setDirect(Direct.IN);
localProperty.setType(DataType.VARCHAR);
localProperty.setValue("${global_param}");
localParams.put("local_param", localProperty);
Property varProperty = new Property();
varProperty.setProp("varPool");
varProperty.setDirect(Direct.IN);
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
}
/**
* This is basic test case for ParamUtils.convert.
* Warning:
* As you can see,this case invokes the function of convert in different situations. When you first invoke the function of convert,
* the variables of localParams and varPool in the ShellParameters will be modified. But in the whole system the variables of localParams
* and varPool have been used in other functions. I'm not sure if this current situation is wrong. So I cannot modify the original logic.
*/
@Test
public void testConvert() {
//The expected value
String expected = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//The expected value when globalParams is null but localParams is not null
String expected1 = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+ "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
//Define expected date , the month is 0-base
Calendar calendar = Calendar.getInstance();
calendar.set(2019, 11, 30);
Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
List<Property> localParamList = localParams.values().stream().collect(Collectors.toList());
List<Property> varPoolParamList = varPoolParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]");
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
shellParameters.setLocalParams(localParamList);
String varPoolParamsJson = JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
shellParameters.setVarPool(taskExecutionContext.getVarPool());
shellParameters.dealOutParam(varPoolParamsJson);
//Invoke convert
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result);
//Invoke convert with null globalParams
taskExecutionContext.setDefinedParams(null);
Map<String, Property> paramsMap1 = ParamUtils.convert(taskExecutionContext, shellParameters);
String result1 = JSONUtils.toJsonString(paramsMap1);
assertEquals(expected1, result1);
// Null check, invoke convert with null globalParams and null localParams
shellParameters.setLocalParams(null);
Map<String, Property> paramsMap2 = ParamUtils.convert(taskExecutionContext, shellParameters);
assertNull(paramsMap2);
}
/**
* Test some new params related to task
*/
@Test
public void testConvertForParamsRelatedTask() throws Exception {
// start to form some test data for new paramters
Map<String,Property> globalParams = new HashMap<>();
Map<String,String> globalParamsMap = new HashMap<>();
Property taskInstanceIdProperty = new Property();
String propName = "task_execution_id";
String paramValue = String.format("${%s}", Constants.PARAMETER_TASK_INSTANCE_ID);
taskInstanceIdProperty.setProp(propName);
taskInstanceIdProperty.setDirect(Direct.IN);
taskInstanceIdProperty.setType(DataType.VARCHAR);
taskInstanceIdProperty.setValue(paramValue);
globalParams.put(propName,taskInstanceIdProperty);
globalParamsMap.put(propName,paramValue);
Property taskExecutionPathProperty = new Property();
propName = "task_execution_path";
paramValue = String.format("${%s}", Constants.PARAMETER_TASK_EXECUTE_PATH);
taskExecutionPathProperty.setProp(propName);
taskExecutionPathProperty.setDirect(Direct.IN);
taskExecutionPathProperty.setType(DataType.VARCHAR);
taskExecutionPathProperty.setValue(paramValue);
globalParams.put(propName,taskExecutionPathProperty);
globalParamsMap.put(propName,paramValue);
Calendar calendar = Calendar.getInstance();
calendar.set(2019,11,30);
Date date = calendar.getTime();
List<Property> globalParamList = globalParams.values().stream().collect(Collectors.toList());
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(1);
taskExecutionContext.setTaskName("params test");
taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
taskExecutionContext.setHost("127.0.0.1:1234");
taskExecutionContext.setExecutePath("/tmp/test");
taskExecutionContext.setLogPath("/log");
taskExecutionContext.setProcessInstanceId(1);
taskExecutionContext.setExecutorId(1);
taskExecutionContext.setCmdTypeIfComplement(0);
taskExecutionContext.setScheduleTime(date);
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
taskExecutionContext.setDefinedParams(globalParamsMap);
taskExecutionContext.setTaskParams(
"{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\","
+ "\"localParams\":"
+ "[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
+ "{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ "\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
//The expected value
String expected = "{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
+ "\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
//The expected value when globalParams is null but localParams is not null
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters);
String result = JSONUtils.toJsonString(paramsMap);
Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
Map<String,String> expectedMap = JSONUtils.parseObject(expected,Map.class);
result = JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
expected = JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
assertEquals(expected, result);
}
/**
* Test the overload method of convert
*/
@Test
public void testConvert1() {
//The expected value
String expected = "{\"global_param\":\"${system.biz.date}\"}";
//Invoke convert
Map<String, String> paramsMap = ParamUtils.convert(globalParams);
String result = JSONUtils.toJsonString(paramsMap);
assertEquals(expected, result);
logger.info(result);
//Null check
Map<String, String> paramsMap1 = ParamUtils.convert(null);
assertNull(paramsMap1);
}
}

133
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/SparkArgsUtilsTest.java

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test SparkArgsUtils
*/
public class SparkArgsUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(SparkArgsUtilsTest.class);
public String mode = "cluster";
public String mainClass = "com.test";
public ResourceInfo mainJar = null;
public String mainArgs = "partitions=2";
public String driverMemory = "2G";
public String executorMemory = "4G";
public ProgramType programType = ProgramType.JAVA;
public int driverCores = 2;
public int executorCores = 6;
public String sparkVersion = "SPARK1";
public int numExecutors = 4;
public String appName = "spark test";
public String queue = "queue1";
@Before
public void setUp() {
ResourceInfo main = new ResourceInfo();
main.setRes("testspark-1.0.0-SNAPSHOT.jar");
mainJar = main;
}
/**
* Test buildArgs
*/
@Test
public void testBuildArgs() {
//Define params
SparkParameters param = new SparkParameters();
param.setDeployMode(mode);
param.setMainClass(mainClass);
param.setDriverCores(driverCores);
param.setDriverMemory(driverMemory);
param.setExecutorCores(executorCores);
param.setExecutorMemory(executorMemory);
param.setMainJar(mainJar);
param.setNumExecutors(numExecutors);
param.setProgramType(programType);
param.setSparkVersion(sparkVersion);
param.setMainArgs(mainArgs);
param.setAppName(appName);
param.setQueue(queue);
//Invoke buildArgs
List<String> result = SparkArgsUtils.buildArgs(param);
for (String s : result) {
logger.info(s);
}
//Expected values and order
assertEquals(22, result.size());
assertEquals("--master", result.get(0));
assertEquals("yarn", result.get(1));
assertEquals("--deploy-mode", result.get(2));
assertEquals(mode, result.get(3));
assertEquals("--class", result.get(4));
assertEquals(mainClass, result.get(5));
assertEquals("--driver-cores", result.get(6));
assertSame(driverCores, Integer.valueOf(result.get(7)));
assertEquals("--driver-memory", result.get(8));
assertEquals(driverMemory, result.get(9));
assertEquals("--num-executors", result.get(10));
assertSame(numExecutors, Integer.valueOf(result.get(11)));
assertEquals("--executor-cores", result.get(12));
assertSame(executorCores, Integer.valueOf(result.get(13)));
assertEquals("--executor-memory", result.get(14));
assertEquals(executorMemory, result.get(15));
assertEquals("--name", result.get(16));
assertEquals(ArgsUtils.escape(appName), result.get(17));
assertEquals("--queue", result.get(18));
assertEquals(queue, result.get(19));
assertEquals(mainJar.getRes(), result.get(20));
assertEquals(mainArgs, result.get(21));
//Others param without --queue
SparkParameters param1 = new SparkParameters();
param1.setOthers("--files xxx/hive-site.xml");
param1.setQueue(queue);
result = SparkArgsUtils.buildArgs(param1);
assertEquals(7, result.size());
}
}
Loading…
Cancel
Save