From f94cfc620dfd0c51010a49134a073e3848c0bd7e Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 18 Mar 2021 18:34:42 +0800 Subject: [PATCH] [Feature][Worker] Add a configuration item to set whether the tenant is automatically created on Worker (#5007) --- .../dolphinscheduler/worker.properties.tpl | 3 ++ .../common/utils/OSUtils.java | 34 ++++++++++++++----- .../common/utils/OSUtilsTest.java | 16 ++++++--- .../server/worker/config/WorkerConfig.java | 12 +++---- .../processor/TaskExecuteProcessor.java | 4 +++ .../src/main/resources/worker.properties | 3 ++ 6 files changed, 53 insertions(+), 19 deletions(-) diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl b/docker/build/conf/dolphinscheduler/worker.properties.tpl index ec0c4abb49..8f5907ede6 100644 --- a/docker/build/conf/dolphinscheduler/worker.properties.tpl +++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl @@ -21,6 +21,9 @@ worker.exec.threads=${WORKER_EXEC_THREADS} # worker heartbeat interval worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL} +# worker tenant auto create +worker.tenant.auto.create=true + # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 worker.max.cpuload.avg=${WORKER_MAX_CPULOAD_AVG} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java index 768c0f654a..92a06fe568 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java @@ -257,6 +257,23 @@ public class OSUtils { return users; } + /** + * create user + * + * @param userName user name + */ + public static void createUserIfAbsent(String userName) { + // if not exists this user, then create + taskLoggerThreadLocal.set(taskLoggerThreadLocal.get()); + if (!getUserList().contains(userName)) { + boolean isSuccess = createUser(userName); + String infoLog = String.format("create user %s %s", userName, isSuccess ? "success" : "fail"); + LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog); + LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog); + } + taskLoggerThreadLocal.remove(); + } + /** * create user * @@ -265,7 +282,7 @@ public class OSUtils { */ public static boolean createUser(String userName) { try { - String userGroup = OSUtils.getGroup(); + String userGroup = getGroup(); if (StringUtils.isEmpty(userGroup)) { String errorLog = String.format("%s group does not exist for this operating system.", userGroup); LoggerUtils.logError(Optional.ofNullable(logger), errorLog); @@ -304,7 +321,7 @@ public class OSUtils { String infoLog2 = String.format("execute cmd : %s", cmd); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); - OSUtils.exeCmd(cmd); + exeCmd(cmd); } /** @@ -315,7 +332,6 @@ public class OSUtils { * @throws IOException in case of an I/O error */ private static void createMacUser(String userName, String userGroup) throws IOException { - Optional optionalLogger = Optional.ofNullable(logger); Optional optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get()); @@ -327,13 +343,13 @@ public class OSUtils { String infoLog2 = String.format("create user command : %s", createUserCmd); LoggerUtils.logInfo(optionalLogger, infoLog2); LoggerUtils.logInfo(optionalTaskLogger, infoLog2); - OSUtils.exeCmd(createUserCmd); + exeCmd(createUserCmd); String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); String infoLog3 = String.format("append user to group : %s", appendGroupCmd); LoggerUtils.logInfo(optionalLogger, infoLog3); LoggerUtils.logInfo(optionalTaskLogger, infoLog3); - OSUtils.exeCmd(appendGroupCmd); + exeCmd(appendGroupCmd); } /** @@ -352,13 +368,13 @@ public class OSUtils { String infoLog2 = String.format("execute create user command : %s", userCreateCmd); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); - OSUtils.exeCmd(userCreateCmd); + exeCmd(userCreateCmd); String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3); - OSUtils.exeCmd(appendGroupCmd); + exeCmd(appendGroupCmd); } /** @@ -472,9 +488,9 @@ public class OSUtils { */ public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) { // system load average - double loadAverage = OSUtils.loadAverage(); + double loadAverage = loadAverage(); // system available physical memory - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + double availablePhysicalMemorySize = availablePhysicalMemorySize(); if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) { logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java index 83cacb758b..86da2684f7 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java @@ -16,17 +16,19 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.apache.dolphinscheduler.common.Constants; + import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.dolphinscheduler.common.Constants; + +import java.io.IOException; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; - public class OSUtilsTest { private static final Logger logger = LoggerFactory.getLogger(OSUtilsTest.class); @@ -75,6 +77,12 @@ public class OSUtilsTest { } } + @Test + public void createUserIfAbsent() { + OSUtils.createUserIfAbsent("test123"); + Assert.assertTrue("create user test123 success", true); + } + @Test public void testGetSudoCmd() { String cmd = "kill -9 1234"; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 0bd84f5e6d..1492899e84 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -35,8 +35,8 @@ public class WorkerConfig { @Value("${worker.heartbeat.interval:10}") private int workerHeartbeatInterval; - @Value("${worker.fetch.task.num:3}") - private int workerFetchTaskNum; + @Value("${worker.tenant.auto.create:false}") + private boolean workerTenantAutoCreate; @Value("${worker.max.cpuload.avg:-1}") private int workerMaxCpuloadAvg; @@ -88,12 +88,12 @@ public class WorkerConfig { this.workerHeartbeatInterval = workerHeartbeatInterval; } - public int getWorkerFetchTaskNum() { - return workerFetchTaskNum; + public boolean getWorkerTenantAutoCreate() { + return workerTenantAutoCreate; } - public void setWorkerFetchTaskNum(int workerFetchTaskNum) { - this.workerFetchTaskNum = workerFetchTaskNum; + public void setWorkerTenantAutoCreate(boolean workerTenantAutoCreate) { + this.workerTenantAutoCreate = workerTenantAutoCreate; } public double getWorkerReservedMemory() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index aafd7a1524..cfd2c3fe2f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -146,6 +147,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { FileUtils.taskLoggerThreadLocal.set(taskLogger); try { FileUtils.createWorkDirIfAbsent(execLocalPath); + if (workerConfig.getWorkerTenantAutoCreate()) { + OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); + } } catch (Throwable ex) { String errorLog = String.format("create execLocalPath : %s", execLocalPath); LoggerUtils.logError(Optional.of(logger), errorLog, ex); diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index 7fd11fe6c2..88c15a6d5d 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -21,6 +21,9 @@ # worker heartbeat interval #worker.heartbeat.interval=10 +# worker tenant auto create +#worker.tenant.auto.create=false + # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 #worker.max.cpuload.avg=-1