dailidong
4 years ago
114 changed files with 3549 additions and 1706 deletions
@ -0,0 +1,47 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.utils; |
||||
|
||||
import java.util.regex.Matcher; |
||||
import java.util.regex.Pattern; |
||||
|
||||
/** |
||||
* This is Regex expression utils. |
||||
*/ |
||||
public class RegexUtils { |
||||
|
||||
/** |
||||
* check number regex expression |
||||
*/ |
||||
private static final String CHECK_NUMBER = "^-?\\d+(\\.\\d+)?$"; |
||||
|
||||
private RegexUtils() { |
||||
} |
||||
|
||||
/** |
||||
* check if the input is number |
||||
* |
||||
* @param str input |
||||
* @return |
||||
*/ |
||||
public static boolean isNumeric(String str) { |
||||
Pattern pattern = Pattern.compile(CHECK_NUMBER); |
||||
Matcher isNum = pattern.matcher(str); |
||||
return isNum.matches(); |
||||
} |
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.utils; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
/** |
||||
* RegexUtils test case |
||||
*/ |
||||
public class RegexUtilsTest { |
||||
|
||||
@Test |
||||
public void testIsNumeric() { |
||||
String num1 = "123467854678"; |
||||
boolean numeric = RegexUtils.isNumeric(num1); |
||||
Assert.assertTrue(numeric); |
||||
|
||||
String num2 = "0.0.01"; |
||||
boolean numeric2 = RegexUtils.isNumeric(num2); |
||||
Assert.assertFalse(numeric2); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,78 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.task; |
||||
|
||||
import java.util.Map; |
||||
|
||||
public class TaskParams { |
||||
|
||||
private String rawScript; |
||||
private Map<String, String>[] localParams; |
||||
|
||||
public void setRawScript(String rawScript) { |
||||
this.rawScript = rawScript; |
||||
} |
||||
|
||||
public void setLocalParams(Map<String, String>[] localParams) { |
||||
this.localParams = localParams; |
||||
} |
||||
|
||||
public String getRawScript() { |
||||
return rawScript; |
||||
} |
||||
|
||||
public void setLocalParamValue(String prop, Object value) { |
||||
if (localParams == null || value == null) { |
||||
return; |
||||
} |
||||
for (int i = 0; i < localParams.length; i++) { |
||||
if (localParams[i].get("prop").equals(prop)) { |
||||
localParams[i].put("value", (String)value); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public void setLocalParamValue(Map<String, Object> propToValue) { |
||||
if (localParams == null || propToValue == null) { |
||||
return; |
||||
} |
||||
for (int i = 0; i < localParams.length; i++) { |
||||
String prop = localParams[i].get("prop"); |
||||
if (propToValue.containsKey(prop)) { |
||||
localParams[i].put("value",(String)propToValue.get(prop)); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public String getLocalParamValue(String prop) { |
||||
if (localParams == null) { |
||||
return null; |
||||
} |
||||
for (int i = 0; i < localParams.length; i++) { |
||||
String tmpProp = localParams[i].get("prop"); |
||||
if (tmpProp.equals(prop)) { |
||||
return localParams[i].get("value"); |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
public Map<String, String>[] getLocalParams() { |
||||
return localParams; |
||||
} |
||||
} |
@ -0,0 +1,124 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode; |
||||
import org.apache.dolphinscheduler.common.task.TaskParams; |
||||
|
||||
import java.text.ParseException; |
||||
import java.util.Map; |
||||
|
||||
public class VarPoolUtils { |
||||
/** |
||||
* getTaskNodeLocalParam |
||||
* @param taskNode taskNode |
||||
* @param prop prop |
||||
* @return localParamForProp |
||||
*/ |
||||
public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) { |
||||
String taskParamsJson = taskNode.getParams(); |
||||
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); |
||||
if (taskParams == null) { |
||||
return null; |
||||
} |
||||
return taskParams.getLocalParamValue(prop); |
||||
} |
||||
|
||||
/** |
||||
* setTaskNodeLocalParams |
||||
* @param taskNode taskNode |
||||
* @param prop LocalParamName |
||||
* @param value LocalParamValue |
||||
*/ |
||||
public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) { |
||||
String taskParamsJson = taskNode.getParams(); |
||||
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); |
||||
if (taskParams == null) { |
||||
return; |
||||
} |
||||
taskParams.setLocalParamValue(prop, value); |
||||
taskNode.setParams(JSONUtils.toJsonString(taskParams)); |
||||
} |
||||
|
||||
/** |
||||
* setTaskNodeLocalParams |
||||
* @param taskNode taskNode |
||||
* @param propToValue propToValue |
||||
*/ |
||||
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) { |
||||
String taskParamsJson = taskNode.getParams(); |
||||
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); |
||||
if (taskParams == null) { |
||||
return; |
||||
} |
||||
taskParams.setLocalParamValue(propToValue); |
||||
taskNode.setParams(JSONUtils.toJsonString(taskParams)); |
||||
} |
||||
|
||||
/** |
||||
* convertVarPoolToMap |
||||
* @param propToValue propToValue |
||||
* @param varPool varPool |
||||
* @throws ParseException ParseException |
||||
*/ |
||||
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException { |
||||
if (varPool == null || propToValue == null) { |
||||
return; |
||||
} |
||||
String[] splits = varPool.split("\\$VarPool\\$"); |
||||
for (String kv : splits) { |
||||
String[] kvs = kv.split(","); |
||||
if (kvs.length == 2) { |
||||
propToValue.put(kvs[0], kvs[1]); |
||||
} else { |
||||
throw new ParseException(kv, 2); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* convertPythonScriptPlaceholders |
||||
* @param rawScript rawScript |
||||
* @return String |
||||
* @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException |
||||
*/ |
||||
public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { |
||||
int len = "${setShareVar(${".length(); |
||||
int scriptStart = 0; |
||||
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) { |
||||
int start = -1; |
||||
int end = rawScript.indexOf('}', scriptStart + len); |
||||
String prop = rawScript.substring(scriptStart + len, end); |
||||
|
||||
start = rawScript.indexOf(',', end); |
||||
end = rawScript.indexOf(')', start); |
||||
|
||||
String value = rawScript.substring(start + 1, end); |
||||
|
||||
start = rawScript.indexOf('}', start) + 1; |
||||
end = rawScript.length(); |
||||
|
||||
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value); |
||||
|
||||
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end); |
||||
|
||||
scriptStart += replaceScript.length(); |
||||
} |
||||
return rawScript; |
||||
} |
||||
} |
@ -0,0 +1,73 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.model.TaskNode; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
public class VarPoolUtilsTest { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); |
||||
|
||||
@Test |
||||
public void testSetTaskNodeLocalParams() { |
||||
String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," |
||||
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\"," |
||||
+ "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\"," |
||||
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}]," |
||||
+ "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," |
||||
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," |
||||
+ "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}"; |
||||
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); |
||||
|
||||
VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1"); |
||||
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1"); |
||||
|
||||
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>(); |
||||
propToValue.put("p1", "test2"); |
||||
|
||||
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue); |
||||
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2"); |
||||
} |
||||
|
||||
@Test |
||||
public void testConvertVarPoolToMap() throws Exception { |
||||
String varPool = "p1,66$VarPool$p2,69$VarPool$"; |
||||
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>(); |
||||
VarPoolUtils.convertVarPoolToMap(propToValue, varPool); |
||||
Assert.assertEquals((String)propToValue.get("p1"), "66"); |
||||
Assert.assertEquals((String)propToValue.get("p2"), "69"); |
||||
logger.info(propToValue.toString()); |
||||
} |
||||
|
||||
@Test |
||||
public void testConvertPythonScriptPlaceholders() throws Exception { |
||||
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; |
||||
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); |
||||
Assert.assertEquals(rawScript, "print(${p1});\n" |
||||
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" |
||||
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); |
||||
logger.info(rawScript); |
||||
} |
||||
} |
@ -0,0 +1,19 @@
|
||||
Copyright (c) 2012-2014 Chris Pettitt |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in |
||||
all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
||||
THE SOFTWARE. |
@ -1 +1,105 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
} |
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
public enum CommandType { |
||||
|
||||
/** |
||||
* remove task log request, |
||||
*/ |
||||
REMOVE_TAK_LOG_REQUEST, |
||||
|
||||
/** |
||||
* remove task log response |
||||
*/ |
||||
REMOVE_TAK_LOG_RESPONSE, |
||||
|
||||
/** |
||||
* roll view log request |
||||
*/ |
||||
ROLL_VIEW_LOG_REQUEST, |
||||
|
||||
/** |
||||
* roll view log response |
||||
*/ |
||||
ROLL_VIEW_LOG_RESPONSE, |
||||
|
||||
/** |
||||
* view whole log request |
||||
*/ |
||||
VIEW_WHOLE_LOG_REQUEST, |
||||
|
||||
/** |
||||
* view whole log response |
||||
*/ |
||||
VIEW_WHOLE_LOG_RESPONSE, |
||||
|
||||
/** |
||||
* get log bytes request |
||||
*/ |
||||
GET_LOG_BYTES_REQUEST, |
||||
|
||||
/** |
||||
* get log bytes response |
||||
*/ |
||||
GET_LOG_BYTES_RESPONSE, |
||||
|
||||
|
||||
WORKER_REQUEST, |
||||
MASTER_RESPONSE, |
||||
|
||||
/** |
||||
* execute task request |
||||
*/ |
||||
TASK_EXECUTE_REQUEST, |
||||
|
||||
/** |
||||
* execute task ack |
||||
*/ |
||||
TASK_EXECUTE_ACK, |
||||
|
||||
/** |
||||
* execute task response |
||||
*/ |
||||
TASK_EXECUTE_RESPONSE, |
||||
|
||||
/** |
||||
* kill task |
||||
*/ |
||||
TASK_KILL_REQUEST, |
||||
|
||||
/** |
||||
* kill task response |
||||
*/ |
||||
TASK_KILL_RESPONSE, |
||||
|
||||
/** |
||||
* HEART_BEAT |
||||
*/ |
||||
HEART_BEAT, |
||||
|
||||
/** |
||||
* ping |
||||
*/ |
||||
PING, |
||||
|
||||
/** |
||||
* pong |
||||
*/ |
||||
PONG; |
||||
} |
||||
|
@ -0,0 +1,43 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.utils; |
||||
|
||||
import org.apache.dolphinscheduler.remote.utils.Host; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
/** |
||||
* host test |
||||
*/ |
||||
public class HostTest { |
||||
|
||||
@Test |
||||
public void testHostWarmUp() { |
||||
Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000))); |
||||
Assert.assertEquals(50, host.getWeight()); |
||||
host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000))); |
||||
Assert.assertEquals(100, host.getWeight()); |
||||
} |
||||
|
||||
@Test |
||||
public void testHost() { |
||||
Host host = Host.of("192.158.2.2:22"); |
||||
Assert.assertEquals(22, host.getPort()); |
||||
} |
||||
} |
@ -0,0 +1,114 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.worker.task; |
||||
|
||||
import org.apache.dolphinscheduler.common.utils.LoggerUtils; |
||||
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; |
||||
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; |
||||
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; |
||||
|
||||
import java.util.Date; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.powermock.api.mockito.PowerMockito; |
||||
import org.powermock.core.classloader.annotations.PrepareForTest; |
||||
import org.powermock.modules.junit4.PowerMockRunner; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
@RunWith(PowerMockRunner.class) |
||||
@PrepareForTest({SpringApplicationContext.class}) |
||||
public class TaskManagerTest { |
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(TaskManagerTest.class); |
||||
|
||||
private TaskExecutionContext taskExecutionContext; |
||||
|
||||
private Logger taskLogger; |
||||
|
||||
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; |
||||
|
||||
@Before |
||||
public void before() { |
||||
// init task execution context, logger
|
||||
taskExecutionContext = new TaskExecutionContext(); |
||||
taskExecutionContext.setProcessId(12345); |
||||
taskExecutionContext.setProcessDefineId(1); |
||||
taskExecutionContext.setProcessInstanceId(1); |
||||
taskExecutionContext.setTaskInstanceId(1); |
||||
taskExecutionContext.setTaskType(""); |
||||
taskExecutionContext.setFirstSubmitTime(new Date()); |
||||
taskExecutionContext.setDelayTime(0); |
||||
taskExecutionContext.setLogPath("/tmp/test.log"); |
||||
taskExecutionContext.setHost("localhost"); |
||||
taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4"); |
||||
|
||||
taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId( |
||||
LoggerUtils.TASK_LOGGER_INFO_PREFIX, |
||||
taskExecutionContext.getProcessDefineId(), |
||||
taskExecutionContext.getProcessInstanceId(), |
||||
taskExecutionContext.getTaskInstanceId() |
||||
)); |
||||
|
||||
taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl(); |
||||
taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); |
||||
|
||||
PowerMockito.mockStatic(SpringApplicationContext.class); |
||||
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) |
||||
.thenReturn(taskExecutionContextCacheManager); |
||||
} |
||||
|
||||
@Test |
||||
public void testNewTask() { |
||||
|
||||
taskExecutionContext.setTaskType("SHELL"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("WATERDROP"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("HTTP"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("MR"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("SPARK"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("FLINK"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("PYTHON"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("DATAX"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
taskExecutionContext.setTaskType("SQOOP"); |
||||
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); |
||||
|
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void testNewTaskIsNull() { |
||||
taskExecutionContext.setTaskType(null); |
||||
TaskManager.newTask(taskExecutionContext,taskLogger); |
||||
} |
||||
|
||||
@Test(expected = IllegalArgumentException.class) |
||||
public void testNewTaskIsNotExists() { |
||||
taskExecutionContext.setTaskType("XXX"); |
||||
TaskManager.newTask(taskExecutionContext,taskLogger); |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,116 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.process; |
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.CommandType; |
||||
import org.apache.dolphinscheduler.common.enums.WarningType; |
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.common.utils.JSONUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.Command; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; |
||||
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskInstance; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode; |
||||
|
||||
/** |
||||
* process service test |
||||
*/ |
||||
public class ProcessServiceTest { |
||||
|
||||
@Test |
||||
public void testCreateSubCommand() { |
||||
ProcessService processService = new ProcessService(); |
||||
ProcessInstance parentInstance = new ProcessInstance(); |
||||
parentInstance.setProcessDefinitionId(1); |
||||
parentInstance.setWarningType(WarningType.SUCCESS); |
||||
parentInstance.setWarningGroupId(0); |
||||
|
||||
TaskInstance task = new TaskInstance(); |
||||
task.setTaskJson("{\"params\":{\"processDefinitionId\":100}}"); |
||||
task.setId(10); |
||||
|
||||
ProcessInstance childInstance = null; |
||||
ProcessInstanceMap instanceMap = new ProcessInstanceMap(); |
||||
instanceMap.setParentProcessInstanceId(1); |
||||
instanceMap.setParentTaskInstanceId(10); |
||||
Command command = null; |
||||
|
||||
//father history: start; child null == command type: start
|
||||
parentInstance.setHistoryCmd("START_PROCESS"); |
||||
parentInstance.setCommandType(CommandType.START_PROCESS); |
||||
command = processService.createSubProcessCommand( |
||||
parentInstance, childInstance, instanceMap, task |
||||
); |
||||
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); |
||||
|
||||
//father history: start,start failure; child null == command type: start
|
||||
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); |
||||
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); |
||||
command = processService.createSubProcessCommand( |
||||
parentInstance, childInstance, instanceMap, task |
||||
); |
||||
Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); |
||||
|
||||
//father history: scheduler,start failure; child null == command type: scheduler
|
||||
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); |
||||
parentInstance.setHistoryCmd("SCHEDULER,START_FAILURE_TASK_PROCESS"); |
||||
command = processService.createSubProcessCommand( |
||||
parentInstance, childInstance, instanceMap, task |
||||
); |
||||
Assert.assertEquals(CommandType.SCHEDULER, command.getCommandType()); |
||||
|
||||
//father history: complement,start failure; child null == command type: complement
|
||||
|
||||
String startString = "2020-01-01 00:00:00"; |
||||
String endString = "2020-01-10 00:00:00"; |
||||
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); |
||||
parentInstance.setHistoryCmd("COMPLEMENT_DATA,START_FAILURE_TASK_PROCESS"); |
||||
Map<String,String> complementMap = new HashMap<>(); |
||||
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE, startString); |
||||
complementMap.put(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE, endString); |
||||
parentInstance.setCommandParam(JSONUtils.toJsonString(complementMap)); |
||||
command = processService.createSubProcessCommand( |
||||
parentInstance, childInstance, instanceMap, task |
||||
); |
||||
Assert.assertEquals(CommandType.COMPLEMENT_DATA, command.getCommandType()); |
||||
|
||||
JsonNode complementDate = JSONUtils.parseObject(command.getCommandParam()); |
||||
Date start = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE).asText()); |
||||
Date end = DateUtils.stringToDate(complementDate.get(Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE).asText()); |
||||
Assert.assertEquals(startString, DateUtils.dateToString(start)); |
||||
Assert.assertEquals(endString, DateUtils.dateToString(end)); |
||||
|
||||
//father history: start,failure,start failure; child not null == command type: start failure
|
||||
childInstance = new ProcessInstance(); |
||||
parentInstance.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); |
||||
parentInstance.setHistoryCmd("START_PROCESS,START_FAILURE_TASK_PROCESS"); |
||||
command = processService.createSubProcessCommand( |
||||
parentInstance, childInstance, instanceMap, task |
||||
); |
||||
Assert.assertEquals(CommandType.START_FAILURE_TASK_PROCESS, command.getCommandType()); |
||||
} |
||||
} |
@ -0,0 +1,218 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
<template> |
||||
<div class="timeout-alarm-model"> |
||||
<div class="clearfix list"> |
||||
<div class="text-box"> |
||||
<span>{{$t('Timeout alarm')}}</span> |
||||
</div> |
||||
<div class="cont-box"> |
||||
<label class="label-box"> |
||||
<div style="padding-top: 5px;"> |
||||
<x-switch v-model="enable" @on-click="_onSwitch(0, $event)" :disabled="isDetails"></x-switch> |
||||
</div> |
||||
</label> |
||||
</div> |
||||
</div> |
||||
<div class="clearfix list" v-if="enable"> |
||||
<div class="text-box"> |
||||
<span>{{$t('Waiting Dependent start')}}</span> |
||||
</div> |
||||
<div class="cont-box"> |
||||
<label class="label-box"> |
||||
<div style="padding: 5px 0;"> |
||||
<x-switch v-model="waitStartTimeout.enable" @on-click="_onSwitch(1, $event)" :disabled="isDetails"></x-switch> |
||||
</div> |
||||
</label> |
||||
</div> |
||||
</div> |
||||
<div class="clearfix list" v-if="enable && waitStartTimeout.enable"> |
||||
<div class="cont-box"> |
||||
<label class="label-box"> |
||||
<span class="text-box"> |
||||
<span>{{$t('Timeout period')}}</span> |
||||
</span> |
||||
<x-input v-model="waitStartTimeout.interval" style="width: 100px;" :disabled="isDetails" maxlength="9"> |
||||
<span slot="append">{{$t('Minute')}}</span> |
||||
</x-input> |
||||
<span class="text-box"> |
||||
<span>{{$t('Check interval')}}</span> |
||||
</span> |
||||
<x-input v-model="waitStartTimeout.checkInterval" style="width: 100px;" :disabled="isDetails" maxlength="9"> |
||||
<span slot="append">{{$t('Minute')}}</span> |
||||
</x-input> |
||||
<span class="text-box"> |
||||
<span>{{$t('Timeout strategy')}}</span> |
||||
</span> |
||||
<div style="padding-top: 6px;"> |
||||
<x-checkbox-group v-model="waitStartTimeout.strategy"> |
||||
<x-checkbox label="FAILED" :disabled="true">{{$t('Timeout failure')}}</x-checkbox> |
||||
</x-checkbox-group> |
||||
</div> |
||||
</label> |
||||
</div> |
||||
</div> |
||||
<div class="clearfix list" v-if="enable"> |
||||
<div class="text-box"> |
||||
<span>{{$t('Waiting Dependent complete')}}</span> |
||||
</div> |
||||
<div class="cont-box"> |
||||
<label class="label-box"> |
||||
<div style="padding: 5px 0;"> |
||||
<x-switch v-model="waitCompleteTimeout.enable" @on-click="_onSwitch(2, $event)" :disabled="isDetails"></x-switch> |
||||
</div> |
||||
</label> |
||||
</div> |
||||
</div> |
||||
<div class="clearfix list" v-if="enable && waitCompleteTimeout.enable"> |
||||
<div class="cont-box"> |
||||
<label class="label-box"> |
||||
<span class="text-box"> |
||||
<span>{{$t('Timeout period')}}</span> |
||||
</span> |
||||
<x-input v-model="waitCompleteTimeout.interval" style="width: 100px;" :disabled="isDetails" maxlength="9"> |
||||
<span slot="append">{{$t('Minute')}}</span> |
||||
</x-input> |
||||
<span class="text-box"> |
||||
<span>{{$t('Timeout strategy')}}</span> |
||||
</span> |
||||
<div style="padding-top: 6px;"> |
||||
<x-checkbox-group v-model="waitCompleteTimeout.strategy"> |
||||
<x-checkbox label="WARN" :disabled="isDetails">{{$t('Timeout alarm')}}</x-checkbox> |
||||
<x-checkbox label="FAILED" :disabled="isDetails">{{$t('Timeout failure')}}</x-checkbox> |
||||
</x-checkbox-group> |
||||
</div> |
||||
</label> |
||||
</div> |
||||
</div> |
||||
</div> |
||||
</template> |
||||
<script> |
||||
import _ from 'lodash' |
||||
import disabledState from '@/module/mixin/disabledState' |
||||
|
||||
export default { |
||||
name: 'form-dependent-timeout', |
||||
data () { |
||||
return { |
||||
// Timeout display hiding |
||||
enable: false, |
||||
waitStartTimeout: { |
||||
enable: false, |
||||
// Timeout strategy |
||||
strategy: ['FAILED'], |
||||
// Timeout period |
||||
interval: null, |
||||
checkInterval: null |
||||
}, |
||||
waitCompleteTimeout: { |
||||
enable: false, |
||||
// Timeout strategy |
||||
strategy: [], |
||||
// Timeout period |
||||
interval: null |
||||
} |
||||
} |
||||
}, |
||||
mixins: [disabledState], |
||||
props: { |
||||
backfillItem: Object |
||||
}, |
||||
methods: { |
||||
_onSwitch (p, is) { |
||||
// reset timeout setting when switch timeout on/off. |
||||
// p = 0 for timeout switch; p = 1 for wait start timeout switch; p = 2 for wait complete timeout switch. |
||||
if (p === 1 || p === 0) { |
||||
this.waitStartTimeout.interval = is ? 30 : null |
||||
this.waitStartTimeout.checkInterval = is ? 1 : null |
||||
} |
||||
if (p === 2 || p === 0) { |
||||
this.waitCompleteTimeout.strategy = is ? ['WARN'] : [] |
||||
this.waitCompleteTimeout.interval = is ? 30 : null |
||||
} |
||||
}, |
||||
_verification () { |
||||
// Verification timeout policy |
||||
if (this.enable |
||||
&& (this.waitCompleteTimeout.enable && !this.waitCompleteTimeout.strategy.length) |
||||
|| (this.waitStartTimeout.enable && !this.waitStartTimeout.strategy.length)) { |
||||
this.$message.warning(`${this.$t('Timeout strategy must be selected')}`) |
||||
return false |
||||
} |
||||
// Verify timeout duration Non 0 positive integer |
||||
const reg = /^[1-9]\d*$/ |
||||
if (this.enable |
||||
&& (this.waitCompleteTimeout.enable && !reg.test(this.waitCompleteTimeout.interval)) |
||||
|| (this.waitStartTimeout.enable && (!reg.test(this.waitStartTimeout.interval || !reg.test(this.waitStartTimeout.checkInterval))))) { |
||||
this.$message.warning(`${this.$t('Timeout must be a positive integer')}`) |
||||
return false |
||||
} |
||||
// Verify timeout duration longer than check interval |
||||
if (this.enable && this.waitStartTimeout.enable && this.waitStartTimeout.checkInterval >= this.waitStartTimeout.interval) { |
||||
this.$message.warning(`${this.$t('Timeout must be longer than check interval')}`) |
||||
return false |
||||
} |
||||
this.$emit('on-timeout', { |
||||
waitStartTimeout: { |
||||
strategy: 'FAILED', |
||||
interval: parseInt(this.waitStartTimeout.interval), |
||||
checkInterval: parseInt(this.waitStartTimeout.checkInterval), |
||||
enable: this.waitStartTimeout.enable |
||||
}, |
||||
waitCompleteTimeout: { |
||||
strategy: (() => { |
||||
// Handling checkout sequence |
||||
let strategy = this.waitCompleteTimeout.strategy |
||||
if (strategy.length === 2 && strategy[0] === 'FAILED') { |
||||
return [strategy[1], strategy[0]].join(',') |
||||
} else { |
||||
return strategy.join(',') |
||||
} |
||||
})(), |
||||
interval: parseInt(this.waitCompleteTimeout.interval), |
||||
enable: this.waitCompleteTimeout.enable |
||||
} |
||||
}) |
||||
return true |
||||
} |
||||
}, |
||||
watch: { |
||||
}, |
||||
created () { |
||||
let o = this.backfillItem |
||||
// Non-null objects represent backfill |
||||
if (!_.isEmpty(o)) { |
||||
if (o.timeout) { |
||||
this.enable = true |
||||
this.waitCompleteTimeout.enable = o.timeout.enable || false |
||||
this.waitCompleteTimeout.strategy = _.split(o.timeout.strategy, ',') || ['WARN'] |
||||
this.waitCompleteTimeout.interval = o.timeout.interval || null |
||||
} |
||||
if (o.waitStartTimeout) { |
||||
this.enable = true |
||||
this.waitStartTimeout.enable = o.waitStartTimeout.enable || false |
||||
this.waitStartTimeout.strategy = ['FAILED'] |
||||
this.waitStartTimeout.interval = o.waitStartTimeout.interval || null |
||||
this.waitStartTimeout.checkInterval = o.waitStartTimeout.checkInterval || null |
||||
} |
||||
} |
||||
}, |
||||
mounted () { |
||||
}, |
||||
components: {} |
||||
} |
||||
</script> |
@ -0,0 +1,108 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
<template> |
||||
<div class="pre_tasks-model"> |
||||
<div class="clearfix list"> |
||||
<div class="text-box"> |
||||
<span>{{$t('Pre tasks')}}</span> |
||||
</div> |
||||
<div class="cont-box"> |
||||
<div class="label-box"> |
||||
<x-select |
||||
ref="preTasksSelector" |
||||
style="width: 100%;" |
||||
filterable |
||||
multiple |
||||
v-model="preTasks" |
||||
:disabled="isDetails" |
||||
:id="preTasksSelectorId"> |
||||
<x-option |
||||
v-for="task in preTaskList" |
||||
:key="task.id" |
||||
:value="task.id" |
||||
:label="task.name"> |
||||
</x-option> |
||||
</x-select> |
||||
</div> |
||||
</div> |
||||
</div> |
||||
</div> |
||||
</template> |
||||
<script> |
||||
import disabledState from '@/module/mixin/disabledState' |
||||
export default { |
||||
name: 'pre_tasks', |
||||
mixins: [disabledState], |
||||
props: { |
||||
backfillItem: Object |
||||
}, |
||||
data () { |
||||
return { |
||||
preTasksSelectorId: '_preTasksSelectorId', // Refresh target vue-component by changing id |
||||
preTasks: [], |
||||
preTasksOld: [], |
||||
} |
||||
}, |
||||
mounted () { |
||||
this.preTasks = this.backfillItem['preTasks'] || this.preTasks |
||||
this.preTasksOld = this.preTasks |
||||
|
||||
// Refresh target vue-component by changing id |
||||
this.$nextTick(() => { |
||||
this.preTasksSelectorId = 'preTasksSelectorId' |
||||
}) |
||||
}, |
||||
computed: { |
||||
preTaskList: function () { |
||||
let currentTaskId = this.backfillItem['id'] || this.id |
||||
let cacheTasks = Object.assign({}, this.store.state.dag.tasks) |
||||
let keys = Object.keys(cacheTasks) |
||||
for (let i = 0; i < keys.length; i++) { |
||||
let key = keys[i] |
||||
if ((!cacheTasks[key].id || !cacheTasks[key].name) || (currentTaskId && cacheTasks[key].id === currentTaskId)) { |
||||
// Clean undefined and current task data |
||||
delete cacheTasks[key] |
||||
} |
||||
} |
||||
|
||||
return cacheTasks |
||||
}, |
||||
// preTaskIds used to create new connection |
||||
preTasksToAdd: function () { |
||||
let toAddTasks = this.preTasks.filter(taskId => { |
||||
return (this.preTasksOld.indexOf(taskId) === -1) |
||||
}) |
||||
return toAddTasks |
||||
}, |
||||
// preTaskIds used to delete connection |
||||
preTasksToDelete: function () { |
||||
return this.preTasksOld.filter(taskId => this.preTasks.indexOf(taskId) === -1) |
||||
}, |
||||
}, |
||||
methods: { |
||||
// Pass data to parent-level to process dag |
||||
_verification () { |
||||
this.$emit('on-pre-tasks', { |
||||
preTasks: this.preTasks, |
||||
preTasksToAdd: this.preTasksToAdd, |
||||
preTasksToDelete: this.preTasksToDelete, |
||||
}) |
||||
return true |
||||
} |
||||
} |
||||
} |
||||
</script> |
@ -0,0 +1,45 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.util; |
||||
|
||||
import java.io.IOException; |
||||
import java.io.InputStream; |
||||
import java.util.HashMap; |
||||
|
||||
import org.ho.yaml.Yaml; |
||||
import org.springframework.core.io.DefaultResourceLoader; |
||||
import org.springframework.core.io.Resource; |
||||
|
||||
/** |
||||
* read yml file |
||||
*/ |
||||
public class YmlReader { |
||||
public static HashMap<String,HashMap<String, String>> map; |
||||
public String getDataYml(String filePath, String key1, String key2) { |
||||
Yaml yaml = new Yaml(); |
||||
Resource resource = new DefaultResourceLoader().getResource("classpath:" + filePath + ".yml"); |
||||
try { |
||||
InputStream inputStream = resource.getInputStream(); |
||||
map = yaml.loadType(inputStream, HashMap.class); |
||||
} catch (IOException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
String data = map.get(key1).get(key2); |
||||
return data; |
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue