Browse Source

Merge pull request #16 from apache/dev

update code from apache
pull/3/MERGE
BoYiZhang 4 years ago committed by GitHub
parent
commit
5480a6523b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 84
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  4. 78
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
  5. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
  6. 124
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
  7. 15
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java
  8. 73
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
  9. 181
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  10. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  11. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  12. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  13. 46
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  14. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  15. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
  16. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  17. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
  18. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  19. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  20. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  21. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  22. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  23. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  24. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  25. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
  26. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
  27. 52
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
  28. 43
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
  29. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  30. 2
      pom.xml
  31. 2
      sql/dolphinscheduler-postgre.sql
  32. 2
      sql/dolphinscheduler_mysql.sql
  33. 40
      sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
  34. 36
      sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
  35. 4
      tools/dependencies/check-LICENSE.sh

37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -48,7 +48,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -77,6 +77,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -84,6 +85,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -232,25 +234,16 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
* @return resource ids * @return resource ids
*/ */
private String getResourceIds(ProcessData processData) { private String getResourceIds(ProcessData processData) {
List<TaskNode> tasks = processData.getTasks(); return Optional.ofNullable(processData.getTasks())
Set<Integer> resourceIds = new HashSet<>(); .orElse(Collections.emptyList())
for (TaskNode taskNode : tasks) { .stream()
String taskParameter = taskNode.getParams(); .map(taskNode -> TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams()))
AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); .filter(Objects::nonNull)
if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { .flatMap(parameters -> parameters.getResourceFilesList().stream())
Set<Integer> tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); .map(ResourceInfo::getId)
resourceIds.addAll(tempSet); .distinct()
} .map(Objects::toString)
} .collect(Collectors.joining(","));
StringBuilder sb = new StringBuilder();
for (int i : resourceIds) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(i);
}
return sb.toString();
} }
/** /**
@ -845,7 +838,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
try { try {
createProcessResult = createProcessDefinition(loginUser createProcessResult = createProcessDefinition(loginUser
, currentProjectName, , currentProjectName,
processDefinitionName + "_import_" + System.currentTimeMillis(), processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(),
importProcessParam, importProcessParam,
processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionDescription(),
processMeta.getProcessDefinitionLocations(), processMeta.getProcessDefinitionLocations(),
@ -1433,7 +1426,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
return createProcessDefinition( return createProcessDefinition(
loginUser, loginUser,
targetProject.getName(), targetProject.getName(),
processDefinition.getName() + "_copy_" + System.currentTimeMillis(), processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp(),
processDefinition.getProcessDefinitionJson(), processDefinition.getProcessDefinitionJson(),
processDefinition.getDescription(), processDefinition.getDescription(),
processDefinition.getLocations(), processDefinition.getLocations(),

84
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.ProcessMeta;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
@ -28,6 +30,9 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -51,8 +56,11 @@ import org.apache.http.entity.ContentType;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -66,6 +74,7 @@ import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile; import org.springframework.mock.web.MockMultipartFile;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
@ -984,6 +993,81 @@ public class ProcessDefinitionServiceTest {
loginUser, projectName, "1", null); loginUser, projectName, "1", null);
} }
@Test
public void testGetResourceIds() throws Exception {
// set up
Method testMethod = ReflectionUtils.findMethod(ProcessDefinitionServiceImpl.class, "getResourceIds", ProcessData.class);
assertThat(testMethod).isNotNull();
testMethod.setAccessible(true);
// when processData has empty task, then return empty string
ProcessData input1 = new ProcessData();
input1.setTasks(Collections.emptyList());
String output1 = (String) testMethod.invoke(processDefinitionService, input1);
assertThat(output1).isEmpty();
// when task is null, then return empty string
ProcessData input2 = new ProcessData();
input2.setTasks(null);
String output2 = (String) testMethod.invoke(processDefinitionService, input2);
assertThat(output2).isEmpty();
// when task type is incorrect mapping, then return empty string
ProcessData input3 = new ProcessData();
TaskNode taskNode3 = new TaskNode();
taskNode3.setType("notExistType");
input3.setTasks(Collections.singletonList(taskNode3));
String output3 = (String) testMethod.invoke(processDefinitionService, input3);
assertThat(output3).isEmpty();
// when task parameter list is null, then return empty string
ProcessData input4 = new ProcessData();
TaskNode taskNode4 = new TaskNode();
taskNode4.setType("SHELL");
taskNode4.setParams(null);
input4.setTasks(Collections.singletonList(taskNode4));
String output4 = (String) testMethod.invoke(processDefinitionService, input4);
assertThat(output4).isEmpty();
// when resource id list is 0 1, then return 0,1
ProcessData input5 = new ProcessData();
TaskNode taskNode5 = new TaskNode();
taskNode5.setType("SHELL");
ShellParameters shellParameters5 = new ShellParameters();
ResourceInfo resourceInfo5A = new ResourceInfo();
resourceInfo5A.setId(0);
ResourceInfo resourceInfo5B = new ResourceInfo();
resourceInfo5B.setId(1);
shellParameters5.setResourceList(Arrays.asList(resourceInfo5A, resourceInfo5B));
taskNode5.setParams(JSONUtils.toJsonString(shellParameters5));
input5.setTasks(Collections.singletonList(taskNode5));
String output5 = (String) testMethod.invoke(processDefinitionService, input5);
assertThat(output5.split(",")).hasSize(2)
.containsExactlyInAnyOrder("0", "1");
// when resource id list is 0 1 1 2, then return 0,1,2
ProcessData input6 = new ProcessData();
TaskNode taskNode6 = new TaskNode();
taskNode6.setType("SHELL");
ShellParameters shellParameters6 = new ShellParameters();
ResourceInfo resourceInfo6A = new ResourceInfo();
resourceInfo6A.setId(0);
ResourceInfo resourceInfo6B = new ResourceInfo();
resourceInfo6B.setId(1);
ResourceInfo resourceInfo6C = new ResourceInfo();
resourceInfo6C.setId(1);
ResourceInfo resourceInfo6D = new ResourceInfo();
resourceInfo6D.setId(2);
shellParameters6.setResourceList(Arrays.asList(resourceInfo6A, resourceInfo6B, resourceInfo6C, resourceInfo6D));
taskNode6.setParams(JSONUtils.toJsonString(shellParameters6));
input6.setTasks(Collections.singletonList(taskNode6));
String output6 = (String) testMethod.invoke(processDefinitionService, input6);
assertThat(output6.split(",")).hasSize(3)
.containsExactlyInAnyOrder("0", "1", "2");
}
/** /**
* get mock datasource * get mock datasource
* *

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -266,6 +266,10 @@ public final class Constants {
*/ */
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss"; public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
/**
* date format of yyyyMMddHHmmssSSS
*/
public static final String YYYYMMDDHHMMSSSSS = "yyyyMMddHHmmssSSS";
/** /**
* http connect time out * http connect time out
*/ */

