From 8ef043151c5f1a5cc0f87edd08d1614f12b8a421 Mon Sep 17 00:00:00 2001 From: Hsu Pu Date: Mon, 21 Sep 2020 16:49:25 +0800 Subject: [PATCH 1/6] done (#3752) --- tools/dependencies/check-LICENSE.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/dependencies/check-LICENSE.sh b/tools/dependencies/check-LICENSE.sh index d414bd40c6..0072554e29 100755 --- a/tools/dependencies/check-LICENSE.sh +++ b/tools/dependencies/check-LICENSE.sh @@ -25,7 +25,9 @@ tar -zxf dolphinscheduler-dist/target/apache-dolphinscheduler*-bin.tar.gz --stri # licenses echo '=== Self modules: ' && ./mvnw --batch-mode --quiet -Dexec.executable='echo' -Dexec.args='${project.artifactId}-${project.version}.jar' exec:exec | tee self-modules.txt -echo '=== Distributed dependencies: ' && ls dist/lib | tee all-dependencies.txt +echo '=== Distributed dependencies: ' && find dist/lib -name "*.jar" | tee all-dependencies.txt +# The prefix "dist/lib/" (9 chars) should be stripped to be ready to compare +sed -i 's/.\{9\}//' all-dependencies.txt # Exclude all self modules(jars) to generate all third-party dependencies echo '=== Third party dependencies: ' && grep -vf self-modules.txt all-dependencies.txt | tee third-party-dependencies.txt From 375ec125c5cc50a9634fac120a9936ccfc2807c7 Mon Sep 17 00:00:00 2001 From: BoYiZhang <39816903+BoYiZhang@users.noreply.github.com> Date: Tue, 22 Sep 2020 17:38:17 +0800 Subject: [PATCH 2/6] [Feature-3773][processInstance, taskInstance] Change title name : Replacement System.currentTimeMillis () (#3774) * fix bug Delete invalid field: executorcores Modify verification prompt * fix bug Delete invalid field: executorcores Modify verification prompt * fix bug Delete invalid field: executorcores Modify verification prompt * dag add close button * reset last version * reset last version * dag add close buttion dag add close buttion * update CLICK_SAVE_WORKFLOW_BUTTON xpath * updae CLICK_SAVE_WORKFLOW_BUTTON xpath * updae CLICK_SAVE_WORKFLOW_BUTTON xpath * updae CLICK_SAVE_WORKFLOW_BUTTON xpath * Update CreateWorkflowLocator.java modify submit workflow button * Update CreateWorkflowLocator.java * Update CreateWorkflowLocator.java modify CLICK_ADD_BUTTON * Update CreateWorkflowLocator.java delete print * Update CreateWorkflowLocator.java 1 * Update CreateWorkflowLocator.java 1 * Setting '-XX:+DisableExplicitGC ' causes netty memory leaks in addition update '- XX: largepagesizeinbytes = 128M' to '- XX: largepagesizeinbytes = 10M' * Update dag.vue * Update dag.vue * Update dag.vue * Update CreateWorkflowLocator.java * Revert "Setting '-XX:+DisableExplicitGC ' causes netty memory leaks" This reverts commit 3a2cba7a * Setting '-XX:+DisableExplicitGC ' causes netty memory leaks in addition update '- XX: largepagesizeinbytes = 128M' to '- XX: largepagesizeinbytes = 10M' * Update dolphinscheduler-daemon.sh * Replacement System.currentTimeMillis () to DateUtils.getCurrentTimeStamp() * update checkstyle * update checkstyle Co-authored-by: dailidong Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> --- .../impl/ProcessDefinitionServiceImpl.java | 4 +- .../dolphinscheduler/common/Constants.java | 4 + .../common/utils/DateUtils.java | 10 + .../common/utils/DateUtilsTest.java | 19 +- .../dao/entity/ProcessInstance.java | 200 +++++++++++------- 5 files changed, 155 insertions(+), 82 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index a5e297072c..f4ea1bf065 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -845,7 +845,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements try { createProcessResult = createProcessDefinition(loginUser , currentProjectName, - processDefinitionName + "_import_" + System.currentTimeMillis(), + processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), @@ -1433,7 +1433,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return createProcessDefinition( loginUser, targetProject.getName(), - processDefinition.getName() + "_copy_" + System.currentTimeMillis(), + processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp(), processDefinition.getProcessDefinitionJson(), processDefinition.getDescription(), processDefinition.getLocations(), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index ba8b0c4921..85066cc55a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -266,6 +266,10 @@ public final class Constants { */ public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss"; + /** + * date format of yyyyMMddHHmmssSSS + */ + public static final String YYYYMMDDHHMMSSSSS = "yyyyMMddHHmmssSSS"; /** * http connect time out */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 283b4e7f80..80c0ed411c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -444,4 +444,14 @@ public class DateUtils { long usedTime = (System.currentTimeMillis() - baseTime.getTime()) / 1000; return intervalSeconds - usedTime; } + + /** + * get current time stamp : yyyyMMddHHmmssSSS + * + * @return date string + */ + public static String getCurrentTimeStamp() { + return getCurrentTime(Constants.YYYYMMDDHHMMSSSSS); + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java index 6800f6b542..fa16446cd8 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.utils; -import org.junit.Assert; -import org.junit.Test; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import org.junit.Assert; +import org.junit.Test; + public class DateUtilsTest { @Test public void format2Readable() throws ParseException { @@ -38,10 +40,8 @@ public class DateUtilsTest { Assert.assertEquals("01 09:23:08", readableDate); } - @Test - public void testWeek(){ - + public void testWeek() { Date curr = DateUtils.stringToDate("2019-02-01 00:00:00"); Date monday1 = DateUtils.stringToDate("2019-01-28 00:00:00"); Date sunday1 = DateUtils.stringToDate("2019-02-03 00:00:00"); @@ -54,7 +54,7 @@ public class DateUtilsTest { } @Test - public void diffHours(){ + public void diffHours() { Date d1 = DateUtils.stringToDate("2019-01-28 00:00:00"); Date d2 = DateUtils.stringToDate("2019-01-28 20:00:00"); Assert.assertEquals(DateUtils.diffHours(d1, d2), 20); @@ -150,4 +150,11 @@ public class DateUtilsTest { Date curr = DateUtils.getEndOfHour(d1); Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); } + + @Test + public void getCurrentTimeStamp() { + String timeStamp = DateUtils.getCurrentTimeStamp(); + Assert.assertNotNull(timeStamp); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 28f1eba975..3d1a756d25 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -14,18 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.DateUtils; + +import java.util.Date; +import java.util.Objects; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.common.enums.*; - -import java.util.Date; -import java.util.Objects; /** * process instance @@ -36,7 +45,7 @@ public class ProcessInstance { /** * id */ - @TableId(value="id", type=IdType.AUTO) + @TableId(value = "id", type = IdType.AUTO) private int id; /** * process definition id @@ -53,13 +62,13 @@ public class ProcessInstance { /** * start time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date startTime; /** * end time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date endTime; /** @@ -108,7 +117,7 @@ public class ProcessInstance { private FailureStrategy failureStrategy; /** - * warning type + * warning type */ private WarningType warningType; @@ -120,13 +129,13 @@ public class ProcessInstance { /** * schedule time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date scheduleTime; /** * command start time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date commandStartTime; /** @@ -189,6 +198,7 @@ public class ProcessInstance { /** * process duration + * * @return */ @TableField(exist = false) @@ -226,19 +236,24 @@ public class ProcessInstance { @TableField(exist = false) private String receiversCc; - public ProcessInstance(){ + public ProcessInstance() { } /** * set the process name with process define version and timestamp + * * @param processDefinition processDefinition */ - public ProcessInstance(ProcessDefinition processDefinition){ + public ProcessInstance(ProcessDefinition processDefinition) { this.processDefinition = processDefinition; - this.name = processDefinition.getName() + "-" + - processDefinition.getVersion() + "-" + - System.currentTimeMillis(); + this.name = processDefinition.getName() + + "-" + + + processDefinition.getVersion() + + "-" + + + DateUtils.getCurrentTimeStamp(); } public ProcessDefinition getProcessDefinition() { @@ -313,7 +328,6 @@ public class ProcessInstance { this.name = name; } - public String getHost() { return host; } @@ -322,7 +336,6 @@ public class ProcessInstance { this.host = host; } - public CommandType getCommandType() { return commandType; } @@ -347,7 +360,6 @@ public class ProcessInstance { this.taskDependType = taskDependType; } - public int getMaxTryTimes() { return maxTryTimes; } @@ -364,8 +376,7 @@ public class ProcessInstance { this.failureStrategy = failureStrategy; } - - public boolean isProcessInstanceStop(){ + public boolean isProcessInstanceStop() { return this.state.typeIsFinished(); } @@ -441,7 +452,6 @@ public class ProcessInstance { this.executorId = executorId; } - public Flag getIsSubProcess() { return isSubProcess; } @@ -457,6 +467,7 @@ public class ProcessInstance { public void setProcessInstancePriority(Priority processInstancePriority) { this.processInstancePriority = processInstancePriority; } + public String getLocations() { return locations; } @@ -477,6 +488,10 @@ public class ProcessInstance { return historyCmd; } + public void setHistoryCmd(String historyCmd) { + this.historyCmd = historyCmd; + } + public String getExecutorName() { return executorName; } @@ -485,28 +500,26 @@ public class ProcessInstance { this.executorName = executorName; } - public void setHistoryCmd(String historyCmd) { - this.historyCmd = historyCmd; - } - /** * add command to history + * * @param cmd cmd */ - public void addHistoryCmd(CommandType cmd){ - if(StringUtils.isNotEmpty(this.historyCmd)){ + public void addHistoryCmd(CommandType cmd) { + if (StringUtils.isNotEmpty(this.historyCmd)) { this.historyCmd = String.format("%s,%s", this.historyCmd, cmd.toString()); - }else{ + } else { this.historyCmd = cmd.toString(); } } /** * check this process is start complement data + * * @return whether complement data */ - public boolean isComplementData(){ - if(StringUtils.isEmpty(this.historyCmd)){ + public boolean isComplementData() { + if (StringUtils.isEmpty(this.historyCmd)) { return false; } return historyCmd.startsWith(CommandType.COMPLEMENT_DATA.toString()); @@ -515,10 +528,11 @@ public class ProcessInstance { /** * get current command type, * if start with complement data,return complement + * * @return CommandType */ - public CommandType getCmdTypeIfComplement(){ - if(isComplementData()){ + public CommandType getCmdTypeIfComplement() { + if (isComplementData()) { return CommandType.COMPLEMENT_DATA; } return commandType; @@ -556,15 +570,14 @@ public class ProcessInstance { this.timeout = timeout; } + public int getTenantId() { + return this.tenantId; + } public void setTenantId(int tenantId) { this.tenantId = tenantId; } - public int getTenantId() { - return this.tenantId ; - } - public String getReceivers() { return receivers; } @@ -583,44 +596,83 @@ public class ProcessInstance { @Override public String toString() { - return "ProcessInstance{" + - "id=" + id + - ", processDefinitionId=" + processDefinitionId + - ", state=" + state + - ", recovery=" + recovery + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", runTimes=" + runTimes + - ", name='" + name + '\'' + - ", host='" + host + '\'' + - ", processDefinition=" + processDefinition + - ", commandType=" + commandType + - ", commandParam='" + commandParam + '\'' + - ", taskDependType=" + taskDependType + - ", maxTryTimes=" + maxTryTimes + - ", failureStrategy=" + failureStrategy + - ", warningType=" + warningType + - ", warningGroupId=" + warningGroupId + - ", scheduleTime=" + scheduleTime + - ", commandStartTime=" + commandStartTime + - ", globalParams='" + globalParams + '\'' + - ", processInstanceJson='" + processInstanceJson + '\'' + - ", executorId=" + executorId + - ", tenantCode='" + tenantCode + '\'' + - ", queue='" + queue + '\'' + - ", isSubProcess=" + isSubProcess + - ", locations='" + locations + '\'' + - ", connects='" + connects + '\'' + - ", historyCmd='" + historyCmd + '\'' + - ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + - ", duration=" + duration + - ", processInstancePriority=" + processInstancePriority + - ", workerGroup='" + workerGroup + '\'' + - ", timeout=" + timeout + - ", tenantId=" + tenantId + - ", receivers='" + receivers + '\'' + - ", receiversCc='" + receiversCc + '\'' + - '}'; + return "ProcessInstance{" + + "id=" + id + + ", processDefinitionId=" + processDefinitionId + + ", state=" + state + + ", recovery=" + recovery + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", runTimes=" + runTimes + + ", name='" + name + '\'' + + ", host='" + host + '\'' + + ", processDefinition=" + + processDefinition + + ", commandType=" + + commandType + + ", commandParam='" + + commandParam + + '\'' + + ", taskDependType=" + + taskDependType + + ", maxTryTimes=" + + maxTryTimes + + ", failureStrategy=" + + failureStrategy + + ", warningType=" + + warningType + + ", warningGroupId=" + + warningGroupId + + ", scheduleTime=" + + scheduleTime + + ", commandStartTime=" + + commandStartTime + + ", globalParams='" + + globalParams + + '\'' + + ", processInstanceJson='" + + processInstanceJson + + '\'' + + ", executorId=" + + executorId + + ", tenantCode='" + + tenantCode + + '\'' + + ", queue='" + + queue + + '\'' + + ", isSubProcess=" + + isSubProcess + + ", locations='" + + locations + + '\'' + + ", connects='" + + connects + + '\'' + + ", historyCmd='" + + historyCmd + + '\'' + + ", dependenceScheduleTimes='" + + dependenceScheduleTimes + + '\'' + + ", duration=" + + duration + + ", processInstancePriority=" + + processInstancePriority + + ", workerGroup='" + + workerGroup + + '\'' + + ", timeout=" + + timeout + + ", tenantId=" + + tenantId + + ", receivers='" + + receivers + + '\'' + + ", receiversCc='" + + receiversCc + + '\'' + + '}'; } @Override From db663a13a37c6bc4e47286ebbc8efb2f20efa57b Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 22 Sep 2020 17:39:39 +0800 Subject: [PATCH 3/6] [Improvement][remote]load balance warm up (#3770) * [Improvement][remote]load balance warm up * reformat code * reformat code * code smell * code smell * add test * add test * add test * add test * fix bug * fix bug * add docs * add host test * add host test * add host test * add docs * code reformat * code reformat --- .../remote/utils/Constants.java | 5 ++ .../dolphinscheduler/remote/utils/Host.java | 48 ++++++++++--- .../dispatch/host/assign/HostWeight.java | 35 +++++++--- .../worker/registry/WorkerRegistry.java | 44 +++++++----- .../assign/LowerWeightRoundRobinTest.java | 51 +++++++++++--- .../host/assign/RandomSelectorTest.java | 16 ++--- .../host/assign/RoundRobinSelectorTest.java | 70 ++++++++++++------- .../server/utils/HostTest.java | 43 ++++++++++++ pom.xml | 1 + 9 files changed, 232 insertions(+), 81 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 370467f6ca..91d4ac245e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -53,4 +53,9 @@ public class Constants { */ public static final String OS_NAME = System.getProperty("os.name"); + /** + * warm up time + */ + public static final int WARM_UP_TIME = 10 * 60 * 1000; + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index b905a9fea8..c18d02f09a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.utils; import java.io.Serializable; @@ -44,6 +45,11 @@ public class Host implements Serializable { */ private int weight; + /** + * startTime + */ + private long startTime; + /** * workGroup */ @@ -58,19 +64,21 @@ public class Host implements Serializable { this.address = ip + ":" + port; } - public Host(String ip, int port, int weight) { + public Host(String ip, int port, int weight, long startTime) { this.ip = ip; this.port = port; this.address = ip + ":" + port; - this.weight = weight; + this.weight = getWarmUpWeight(weight, startTime); + this.startTime = startTime; } - public Host(String ip, int port, int weight,String workGroup) { + public Host(String ip, int port, int weight, long startTime, String workGroup) { this.ip = ip; this.port = port; this.address = ip + ":" + port; - this.weight = weight; - this.workGroup=workGroup; + this.weight = getWarmUpWeight(weight, startTime); + this.workGroup = workGroup; + this.startTime = startTime; } public String getAddress() { @@ -98,6 +106,14 @@ public class Host implements Serializable { this.weight = weight; } + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + public int getPort() { return port; } @@ -133,8 +149,8 @@ public class Host implements Serializable { if (parts.length == 2) { host = new Host(parts[0], Integer.parseInt(parts[1])); } - if (parts.length == 3) { - host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2])); + if (parts.length == 4) { + host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3])); } return host; } @@ -169,8 +185,20 @@ public class Host implements Serializable { @Override public String toString() { - return "Host{" + - "address='" + address + '\'' + - '}'; + return "Host{" + + "address='" + address + '\'' + + '}'; + } + + /** + * warm up + */ + private int getWarmUpWeight(int weight, long startTime) { + long uptime = System.currentTimeMillis() - startTime; + //If the warm-up is not over, reduce the weight + if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { + return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME)); + } + return weight; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index ebceea7b13..298a62a6d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; +import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Host; /** @@ -37,9 +38,9 @@ public class HostWeight { private int currentWeight; public HostWeight(Host host, double cpu, double memory, double loadAverage) { - this.weight = calculateWeight(cpu, memory, loadAverage); - this.host = host ; - this.currentWeight = weight ; + this.weight = getWeight(cpu, memory, loadAverage, host); + this.host = host; + this.currentWeight = weight; } public int getCurrentWeight() { @@ -60,14 +61,28 @@ public class HostWeight { @Override public String toString() { - return "HostWeight{" + - "host=" + host + - ", weight=" + weight + - ", currentWeight=" + currentWeight + - '}'; + return "HostWeight{" + + "host=" + host + + ", weight=" + weight + + ", currentWeight=" + currentWeight + + '}'; } - private int calculateWeight(double cpu, double memory, double loadAverage){ - return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + private int getWeight(double cpu, double memory, double loadAverage, Host host) { + int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + return getWarmUpWeight(host, calculateWeight); + + } + + /** + * If the warm-up is not over, add the weight + */ + private int getWarmUpWeight(Host host, int weight) { + long startTime = host.getStartTime(); + long uptime = System.currentTimeMillis() - startTime; + if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { + return (int) ((weight * Constants.WARM_UP_TIME) / uptime); + } + return weight; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 36998fad63..904ea3a807 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -14,19 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.registry; -import java.util.Date; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import static org.apache.dolphinscheduler.common.Constants.COLON; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; -import javax.annotation.PostConstruct; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; + +import java.util.Date; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,8 +48,6 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; -import static org.apache.dolphinscheduler.common.Constants.*; - /** * worker registry @@ -111,10 +116,10 @@ public class WorkerRegistry { } HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, - this.workerConfig.getWorkerReservedMemory(), - this.workerConfig.getWorkerMaxCpuloadAvg(), - workerZkPaths, - this.zookeeperRegistryCenter); + this.workerConfig.getWorkerReservedMemory(), + this.workerConfig.getWorkerMaxCpuloadAvg(), + workerZkPaths, + this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); @@ -142,6 +147,7 @@ public class WorkerRegistry { String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String weight = getWorkerWeight(); + String workerStartTime = COLON + System.currentTimeMillis(); for (String workGroup : this.workerGroups) { StringBuilder workerZkPathBuilder = new StringBuilder(100); @@ -153,6 +159,7 @@ public class WorkerRegistry { workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(address); workerZkPathBuilder.append(weight); + workerZkPathBuilder.append(workerStartTime); workerZkPaths.add(workerZkPathBuilder.toString()); } return workerZkPaths; @@ -162,13 +169,14 @@ public class WorkerRegistry { * get local address */ private String getLocalAddress() { - return NetUtils.getHost() + ":" + workerConfig.getListenPort(); + return NetUtils.getHost() + COLON + workerConfig.getListenPort(); } /** * get Worker Weight */ private String getWorkerWeight() { - return ":" + workerConfig.getWeight(); + return COLON + workerConfig.getWeight(); } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java index fadaa84a69..fd5dda0873 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java @@ -14,9 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import org.apache.dolphinscheduler.remote.utils.Host; + +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -27,15 +30,47 @@ public class LowerWeightRoundRobinTest { @Test - public void testSelect(){ + public void testSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24)); - sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15)); - System.out.println(sources); + sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24)); + sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15)); + + LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); + HostWeight result; + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + } + + @Test + public void testWarmUpSelect() { + Collection sources = new ArrayList<>(); + sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84)); + LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); - for(int i = 0; i < 100; i ++){ - System.out.println(roundRobin.select(sources)); - } + HostWeight result; + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.4", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.4", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java index f25a227947..14aa7b8f1f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.commons.lang.ObjectUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; + import org.junit.Assert; import org.junit.Test; @@ -31,22 +31,22 @@ import java.util.Collections; public class RandomSelectorTest { @Test(expected = IllegalArgumentException.class) - public void testSelectWithIllegalArgumentException(){ + public void testSelectWithIllegalArgumentException() { RandomSelector selector = new RandomSelector(); - selector.select(Collections.EMPTY_LIST); + selector.select(null); } @Test - public void testSelect1(){ + public void testSelect1() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20))); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis()))); Assert.assertNotNull(result); } @Test - public void testSelect(){ + public void testSelect() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20))); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis()))); Assert.assertNotNull(result); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java index ed62caaa2c..9e41cd68bf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; + import org.junit.Assert; import org.junit.Test; import java.util.Arrays; import java.util.Collections; -import java.util.List; /** * round robin selector @@ -33,43 +33,59 @@ public class RoundRobinSelectorTest { @Test(expected = IllegalArgumentException.class) public void testSelectWithIllegalArgumentException() { RoundRobinSelector selector = new RoundRobinSelector(); - selector.select(Collections.EMPTY_LIST); + selector.select(null); } @Test public void testSelect1() { RoundRobinSelector selector = new RoundRobinSelector(); - Host result = null; - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Host result; + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); // add new host - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.3", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.3",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.2",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.3",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.3", result.getIp()); // remove host3 - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.2",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + + } + + @Test + public void testWarmUpRoundRobinSelector() { + RoundRobinSelector selector = new RoundRobinSelector(); + Host result; + result = selector.select( + Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java new file mode 100644 index 0000000000..6273569485 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.remote.utils.Host; + +import org.junit.Assert; +import org.junit.Test; + +/** + * host test + */ +public class HostTest { + + @Test + public void testHostWarmUp() { + Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000))); + Assert.assertEquals(50, host.getWeight()); + host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000))); + Assert.assertEquals(100, host.getWeight()); + } + + @Test + public void testHost() { + Host host = Host.of("192.158.2.2:22"); + Assert.assertEquals(22, host.getPort()); + } +} diff --git a/pom.xml b/pom.xml index 207518c936..c8cb5a9dea 100644 --- a/pom.xml +++ b/pom.xml @@ -832,6 +832,7 @@ **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java + **/server/utils/HostTest.java **/server/utils/LogUtilsTest.java **/server/utils/ParamUtilsTest.java From 3b581455fc572c01c93a9d5ecfb955a966a20263 Mon Sep 17 00:00:00 2001 From: guyinyou <36399867+guyinyou@users.noreply.github.com> Date: Fri, 25 Sep 2020 13:20:42 +0800 Subject: [PATCH 4/6] [Feature]Add Python task "task variable / result transfer" implementation (#3659) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 增加Python Task的“任务变量/结果传递”实现 Signed-off-by: 古崟佑 * add two files license Signed-off-by: 古崟佑 * fix 'server/worker/task/AbstractCommandExecutor.java' code style Signed-off-by: 1941815847Cy4 <1941815847cy4@kuaishou.com> * update DB Signed-off-by: 古崟佑 * update DB -- 2 Signed-off-by: 古崟佑 * fix codeStyle Signed-off-by: 古崟佑 * fix codestyle Signed-off-by: 古崟佑 * fix codeStyle Signed-off-by: 古崟佑 * fix codeStyle Signed-off-by: 古崟佑 * fix codeStyle Signed-off-by: 古崟佑 * add VarPoolUtils Test Signed-off-by: 古崟佑 * fix VarPoolUtilsTest codeStyle Signed-off-by: 古崟佑 * fix VarPoolUtilsTest codeStyle Signed-off-by: 古崟佑 * fix VarPoolUtilsTest codeStyle Signed-off-by: 古崟佑 * fix VarPoolUtilsTest codeStyle Signed-off-by: 古崟佑 * fix VarPoolUtilsTest codeStyle Signed-off-by: 古崟佑 * add test config for VarPoolUtilsTest Signed-off-by: 古崟佑 * fix unit test Signed-off-by: 古崟佑 * fix codeStyle Signed-off-by: 古崟佑 * fix VarPoolUtilsTest.java Signed-off-by: 古崟佑 * fix Signed-off-by: 古崟佑 * change the test class path Signed-off-by: 古崟佑 * fix Signed-off-by: 古崟佑 * fix "print the error message" Signed-off-by: 古崟佑 * fix bug Signed-off-by: 古崟佑 * fix Signed-off-by: 古崟佑 Co-authored-by: 1941815847Cy4 <1941815847cy4@kuaishou.com> --- .../common/task/TaskParams.java | 78 ++++++++ .../common/utils/VarPoolUtils.java | 124 ++++++++++++ .../common/utils/VarPoolUtilsTest.java | 73 +++++++ .../dao/entity/ProcessInstance.java | 13 ++ .../dao/entity/TaskInstance.java | 12 ++ .../command/TaskExecuteResponseCommand.java | 12 ++ .../processor/TaskResponseProcessor.java | 3 +- .../processor/queue/TaskResponseEvent.java | 18 +- .../processor/queue/TaskResponseService.java | 3 +- .../master/runner/MasterExecThread.java | 15 +- .../worker/runner/TaskExecuteThread.java | 1 + .../worker/task/AbstractCommandExecutor.java | 15 +- .../server/worker/task/AbstractTask.java | 13 ++ .../server/worker/task/python/PythonTask.java | 189 +++++++++--------- .../service/process/ProcessService.java | 5 +- pom.xml | 1 + sql/dolphinscheduler-postgre.sql | 2 + sql/dolphinscheduler_mysql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 40 ++++ .../postgresql/dolphinscheduler_ddl.sql | 36 ++++ 20 files changed, 555 insertions(+), 100 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java new file mode 100644 index 0000000000..abea2d95b0 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.task; + +import java.util.Map; + +public class TaskParams { + + private String rawScript; + private Map[] localParams; + + public void setRawScript(String rawScript) { + this.rawScript = rawScript; + } + + public void setLocalParams(Map[] localParams) { + this.localParams = localParams; + } + + public String getRawScript() { + return rawScript; + } + + public void setLocalParamValue(String prop, Object value) { + if (localParams == null || value == null) { + return; + } + for (int i = 0; i < localParams.length; i++) { + if (localParams[i].get("prop").equals(prop)) { + localParams[i].put("value", (String)value); + } + } + } + + public void setLocalParamValue(Map propToValue) { + if (localParams == null || propToValue == null) { + return; + } + for (int i = 0; i < localParams.length; i++) { + String prop = localParams[i].get("prop"); + if (propToValue.containsKey(prop)) { + localParams[i].put("value",(String)propToValue.get(prop)); + } + } + } + + public String getLocalParamValue(String prop) { + if (localParams == null) { + return null; + } + for (int i = 0; i < localParams.length; i++) { + String tmpProp = localParams[i].get("prop"); + if (tmpProp.equals(prop)) { + return localParams[i].get("value"); + } + } + return null; + } + + public Map[] getLocalParams() { + return localParams; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java new file mode 100644 index 0000000000..837e96f55f --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.task.TaskParams; + +import java.text.ParseException; +import java.util.Map; + +public class VarPoolUtils { + /** + * getTaskNodeLocalParam + * @param taskNode taskNode + * @param prop prop + * @return localParamForProp + */ + public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return null; + } + return taskParams.getLocalParamValue(prop); + } + + /** + * setTaskNodeLocalParams + * @param taskNode taskNode + * @param prop LocalParamName + * @param value LocalParamValue + */ + public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, Object value) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return; + } + taskParams.setLocalParamValue(prop, value); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + } + + /** + * setTaskNodeLocalParams + * @param taskNode taskNode + * @param propToValue propToValue + */ + public static void setTaskNodeLocalParams(TaskNode taskNode, Map propToValue) { + String taskParamsJson = taskNode.getParams(); + TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, TaskParams.class); + if (taskParams == null) { + return; + } + taskParams.setLocalParamValue(propToValue); + taskNode.setParams(JSONUtils.toJsonString(taskParams)); + } + + /** + * convertVarPoolToMap + * @param propToValue propToValue + * @param varPool varPool + * @throws ParseException ParseException + */ + public static void convertVarPoolToMap(Map propToValue, String varPool) throws ParseException { + if (varPool == null || propToValue == null) { + return; + } + String[] splits = varPool.split("\\$VarPool\\$"); + for (String kv : splits) { + String[] kvs = kv.split(","); + if (kvs.length == 2) { + propToValue.put(kvs[0], kvs[1]); + } else { + throw new ParseException(kv, 2); + } + } + } + + /** + * convertPythonScriptPlaceholders + * @param rawScript rawScript + * @return String + * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException + */ + public static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { + int len = "${setShareVar(${".length(); + int scriptStart = 0; + while ((scriptStart = rawScript.indexOf("${setShareVar(${", scriptStart)) != -1) { + int start = -1; + int end = rawScript.indexOf('}', scriptStart + len); + String prop = rawScript.substring(scriptStart + len, end); + + start = rawScript.indexOf(',', end); + end = rawScript.indexOf(')', start); + + String value = rawScript.substring(start + 1, end); + + start = rawScript.indexOf('}', start) + 1; + end = rawScript.length(); + + String replaceScript = String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value); + + rawScript = rawScript.substring(0, scriptStart) + replaceScript + rawScript.substring(start, end); + + scriptStart += replaceScript.length(); + } + return rawScript; + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java new file mode 100644 index 0000000000..e47203c225 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.model.TaskNode; + +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VarPoolUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(VarPoolUtilsTest.class); + + @Test + public void testSetTaskNodeLocalParams() { + String taskJson = "{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\"," + + "\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\"," + + "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is python task \\\\\\\",${p0})\\\"," + + "\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}]," + + "\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," + + "\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\"," + + "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}"; + TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + + VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1"); + Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test1"); + + ConcurrentHashMap propToValue = new ConcurrentHashMap(); + propToValue.put("p1", "test2"); + + VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue); + Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, "p1"), "test2"); + } + + @Test + public void testConvertVarPoolToMap() throws Exception { + String varPool = "p1,66$VarPool$p2,69$VarPool$"; + ConcurrentHashMap propToValue = new ConcurrentHashMap(); + VarPoolUtils.convertVarPoolToMap(propToValue, varPool); + Assert.assertEquals((String)propToValue.get("p1"), "66"); + Assert.assertEquals((String)propToValue.get("p2"), "69"); + logger.info(propToValue.toString()); + } + + @Test + public void testConvertPythonScriptPlaceholders() throws Exception { + String rawScript = "print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};"; + rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript); + Assert.assertEquals(rawScript, "print(${p1});\n" + + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n" + + "print(\"${{setValue({},{})}}\".format(\"p2\",4));"); + logger.info(rawScript); + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 3d1a756d25..e3a3f11386 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -224,6 +224,11 @@ public class ProcessInstance { */ private int tenantId; + /** + * varPool string + */ + private String varPool; + /** * receivers for api */ @@ -256,6 +261,14 @@ public class ProcessInstance { DateUtils.getCurrentTimeStamp(); } + public String getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public ProcessDefinition getProcessDefinition() { return processDefinition; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 9688200b2c..b13ca87e38 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -211,6 +211,11 @@ public class TaskInstance implements Serializable { */ private int executorId; + /** + * varPool string + */ + private String varPool; + /** * executor name */ @@ -232,7 +237,14 @@ public class TaskInstance implements Serializable { this.executePath = executePath; } + public String getVarPool() { + return varPool; + } + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public ProcessInstance getProcessInstance() { return processInstance; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index e559334f48..7f6ee668a8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -63,7 +63,19 @@ public class TaskExecuteResponseCommand implements Serializable { */ private String appIds; + /** + * varPool string + */ + private String varPool; + public void setVarPool(String varPool) { + this.varPool = varPool; + } + + public String getVarPool() { + return varPool; + } + public int getTaskInstanceId() { return taskInstanceId; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index b04b930fd4..2633ccd634 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -90,7 +90,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor { responseCommand.getEndTime(), responseCommand.getProcessId(), responseCommand.getAppIds(), - responseCommand.getTaskInstanceId()); + responseCommand.getTaskInstanceId(), + responseCommand.getVarPool()); taskResponseService.addResponse(taskResponseEvent); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 051cc388bf..ba07be50f3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -79,7 +79,12 @@ public class TaskResponseEvent { */ private Event event; - public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId){ + /** + * varPool + */ + private String varPool; + + public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, String workerAddress, String executePath, String logPath, int taskInstanceId) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setStartTime(startTime); @@ -91,7 +96,7 @@ public class TaskResponseEvent { return event; } - public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId){ + public static TaskResponseEvent newResult(ExecutionStatus state, Date endTime, int processId, String appIds, int taskInstanceId, String varPool) { TaskResponseEvent event = new TaskResponseEvent(); event.setState(state); event.setEndTime(endTime); @@ -99,9 +104,18 @@ public class TaskResponseEvent { event.setAppIds(appIds); event.setTaskInstanceId(taskInstanceId); event.setEvent(Event.RESULT); + event.setVarPool(varPool); return event; } + public String getVarPool() { + return varPool; + } + + public void setVarPool(String varPool) { + this.varPool = varPool; + } + public int getTaskInstanceId() { return taskInstanceId; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index ba07313a9a..6434db70e5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -139,7 +139,8 @@ public class TaskResponseService { taskResponseEvent.getEndTime(), taskResponseEvent.getProcessId(), taskResponseEvent.getAppIds(), - taskResponseEvent.getTaskInstanceId()); + taskResponseEvent.getTaskInstanceId(), + taskResponseEvent.getVarPool()); break; default: throw new IllegalArgumentException("invalid event type : " + event); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 788b30638e..3c28e16651 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.VarPoolUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable { * submit post node * @param parentNodeName parent node name */ + private Map propToValue = new ConcurrentHashMap(); private void submitPostNode(String parentNodeName){ List submitTaskNodeList = parsePostNodeList(parentNodeName); List taskInstances = new ArrayList<>(); for(String taskNode : submitTaskNodeList){ + try { + VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool()); + } catch (ParseException e) { + logger.error("parse {} exception", processInstance.getVarPool(), e); + throw new RuntimeException(); + } + TaskNode taskNodeObject = dag.getNode(taskNode); + VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue); taskInstances.add(createTaskInstance(processInstance, taskNode, - dag.getNode(taskNode))); + taskNodeObject)); } // if previous node success , post node submit @@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable { task.getName(), task.getId(), task.getState()); // node success , post node submit if(task.getState() == ExecutionStatus.SUCCESS){ + processInstance.setVarPool(task.getVarPool()); + processService.updateProcessInstance(processInstance); completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); continue; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 3ba49451c7..58f743303c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setEndTime(new Date()); responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); + responseCommand.setVarPool(task.getVarPool()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 3dedeced06..dddd1a64b7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor { */ protected static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); + protected StringBuilder varPool = new StringBuilder(); /** * process */ @@ -234,7 +235,10 @@ public abstract class AbstractCommandExecutor { return result; } - + public String getVarPool() { + return varPool.toString(); + } + /** * cancel application * @throws Exception exception @@ -347,8 +351,13 @@ public abstract class AbstractCommandExecutor { long lastFlushTime = System.currentTimeMillis(); while ((line = inReader.readLine()) != null) { - logBuffer.add(line); - lastFlushTime = flush(lastFlushTime); + if (line.startsWith("${setValue(")) { + varPool.append(line.substring("${setValue(".length(), line.length() - 2)); + varPool.append("$VarPool$"); + } else { + logBuffer.add(line); + lastFlushTime = flush(lastFlushTime); + } } } catch (Exception e) { logger.error(e.getMessage(),e); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index ae03932a52..1a66349817 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -47,6 +47,11 @@ import java.util.Map; */ public abstract class AbstractTask { + /** + * varPool string + */ + protected String varPool; + /** * taskExecutionContext **/ @@ -121,6 +126,14 @@ public abstract class AbstractTask { logger.info(" -> {}", String.join("\n\t", logs)); } + public void setVarPool(String varPool) { + this.varPool = varPool; + } + + public String getVarPool() { + return varPool; + } + /** * get exit status code * @return exit status code diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index 367da80a0c..6e561c1cab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -38,103 +38,110 @@ import java.util.Map; */ public class PythonTask extends AbstractTask { - /** - * python parameters - */ - private PythonParameters pythonParameters; - - /** - * task dir - */ - private String taskDir; - - /** - * python command executor - */ - private PythonCommandExecutor pythonCommandExecutor; - - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; - - /** - * constructor - * @param taskExecutionContext taskExecutionContext - * @param logger logger - */ - public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) { - super(taskExecutionContext, logger); - this.taskExecutionContext = taskExecutionContext; - - this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, - taskExecutionContext, - logger); - } - - @Override - public void init() { - logger.info("python task params {}", taskExecutionContext.getTaskParams()); - - pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class); - - if (!pythonParameters.checkParameters()) { - throw new RuntimeException("python task params is not valid"); + /** + * python parameters + */ + private PythonParameters pythonParameters; + + /** + * task dir + */ + private String taskDir; + + /** + * python command executor + */ + private PythonCommandExecutor pythonCommandExecutor; + + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + + /** + * constructor + * @param taskExecutionContext taskExecutionContext + * @param logger logger + */ + public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.taskExecutionContext = taskExecutionContext; + + this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, + taskExecutionContext, + logger); } - } - @Override - public void handle() throws Exception { - try { - // construct process - CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand()); + @Override + public void init() { + logger.info("python task params {}", taskExecutionContext.getTaskParams()); - setExitStatusCode(commandExecuteResult.getExitStatusCode()); - setAppIds(commandExecuteResult.getAppIds()); - setProcessId(commandExecuteResult.getProcessId()); - } - catch (Exception e) { - logger.error("python task failure", e); - setExitStatusCode(Constants.EXIT_CODE_FAILURE); - throw e; - } - } - - @Override - public void cancelApplication(boolean cancelApplication) throws Exception { - // cancel process - pythonCommandExecutor.cancelApplication(); - } - - /** - * build command - * @return raw python script - * @throws Exception exception - */ - private String buildCommand() throws Exception { - String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); - - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - pythonParameters.getLocalParametersMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); - if (paramsMap != null){ - rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); - } + pythonParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PythonParameters.class); - logger.info("raw python script : {}", pythonParameters.getRawScript()); - logger.info("task dir : {}", taskDir); - - return rawPythonScript; - } + if (!pythonParameters.checkParameters()) { + throw new RuntimeException("python task params is not valid"); + } + } - @Override - public AbstractParameters getParameters() { - return pythonParameters; - } + @Override + public void handle() throws Exception { + try { + // construct process + CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand()); + + setExitStatusCode(commandExecuteResult.getExitStatusCode()); + setAppIds(commandExecuteResult.getAppIds()); + setProcessId(commandExecuteResult.getProcessId()); + setVarPool(pythonCommandExecutor.getVarPool()); + } + catch (Exception e) { + logger.error("python task failure", e); + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + throw e; + } + } + @Override + public void cancelApplication(boolean cancelApplication) throws Exception { + // cancel process + pythonCommandExecutor.cancelApplication(); + } + /** + * build command + * @return raw python script + * @throws Exception exception + */ + private String buildCommand() throws Exception { + String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); + + // replace placeholder + Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), + taskExecutionContext.getDefinedParams(), + pythonParameters.getLocalParametersMap(), + CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), + taskExecutionContext.getScheduleTime()); + + try { + rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); + } + catch (StringIndexOutOfBoundsException e) { + logger.error("setShareVar field format error, raw python script : {}", rawPythonScript); + } + + if (paramsMap != null) { + rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); + } + + logger.info("raw python script : {}", pythonParameters.getRawScript()); + logger.info("task dir : {}", taskDir); + + return rawPythonScript; + } + @Override + public AbstractParameters getParameters() { + return pythonParameters; + } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 6f642672cd..7344cf13e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1464,17 +1464,20 @@ public class ProcessService { * @param state state * @param endTime endTime * @param taskInstId taskInstId + * @param varPool varPool */ public void changeTaskState(ExecutionStatus state, Date endTime, int processId, String appIds, - int taskInstId) { + int taskInstId, + String varPool) { TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId); taskInstance.setPid(processId); taskInstance.setAppLink(appIds); taskInstance.setState(state); taskInstance.setEndTime(endTime); + taskInstance.setVarPool(varPool); saveTaskInstance(taskInstance); } diff --git a/pom.xml b/pom.xml index c8cb5a9dea..e895b01d89 100644 --- a/pom.xml +++ b/pom.xml @@ -785,6 +785,7 @@ **/common/utils/StringTest.java **/common/utils/StringUtilsTest.java **/common/utils/TaskParametersUtilsTest.java + **/common/utils/VarPoolUtilsTest.java **/common/utils/HadoopUtilsTest.java **/common/utils/HttpUtilsTest.java **/common/utils/KerberosHttpClientTest.java diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql index 5ae37e1be8..e2f5ebd91f 100644 --- a/sql/dolphinscheduler-postgre.sql +++ b/sql/dolphinscheduler-postgre.sql @@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance ( worker_group varchar(64) , timeout int DEFAULT '0' , tenant_id int NOT NULL DEFAULT '-1' , + var_pool text , PRIMARY KEY (id) ) ; create index process_instance_index on t_ds_process_instance (process_definition_id,id); @@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance ( executor_id int DEFAULT NULL , first_submit_time timestamp DEFAULT NULL , delay_time int DEFAULT '0' , + var_pool text , PRIMARY KEY (id) ) ; diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 61e697568a..9039a19084 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` ( `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `timeout` int(11) DEFAULT '0' COMMENT 'time out', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', + `var_pool` longtext COMMENT 'var_pool', PRIMARY KEY (`id`), KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE, KEY `start_time_index` (`start_time`) USING BTREE @@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` ( `executor_id` int(11) DEFAULT NULL, `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', + `var_pool` longtext COMMENT 'var_pool', PRIMARY KEY (`id`), KEY `process_instance_id` (`process_instance_id`) USING BTREE, KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE, diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql index 43488272e2..ae66da914f 100644 --- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql @@ -56,6 +56,46 @@ delimiter ; CALL uc_dolphin_T_t_ds_task_instance_A_delay_time(); DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time; +-- uc_dolphin_T_t_ds_task_instance_A_var_pool +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_task_instance_A_var_pool(); +DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool; + +-- uc_dolphin_T_t_ds_process_instance_A_var_pool +drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool; +delimiter d// +CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL; + END IF; + END; + +d// + +delimiter ; +CALL uc_dolphin_T_t_ds_process_instance_A_var_pool(); +DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool; + -- uc_dolphin_T_t_ds_process_definition_A_modify_by drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version; delimiter d// diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql index e2767617df..3351cac88c 100644 --- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql @@ -51,6 +51,42 @@ delimiter ; SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time(); +-- uc_dolphin_T_t_ds_process_instance_A_var_pool +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool(); + +-- uc_dolphin_T_t_ds_task_instance_A_var_pool +delimiter d// +CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='var_pool') + THEN + ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text; + END IF; +END; +$$ LANGUAGE plpgsql; +d// + +delimiter ; +SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool(); +DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool(); + -- uc_dolphin_T_t_ds_process_definition_A_modify_by delimiter d// CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() RETURNS void AS $$ From eb597e67e35179ee627d8ba664c50e4f42854479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E7=BB=A7=E5=B3=B0?= Date: Sat, 26 Sep 2020 15:45:24 +0800 Subject: [PATCH 5/6] [Improvement][API] simplify with stream (#3764) * simplify with stream * add distinct * compatible tasks is null * add unit test --- .../impl/ProcessDefinitionServiceImpl.java | 33 +++----- .../service/ProcessDefinitionServiceTest.java | 84 +++++++++++++++++++ 2 files changed, 97 insertions(+), 20 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index f4ea1bf065..cade36a1d6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -48,7 +48,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -77,6 +77,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -84,6 +85,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -232,25 +234,16 @@ public class ProcessDefinitionServiceImpl extends BaseService implements * @return resource ids */ private String getResourceIds(ProcessData processData) { - List tasks = processData.getTasks(); - Set resourceIds = new HashSet<>(); - for (TaskNode taskNode : tasks) { - String taskParameter = taskNode.getParams(); - AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); - if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - Set tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); - resourceIds.addAll(tempSet); - } - } - - StringBuilder sb = new StringBuilder(); - for (int i : resourceIds) { - if (sb.length() > 0) { - sb.append(","); - } - sb.append(i); - } - return sb.toString(); + return Optional.ofNullable(processData.getTasks()) + .orElse(Collections.emptyList()) + .stream() + .map(taskNode -> TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams())) + .filter(Objects::nonNull) + .flatMap(parameters -> parameters.getResourceFilesList().stream()) + .map(ResourceInfo::getId) + .distinct() + .map(Objects::toString) + .collect(Collectors.joining(",")); } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 33032f54e4..f35ff9509c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.service; +import static org.assertj.core.api.Assertions.assertThat; + import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; @@ -28,6 +30,9 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.process.ResourceInfo; +import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -51,8 +56,11 @@ import org.apache.http.entity.ContentType; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.Method; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -66,6 +74,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.mock.web.MockMultipartFile; +import org.springframework.util.ReflectionUtils; import org.springframework.web.multipart.MultipartFile; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -984,6 +993,81 @@ public class ProcessDefinitionServiceTest { loginUser, projectName, "1", null); } + @Test + public void testGetResourceIds() throws Exception { + // set up + Method testMethod = ReflectionUtils.findMethod(ProcessDefinitionServiceImpl.class, "getResourceIds", ProcessData.class); + assertThat(testMethod).isNotNull(); + testMethod.setAccessible(true); + + // when processData has empty task, then return empty string + ProcessData input1 = new ProcessData(); + input1.setTasks(Collections.emptyList()); + String output1 = (String) testMethod.invoke(processDefinitionService, input1); + assertThat(output1).isEmpty(); + + // when task is null, then return empty string + ProcessData input2 = new ProcessData(); + input2.setTasks(null); + String output2 = (String) testMethod.invoke(processDefinitionService, input2); + assertThat(output2).isEmpty(); + + // when task type is incorrect mapping, then return empty string + ProcessData input3 = new ProcessData(); + TaskNode taskNode3 = new TaskNode(); + taskNode3.setType("notExistType"); + input3.setTasks(Collections.singletonList(taskNode3)); + String output3 = (String) testMethod.invoke(processDefinitionService, input3); + assertThat(output3).isEmpty(); + + // when task parameter list is null, then return empty string + ProcessData input4 = new ProcessData(); + TaskNode taskNode4 = new TaskNode(); + taskNode4.setType("SHELL"); + taskNode4.setParams(null); + input4.setTasks(Collections.singletonList(taskNode4)); + String output4 = (String) testMethod.invoke(processDefinitionService, input4); + assertThat(output4).isEmpty(); + + // when resource id list is 0 1, then return 0,1 + ProcessData input5 = new ProcessData(); + TaskNode taskNode5 = new TaskNode(); + taskNode5.setType("SHELL"); + ShellParameters shellParameters5 = new ShellParameters(); + ResourceInfo resourceInfo5A = new ResourceInfo(); + resourceInfo5A.setId(0); + ResourceInfo resourceInfo5B = new ResourceInfo(); + resourceInfo5B.setId(1); + shellParameters5.setResourceList(Arrays.asList(resourceInfo5A, resourceInfo5B)); + taskNode5.setParams(JSONUtils.toJsonString(shellParameters5)); + input5.setTasks(Collections.singletonList(taskNode5)); + String output5 = (String) testMethod.invoke(processDefinitionService, input5); + assertThat(output5.split(",")).hasSize(2) + .containsExactlyInAnyOrder("0", "1"); + + // when resource id list is 0 1 1 2, then return 0,1,2 + ProcessData input6 = new ProcessData(); + TaskNode taskNode6 = new TaskNode(); + taskNode6.setType("SHELL"); + ShellParameters shellParameters6 = new ShellParameters(); + ResourceInfo resourceInfo6A = new ResourceInfo(); + resourceInfo6A.setId(0); + ResourceInfo resourceInfo6B = new ResourceInfo(); + resourceInfo6B.setId(1); + ResourceInfo resourceInfo6C = new ResourceInfo(); + resourceInfo6C.setId(1); + ResourceInfo resourceInfo6D = new ResourceInfo(); + resourceInfo6D.setId(2); + shellParameters6.setResourceList(Arrays.asList(resourceInfo6A, resourceInfo6B, resourceInfo6C, resourceInfo6D)); + taskNode6.setParams(JSONUtils.toJsonString(shellParameters6)); + input6.setTasks(Collections.singletonList(taskNode6)); + + String output6 = (String) testMethod.invoke(processDefinitionService, input6); + + assertThat(output6.split(",")).hasSize(3) + .containsExactlyInAnyOrder("0", "1", "2"); + } + /** * get mock datasource * From 11d3c2cdaf95be9ea9ef8c9e7838320dd76312ba Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 28 Sep 2020 09:56:46 +0800 Subject: [PATCH 6/6] [Improvement][server]lower weight round robin support weight (#3768) * [Improvement][server]lower weight round robin support weight * delete code * resolve conflicts --- .../dispatch/host/assign/HostWeight.java | 19 +++++++++---------- .../host/assign/LowerWeightRoundRobin.java | 13 +++++++------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index 298a62a6d9..839ebc85c7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -33,9 +33,9 @@ public class HostWeight { private final Host host; - private final int weight; + private final double weight; - private int currentWeight; + private double currentWeight; public HostWeight(Host host, double cpu, double memory, double loadAverage) { this.weight = getWeight(cpu, memory, loadAverage, host); @@ -43,15 +43,15 @@ public class HostWeight { this.currentWeight = weight; } - public int getCurrentWeight() { + public double getCurrentWeight() { return currentWeight; } - public int getWeight() { + public double getWeight() { return weight; } - public void setCurrentWeight(int currentWeight) { + public void setCurrentWeight(double currentWeight) { this.currentWeight = currentWeight; } @@ -68,20 +68,19 @@ public class HostWeight { + '}'; } - private int getWeight(double cpu, double memory, double loadAverage, Host host) { - int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + private double getWeight(double cpu, double memory, double loadAverage, Host host) { + double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; return getWarmUpWeight(host, calculateWeight); - } /** * If the warm-up is not over, add the weight */ - private int getWarmUpWeight(Host host, int weight) { + private double getWarmUpWeight(Host host, double weight) { long startTime = host.getStartTime(); long uptime = System.currentTimeMillis() - startTime; if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { - return (int) ((weight * Constants.WARM_UP_TIME) / uptime); + return weight * Constants.WARM_UP_TIME / uptime; } return weight; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java index 843397e20c..ea55785182 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -20,24 +20,25 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import java.util.Collection; /** - * lower weight round robin + * lower weight round robin */ -public class LowerWeightRoundRobin extends AbstractSelector{ +public class LowerWeightRoundRobin extends AbstractSelector { /** * select + * * @param sources sources * @return HostWeight */ @Override - public HostWeight doSelect(Collection sources){ - int totalWeight = 0; - int lowWeight = 0; + public HostWeight doSelect(Collection sources) { + double totalWeight = 0; + double lowWeight = 0; HostWeight lowerNode = null; for (HostWeight hostWeight : sources) { totalWeight += hostWeight.getWeight(); hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); - if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) { + if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { lowerNode = hostWeight; lowWeight = hostWeight.getCurrentWeight(); }