Browse Source

Fix sudo enable is false will still check tenant exist (#16071)

(cherry picked from commit c5baa75ef93243c6bf88194eb3ad94a48173a003)
upstream-dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
af55f94cc0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      deploy/kubernetes/dolphinscheduler/README.md
  2. 2
      deploy/kubernetes/dolphinscheduler/values.yaml
  3. 1
      docs/docs/en/architecture/configuration.md
  4. 1
      docs/docs/zh/architecture/configuration.md
  5. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/TenantConstants.java
  6. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/TenantConfig.java
  7. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
  8. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
  9. 42
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java
  10. 95
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TenantUtils.java
  11. 2
      dolphinscheduler-worker/src/main/resources/application.yaml

1
deploy/kubernetes/dolphinscheduler/README.md

@ -308,7 +308,6 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max system cpu usage, when the worker's system cpu usage is smaller then this value, worker server can be dispatched tasks. | | worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_CPU_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max system cpu usage, when the worker's system cpu usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks. | | worker.env.WORKER_SERVER_LOAD_PROTECTION_MAX_SYSTEM_MEMORY_USAGE_PERCENTAGE_THRESHOLDS | float | `0.7` | Worker max memory usage , when the worker's memory usage is smaller then this value, worker server can be dispatched tasks. |
| worker.env.WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED | bool | `true` | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | | worker.env.WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED | bool | `true` | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. |
| worker.env.WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT | bool | `false` | Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants. |
| worker.keda.advanced | object | `{}` | Specify HPA related options | | worker.keda.advanced | object | `{}` | Specify HPA related options |
| worker.keda.cooldownPeriod | int | `30` | How many seconds KEDA will wait before scaling to zero. Note that HPA has a separate cooldown period for scale-downs | | worker.keda.cooldownPeriod | int | `30` | How many seconds KEDA will wait before scaling to zero. Note that HPA has a separate cooldown period for scale-downs |
| worker.keda.enabled | bool | `false` | Enable or disable the Keda component | | worker.keda.enabled | bool | `false` | Enable or disable the Keda component |

2
deploy/kubernetes/dolphinscheduler/values.yaml

@ -654,8 +654,6 @@ worker:
WORKER_HOST_WEIGHT: "100" WORKER_HOST_WEIGHT: "100"
# -- tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # -- tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED: true WORKER_TENANT_CONFIG_AUTO_CREATE_TENANT_ENABLED: true
# -- Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, worker.tenant.auto.create has no effect and will not automatically create tenants.
WORKER_TENANT_CONFIG_DISTRIBUTED_TENANT: false
# -- If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`; # -- If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`;
DEFAULT_TENANT_ENABLED: false DEFAULT_TENANT_ENABLED: false

1
docs/docs/en/architecture/configuration.md

@ -328,7 +328,6 @@ Location: `worker-server/conf/application.yaml`
| worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely | | worker.registry-disconnect-strategy.max-waiting-time | 100s | Used when the worker disconnect from registry, and the disconnect strategy is waiting, this config means the worker will waiting to reconnect to registry in given times, and after the waiting times, if the worker still cannot connect to registry, will stop itself, if the value is 0s, will wait infinitely |
| worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution | | worker.task-execute-threads-full-policy | REJECT | If REJECT, when the task waiting in the worker reaches exec-threads, it will reject the received task and the Master will redispatch it; If CONTINUE, it will put the task into the worker's execution queue and wait for a free thread to start execution |
| worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. | | worker.tenant-config.auto-create-tenant-enabled | true | tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. |
| worker.tenant-config.distributed-tenant-enabled | false | When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants |
| worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. | | worker.tenant-config.default-tenant-enabled | false | If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. |
### Alert Server related configuration ### Alert Server related configuration

1
docs/docs/zh/architecture/configuration.md

@ -331,7 +331,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
| worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 | | worker.registry-disconnect-strategy.max-waiting-time | 100s | 当Worker与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Worker与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Worker会丢弃kill正在执行的任务。值为0表示会无限期等待 |
| worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务 | | worker.task-execute-threads-full-policy | REJECT | 如果是 REJECT, 当Worker中等待队列中的任务数达到exec-threads时, Worker将会拒绝接下来新接收的任务,Master将会重新分发该任务; 如果是 CONTINUE, Worker将会接收任务,放入等待队列中等待空闲线程去执行该任务 |
| worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 | | worker.tenant-config.auto-create-tenant-enabled | true | 租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。 |
| worker.tenant-config.distributed-tenant-enabled | false | 如果设置为true, auto-create-tenant-enabled 将会不起作用。 |
| worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 | | worker.tenant-config.default-tenant-enabled | false | 如果设置为true, 将会使用worker服务启动用户作为 `default` 租户。 |
## Alert Server相关配置 ## Alert Server相关配置

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/TenantConstants.java

@ -21,5 +21,5 @@ public class TenantConstants {
public static final String DEFAULT_TENANT_CODE = "default"; public static final String DEFAULT_TENANT_CODE = "default";
public static final String BOOTSTRAPT_SYSTEM_USER = System.getProperty("user.name"); public static final String BOOTSTRAP_SYSTEM_USER = System.getProperty("user.name");
} }

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/TenantConfig.java

@ -23,6 +23,5 @@ import lombok.Data;
public class TenantConfig { public class TenantConfig {
private boolean autoCreateTenantEnabled = true; private boolean autoCreateTenantEnabled = true;
private boolean distributedTenantEnabled = false;
private boolean defaultTenantEnabled = false; private boolean defaultTenantEnabled = false;
} }

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java