78
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task;
import java.util.Map;
public class TaskParams {
private String rawScript;
private Map<String, String>[] localParams;
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public void setLocalParams(Map<String, String>[] localParams) {
this.localParams = localParams;
}
public String getRawScript() {
return rawScript;
}
public void setLocalParamValue(String prop, Object value) {
if (localParams == null || value == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
if (localParams[i].get("prop").equals(prop)) {
localParams[i].put("value", (String)value);
}
}
}
public void setLocalParamValue(Map<String, Object> propToValue) {
if (localParams == null || propToValue == null) {
return;
}
for (int i = 0; i < localParams.length; i++) {
String prop = localParams[i].get("prop");
if (propToValue.containsKey(prop)) {
localParams[i].put("value",(String)propToValue.get(prop));
}
}
}
public String getLocalParamValue(String prop) {
if (localParams == null) {
return null;
}
for (int i = 0; i < localParams.length; i++) {
String tmpProp = localParams[i].get("prop");
if (tmpProp.equals(prop)) {
return localParams[i].get("value");
}
}
return null;
}
public Map<String, String>[] getLocalParams() {
return localParams;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java

@ -444,4 +444,14 @@ public class DateUtils {
long usedTime = (System.currentTimeMillis() - baseTime.getTime()) / 1000; long usedTime = (System.currentTimeMillis() - baseTime.getTime()) / 1000;
return intervalSeconds - usedTime; return intervalSeconds - usedTime;
} }
/**
* get current time stamp : yyyyMMddHHmmssSSS
*
* @return date string
*/
public static String getCurrentTimeStamp() {
return getCurrentTime(Constants.YYYYMMDDHHMMSSSSS);
}
} }

124
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskParams;
import java.text.ParseException;
import java.util.Map;
public class VarPoolUtils {
/**
* getTaskNodeLocalParam
* @param taskNode taskNode
* @param prop prop
* @return localParamForProp
*/
public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return null;
}
return taskParams.getLocalParamValue(prop);
}
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param prop LocalParamName
* @param value LocalParamValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(prop, value);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* setTaskNodeLocalParams
* @param taskNode taskNode
* @param propToValue propToValue
*/
public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object> propToValue) {
String taskParamsJson = taskNode.getParams();
TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class);
if (taskParams == null) {
return;
}
taskParams.setLocalParamValue(propToValue);
taskNode.setParams(JSONUtils.toJsonString(taskParams));
}
/**
* convertVarPoolToMap
* @param propToValue propToValue
* @param varPool varPool
* @throws ParseException ParseException
*/
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (varPool == null || propToValue == null) {
return;
}
String[] splits = varPool.split("\\$VarPool\\$");
for (String kv : splits) {
String[] kvs = kv.split(",");
if (kvs.length == 2) {
propToValue.put(kvs[0], kvs[1]);
} else {
throw new ParseException(kv, 2);
}
}
}
/**
* convertPythonScriptPlaceholders
* @param rawScript rawScript
* @return String
* @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
*/
public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException {
int len = "${setShareVar(${".length();
int scriptStart = 0;
while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) {
int start = -1;
int end = rawScript.indexOf('}', scriptStart + len);
String prop = rawScript.substring(scriptStart + len, end);
start = rawScript.indexOf(',', end);
end = rawScript.indexOf(')', start);
String value = rawScript.substring(start + 1, end);
start = rawScript.indexOf('}', start) + 1;
end = rawScript.length();
String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end);
scriptStart += replaceScript.length();
}
return rawScript;
}
}

