diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java index f0e3ec8214..0dcfbddaf4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java @@ -158,13 +158,12 @@ public class FileUtils { } /** - * create directory and user + * create directory if absent * * @param execLocalPath execute local path - * @param userName user name * @throws IOException errors */ - public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException { + public static void createWorkDirIfAbsent(String execLocalPath) throws IOException { //if work dir exists, first delete File execLocalPathFile = new File(execLocalPath); @@ -177,27 +176,6 @@ public class FileUtils { String mkdirLog = "create dir success " + execLocalPath; LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog); - - //if not exists this user,then create - OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get()); - try { - if (!OSUtils.getUserList().contains(userName)) { - boolean isSuccessCreateUser = OSUtils.createUser(userName); - - String infoLog; - if (isSuccessCreateUser) { - infoLog = String.format("create user name success %s", userName); - } else { - infoLog = String.format("create user name fail %s", userName); - } - LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog); - LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog); - } - } catch (Throwable e) { - LoggerUtils.logError(Optional.ofNullable(logger), e); - LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e); - } - OSUtils.taskLoggerThreadLocal.remove(); } /** diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java index f87628c950..a4a39ae252 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java @@ -67,9 +67,9 @@ public class FileUtilsTest { } @Test - public void testCreateWorkDirAndUserIfAbsent() { + public void testCreateWorkDirIfAbsent() { try { - FileUtils.createWorkDirAndUserIfAbsent("/tmp/createWorkDirAndUserIfAbsent", "test123"); + FileUtils.createWorkDirIfAbsent("/tmp/createWorkDirAndUserIfAbsent"); Assert.assertTrue(true); } catch (Exception e) { Assert.assertTrue(false); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 3fe3b6dc53..e43a913e8c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -116,6 +116,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.error("task execution context is null"); return; } + setTaskCache(taskExecutionContext); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, @@ -134,7 +135,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { FileUtils.taskLoggerThreadLocal.set(taskLogger); try { - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); + FileUtils.createWorkDirIfAbsent(execLocalPath); } catch (Throwable ex) { String errorLog = String.format("create execLocalPath : %s", execLocalPath); LoggerUtils.logError(Optional.of(logger), errorLog, ex); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 6baeae9b7b..c1d03e366e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.Constants; @@ -23,11 +24,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; @@ -57,7 +58,6 @@ import org.slf4j.LoggerFactory; import com.github.rholder.retry.RetryException; - /** * task scheduler thread */ @@ -113,6 +113,15 @@ public class TaskExecuteThread implements Runnable { TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); + // check if the OS user exists + if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) { + String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode()); + taskLogger.error(errorLog); + responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); + responseCommand.setEndTime(new Date()); + return; + } + // task node TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); @@ -199,10 +208,10 @@ public class TaskExecuteThread implements Runnable { // the default timeout is the maximum value of the integer taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); - if (taskTimeoutParameter.getEnable()){ + if (taskTimeoutParameter.getEnable()) { // get timeout strategy taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode()); - switch (taskTimeoutParameter.getStrategy()){ + switch (taskTimeoutParameter.getStrategy()) { case WARN: break; case FAILED: @@ -223,21 +232,19 @@ public class TaskExecuteThread implements Runnable { } } - /** * kill task */ - public void kill(){ - if (task != null){ + public void kill() { + if (task != null) { try { task.cancelApplication(true); - }catch (Exception e){ + } catch (Exception e) { logger.error(e.getMessage(),e); } } } - /** * download resource file * @@ -248,7 +255,7 @@ public class TaskExecuteThread implements Runnable { private void downloadResource(String execLocalPath, Map projectRes, Logger logger) throws Exception { - if (MapUtils.isEmpty(projectRes)){ + if (MapUtils.isEmpty(projectRes)) { return; } @@ -265,7 +272,7 @@ public class TaskExecuteThread implements Runnable { logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); - }catch (Exception e){ + } catch (Exception e) { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index f0c68d1088..86d3f8832c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -33,7 +34,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import org.junit.Assert; import org.junit.Before; @@ -49,7 +52,7 @@ import org.slf4j.LoggerFactory; * test task execute thread. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class}) +@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class, OSUtils.class}) public class TaskExecuteThreadTest { private TaskExecutionContext taskExecutionContext; @@ -110,6 +113,12 @@ public class TaskExecuteThreadTest { PowerMockito.mockStatic(CommonUtils.class); PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); + + List osUserList = new ArrayList() {{ + add("test"); + }}; + PowerMockito.mockStatic(OSUtils.class); + PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList); } @Test @@ -117,6 +126,7 @@ public class TaskExecuteThreadTest { taskExecutionContext.setTaskType("SQL"); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + taskExecutionContext.setTenantCode("test"); TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); taskExecuteThread.run(); taskExecutionContext.getCurrentExecutionStatus(); @@ -132,6 +142,7 @@ public class TaskExecuteThreadTest { taskExecutionContext.setStartTime(null); taskExecutionContext.setDelayTime(1); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); + taskExecutionContext.setTenantCode("test"); TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); taskExecuteThread.run();