@ -53,6 +53,7 @@ import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils; import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils; import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.apache.dolphinscheduler.server.worker.utils.TenantUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -209,8 +210,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
// In most of case the origin tenant is the same as the current tenant // In most of case the origin tenant is the same as the current tenant
// Except `default` tenant. The originTenant is used to download the resources // Except `default` tenant. The originTenant is used to download the resources
String originTenant = taskExecutionContext.getTenantCode(); String originTenant = taskExecutionContext.getTenantCode();
String tenant = TaskExecutionContextUtils.getOrCreateTenant(workerConfig, taskExecutionContext); taskExecutionContext.setTenantCode(TenantUtils.getOrCreateActualTenant(workerConfig, taskExecutionContext));
taskExecutionContext.setTenantCode(tenant);
log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode()); log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());
TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext); TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java

@ -79,7 +79,6 @@ public class TaskInstanceDispatchOperationFunction
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor(); .createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor();
// todo: hold the workerTaskExecutor
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) { if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName()); log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),

42
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionContextUtils.java

@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.server.worker.utils; package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -28,12 +26,9 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File; import java.io.File;
import java.nio.file.Files; import java.nio.file.Files;
@ -45,43 +40,6 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public class TaskExecutionContextUtils { public class TaskExecutionContextUtils {
public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
try {
TenantConfig tenantConfig = workerConfig.getTenantConfig();
String tenantCode = taskExecutionContext.getTenantCode();
if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode) && tenantConfig.isDefaultTenantEnabled()) {
log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task",
TenantConstants.BOOTSTRAPT_SYSTEM_USER);
return TenantConstants.BOOTSTRAPT_SYSTEM_USER;
}
boolean osUserExistFlag;
// if Using distributed is true and Currently supported systems are linux,Should not let it
// automatically
// create tenants,so TenantAutoCreate has no effect
if (tenantConfig.isDistributedTenantEnabled() && SystemUtils.IS_OS_LINUX) {
// use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(tenantCode);
} else if (OSUtils.isSudoEnable() && tenantConfig.isAutoCreateTenantEnabled()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(tenantCode);
osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
} else {
osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
}
if (!osUserExistFlag) {
throw new TaskException(
String.format("TenantCode: %s doesn't exist", tenantCode));
}
return tenantCode;
} catch (TaskException ex) {
throw ex;
} catch (Exception ex) {
throw new TaskException(
String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()), ex);
}
}
public static void createTaskInstanceWorkingDirectory(TaskExecutionContext taskExecutionContext) throws TaskException { public static void createTaskInstanceWorkingDirectory(TaskExecutionContext taskExecutionContext) throws TaskException {
// local execute path // local execute path
String taskInstanceWorkingDirectory = FileUtils.getTaskInstanceWorkingDirectory( String taskInstanceWorkingDirectory = FileUtils.getTaskInstanceWorkingDirectory(

95
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TenantUtils.java

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TenantConfig;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@UtilityClass
public class TenantUtils {
public static boolean isTenantEnable() {
// todo: add tenantEnable in workerConfig, the tenantEnable shouldn't judged by sudoEnable, these should be two
// config
return OSUtils.isSudoEnable();
}
/**
* Get the the actual tenant code or create the tenant if it doesn't exist.
* <p>
* If sudo is enabled then will check tenant.
* If the tenant code is the default tenant code and the {@link TenantConfig#isDefaultTenantEnabled()} is enabled, will return the bootstrap user.
* If the tenant code is not the default tenant code, will check if the tenant exist, if the tenant is not exist and {@link TenantConfig#isAutoCreateTenantEnabled()} is true will create the tenant.
* <p>
* If sudo is not enabled, will not check the tenant code.
*/
public static String getOrCreateActualTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
TenantConfig tenantConfig = workerConfig.getTenantConfig();
if (!isTenantEnable()) {
log.info("Tenant is not enabled, will use the bootstrap: {} user as tenant", getBootstrapTenant());
return getBootstrapTenant();
}
String tenantCode = taskExecutionContext.getTenantCode();
if (isDefaultTenant(tenantCode)) {
if (tenantConfig.isDefaultTenantEnabled()) {
log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task",
getBootstrapTenant());
return getBootstrapTenant();
} else {
throw new TaskException(
"The tenantCode is " + tenantCode + ", please enable TenantConfig#isDefaultTenantEnabled");
}
}
if (tenantConfig.isAutoCreateTenantEnabled()) {
OSUtils.createUserIfAbsent(tenantCode);
}
if (!tenantExists(tenantCode)) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", tenantCode));
}
return tenantCode;
}
public static boolean isDefaultTenant(String tenantCode) {
return TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode);
}
public static String getBootstrapTenant() {
return TenantConstants.BOOTSTRAP_SYSTEM_USER;
}
public static boolean isBootstrapTenant(String tenantCode) {
return TenantConstants.BOOTSTRAP_SYSTEM_USER.equals(tenantCode);
}
public static boolean tenantExists(String tenantCode) {
return OSUtils.getUserList().contains(tenantCode);
}
}

2
dolphinscheduler-worker/src/main/resources/application.yaml

@ -67,8 +67,6 @@ worker:
tenant-config: tenant-config:
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.
auto-create-tenant-enabled: true auto-create-tenant-enabled: true
# Scenes to be used for distributed users. For example, users created by FreeIpa are stored in LDAP. This parameter only applies to Linux, When this parameter is true, auto-create-tenant-enabled has no effect and will not automatically create tenants.
distributed-tenant-enabled: false
# If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`. # If set true, will use worker bootstrap user as the tenant to execute task when the tenant is `default`.
default-tenant-enabled: false default-tenant-enabled: false

Loading…
Cancel
Save