15
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java

@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert;
import org.junit.Test;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
public class DateUtilsTest { public class DateUtilsTest {
@Test @Test
public void format2Readable() throws ParseException { public void format2Readable() throws ParseException {
@ -38,10 +40,8 @@ public class DateUtilsTest {
Assert.assertEquals("01 09:23:08", readableDate); Assert.assertEquals("01 09:23:08", readableDate);
} }
@Test @Test
public void testWeek() { public void testWeek() {
Date curr = DateUtils.stringToDate("2019-02-01 00:00:00"); Date curr = DateUtils.stringToDate("2019-02-01 00:00:00");
Date monday1 = DateUtils.stringToDate("2019-01-28 00:00:00"); Date monday1 = DateUtils.stringToDate("2019-01-28 00:00:00");
Date sunday1 = DateUtils.stringToDate("2019-02-03 00:00:00"); Date sunday1 = DateUtils.stringToDate("2019-02-03 00:00:00");
@ -150,4 +150,11 @@ public class DateUtilsTest {
Date curr = DateUtils.getEndOfHour(d1); Date curr = DateUtils.getEndOfHour(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59");
} }
@Test
public void getCurrentTimeStamp() {
String timeStamp = DateUtils.getCurrentTimeStamp();
Assert.assertNotNull(timeStamp);
}
} }

73
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.model.TaskNode;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VarPoolUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class);
@Test
public void testSetTaskNodeLocalParams() {
String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+ "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+ "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\","
+ "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+ "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+ "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+ "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1");
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
propToValue.put("p1", "test2");
VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2");
}
@Test
public void testConvertVarPoolToMap() throws Exception {
String varPool = "p1,66$VarPool$p2,69$VarPool$";
ConcurrentHashMap<String, Object> propToValue = new ConcurrentHashMap<String, Object>();
VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
Assert.assertEquals((String)propToValue.get("p1"), "66");
Assert.assertEquals((String)propToValue.get("p2"), "69");
logger.info(propToValue.toString());
}
@Test
public void testConvertPythonScriptPlaceholders() throws Exception {
String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
Assert.assertEquals(rawScript, "print(${p1});\n"
+ "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+ "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
logger.info(rawScript);
}
}

181
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -14,18 +14,27 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import java.util.Date;
import java.util.Objects;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.*;
import java.util.Date;
import java.util.Objects;
/** /**
* process instance * process instance
@ -189,6 +198,7 @@ public class ProcessInstance {
/** /**
* process duration * process duration
*
* @return * @return
*/ */
@TableField(exist = false) @TableField(exist = false)
@ -214,6 +224,11 @@ public class ProcessInstance {
*/ */
private int tenantId; private int tenantId;
/**
* varPool string
*/
private String varPool;
/** /**
* receivers for api * receivers for api
*/ */
@ -232,13 +247,26 @@ public class ProcessInstance {
/** /**
* set the process name with process define version and timestamp * set the process name with process define version and timestamp
*
* @param processDefinition processDefinition * @param processDefinition processDefinition
*/ */
public ProcessInstance(ProcessDefinition processDefinition) { public ProcessInstance(ProcessDefinition processDefinition) {
this.processDefinition = processDefinition; this.processDefinition = processDefinition;
this.name = processDefinition.getName() + "-" + this.name = processDefinition.getName()
processDefinition.getVersion() + "-" + + "-"
System.currentTimeMillis(); +
processDefinition.getVersion()
+ "-"
+
DateUtils.getCurrentTimeStamp();
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
} }
public ProcessDefinition getProcessDefinition() { public ProcessDefinition getProcessDefinition() {
@ -313,7 +341,6 @@ public class ProcessInstance {
this.name = name; this.name = name;
} }
public String getHost() { public String getHost() {
return host; return host;
} }
@ -322,7 +349,6 @@ public class ProcessInstance {
this.host = host; this.host = host;
} }
public CommandType getCommandType() { public CommandType getCommandType() {
return commandType; return commandType;
} }
@ -347,7 +373,6 @@ public class ProcessInstance {
this.taskDependType = taskDependType; this.taskDependType = taskDependType;
} }
public int getMaxTryTimes() { public int getMaxTryTimes() {
return maxTryTimes; return maxTryTimes;
} }
@ -364,7 +389,6 @@ public class ProcessInstance {
this.failureStrategy = failureStrategy; this.failureStrategy = failureStrategy;
} }
public boolean isProcessInstanceStop() { public boolean isProcessInstanceStop() {
return this.state.typeIsFinished(); return this.state.typeIsFinished();
} }
@ -441,7 +465,6 @@ public class ProcessInstance {
this.executorId = executorId; this.executorId = executorId;
} }
public Flag getIsSubProcess() { public Flag getIsSubProcess() {
return isSubProcess; return isSubProcess;
} }
@ -457,6 +480,7 @@ public class ProcessInstance {
public void setProcessInstancePriority(Priority processInstancePriority) { public void setProcessInstancePriority(Priority processInstancePriority) {
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
public String getLocations() { public String getLocations() {
return locations; return locations;
} }
@ -477,6 +501,10 @@ public class ProcessInstance {
return historyCmd; return historyCmd;
} }
public void setHistoryCmd(String historyCmd) {
this.historyCmd = historyCmd;
}
public String getExecutorName() { public String getExecutorName() {
return executorName; return executorName;
} }
@ -485,12 +513,9 @@ public class ProcessInstance {
this.executorName = executorName; this.executorName = executorName;
} }
public void setHistoryCmd(String historyCmd) {
this.historyCmd = historyCmd;
}
/** /**
* add command to history * add command to history
*
* @param cmd cmd * @param cmd cmd
*/ */
public void addHistoryCmd(CommandType cmd) { public void addHistoryCmd(CommandType cmd) {
@ -503,6 +528,7 @@ public class ProcessInstance {
/** /**
* check this process is start complement data * check this process is start complement data
*
* @return whether complement data * @return whether complement data
*/ */
public boolean isComplementData() { public boolean isComplementData() {
@ -515,6 +541,7 @@ public class ProcessInstance {
/** /**
* get current command type, * get current command type,
* if start with complement data,return complement * if start with complement data,return complement
*
* @return CommandType * @return CommandType
*/ */
public CommandType getCmdTypeIfComplement() { public CommandType getCmdTypeIfComplement() {
@ -556,15 +583,14 @@ public class ProcessInstance {
this.timeout = timeout; this.timeout = timeout;
} }
public int getTenantId() {
return this.tenantId;
}
public void setTenantId(int tenantId) { public void setTenantId(int tenantId) {
this.tenantId = tenantId; this.tenantId = tenantId;
} }
public int getTenantId() {
return this.tenantId ;
}
public String getReceivers() { public String getReceivers() {
return receivers; return receivers;
} }
@ -583,44 +609,83 @@ public class ProcessInstance {
@Override @Override
public String toString() { public String toString() {
return "ProcessInstance{" + return "ProcessInstance{"
"id=" + id + + "id=" + id
", processDefinitionId=" + processDefinitionId + + ", processDefinitionId=" + processDefinitionId
", state=" + state + + ", state=" + state
", recovery=" + recovery + + ", recovery=" + recovery
", startTime=" + startTime + + ", startTime=" + startTime
", endTime=" + endTime + + ", endTime=" + endTime
", runTimes=" + runTimes + + ", runTimes=" + runTimes
", name='" + name + '\'' + + ", name='" + name + '\''
", host='" + host + '\'' + + ", host='" + host + '\''
", processDefinition=" + processDefinition + + ", processDefinition="
", commandType=" + commandType + + processDefinition
", commandParam='" + commandParam + '\'' + + ", commandType="
", taskDependType=" + taskDependType + + commandType
", maxTryTimes=" + maxTryTimes + + ", commandParam='"
", failureStrategy=" + failureStrategy + + commandParam
", warningType=" + warningType + + '\''
", warningGroupId=" + warningGroupId + + ", taskDependType="
", scheduleTime=" + scheduleTime + + taskDependType
", commandStartTime=" + commandStartTime + + ", maxTryTimes="
", globalParams='" + globalParams + '\'' + + maxTryTimes
", processInstanceJson='" + processInstanceJson + '\'' + + ", failureStrategy="
", executorId=" + executorId + + failureStrategy
", tenantCode='" + tenantCode + '\'' + + ", warningType="
", queue='" + queue + '\'' + + warningType
", isSubProcess=" + isSubProcess + + ", warningGroupId="
", locations='" + locations + '\'' + + warningGroupId
", connects='" + connects + '\'' + + ", scheduleTime="
", historyCmd='" + historyCmd + '\'' + + scheduleTime
", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + + ", commandStartTime="
", duration=" + duration + + commandStartTime
", processInstancePriority=" + processInstancePriority + + ", globalParams='"
", workerGroup='" + workerGroup + '\'' + + globalParams
", timeout=" + timeout + + '\''
", tenantId=" + tenantId + + ", processInstanceJson='"
", receivers='" + receivers + '\'' + + processInstanceJson
", receiversCc='" + receiversCc + '\'' + + '\''
'}'; + ", executorId="
+ executorId
+ ", tenantCode='"
+ tenantCode
+ '\''
+ ", queue='"
+ queue
+ '\''
+ ", isSubProcess="
+ isSubProcess
+ ", locations='"
+ locations
+ '\''
+ ", connects='"
+ connects
+ '\''
+ ", historyCmd='"
+ historyCmd
+ '\''
+ ", dependenceScheduleTimes='"
+ dependenceScheduleTimes
+ '\''
+ ", duration="
+ duration
+ ", processInstancePriority="
+ processInstancePriority
+ ", workerGroup='"
+ workerGroup
+ '\''
+ ", timeout="
+ timeout
+ ", tenantId="
+ tenantId
+ ", receivers='"
+ receivers
+ '\''
+ ", receiversCc='"
+ receiversCc
+ '\''
+ '}';
} }
@Override @Override

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -211,6 +211,11 @@ public class TaskInstance implements Serializable {
*/ */
private int executorId; private int executorId;
/**
* varPool string
*/
private String varPool;
/** /**
* executor name * executor name
*/ */
@ -232,6 +237,13 @@ public class TaskInstance implements Serializable {
this.executePath = executePath; this.executePath = executePath;
} }
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public ProcessInstance getProcessInstance() { public ProcessInstance getProcessInstance() {
return processInstance; return processInstance;

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -63,6 +63,18 @@ public class TaskExecuteResponseCommand implements Serializable {
*/ */
private String appIds; private String appIds;
/**
* varPool string
*/
private String varPool;
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -53,4 +53,9 @@ public class Constants {
*/ */
public static final String OS_NAME = System.getProperty("os.name"); public static final String OS_NAME = System.getProperty("os.name");
/**
* warm up time
*/
public static final int WARM_UP_TIME = 10 * 60 * 1000;
} }

46
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable; import java.io.Serializable;
@ -44,6 +45,11 @@ public class Host implements Serializable {
*/ */
private int weight; private int weight;
/**
* startTime
*/
private long startTime;
/** /**
* workGroup * workGroup
*/ */
@ -58,19 +64,21 @@ public class Host implements Serializable {
this.address = ip + ":" + port; this.address = ip + ":" + port;
} }
public Host(String ip, int port, int weight) { public Host(String ip, int port, int weight, long startTime) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + ":" + port;
this.weight = weight; this.weight = getWarmUpWeight(weight, startTime);
this.startTime = startTime;
} }
public Host(String ip, int port, int weight,String workGroup) { public Host(String ip, int port, int weight, long startTime, String workGroup) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + ":" + port;
this.weight = weight; this.weight = getWarmUpWeight(weight, startTime);
this.workGroup = workGroup; this.workGroup = workGroup;
this.startTime = startTime;
} }
public String getAddress() { public String getAddress() {
@ -98,6 +106,14 @@ public class Host implements Serializable {
this.weight = weight; this.weight = weight;
} }
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public int getPort() { public int getPort() {
return port; return port;
} }
@ -133,8 +149,8 @@ public class Host implements Serializable {
if (parts.length == 2) { if (parts.length == 2) {
host = new Host(parts[0], Integer.parseInt(parts[1])); host = new Host(parts[0], Integer.parseInt(parts[1]));
} }
if (parts.length == 3) { if (parts.length == 4) {
host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2])); host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
} }
return host; return host;
} }
@ -169,8 +185,20 @@ public class Host implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "Host{" + return "Host{"
"address='" + address + '\'' + + "address='" + address + '\''
'}'; + '}';
}
/**
* warm up
*/
private int getWarmUpWeight(int weight, long startTime) {
long uptime = System.currentTimeMillis() - startTime;
//If the warm-up is not over, reduce the weight
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
}
return weight;
} }
} }

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
/** /**
@ -32,25 +33,25 @@ public class HostWeight {
private final Host host; private final Host host;
private final int weight; private final double weight;
private int currentWeight; private double currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) { public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = calculateWeight(cpu, memory, loadAverage); this.weight = getWeight(cpu, memory, loadAverage, host);
this.host = host; this.host = host;
this.currentWeight = weight; this.currentWeight = weight;
} }
public int getCurrentWeight() { public double getCurrentWeight() {
return currentWeight; return currentWeight;
} }
public int getWeight() { public double getWeight() {
return weight; return weight;
} }
public void setCurrentWeight(int currentWeight) { public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight; this.currentWeight = currentWeight;
} }
@ -60,14 +61,27 @@ public class HostWeight {
@Override @Override
public String toString() { public String toString() {
return "HostWeight{" + return "HostWeight{"
"host=" + host + + "host=" + host
", weight=" + weight + + ", weight=" + weight
", currentWeight=" + currentWeight + + ", currentWeight=" + currentWeight
'}'; + '}';
} }
private int calculateWeight(double cpu, double memory, double loadAverage){ private double getWeight(double cpu, double memory, double loadAverage, Host host) {
return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
return getWarmUpWeight(host, calculateWeight);
}
/**
* If the warm-up is not over, add the weight
*/
private double getWarmUpWeight(Host host, double weight) {
long startTime = host.getStartTime();
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return weight * Constants.WARM_UP_TIME / uptime;
}
return weight;
} }
} }

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java

