Browse Source

Fix task instance log order might be wrong due to async print (#13660)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
ca9df868f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  2. 19
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
  3. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  4. 40
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskInstanceLogHeader.java
  5. 15
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  6. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
  7. 13
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

15
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -40,8 +40,10 @@ import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -96,6 +98,8 @@ public abstract class AbstractCommandExecutor {
*/
protected TaskExecutionContext taskRequest;
protected Future<?> taskOutputFuture;
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
@ -237,6 +241,15 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
if (taskOutputFuture != null) {
try {
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
logger.info("Handle task log error", e);
}
}
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
@ -400,7 +413,7 @@ public abstract class AbstractCommandExecutor {
getOutputLogService.shutdown();
ExecutorService parseProcessOutputExecutorService = newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(() -> {
taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
try {
while (!logBuffer.isEmpty() || !logOutputIsSuccess) {
if (!logBuffer.isEmpty()) {

19
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java

@ -49,7 +49,6 @@ import lombok.NonNull;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -66,11 +65,11 @@ public class LogUtils {
public List<String> getAppIds(@NonNull String logPath, @NonNull String appInfoPath, String fetchWay) {
if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) {
log.info("Start finding appId in {}, fetch way: {} ", appInfoPath);
return getAppIdsFromAppInfoFile(appInfoPath, log);
log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay);
return getAppIdsFromAppInfoFile(appInfoPath);
} else {
log.info("Start finding appId in {}, fetch way: {} ", logPath);
return getAppIdsFromLogFile(logPath, log);
log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay);
return getAppIdsFromLogFile(logPath);
}
}
@ -123,7 +122,7 @@ public class LogUtils {
processDefineCode, processDefineVersion, processInstId, taskId);
}
public List<String> getAppIdsFromAppInfoFile(@NonNull String appInfoPath, Logger logger) {
public List<String> getAppIdsFromAppInfoFile(@NonNull String appInfoPath) {
File appInfoFile = new File(appInfoPath);
if (!appInfoFile.exists() || !appInfoFile.isFile()) {
return Collections.emptyList();
@ -133,12 +132,12 @@ public class LogUtils {
stream.forEach(appIds::add);
return new ArrayList<>(appIds);
} catch (IOException e) {
logger.error("Get appId from appInfo file error, appInfoPath: {}", appInfoPath, e);
log.error("Get appId from appInfo file error, appInfoPath: {}", appInfoPath, e);
return Collections.emptyList();
}
}
public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) {
return Collections.emptyList();
@ -153,13 +152,13 @@ public class LogUtils {
if (matcher.find()) {
String appId = matcher.group();
if (appIds.add(appId)) {
logger.info("Find appId: {} from {}", appId, logPath);
log.info("Find appId: {} from {}", appId, logPath);
}
}
});
return new ArrayList<>(appIds);
} catch (IOException e) {
logger.error("Get appId from log file error, logPath: {}", logPath, e);
log.error("Get appId from log file error, logPath: {}", logPath, e);
return Collections.emptyList();
}
}

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -62,9 +62,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private WorkerManagerThread workerManagerThread;
/**
* worker registry
*/
@Autowired
private WorkerRegistryClient workerRegistryClient;
@ -141,9 +138,6 @@ public class WorkerServer implements IStoppable {
close(cause);
}
/**
* kill all tasks which are running
*/
public void killAllRunningTasks() {
Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
if (CollectionUtils.isEmpty(taskRequests)) {

40
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/log/TaskInstanceLogHeader.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.log;
public class TaskInstanceLogHeader {
public static final String INITIALIZE_TASK_CONTEXT_HEADER = "" +
"***********************************************************************************************\n" +
"********************************* Initialize task context ***********************************\n" +
"***********************************************************************************************\n";
public static final String LOAD_TASK_INSTANCE_PLUGIN_HEADER = "" +
"***********************************************************************************************\n" +
"********************************* Load task instance plugin *********************************\n" +
"***********************************************************************************************\n";
public static final String EXECUTE_TASK_HEADER = "" +
"***********************************************************************************************\n" +
"********************************* Execute task instance *************************************\n" +
"***********************************************************************************************\n";
public static final String FINALIZE_TASK_HEADER = "" +
"***********************************************************************************************\n" +
"********************************* Finalize task instance ************************************\n" +
"***********************************************************************************************\n";
}

15
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker.processor;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
@ -42,8 +44,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
@ -58,24 +58,15 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
@Autowired
private WorkerConfig workerConfig;
/**
* task callback service
*/
@Autowired
private WorkerMessageSender workerMessageSender;
/**
* alert client service
*/
@Autowired
private WorkerRpcClient workerRpcClient;
@Autowired
private TaskPluginManager taskPluginManager;
/**
* task execute manager
*/
@Autowired
private WorkerManagerThread workerManager;
@ -86,7 +77,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java

@ -69,7 +69,6 @@ public class TaskCallbackImpl implements TaskCallBack {
return;
}
log.info("send remote taskExecutionContext info {}", taskExecutionContext);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_UPDATE_PID);
}

13
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.log.TaskInstanceLogHeader;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionCheckerUtils;
@ -154,8 +155,8 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
log.info("Begin to pulling task");
log.info("\n{}", TaskInstanceLogHeader.INITIALIZE_TASK_CONTEXT_HEADER);
initializeTask();
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
@ -169,12 +170,18 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
return;
}
log.info("\n{}", TaskInstanceLogHeader.LOAD_TASK_INSTANCE_PLUGIN_HEADER);
beforeExecute();
TaskCallBack taskCallBack = TaskCallbackImpl.builder().workerMessageSender(workerMessageSender)
.masterAddress(masterAddress).build();
TaskCallBack taskCallBack = TaskCallbackImpl.builder()
.workerMessageSender(workerMessageSender)
.masterAddress(masterAddress)
.build();
log.info("\n{}", TaskInstanceLogHeader.EXECUTE_TASK_HEADER);
executeTask(taskCallBack);
log.info("\n{}", TaskInstanceLogHeader.FINALIZE_TASK_HEADER);
afterExecute();
} catch (Throwable ex) {

Loading…
Cancel
Save