Browse Source

[Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)

* when  the tenant does not exist, the task execution should throw an exception

* remote method createWorkDirAndUserIfAbsent

* set the task status failed when the tenant code does not exist.

* add taskLog.

* update check os user exists

* update TaskExecuteThreadTest test method.

* solving sonar fail.
pull/3/MERGE
zhuangchong 4 years ago committed by GitHub
parent
commit
3bcd3fbcd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
  2. 4
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
  3. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  4. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  5. 13
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

26
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java

@ -158,13 +158,12 @@ public class FileUtils {
} }
/** /**
* create directory and user * create directory if absent
* *
* @param execLocalPath execute local path * @param execLocalPath execute local path
* @param userName user name
* @throws IOException errors * @throws IOException errors
*/ */
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException { public static void createWorkDirIfAbsent(String execLocalPath) throws IOException {
//if work dir exists, first delete //if work dir exists, first delete
File execLocalPathFile = new File(execLocalPath); File execLocalPathFile = new File(execLocalPath);
@ -177,27 +176,6 @@ public class FileUtils {
String mkdirLog = "create dir success " + execLocalPath; String mkdirLog = "create dir success " + execLocalPath;
LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog); LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog); LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
//if not exists this user,then create
OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
try {
if (!OSUtils.getUserList().contains(userName)) {
boolean isSuccessCreateUser = OSUtils.createUser(userName);
String infoLog;
if (isSuccessCreateUser) {
infoLog = String.format("create user name success %s", userName);
} else {
infoLog = String.format("create user name fail %s", userName);
}
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
}
} catch (Throwable e) {
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
}
OSUtils.taskLoggerThreadLocal.remove();
} }
/** /**

4
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java

@ -67,9 +67,9 @@ public class FileUtilsTest {
} }
@Test @Test
public void testCreateWorkDirAndUserIfAbsent() { public void testCreateWorkDirIfAbsent() {
try { try {
FileUtils.createWorkDirAndUserIfAbsent("/tmp/createWorkDirAndUserIfAbsent", "test123"); FileUtils.createWorkDirIfAbsent("/tmp/createWorkDirAndUserIfAbsent");
Assert.assertTrue(true); Assert.assertTrue(true);
} catch (Exception e) { } catch (Exception e) {
Assert.assertTrue(false); Assert.assertTrue(false);

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -116,6 +116,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null"); logger.error("task execution context is null");
return; return;
} }
setTaskCache(taskExecutionContext); setTaskCache(taskExecutionContext);
// custom logger // custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@ -134,7 +135,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
FileUtils.taskLoggerThreadLocal.set(taskLogger); FileUtils.taskLoggerThreadLocal.set(taskLogger);
try { try {
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode()); FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) { } catch (Throwable ex) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath); String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.of(logger), errorLog, ex); LoggerUtils.logError(Optional.of(logger), errorLog, ex);

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -23,11 +24,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -57,7 +58,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException; import com.github.rholder.retry.RetryException;
/** /**
* task scheduler thread * task scheduler thread
*/ */
@ -113,6 +113,15 @@ public class TaskExecuteThread implements Runnable {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
try { try {
logger.info("script path : {}", taskExecutionContext.getExecutePath()); logger.info("script path : {}", taskExecutionContext.getExecutePath());
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
taskLogger.error(errorLog);
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
return;
}
// task node // task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@ -199,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
// the default timeout is the maximum value of the integer // the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()){ if (taskTimeoutParameter.getEnable()) {
// get timeout strategy // get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode()); taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
switch (taskTimeoutParameter.getStrategy()){ switch (taskTimeoutParameter.getStrategy()) {
case WARN: case WARN:
break; break;
case FAILED: case FAILED:
@ -223,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
} }
} }
/** /**
* kill task * kill task
*/ */
public void kill(){ public void kill() {
if (task != null){ if (task != null) {
try { try {
task.cancelApplication(true); task.cancelApplication(true);
}catch (Exception e){ } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
} }
} }
} }
/** /**
* download resource file * download resource file
* *
@ -248,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
private void downloadResource(String execLocalPath, private void downloadResource(String execLocalPath,
Map<String,String> projectRes, Map<String,String> projectRes,
Logger logger) throws Exception { Logger logger) throws Exception {
if (MapUtils.isEmpty(projectRes)){ if (MapUtils.isEmpty(projectRes)) {
return; return;
} }
@ -265,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
logger.info("get resource file from hdfs :{}", resHdfsPath); logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
}catch (Exception e){ } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage()); throw new RuntimeException(e.getMessage());
} }

13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -33,7 +34,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -49,7 +52,7 @@ import org.slf4j.LoggerFactory;
* test task execute thread. * test task execute thread.
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class}) @PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class, OSUtils.class})
public class TaskExecuteThreadTest { public class TaskExecuteThreadTest {
private TaskExecutionContext taskExecutionContext; private TaskExecutionContext taskExecutionContext;
@ -110,6 +113,12 @@ public class TaskExecuteThreadTest {
PowerMockito.mockStatic(CommonUtils.class); PowerMockito.mockStatic(CommonUtils.class);
PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile"); PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
List<String> osUserList = new ArrayList<String>() {{
add("test");
}};
PowerMockito.mockStatic(OSUtils.class);
PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
} }
@Test @Test
@ -117,6 +126,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setTaskType("SQL"); taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setTenantCode("test");
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run(); taskExecuteThread.run();
taskExecutionContext.getCurrentExecutionStatus(); taskExecutionContext.getCurrentExecutionStatus();
@ -132,6 +142,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setStartTime(null); taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1); taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setTenantCode("test");
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run(); taskExecuteThread.run();

Loading…
Cancel
Save