@ -26,13 +26,14 @@ public class LowerWeightRoundRobin extends AbstractSelector<HostWeight>{
/** /**
* select * select
*
* @param sources sources * @param sources sources
* @return HostWeight * @return HostWeight
*/ */
@Override @Override
public HostWeight doSelect(Collection<HostWeight> sources) { public HostWeight doSelect(Collection<HostWeight> sources) {
int totalWeight = 0; double totalWeight = 0;
int lowWeight = 0; double lowWeight = 0;
HostWeight lowerNode = null; HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) { for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight(); totalWeight += hostWeight.getWeight();

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -90,7 +90,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
responseCommand.getEndTime(), responseCommand.getEndTime(),
responseCommand.getProcessId(), responseCommand.getProcessId(),
responseCommand.getAppIds(), responseCommand.getAppIds(),
responseCommand.getTaskInstanceId()); responseCommand.getTaskInstanceId(),
responseCommand.getVarPool());
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java

@ -79,6 +79,11 @@ public class TaskResponseEvent {
*/ */
private Event event; private Event event;
/**
* varPool
*/
private String varPool;
public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) { public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) {
TaskResponseEvent event = new TaskResponseEvent(); TaskResponseEvent event = new TaskResponseEvent();
event.setState(state); event.setState(state);
@ -91,7 +96,7 @@ public class TaskResponseEvent {
return event; return event;
} }
public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){ public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId, String varPool) {
TaskResponseEvent event = new TaskResponseEvent(); TaskResponseEvent event = new TaskResponseEvent();
event.setState(state); event.setState(state);
event.setEndTime(endTime); event.setEndTime(endTime);
@ -99,9 +104,18 @@ public class TaskResponseEvent {
event.setAppIds(appIds); event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId); event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT); event.setEvent(Event.RESULT);
event.setVarPool(varPool);
return event; return event;
} }
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -139,7 +139,8 @@ public class TaskResponseService {
taskResponseEvent.getEndTime(), taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(), taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(), taskResponseEvent.getAppIds(),
taskResponseEvent.getTaskInstanceId()); taskResponseEvent.getTaskInstanceId(),
taskResponseEvent.getVarPool());
break; break;
default: default:
throw new IllegalArgumentException("invalid event type : " + event); throw new IllegalArgumentException("invalid event type : " + event);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable {
* submit post node * submit post node
* @param parentNodeName parent node name * @param parentNodeName parent node name
*/ */
private Map<String,Object> propToValue = new ConcurrentHashMap<String, Object>();
private void submitPostNode(String parentNodeName){ private void submitPostNode(String parentNodeName){
List<String> submitTaskNodeList = parsePostNodeList(parentNodeName); List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
List<TaskInstance> taskInstances = new ArrayList<>(); List<TaskInstance> taskInstances = new ArrayList<>();
for(String taskNode : submitTaskNodeList){ for(String taskNode : submitTaskNodeList){
try {
VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
} catch (ParseException e) {
logger.error("parse {} exception", processInstance.getVarPool(), e);
throw new RuntimeException();
}
TaskNode taskNodeObject = dag.getNode(taskNode);
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
taskInstances.add(createTaskInstance(processInstance, taskNode, taskInstances.add(createTaskInstance(processInstance, taskNode,
dag.getNode(taskNode))); taskNodeObject));
} }
// if previous node success , post node submit // if previous node success , post node submit
@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState()); task.getName(), task.getId(), task.getState());
// node success , post node submit // node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){ if(task.getState() == ExecutionStatus.SUCCESS){
processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);
submitPostNode(task.getName()); submitPostNode(task.getName());
continue; continue;

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import java.util.Date; import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.util.Set; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import java.util.concurrent.Executors; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -41,8 +48,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* worker registry * worker registry
@ -142,6 +147,7 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
String weight = getWorkerWeight(); String weight = getWorkerWeight();
String workerStartTime = COLON + System.currentTimeMillis();
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100); StringBuilder workerZkPathBuilder = new StringBuilder(100);
@ -153,6 +159,7 @@ public class WorkerRegistry {
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(address); workerZkPathBuilder.append(address);
workerZkPathBuilder.append(weight); workerZkPathBuilder.append(weight);
workerZkPathBuilder.append(workerStartTime);
workerZkPaths.add(workerZkPathBuilder.toString()); workerZkPaths.add(workerZkPathBuilder.toString());
} }
return workerZkPaths; return workerZkPaths;
@ -162,13 +169,14 @@ public class WorkerRegistry {
* get local address * get local address
*/ */
private String getLocalAddress() { private String getLocalAddress() {
return NetUtils.getHost() + ":" + workerConfig.getListenPort(); return NetUtils.getHost() + COLON + workerConfig.getListenPort();
} }
/** /**
* get Worker Weight * get Worker Weight
*/ */
private String getWorkerWeight() { private String getWorkerWeight() {
return ":" + workerConfig.getWeight(); return COLON + workerConfig.getWeight();
} }
} }

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId()); responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds()); responseCommand.setAppIds(task.getAppIds());
responseCommand.setVarPool(task.getVarPool());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
} catch (Exception e) { } catch (Exception e) {
logger.error("task scheduler failure", e); logger.error("task scheduler failure", e);

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor {
*/ */
protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX);
protected StringBuilder varPool = new StringBuilder();
/** /**
* process * process
*/ */
@ -234,6 +235,9 @@ public abstract class AbstractCommandExecutor {
return result; return result;
} }
public String getVarPool() {
return varPool.toString();
}
/** /**
* cancel application * cancel application
@ -347,9 +351,14 @@ public abstract class AbstractCommandExecutor {
long lastFlushTime = System.currentTimeMillis(); long lastFlushTime = System.currentTimeMillis();
while ((line = inReader.readLine()) != null) { while ((line = inReader.readLine()) != null) {
if (line.startsWith("${setValue(")) {
varPool.append(line.substring("${setValue(".length(), line.length() - 2));
varPool.append("$VarPool$");
} else {
logBuffer.add(line); logBuffer.add(line);
lastFlushTime = flush(lastFlushTime); lastFlushTime = flush(lastFlushTime);
} }
}
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
} finally { } finally {

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -47,6 +47,11 @@ import java.util.Map;
*/ */
public abstract class AbstractTask { public abstract class AbstractTask {
/**
* varPool string
*/
protected String varPool;
/** /**
* taskExecutionContext * taskExecutionContext
**/ **/
@ -121,6 +126,14 @@ public abstract class AbstractTask {
logger.info(" -> {}", String.join("\n\t", logs)); logger.info(" -> {}", String.join("\n\t", logs));
} }
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
/** /**
* get exit status code * get exit status code
* @return exit status code * @return exit status code

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -92,6 +92,7 @@ public class PythonTask extends AbstractTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
setVarPool(pythonCommandExecutor.getVarPool());
} }
catch (Exception e) { catch (Exception e) {
logger.error("python task failure", e); logger.error("python task failure", e);
@ -120,6 +121,14 @@ public class PythonTask extends AbstractTask {
pythonParameters.getLocalParametersMap(), pythonParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime()); taskExecutionContext.getScheduleTime());
try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
}
catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
if (paramsMap != null) { if (paramsMap != null) {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
} }
@ -135,6 +144,4 @@ public class PythonTask extends AbstractTask {
return pythonParameters; return pythonParameters;
} }
} }

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java

@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -29,13 +32,45 @@ public class LowerWeightRoundRobinTest {
@Test @Test
public void testSelect() { public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>(); Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84)); sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24)); sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15)); sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
System.out.println(sources);
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
for(int i = 0; i < 100; i ++){ HostWeight result;
System.out.println(roundRobin.select(sources)); result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
} }
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.4", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.4", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
} }
} }

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java

@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.commons.lang.ObjectUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -33,20 +33,20 @@ public class RandomSelectorTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException() { public void testSelectWithIllegalArgumentException() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
selector.select(Collections.EMPTY_LIST); selector.select(null);
} }
@Test @Test
public void testSelect1() { public void testSelect1() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20))); Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result); Assert.assertNotNull(result);
} }
@Test @Test
public void testSelect() { public void testSelect() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20))); Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result); Assert.assertNotNull(result);
} }

52
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java

@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
/** /**
* round robin selector * round robin selector
@ -33,44 +33,60 @@ public class RoundRobinSelectorTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException() { public void testSelectWithIllegalArgumentException() {
RoundRobinSelector selector = new RoundRobinSelector(); RoundRobinSelector selector = new RoundRobinSelector();
selector.select(Collections.EMPTY_LIST); selector.select(null);
} }
@Test @Test
public void testSelect1() { public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector(); RoundRobinSelector selector = new RoundRobinSelector();
Host result = null; Host result;
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
// add new host // add new host
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.3", result.getIp()); Assert.assertEquals("192.168.1.3", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.3", result.getIp()); Assert.assertEquals("192.168.1.3", result.getIp());
// remove host3 // remove host3
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
} }
@Test
public void testWarmUpRoundRobinSelector() {
RoundRobinSelector selector = new RoundRobinSelector();
Host result;
result = selector.select(
Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
}
} }

43
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
/**
* host test
*/
public class HostTest {
@Test
public void testHostWarmUp() {
Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)));
Assert.assertEquals(50, host.getWeight());
host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000)));
Assert.assertEquals(100, host.getWeight());
}
@Test
public void testHost() {
Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort());
}
}

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1464,17 +1464,20 @@ public class ProcessService {
* @param state state * @param state state
* @param endTime endTime * @param endTime endTime
* @param taskInstId taskInstId * @param taskInstId taskInstId
* @param varPool varPool
*/ */
public void changeTaskState(ExecutionStatus state, public void changeTaskState(ExecutionStatus state,
Date endTime, Date endTime,
int processId, int processId,
String appIds, String appIds,
int taskInstId) { int taskInstId,
String varPool) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(processId); taskInstance.setPid(processId);
taskInstance.setAppLink(appIds); taskInstance.setAppLink(appIds);
taskInstance.setState(state); taskInstance.setState(state);
taskInstance.setEndTime(endTime); taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
saveTaskInstance(taskInstance); saveTaskInstance(taskInstance);
} }

