diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index 82c09d549c..3dd8df0058 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -29,8 +29,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.logging.log4j.util.Strings; - import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -81,7 +79,7 @@ public class ParamUtils { params.putAll(globalParamsMap); } - if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) { + if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) { params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath()); } params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 597529406d..c5fb466b32 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -37,9 +37,11 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; -import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.spi.task.AbstractTask; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskRequest; import org.apache.commons.collections.MapUtils; @@ -173,12 +175,12 @@ public class TaskExecuteThread implements Runnable, Delayed { //TODO Temporary operation, To be adjusted TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class); + task = taskChannel.createTask(taskRequest, taskLogger); // task init this.task.init(); //init varPool - //TODO Temporary operation, To be adjusted -// this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); + this.task.getParameters().setVarPool(taskExecutionContext.getVarPool()); // task handle this.task.handle(); @@ -189,11 +191,10 @@ public class TaskExecuteThread implements Runnable, Delayed { responseCommand.setEndTime(new Date()); responseCommand.setProcessId(this.task.getProcessId()); responseCommand.setAppIds(this.task.getAppIds()); - //TODO Temporary operation, To be adjusted -// responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); + responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool())); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus()); } catch (Throwable e) { - e.printStackTrace(); + logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); @@ -253,7 +254,6 @@ public class TaskExecuteThread implements Runnable, Delayed { return globalParamsMap; } - /** * kill task */ diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml index 84771399f2..dbdebb98ea 100644 --- a/dolphinscheduler-service/pom.xml +++ b/dolphinscheduler-service/pom.xml @@ -43,7 +43,6 @@ dolphinscheduler-spi - org.quartz-scheduler quartz @@ -63,11 +62,6 @@ - - org.apache.logging.log4j - log4j-core - - org.quartz-scheduler quartz-jobs diff --git a/dolphinscheduler-spi/pom.xml b/dolphinscheduler-spi/pom.xml index 46115f8657..79b7a583b9 100644 --- a/dolphinscheduler-spi/pom.xml +++ b/dolphinscheduler-spi/pom.xml @@ -46,6 +46,17 @@ jackson-core + + org.apache.commons + commons-collections4 + provided + + + commons-beanutils + commons-beanutils + provided + + org.slf4j slf4j-api diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java index 3e709adfd4..c5706d2b89 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/params/PluginParamsTransfer.java @@ -62,7 +62,7 @@ public class PluginParamsTransfer { * @return return plugin params value */ public static List> generatePluginParams(String paramsJsonStr, String pluginParamsTemplate) { - Map paramsMap = JSONUtils.toMap(paramsJsonStr); + Map paramsMap = JSONUtils.toMap(paramsJsonStr, String.class, Object.class); return generatePluginParams(paramsMap, pluginParamsTemplate); } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java index e10b1e3c34..adb913675d 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/AbstractParameters.java @@ -15,10 +15,20 @@ package org.apache.dolphinscheduler.spi.task;/* * limitations under the License. */ +import org.apache.dolphinscheduler.spi.utils.CollectionUtils; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; + /** * job params related class */ @@ -29,6 +39,11 @@ public abstract class AbstractParameters implements IParameters { */ private List localParams; + /** + * var pool + */ + public List varPool; + /** * get local parameters list * @return Property list @@ -57,4 +72,95 @@ public abstract class AbstractParameters implements IParameters { return null; } + /** + * get varPool map + * + * @return parameters map + */ + public Map getVarPoolMap() { + if (varPool != null) { + Map varPoolMap = new LinkedHashMap<>(); + for (Property property : varPool) { + varPoolMap.put(property.getProp(), property); + } + return varPoolMap; + } + return null; + } + + public List getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + if (StringUtils.isEmpty(varPool)) { + this.varPool = new ArrayList<>(); + } else { + this.varPool = JSONUtils.toList(varPool, Property.class); + } + } + + public void dealOutParam(String result) { + if (CollectionUtils.isEmpty(localParams)) { + return; + } + List outProperty = getOutProperty(localParams); + if (CollectionUtils.isEmpty(outProperty)) { + return; + } + if (StringUtils.isEmpty(result)) { + varPool.addAll(outProperty); + return; + } + Map taskResult = getMapByString(result); + if (taskResult == null || taskResult.size() == 0) { + return; + } + for (Property info : outProperty) { + info.setValue(taskResult.get(info.getProp())); + varPool.add(info); + } + } + + public List getOutProperty(List params) { + if (CollectionUtils.isEmpty(params)) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Property info : params) { + if (info.getDirect() == Direct.OUT) { + result.add(info); + } + } + return result; + } + + public List> getListMapByString(String json) { + List> allParams = new ArrayList<>(); + ArrayNode paramsByJson = JSONUtils.parseArray(json); + Iterator listIterator = paramsByJson.iterator(); + while (listIterator.hasNext()) { + Map param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class); + allParams.add(param); + } + return allParams; + } + + /** + * shell's result format is key=value$VarPool$key=value$VarPool$ + * @param result + * @return + */ + public static Map getMapByString(String result) { + String[] formatResult = result.split("\\$VarPool\\$"); + Map format = new HashMap<>(); + for (String info : formatResult) { + if (StringUtils.isNotEmpty(info) && info.contains("=")) { + String[] keyValue = info.split("="); + format.put(keyValue[0], keyValue[1]); + } + } + return format; + } + } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java new file mode 100644 index 0000000000..cf2498e0f2 --- /dev/null +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/CollectionUtils.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.spi.utils; + +import org.apache.commons.beanutils.BeanMap; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Provides utility methods and decorators for {@link Collection} instances. + *

+ * Various utility methods might put the input objects into a Set/Map/Bag. In case + * the input objects override {@link Object#equals(Object)}, it is mandatory that + * the general contract of the {@link Object#hashCode()} method is maintained. + *

+ * NOTE: From 4.0, method parameters will take {@link Iterable} objects when possible. + * + * @version $Id: CollectionUtils.java 1686855 2015-06-22 13:00:27Z tn $ + * @since 1.0 + */ +public class CollectionUtils { + + private CollectionUtils() { + throw new UnsupportedOperationException("Construct CollectionUtils"); + } + + /** + * The load factor used when none specified in constructor. + */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * Returns a new {@link Collection} containing a minus a subset of + * b. Only the elements of b that satisfy the predicate + * condition, p are subtracted from a. + * + *

The cardinality of each element e in the returned {@link Collection} + * that satisfies the predicate condition will be the cardinality of e in a + * minus the cardinality of e in b, or zero, whichever is greater.

+ *

The cardinality of each element e in the returned {@link Collection} that does not + * satisfy the predicate condition will be equal to the cardinality of e in a.

+ * + * @param a the collection to subtract from, must not be null + * @param b the collection to subtract, must not be null + * @param T + * @return a new collection with the results + * @see Collection#removeAll + */ + public static Collection subtract(Set a, Set b) { + return org.apache.commons.collections4.CollectionUtils.subtract(a, b); + } + + public static boolean isNotEmpty(Collection coll) { + return !isEmpty(coll); + } + + public static boolean isEmpty(Collection coll) { + return coll == null || coll.isEmpty(); + } + + /** + * String to map + * + * @param str string + * @param separator separator + * @return string to map + */ + public static Map stringToMap(String str, String separator) { + return stringToMap(str, separator, ""); + } + + /** + * String to map + * + * @param str string + * @param separator separator + * @param keyPrefix prefix + * @return string to map + */ + public static Map stringToMap(String str, String separator, String keyPrefix) { + + Map emptyMap = new HashMap<>(0); + if (StringUtils.isEmpty(str)) { + return emptyMap; + } + if (StringUtils.isEmpty(separator)) { + return emptyMap; + } + String[] strings = str.split(separator); + int initialCapacity = (int)(strings.length / DEFAULT_LOAD_FACTOR) + 1; + Map map = new HashMap<>(initialCapacity); + for (int i = 0; i < strings.length; i++) { + String[] strArray = strings[i].split("="); + if (strArray.length != 2) { + return emptyMap; + } + //strArray[0] KEY strArray[1] VALUE + if (StringUtils.isEmpty(keyPrefix)) { + map.put(strArray[0], strArray[1]); + } else { + map.put(keyPrefix + strArray[0], strArray[1]); + } + } + return map; + } + + /** + * Transform item in collection + * + * @param collection origin collection + * @param transformFunc transform function + * @param origin item type + * @param target type + * @return transform list + */ + public static List transformToList(Collection collection, Function transformFunc) { + if (isEmpty(collection)) { + return new ArrayList<>(); + } + return collection.stream().map(transformFunc).collect(Collectors.toList()); + } + + /** + * Collect collection to map + * + * @param collection origin collection + * @param keyTransformFunction key transform function + * @param target k type + * @param value + * @return map + */ + public static Map collectionToMap(Collection collection, Function keyTransformFunction) { + if (isEmpty(collection)) { + return new HashMap<>(); + } + return collection.stream().collect(Collectors.toMap(keyTransformFunction, Function.identity())); + } + + /** + * Helper class to easily access cardinality properties of two collections. + * + * @param the element type + */ + private static class CardinalityHelper { + + /** + * Contains the cardinality for each object in collection A. + */ + final Map cardinalityA; + + /** + * Contains the cardinality for each object in collection B. + */ + final Map cardinalityB; + + /** + * Create a new CardinalityHelper for two collections. + * + * @param a the first collection + * @param b the second collection + */ + public CardinalityHelper(final Iterable a, final Iterable b) { + cardinalityA = CollectionUtils.getCardinalityMap(a); + cardinalityB = CollectionUtils.getCardinalityMap(b); + } + + /** + * Returns the frequency of this object in collection A. + * + * @param obj the object + * @return the frequency of the object in collection A + */ + public int freqA(final Object obj) { + return getFreq(obj, cardinalityA); + } + + /** + * Returns the frequency of this object in collection B. + * + * @param obj the object + * @return the frequency of the object in collection B + */ + public int freqB(final Object obj) { + return getFreq(obj, cardinalityB); + } + + private int getFreq(final Object obj, final Map freqMap) { + final Integer count = freqMap.get(obj); + if (count != null) { + return count; + } + return 0; + } + } + + /** + * returns {@code true} iff the given {@link Collection}s contain + * exactly the same elements with exactly the same cardinalities. + * + * @param a the first collection + * @param b the second collection + * @return Returns true iff the given Collections contain exactly the same elements with exactly the same cardinalities. + * That is, iff the cardinality of e in a is equal to the cardinality of e in b, for each element e in a or b. + */ + public static boolean equalLists(Collection a, Collection b) { + if (a == null && b == null) { + return true; + } + + if (a == null || b == null) { + return false; + } + + return isEqualCollection(a, b); + } + + /** + * Returns {@code true} iff the given {@link Collection}s contain + * exactly the same elements with exactly the same cardinalities. + *

+ * That is, iff the cardinality of e in a is + * equal to the cardinality of e in b, + * for each element e in a or b. + * + * @param a the first collection, must not be null + * @param b the second collection, must not be null + * @return true iff the collections contain the same elements with the same cardinalities. + */ + public static boolean isEqualCollection(final Collection a, final Collection b) { + if (a.size() != b.size()) { + return false; + } + final CardinalityHelper helper = new CardinalityHelper<>(a, b); + if (helper.cardinalityA.size() != helper.cardinalityB.size()) { + return false; + } + for (final Object obj : helper.cardinalityA.keySet()) { + if (helper.freqA(obj) != helper.freqB(obj)) { + return false; + } + } + return true; + } + + /** + * Returns a {@link Map} mapping each unique element in the given + * {@link Collection} to an {@link Integer} representing the number + * of occurrences of that element in the {@link Collection}. + *

+ * Only those elements present in the collection will appear as + * keys in the map. + * + * @param the type of object in the returned {@link Map}. This is a super type of O + * @param coll the collection to get the cardinality map for, must not be null + * @return the populated cardinality map + */ + public static Map getCardinalityMap(final Iterable coll) { + final Map count = new HashMap<>(); + for (final O obj : coll) { + count.put(obj, count.getOrDefault(obj, 0) + 1); + } + return count; + } + + /** + * Removes certain attributes of each object in the list + * + * @param originList origin list + * @param exclusionSet exclusion set + * @param T + * @return removes certain attributes of each object in the list + */ + public static List> getListByExclusion(List originList, Set exclusionSet) { + List> instanceList = new ArrayList<>(); + if (exclusionSet == null) { + exclusionSet = new HashSet<>(); + } + if (originList == null) { + return instanceList; + } + Map instanceMap; + for (T instance : originList) { + BeanMap beanMap = new BeanMap(instance); + instanceMap = new LinkedHashMap<>(16, 0.75f, true); + for (Map.Entry entry : beanMap.entrySet()) { + if (exclusionSet.contains(entry.getKey())) { + continue; + } + instanceMap.put((String) entry.getKey(), entry.getValue()); + } + instanceList.add(instanceMap); + } + return instanceList; + } + +} diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java index e87e2102ee..ed53cb2625 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/JSONUtils.java @@ -17,10 +17,14 @@ package org.apache.dolphinscheduler.spi.utils; +import static java.nio.charset.StandardCharsets.UTF_8; + import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,12 +34,21 @@ import java.util.TimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.type.CollectionType; /** @@ -52,9 +65,23 @@ public class JSONUtils { .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) .setTimeZone(TimeZone.getDefault()); private JSONUtils() { + throw new UnsupportedOperationException("Construct JSONUtils"); + } + + public static ArrayNode createArrayNode() { + return objectMapper.createArrayNode(); + } + + public static ObjectNode createObjectNode() { + return objectMapper.createObjectNode(); + } + + public static JsonNode toJsonNode(Object obj) { + return objectMapper.valueToTree(obj); } /** @@ -102,6 +129,22 @@ public class JSONUtils { return null; } + /** + * deserialize + * + * @param src byte array + * @param clazz class + * @param deserialize type + * @return deserialize type + */ + public static T parseObject(byte[] src, Class clazz) { + if (src == null) { + return null; + } + String json = new String(src, UTF_8); + return parseObject(json, clazz); + } + /** * json to list * @@ -126,13 +169,84 @@ public class JSONUtils { return Collections.emptyList(); } + /** + * check json object valid + * + * @param json json + * @return true if valid + */ + public static boolean checkJsonValid(String json) { + + if (StringUtils.isEmpty(json)) { + return false; + } + + try { + objectMapper.readTree(json); + return true; + } catch (IOException e) { + logger.error("check json object valid exception!", e); + } + + return false; + } + + /** + * Method for finding a JSON Object field with specified name in this + * node or its child nodes, and returning value it has. + * If no matching field is found in this node or its descendants, returns null. + * + * @param jsonNode json node + * @param fieldName Name of field to look for + * @return Value of first matching node found, if any; null if none + */ + public static String findValue(JsonNode jsonNode, String fieldName) { + JsonNode node = jsonNode.findValue(fieldName); + + if (node == null) { + return null; + } + + return node.asText(); + } + /** * json to map + * {@link #toMap(String, Class, Class)} * * @param json json * @return json to map */ - public static Map toMap(String json) { + public static Map toMap(String json) { + return parseObject(json, new TypeReference>() {}); + } + + /** + * from the key-value generated json to get the str value no matter the real type of value + * @param json the json str + * @param nodeName key + * @return the str value of key + */ + public static String getNodeString(String json, String nodeName) { + try { + JsonNode rootNode = objectMapper.readTree(json); + return rootNode.has(nodeName) ? rootNode.get(nodeName).toString() : ""; + } catch (JsonProcessingException e) { + return ""; + } + } + + /** + * json to map + * + * @param json json + * @param classK classK + * @param classV classV + * @param K + * @param V + * @return to map + */ + public static Map toMap(String json, Class classK, Class classV) { return parseObject(json, new TypeReference>() {}); } @@ -172,9 +286,34 @@ public class JSONUtils { } } + /** + * serialize to json byte + * + * @param obj object + * @param object type + * @return byte array + */ + public static byte[] toJsonByteArray(T obj) { + if (obj == null) { + return null; + } + String json = ""; + try { + json = toJsonString(obj); + } catch (Exception e) { + logger.error("json serialize exception.", e); + } + + return json.getBytes(UTF_8); + } + public static ObjectNode parseObject(String text) { try { - return (ObjectNode) objectMapper.readTree(text); + if (text.isEmpty()) { + return parseObject(text, ObjectNode.class); + } else { + return (ObjectNode) objectMapper.readTree(text); + } } catch (Exception e) { throw new RuntimeException("String json deserialization exception.", e); } @@ -187,4 +326,33 @@ public class JSONUtils { throw new RuntimeException("Json deserialization exception.", e); } } + + /** + * json serializer + */ + public static class JsonDataSerializer extends JsonSerializer { + + @Override + public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeRawValue(value); + } + + } + + /** + * json data deserializer + */ + public static class JsonDataDeserializer extends JsonDeserializer { + + @Override + public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode node = p.getCodec().readTree(p); + if (node instanceof TextNode) { + return node.asText(); + } else { + return node.toString(); + } + } + + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml index 75f87cb8a7..6ec6b39295 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/pom.xml @@ -49,6 +49,7 @@ org.slf4j slf4j-api + provided \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml new file mode 100644 index 0000000000..99ac7cd23a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/pom.xml @@ -0,0 +1,46 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-datax + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 7a88aced58..8d9bb7f7e0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; public class FlinkTask extends AbstractYarnTask { - /** * flink command * usage: flink run [OPTIONS] diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java index 6ea5a66941..6d843c637a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskChannelFactory.java @@ -31,7 +31,7 @@ public class FlinkTaskChannelFactory implements TaskChannelFactory { @Override public String getName() { - return "Flink"; + return "FLINK"; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml new file mode 100644 index 0000000000..0aaea8d4bb --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/pom.xml @@ -0,0 +1,48 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-http + dolphinscheduler-plugin + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + + dolphinscheduler-task-http-${project.version} + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java new file mode 100644 index 0000000000..b2f423b22f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpCheckCondition.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.http; + +/** + * http check condition + */ +public enum HttpCheckCondition { + /** + * 0 status_code_default:200 + * 1 status_code_custom + * 2 body_contains + * 3 body_not_contains + */ + STATUS_CODE_DEFAULT,STATUS_CODE_CUSTOM, BODY_CONTAINS, BODY_NOT_CONTAINS + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java new file mode 100644 index 0000000000..d949f8a783 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpMethod.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.http; + +/** + * http method + */ +public enum HttpMethod { + /** + * 0 get + * 1 post + * 2 head + * 3 put + * 4 delete + */ + GET, POST, HEAD, PUT, DELETE +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java new file mode 100644 index 0000000000..efad7d3fca --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParameters.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.http; + +import org.apache.dolphinscheduler.spi.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.task.ResourceInfo; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * http parameter + */ +public class HttpParameters extends AbstractParameters { + /** + * url + */ + private String url; + + /** + * httpMethod + */ + private HttpMethod httpMethod; + + /** + * http params + */ + private List httpParams; + + /** + * httpCheckCondition + */ + private HttpCheckCondition httpCheckCondition = HttpCheckCondition.STATUS_CODE_DEFAULT; + + /** + * condition + */ + private String condition; + + + /** + * Connect Timeout + * Unit: ms + */ + private int connectTimeout ; + + /** + * Socket Timeout + * Unit: ms + */ + private int socketTimeout ; + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(url); + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public HttpMethod getHttpMethod() { + return httpMethod; + } + + public void setHttpMethod(HttpMethod httpMethod) { + this.httpMethod = httpMethod; + } + + public List getHttpParams() { + return httpParams; + } + + public void setHttpParams(List httpParams) { + this.httpParams = httpParams; + } + + public HttpCheckCondition getHttpCheckCondition() { + return httpCheckCondition; + } + + public void setHttpCheckCondition(HttpCheckCondition httpCheckCondition) { + this.httpCheckCondition = httpCheckCondition; + } + + public String getCondition() { + return condition; + } + + public void setCondition(String condition) { + this.condition = condition; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java new file mode 100644 index 0000000000..83b6a74091 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpParametersType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.http; + +/** + * http parameters type + */ +public enum HttpParametersType { + /** + * 0 parameter; + * 1 body; + * 2 headers; + */ + PARAMETER,BODY,HEADERS +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java new file mode 100644 index 0000000000..5aa44d1695 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpProperty.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.http; + +import java.util.Objects; + +public class HttpProperty { + /** + * key + */ + private String prop; + + /** + * httpParametersType + */ + private HttpParametersType httpParametersType; + + /** + * value + */ + private String value; + + public HttpProperty() { + } + + public HttpProperty(String prop, HttpParametersType httpParametersType, String value) { + this.prop = prop; + this.httpParametersType = httpParametersType; + this.value = value; + } + + /** + * getter method + * + * @return the prop + * @see HttpProperty#prop + */ + public String getProp() { + return prop; + } + + /** + * setter method + * + * @param prop the prop to set + * @see HttpProperty#prop + */ + public void setProp(String prop) { + this.prop = prop; + } + + /** + * getter method + * + * @return the value + * @see HttpProperty#value + */ + public String getValue() { + return value; + } + + /** + * setter method + * + * @param value the value to set + * @see HttpProperty#value + */ + public void setValue(String value) { + this.value = value; + } + + public HttpParametersType getHttpParametersType() { + return httpParametersType; + } + + public void setHttpParametersType(HttpParametersType httpParametersType) { + this.httpParametersType = httpParametersType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HttpProperty property = (HttpProperty) o; + return Objects.equals(prop, property.prop) && + Objects.equals(value, property.value); + } + + + @Override + public int hashCode() { + return Objects.hash(prop, value); + } + + @Override + public String toString() { + return "HttpProperty{" + + "prop='" + prop + '\'' + + ", httpParametersType=" + httpParametersType + + ", value='" + value + '\'' + + '}'; + } + + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java new file mode 100644 index 0000000000..f6273a642a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -0,0 +1,47 @@ +package org.apache.dolphinscheduler.plugin.task.http; + +import org.apache.dolphinscheduler.spi.task.AbstractParameters; +import org.apache.dolphinscheduler.spi.task.AbstractTask; +import org.apache.dolphinscheduler.spi.task.TaskRequest; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.slf4j.Logger; + +public class HttpTask extends AbstractTask { + + /** + * taskExecutionContext + */ + private TaskRequest taskExecutionContext; + + private HttpParameters httpParameters; + + /** + * constructor + * + * @param taskExecutionContext taskExecutionContext + * @param logger logger + */ + public HttpTask(TaskRequest taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + } + + @Override + public void init() { + logger.info("http task params {}", taskExecutionContext.getTaskParams()); + this.httpParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HttpParameters.class); + + if (!httpParameters.checkParameters()) { + throw new RuntimeException("http task params is not valid"); + } + } + + @Override + public void handle() throws Exception { + + } + + @Override + public AbstractParameters getParameters() { + return null; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java new file mode 100644 index 0000000000..632a50ffdc --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannel.java @@ -0,0 +1,4 @@ +package org.apache.dolphinscheduler.plugin.task.http; + +public class HttpTaskChannel { +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java new file mode 100644 index 0000000000..25ad810ff0 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskChannelFactory.java @@ -0,0 +1,24 @@ +package org.apache.dolphinscheduler.plugin.task.http; + +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import java.util.List; + +public class HttpTaskChannelFactory implements TaskChannelFactory { + @Override + public String getName() { + return null; + } + + @Override + public List getParams() { + return null; + } + + @Override + public TaskChannel create() { + return null; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java new file mode 100644 index 0000000000..0158dcf8e9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTaskPlugin.java @@ -0,0 +1,13 @@ +package org.apache.dolphinscheduler.plugin.task.http; + +import com.google.common.collect.ImmutableList; +import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +public class HttpTaskPlugin implements DolphinSchedulerPlugin { + + @Override + public Iterable getTaskChannelFactorys() { + return ImmutableList.of(new HttpTaskChannelFactory()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml new file mode 100644 index 0000000000..6582a31f67 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/pom.xml @@ -0,0 +1,44 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-mr + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml new file mode 100644 index 0000000000..9c13b8c54a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/pom.xml @@ -0,0 +1,45 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-procedure + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java index 04c59cd73a..c41938b38c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTaskChannelFactory.java @@ -29,7 +29,7 @@ public class PythonTaskChannelFactory implements TaskChannelFactory { @Override public String getName() { - return "Python"; + return "PYTHON"; } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java new file mode 100644 index 0000000000..f4230c5357 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.spark; + +import org.apache.dolphinscheduler.spi.task.AbstractTask; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskRequest; + +import org.slf4j.Logger; + +public class SparkTaskChannel implements TaskChannel { + + @Override + public void cancelApplication(boolean status) { + + } + + @Override + public AbstractTask createTask(TaskRequest taskRequest, Logger logger) { + return new SparkTask(taskRequest, logger); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java new file mode 100644 index 0000000000..3169fac56f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskChannelFanctory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.spark; + +import org.apache.dolphinscheduler.spi.params.base.PluginParams; +import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import java.util.List; + +public class SparkTaskChannelFanctory implements TaskChannelFactory { + @Override + public String getName() { + return "SPARK"; + } + + @Override + public List getParams() { + return null; + } + + @Override + public TaskChannel create() { + return new SparkTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java new file mode 100644 index 0000000000..aa18897ed7 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskPlugin.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.spark; + +import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin; +import org.apache.dolphinscheduler.spi.task.TaskChannelFactory; + +import com.google.common.collect.ImmutableList; + +public class SparkTaskPlugin implements DolphinSchedulerPlugin { + + @Override + public Iterable getTaskChannelFactorys() { + return ImmutableList.of(new SparkTaskChannelFanctory()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml new file mode 100644 index 0000000000..6f567441ee --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/pom.xml @@ -0,0 +1,46 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-sql + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml new file mode 100644 index 0000000000..fb52e95eed --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/pom.xml @@ -0,0 +1,44 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + 1.3.6-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-sqoop + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + provided + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + ${project.version} + + + + + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index d6459fee88..e8ba5aaf71 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -34,7 +34,14 @@ dolphinscheduler-task-flink dolphinscheduler-task-python dolphinscheduler-task-spark + dolphinscheduler-task-http + dolphinscheduler-task-sql + dolphinscheduler-task-sqoop + dolphinscheduler-task-datax + dolphinscheduler-task-mr + dolphinscheduler-task-procedure dolphinscheduler-task-tis +