Browse Source

[Feature][Worker] Add a configuration item to set whether the tenant is automatically created on Worker (#5007)

pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
f94cfc620d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docker/build/conf/dolphinscheduler/worker.properties.tpl
  2. 34
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  3. 16
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
  4. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  5. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  6. 3
      dolphinscheduler-server/src/main/resources/worker.properties

3
docker/build/conf/dolphinscheduler/worker.properties.tpl

@ -21,6 +21,9 @@ worker.exec.threads=${WORKER_EXEC_THREADS}
# worker heartbeat interval # worker heartbeat interval
worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL} worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL}
# worker tenant auto create
worker.tenant.auto.create=true
# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2
worker.max.cpuload.avg=${WORKER_MAX_CPULOAD_AVG} worker.max.cpuload.avg=${WORKER_MAX_CPULOAD_AVG}

34
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -257,6 +257,23 @@ public class OSUtils {
return users; return users;
} }
/**
* create user
*
* @param userName user name
*/
public static void createUserIfAbsent(String userName) {
// if not exists this user, then create
taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
if (!getUserList().contains(userName)) {
boolean isSuccess = createUser(userName);
String infoLog = String.format("create user %s %s", userName, isSuccess ? "success" : "fail");
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
}
taskLoggerThreadLocal.remove();
}
/** /**
* create user * create user
* *
@ -265,7 +282,7 @@ public class OSUtils {
*/ */
public static boolean createUser(String userName) { public static boolean createUser(String userName) {
try { try {
String userGroup = OSUtils.getGroup(); String userGroup = getGroup();
if (StringUtils.isEmpty(userGroup)) { if (StringUtils.isEmpty(userGroup)) {
String errorLog = String.format("%s group does not exist for this operating system.", userGroup); String errorLog = String.format("%s group does not exist for this operating system.", userGroup);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog); LoggerUtils.logError(Optional.ofNullable(logger), errorLog);
@ -304,7 +321,7 @@ public class OSUtils {
String infoLog2 = String.format("execute cmd : %s", cmd); String infoLog2 = String.format("execute cmd : %s", cmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(cmd); exeCmd(cmd);
} }
/** /**
@ -315,7 +332,6 @@ public class OSUtils {
* @throws IOException in case of an I/O error * @throws IOException in case of an I/O error
*/ */
private static void createMacUser(String userName, String userGroup) throws IOException { private static void createMacUser(String userName, String userGroup) throws IOException {
Optional<Logger> optionalLogger = Optional.ofNullable(logger); Optional<Logger> optionalLogger = Optional.ofNullable(logger);
Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get()); Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get());
@ -327,13 +343,13 @@ public class OSUtils {
String infoLog2 = String.format("create user command : %s", createUserCmd); String infoLog2 = String.format("create user command : %s", createUserCmd);
LoggerUtils.logInfo(optionalLogger, infoLog2); LoggerUtils.logInfo(optionalLogger, infoLog2);
LoggerUtils.logInfo(optionalTaskLogger, infoLog2); LoggerUtils.logInfo(optionalTaskLogger, infoLog2);
OSUtils.exeCmd(createUserCmd); exeCmd(createUserCmd);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup); String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
String infoLog3 = String.format("append user to group : %s", appendGroupCmd); String infoLog3 = String.format("append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(optionalLogger, infoLog3); LoggerUtils.logInfo(optionalLogger, infoLog3);
LoggerUtils.logInfo(optionalTaskLogger, infoLog3); LoggerUtils.logInfo(optionalTaskLogger, infoLog3);
OSUtils.exeCmd(appendGroupCmd); exeCmd(appendGroupCmd);
} }
/** /**
@ -352,13 +368,13 @@ public class OSUtils {
String infoLog2 = String.format("execute create user command : %s", userCreateCmd); String infoLog2 = String.format("execute create user command : %s", userCreateCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(userCreateCmd); exeCmd(userCreateCmd);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName); String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd); String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3); LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3);
OSUtils.exeCmd(appendGroupCmd); exeCmd(appendGroupCmd);
} }
/** /**
@ -472,9 +488,9 @@ public class OSUtils {
*/ */
public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) { public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) {
// system load average // system load average
double loadAverage = OSUtils.loadAverage(); double loadAverage = loadAverage();
// system available physical memory // system available physical memory
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double availablePhysicalMemorySize = availablePhysicalMemorySize();
if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) { if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) {
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage);

16
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java

@ -16,17 +16,19 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.dolphinscheduler.common.Constants;
import java.io.IOException;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class OSUtilsTest { public class OSUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(OSUtilsTest.class); private static final Logger logger = LoggerFactory.getLogger(OSUtilsTest.class);
@ -75,6 +77,12 @@ public class OSUtilsTest {
} }
} }
@Test
public void createUserIfAbsent() {
OSUtils.createUserIfAbsent("test123");
Assert.assertTrue("create user test123 success", true);
}
@Test @Test
public void testGetSudoCmd() { public void testGetSudoCmd() {
String cmd = "kill -9 1234"; String cmd = "kill -9 1234";

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -35,8 +35,8 @@ public class WorkerConfig {
@Value("${worker.heartbeat.interval:10}") @Value("${worker.heartbeat.interval:10}")
private int workerHeartbeatInterval; private int workerHeartbeatInterval;
@Value("${worker.fetch.task.num:3}") @Value("${worker.tenant.auto.create:false}")
private int workerFetchTaskNum; private boolean workerTenantAutoCreate;
@Value("${worker.max.cpuload.avg:-1}") @Value("${worker.max.cpuload.avg:-1}")
private int workerMaxCpuloadAvg; private int workerMaxCpuloadAvg;
@ -88,12 +88,12 @@ public class WorkerConfig {
this.workerHeartbeatInterval = workerHeartbeatInterval; this.workerHeartbeatInterval = workerHeartbeatInterval;
} }
public int getWorkerFetchTaskNum() { public boolean getWorkerTenantAutoCreate() {
return workerFetchTaskNum; return workerTenantAutoCreate;
} }
public void setWorkerFetchTaskNum(int workerFetchTaskNum) { public void setWorkerTenantAutoCreate(boolean workerTenantAutoCreate) {
this.workerFetchTaskNum = workerFetchTaskNum; this.workerTenantAutoCreate = workerTenantAutoCreate;
} }
public double getWorkerReservedMemory() { public double getWorkerReservedMemory() {

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -146,6 +147,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
FileUtils.taskLoggerThreadLocal.set(taskLogger); FileUtils.taskLoggerThreadLocal.set(taskLogger);
try { try {
FileUtils.createWorkDirIfAbsent(execLocalPath); FileUtils.createWorkDirIfAbsent(execLocalPath);
if (workerConfig.getWorkerTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) { } catch (Throwable ex) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath); String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.of(logger), errorLog, ex); LoggerUtils.logError(Optional.of(logger), errorLog, ex);

3
dolphinscheduler-server/src/main/resources/worker.properties

@ -21,6 +21,9 @@
# worker heartbeat interval # worker heartbeat interval
#worker.heartbeat.interval=10 #worker.heartbeat.interval=10
# worker tenant auto create
#worker.tenant.auto.create=false
# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2
#worker.max.cpuload.avg=-1 #worker.max.cpuload.avg=-1

Loading…
Cancel
Save