2
pom.xml

@ -785,6 +785,7 @@
<include>**/common/utils/StringTest.java</include> <include>**/common/utils/StringTest.java</include>
<include>**/common/utils/StringUtilsTest.java</include> <include>**/common/utils/StringUtilsTest.java</include>
<include>**/common/utils/TaskParametersUtilsTest.java</include> <include>**/common/utils/TaskParametersUtilsTest.java</include>
<include>**/common/utils/VarPoolUtilsTest.java</include>
<include>**/common/utils/HadoopUtilsTest.java</include> <include>**/common/utils/HadoopUtilsTest.java</include>
<include>**/common/utils/HttpUtilsTest.java</include> <include>**/common/utils/HttpUtilsTest.java</include>
<include>**/common/utils/KerberosHttpClientTest.java</include> <include>**/common/utils/KerberosHttpClientTest.java</include>
@ -832,6 +833,7 @@
<include>**/server/register/ZookeeperNodeManagerTest.java</include> <include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>--> <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/LogUtilsTest.java</include> <include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include> <include>**/server/utils/ParamUtilsTest.java</include>

2
sql/dolphinscheduler-postgre.sql

@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance (
worker_group varchar(64) , worker_group varchar(64) ,
timeout int DEFAULT '0' , timeout int DEFAULT '0' ,
tenant_id int NOT NULL DEFAULT '-1' , tenant_id int NOT NULL DEFAULT '-1' ,
var_pool text ,
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;
create index process_instance_index on t_ds_process_instance (process_definition_id,id); create index process_instance_index on t_ds_process_instance (process_definition_id,id);
@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance (
executor_id int DEFAULT NULL , executor_id int DEFAULT NULL ,
first_submit_time timestamp DEFAULT NULL , first_submit_time timestamp DEFAULT NULL ,
delay_time int DEFAULT '0' , delay_time int DEFAULT '0' ,
var_pool text ,
PRIMARY KEY (id) PRIMARY KEY (id)
) ; ) ;

2
sql/dolphinscheduler_mysql.sql

@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` (
`worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
`timeout` int(11) DEFAULT '0' COMMENT 'time out', `timeout` int(11) DEFAULT '0' COMMENT 'time out',
`tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE, KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`) USING BTREE KEY `start_time_index` (`start_time`) USING BTREE
@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` (
`executor_id` int(11) DEFAULT NULL, `executor_id` int(11) DEFAULT NULL,
`first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
`delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
`var_pool` longtext COMMENT 'var_pool',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE, KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE, KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,

40
sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql

@ -56,6 +56,46 @@ delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_delay_time(); CALL uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time; DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time;
-- uc_dolphin_T_t_ds_task_instance_A_var_pool
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_var_pool();
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool;
-- uc_dolphin_T_t_ds_process_instance_A_var_pool
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_A_var_pool();
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool;
-- uc_dolphin_T_t_ds_process_definition_A_modify_by -- uc_dolphin_T_t_ds_process_definition_A_modify_by
drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version; drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version;
delimiter d// delimiter d//

36
sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql

@ -51,6 +51,42 @@ delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time(); SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time();
-- uc_dolphin_T_t_ds_process_instance_A_var_pool
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool();
-- uc_dolphin_T_t_ds_task_instance_A_var_pool
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='var_pool')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool();
-- uc_dolphin_T_t_ds_process_definition_A_modify_by -- uc_dolphin_T_t_ds_process_definition_A_modify_by
delimiter d// delimiter d//
CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$ CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$

4
tools/dependencies/check-LICENSE.sh

@ -25,7 +25,9 @@ tar -zxf dolphinscheduler-dist/target/apache-dolphinscheduler*-bin.tar.gz --stri
# licenses # licenses
echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo' -Dexec.args='${project.artifactId}-${project.version}.jar' exec:exec | tee self-modules.txt echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo' -Dexec.args='${project.artifactId}-${project.version}.jar' exec:exec | tee self-modules.txt
echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt echo '=== Distributed dependencies: ' && find dist/lib -name "*.jar" | tee all-dependencies.txt
# The prefix "dist/lib/" (9 chars) should be stripped to be ready to compare
sed -i 's/.\{9\}//' all-dependencies.txt
# Exclude all self modules(jars) to generate all third-party dependencies # Exclude all self modules(jars) to generate all third-party dependencies
echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt

Loading…
Cancel
Save