From f3277277f07c9dc8ea674b79ae8b6b16c688b29b Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Wed, 26 Oct 2022 16:20:13 +0800 Subject: [PATCH] cherry-pick Refactor LogServiceClient Singleton to avoid repeat creation of NettyClient #11777 (#12542) Co-authored-by: Wenjun Ruan --- .../api/exceptions/ServiceException.java | 36 ++-- .../api/service/impl/ExecutorServiceImpl.java | 10 +- .../api/service/impl/LoggerServiceImpl.java | 19 +-- .../impl/ProcessInstanceServiceImpl.java | 107 +++++------- .../service/impl/SchedulerServiceImpl.java | 10 +- .../api/exceptions/ServiceExceptionTest.java | 14 +- .../api/service/ExecutorServiceTest.java | 3 +- .../api/service/LoggerServiceTest.java | 49 +++--- .../service/ProcessDefinitionServiceTest.java | 2 +- .../service/ProcessInstanceServiceTest.java | 38 +++-- .../api/service/TaskInstanceServiceTest.java | 3 +- .../server/log/LoggerRequestProcessor.java | 6 +- .../master/service/MasterFailoverService.java | 12 +- .../master/service/WorkerFailoverService.java | 31 ++-- .../utils/DataQualityResultOperator.java | 3 +- .../registry/MasterRegistryClientTest.java | 17 +- .../master/service/FailoverServiceTest.java | 18 +- .../remote/NettyRemotingClient.java | 104 +++--------- .../remote/config/NettyClientConfig.java | 64 ++------ .../server/utils/ProcessUtils.java | 13 +- .../{LogClientService.java => LogClient.java} | 155 +++++++++--------- .../service/log/LogPromise.java | 108 ------------ .../service/process/ProcessService.java | 3 +- .../service/process/ProcessServiceImpl.java | 104 ++++++------ ...entServiceTest.java => LogClientTest.java} | 33 ++-- .../plugin/task/api/AbstractRemoteTask.java | 4 +- .../plugin/task/api/AbstractYarnTask.java | 20 +-- .../plugin/task/api/utils/LogUtils.java | 12 +- .../plugin/task/api/utils/LogUtilsTest.java | 8 +- .../plugin/task/dinky/DinkyTask.java | 39 +++-- .../plugin/task/emr/EmrAddStepsTask.java | 50 +++--- .../plugin/task/emr/EmrJobFlowTask.java | 6 +- .../plugin/task/flink/FlinkStreamTask.java | 8 +- .../plugin/task/flink/FlinkTask.java | 47 +----- .../plugin/task/hivecli/HiveCliTask.java | 6 +- .../plugin/task/jupyter/JupyterTask.java | 10 +- .../plugin/task/k8s/K8sTask.java | 8 +- .../plugin/task/pigeon/PigeonTask.java | 45 +++-- .../plugin/task/sagemaker/SagemakerTask.java | 6 +- .../plugin/task/seatunnel/SeatunnelTask.java | 22 +-- .../plugin/task/zeppelin/ZeppelinTask.java | 14 +- .../worker/processor/TaskKillProcessor.java | 49 +++--- .../runner/WorkerTaskExecuteRunnable.java | 8 +- 43 files changed, 519 insertions(+), 805 deletions(-) rename dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/{LogClientService.java => LogClient.java} (57%) delete mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java rename dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/{LogClientServiceTest.java => LogClientTest.java} (82%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java index 369fbbec3e..2fa3e01a1c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java @@ -20,48 +20,36 @@ import org.apache.dolphinscheduler.api.enums.Status; import java.text.MessageFormat; +import lombok.Data; -/** - * service exception - */ +@Data public class ServiceException extends RuntimeException { - /** - * code - */ - private Integer code; + private int code; public ServiceException() { + this(Status.INTERNAL_SERVER_ERROR_ARGS); } public ServiceException(Status status) { - super(status.getMsg()); - this.code = status.getCode(); + this(status.getCode(), status.getMsg()); } public ServiceException(Status status, Object... formatter) { - super(MessageFormat.format(status.getMsg(), formatter)); - this.code = status.getCode(); - } - - public ServiceException(Integer code,String message) { - super(message); - this.code = code; + this(status.getCode(), MessageFormat.format(status.getMsg(), formatter)); } public ServiceException(String message) { - super(message); + this(Status.INTERNAL_SERVER_ERROR_ARGS, message); } - public ServiceException(String message, Exception cause) { - super(message, cause); + public ServiceException(int code, String message) { + this(code, message, null); } - public Integer getCode() { - return this.code; - } - - public void setCode(Integer code) { + public ServiceException(int code, String message, Exception cause) { + super(message, cause); this.code = code; } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index ec878c85a3..1447ba91bd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -31,6 +31,7 @@ import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LEN import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -377,11 +378,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (processInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); - return result; - } + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) + .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), @@ -1094,7 +1092,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ */ @Override public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) { - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null); if (processInstance == null) { return null; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 39f9208b5f..518a03004a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.log.LogClientService; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.lang3.StringUtils; @@ -66,7 +66,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @Autowired private ProcessService processService; - private LogClientService logClient; + @Autowired + private LogClient logClient; @Autowired ProjectMapper projectMapper; @@ -77,20 +78,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @Autowired TaskDefinitionMapper taskDefinitionMapper; - @PostConstruct - public void init() { - if (Objects.isNull(this.logClient)) { - this.logClient = new LogClientService(); - } - } - - @PreDestroy - public void close() { - if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) { - logClient.close(); - } - } - /** * view log * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 2dcba5473b..78bb277e6d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -17,19 +17,10 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; -import static org.apache.dolphinscheduler.common.Constants.*; -import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; -import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; -import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; - +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; @@ -78,9 +69,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -97,12 +88,18 @@ import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; +import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; +import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR; +import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; +import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; /** * process instance service impl @@ -228,7 +225,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId)); ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), @@ -345,17 +343,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce public Map queryTaskListByProcessId(User loginUser, long projectCode, Integer processId) throws IOException { Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); - ProcessDefinition processDefinition = - processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId)); + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); + putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId); return result; } List taskInstanceList = processService.findValidTaskListByProcessId(processId); @@ -487,28 +484,23 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce String globalParams, String locations, int timeout, String tenantCode) { Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - // check process instance exists - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (processInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); - return result; - } - // check process instance exists in project - ProcessDefinition processDefinition0 = - processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + //check process instance exists + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); + //check process instance exists in project + ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } - // check process instance status + //check process instance status if (!processInstance.getState().isFinished()) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, + putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState().toString(), "update"); return result; } @@ -626,11 +618,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } - ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId); - if (subInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId); - return result; - } + ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId)); if (subInstance.getIsSubProcess() == Flag.NO) { putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName()); return result; @@ -666,23 +655,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (null == processInstance) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId)); - return result; - } + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); // check process instance status if (!processInstance.getState().isFinished()) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, - processInstance.getName(), processInstance.getState().toString(), "delete"); - return result; + throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete"); } ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId)); - return result; + throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId); } try { @@ -728,7 +711,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } @@ -817,7 +800,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } GanttDto ganttDto = new GanttDto(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 392543e788..677d580e2d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -78,9 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.cronutils.model.Cron; -/** - * scheduler service impl - */ @Service public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService { @@ -382,9 +379,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } } catch (Exception e) { - result.put(Constants.MSG, - scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); - throw new ServiceException(result.get(Constants.MSG).toString(), e); + Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR + : Status.OFFLINE_SCHEDULE_ERROR; + result.put(Constants.STATUS, status); + throw new ServiceException(status, e); } putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java index a574253d1d..3bb6479ba9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java @@ -17,25 +17,27 @@ package org.apache.dolphinscheduler.api.exceptions; import org.apache.dolphinscheduler.api.enums.Status; + import org.junit.Assert; import org.junit.Test; public class ServiceExceptionTest { + @Test - public void getCodeTest(){ + public void getCodeTest() { ServiceException serviceException = new ServiceException(); - Assert.assertNull(serviceException.getCode()); + Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), serviceException.getCode()); serviceException = new ServiceException(Status.ALERT_GROUP_EXIST); - Assert.assertNotNull(serviceException.getCode()); + Assert.assertEquals(Status.ALERT_GROUP_EXIST.getCode(), serviceException.getCode()); serviceException = new ServiceException(10012, "alarm group already exists"); - Assert.assertNotNull(serviceException.getCode()); + Assert.assertEquals(10012, serviceException.getCode()); } @Test - public void getMessageTest(){ + public void getMessageTest() { ServiceException serviceException = new ServiceException(); - Assert.assertNull(serviceException.getMessage()); + Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), serviceException.getMessage()); serviceException = new ServiceException(Status.ALERT_GROUP_EXIST); Assert.assertNotNull(serviceException.getMessage()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 59418e2171..8cd5d681cc 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -66,6 +66,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import org.assertj.core.util.Lists; import org.junit.Assert; @@ -193,7 +194,7 @@ public class ExecutorServiceTest { doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); - Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance)); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue); Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 4be8e246fc..1391ee0857 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; @@ -28,15 +31,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -47,9 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; - /** * logger service test */ @@ -74,10 +73,8 @@ public class LoggerServiceTest { @Mock private TaskDefinitionMapper taskDefinitionMapper; - @Before - public void init() { - this.loggerService.init(); - } + @Mock + private LogClient logClient; @Test public void testQueryDataSourceList() { @@ -85,11 +82,11 @@ public class LoggerServiceTest { TaskInstance taskInstance = new TaskInstance(); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Result result = loggerService.queryLog(2, 1, 1); - //TASK_INSTANCE_NOT_FOUND + // TASK_INSTANCE_NOT_FOUND Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); try { - //HOST NOT FOUND OR ILLEGAL + // HOST NOT FOUND OR ILLEGAL result = loggerService.queryLog(1, 1, 1); } catch (RuntimeException e) { Assert.assertTrue(true); @@ -97,7 +94,7 @@ public class LoggerServiceTest { } Assert.assertEquals(Status.TASK_INSTANCE_HOST_IS_NULL.getCode(), result.getCode().intValue()); - //SUCCESS + // SUCCESS taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); @@ -111,7 +108,7 @@ public class LoggerServiceTest { TaskInstance taskInstance = new TaskInstance(); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); - //task instance is null + // task instance is null try { loggerService.getLogBytes(2); } catch (RuntimeException e) { @@ -119,7 +116,7 @@ public class LoggerServiceTest { logger.error("testGetLogBytes error: {}", "task instance is null"); } - //task instance host is null + // task instance host is null try { loggerService.getLogBytes(1); } catch (RuntimeException e) { @@ -127,11 +124,13 @@ public class LoggerServiceTest { logger.error("testGetLogBytes error: {}", "task instance host is null"); } - //success + // success taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - //if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage, + // if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage, // so no assert will be added here + Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString())) + .thenReturn(new byte[0]); loggerService.getLogBytes(1); } @@ -152,12 +151,12 @@ public class LoggerServiceTest { TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); taskDefinition.setCode(1L); - //SUCCESS + // SUCCESS taskInstance.setTaskCode(1L); taskInstance.setId(1); taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,VIEW_LOG)).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); @@ -179,22 +178,20 @@ public class LoggerServiceTest { TaskDefinition taskDefinition = new TaskDefinition(); taskDefinition.setProjectCode(projectCode); taskDefinition.setCode(1L); - //SUCCESS + // SUCCESS taskInstance.setTaskCode(1L); taskInstance.setId(1); taskInstance.setHost("127.0.0.1:8080"); taskInstance.setLogPath("/temp/log"); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,DOWNLOAD_LOG )).thenReturn(result); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG)) + .thenReturn(result); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); + Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString())) + .thenReturn(new byte[0]); loggerService.getLogBytes(loginUser, projectCode, 1); } - @After - public void close() { - this.loggerService.close(); - } - /** * get mock Project * @@ -218,4 +215,4 @@ public class LoggerServiceTest { result.put(Constants.MSG, status.getMsg()); } } -} \ No newline at end of file +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 042cb78c5c..f07317acf4 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -200,7 +200,7 @@ public class ProcessDefinitionServiceTest { .checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION); processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0); } catch (ServiceException serviceException) { - Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue()); + Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode()); } Map result = new HashMap<>(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 4167673faa..48dd44e468 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; @@ -70,6 +71,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.junit.Assert; import org.junit.Test; @@ -308,7 +310,8 @@ public class ProcessInstanceServiceTest { processDefinition.setProjectCode(projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); + when(processService.findProcessInstanceDetailById(processInstance.getId())) + .thenReturn(Optional.of(processInstance)); when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition); Map successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1); @@ -353,7 +356,8 @@ public class ProcessInstanceServiceTest { res.setData("xxx"); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); + when(processService.findProcessInstanceDetailById(processInstance.getId())) + .thenReturn(Optional.of(processInstance)); when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList); when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res); Map successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); @@ -451,14 +455,18 @@ public class ProcessInstanceServiceTest { ProcessInstance processInstance = getProcessInstance(); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result); - when(processService.findProcessInstanceDetailById(1)).thenReturn(null); - Map processInstanceNullRes = - processInstanceService.updateProcessInstance(loginUser, projectCode, 1, - shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, ""); - Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS)); + when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty()); + try { + Map processInstanceNullRes = + processInstanceService.updateProcessInstance(loginUser, projectCode, 1, + shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, ""); + Assert.fail(); + } catch (ServiceException ex) { + Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode()); + } // process instance not finish - when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance); + when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance)); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); putMsg(result, Status.SUCCESS, projectCode); Map processInstanceNotFinishRes = @@ -523,16 +531,20 @@ public class ProcessInstanceServiceTest { putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findProcessInstanceDetailById(1)).thenReturn(null); - Map processInstanceNullRes = - processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); - Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS)); + when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty()); + try { + Map processInstanceNullRes = + processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); + + } catch (ServiceException ex) { + Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode()); + } // not sub process ProcessInstance processInstance = getProcessInstance(); processInstance.setIsSubProcess(Flag.NO); putMsg(result, Status.SUCCESS, projectCode); - when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance); + when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance)); Map notSubProcessRes = processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index e1e783c2ac..dd06ce66ef 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -51,6 +51,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.junit.Assert; import org.junit.Test; @@ -158,7 +159,7 @@ public class TaskInstanceServiceTest { eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())) - .thenReturn(processInstance); + .thenReturn(Optional.of(processInstance)); Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "", "test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20); diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index d921d54000..0d04ed8596 100644 --- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -46,10 +46,8 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -168,9 +166,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { if (!checkPathSecurity(logPath)) { throw new IllegalArgumentException("Illegal path"); } - Set appIds = LogUtils.getAppIdsFromLogFile(logPath); + List appIds = LogUtils.getAppIdsFromLogFile(logPath); channel.writeAndFlush( - new GetAppIdResponseCommand(new ArrayList<>(appIds)).convert2Command(command.getOpaque())); + new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque())); break; default: throw new IllegalArgumentException("unknown commandType: " + commandType); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 9e1db701fa..bdb7289aad 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.spi.utils.StringUtils; @@ -73,17 +73,21 @@ public class MasterFailoverService { private final ProcessInstanceExecCacheManager processInstanceExecCacheManager; + private final LogClient logClient; + public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull NettyExecutorManager nettyExecutorManager, - @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) { + @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager, + @NonNull LogClient logClient) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; - this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.nettyExecutorManager = nettyExecutorManager; + this.localAddress = masterConfig.getMasterAddress(); this.processInstanceExecCacheManager = processInstanceExecCacheManager; + this.logClient = logClient; } @@ -233,7 +237,7 @@ public class MasterFailoverService { if (masterConfig.isKillYarnJobWhenTaskFailover()) { // only kill yarn job if exists , the local thread has exited LOGGER.info("TaskInstance failover begin kill the task related yarn job"); - ProcessUtils.killYarnJob(taskExecutionContext); + ProcessUtils.killYarnJob(logClient, taskExecutionContext); } // kill worker task, When the master failover and worker failover happened in the same time, // the task may not be failover if we don't set NEED_FAULT_TOLERANCE. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 99c62f1172..a294874ff4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -17,13 +17,16 @@ package org.apache.dolphinscheduler.server.master.service; +import lombok.NonNull; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -37,13 +40,14 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; - +import javax.annotation.Nullable; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -52,14 +56,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import lombok.NonNull; - @Service public class WorkerFailoverService { @@ -70,19 +66,22 @@ public class WorkerFailoverService { private final ProcessService processService; private final WorkflowExecuteThreadPool workflowExecuteThreadPool; private final ProcessInstanceExecCacheManager cacheManager; + private final LogClient logClient; private final String localAddress; public WorkerFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, @NonNull ProcessService processService, @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, - @NonNull ProcessInstanceExecCacheManager cacheManager) { + @NonNull ProcessInstanceExecCacheManager cacheManager, + @NonNull LogClient logClient) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; this.workflowExecuteThreadPool = workflowExecuteThreadPool; this.cacheManager = cacheManager; - this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); + this.logClient = logClient; + this.localAddress = masterConfig.getMasterAddress(); } /** @@ -172,7 +171,7 @@ public class WorkerFailoverService { if (masterConfig.isKillYarnJobWhenTaskFailover()) { // only kill yarn job if exists , the local thread has exited LOGGER.info("TaskInstance failover begin kill the task related yarn job"); - ProcessUtils.killYarnJob(taskExecutionContext); + ProcessUtils.killYarnJob(logClient, taskExecutionContext); } } else { LOGGER.info("The failover taskInstance is a master task"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java index 42d9d14e49..c196def48a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java @@ -63,8 +63,7 @@ public class DataQualityResultOperator { if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) { ProcessInstance processInstance = - processService.findProcessInstanceDetailById( - Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId()))); + processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()).orElse(null); // when the task is failure or cancel, will delete the execute result and statistics value if (taskResponseEvent.getState().isFailure() diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java index b71e267216..23f209bbdf 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java @@ -25,8 +25,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.registry.api.ConnectionState; -import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -34,7 +32,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.Arrays; import java.util.Date; -import java.util.concurrent.ScheduledExecutorService; +import java.util.Optional; import org.junit.Before; import org.junit.Test; @@ -58,26 +56,17 @@ public class MasterRegistryClientTest { @InjectMocks private MasterRegistryClient masterRegistryClient; - @Mock - private MasterConfig masterConfig; - @Mock private RegistryClient registryClient; - @Mock - private ScheduledExecutorService heartBeatExecutor; - @Mock private ProcessService processService; - @Mock - private MasterConnectStrategy masterConnectStrategy; - @Mock private MasterHeartBeatTask masterHeartBeatTask; @Mock - private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; + private MasterConfig masterConfig; @Before public void before() throws Exception { @@ -105,7 +94,7 @@ public class MasterRegistryClientTest { taskInstance.setHost("127.0.0.1:8080"); given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())) .willReturn(Arrays.asList(taskInstance)); - given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); + given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(Optional.of(processInstance)); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true); Server server = new Server(); server.setHost("127.0.0.1"); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index e0c3c94714..27be919ff9 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service; import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; - import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doNothing; @@ -38,12 +37,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.Optional; import org.junit.Assert; import org.junit.Before; @@ -89,6 +90,9 @@ public class FailoverServiceTest { @Mock private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + @Mock + private LogClient logClient; + private static int masterPort = 5678; private static int workerPort = 1234; @@ -106,17 +110,20 @@ public class FailoverServiceTest { springApplicationContext.setApplicationContext(applicationContext); given(masterConfig.getListenPort()).willReturn(masterPort); + testMasterHost = NetUtils.getAddr(masterConfig.getListenPort()); + given(masterConfig.getMasterAddress()).willReturn(testMasterHost); MasterFailoverService masterFailoverService = - new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager); + new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, + processInstanceExecCacheManager, logClient); WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, - cacheManager); + cacheManager, + logClient); failoverService = new FailoverService(masterFailoverService, workerFailoverService); - testMasterHost = NetUtils.getAddr(masterConfig.getListenPort()); String ip = testMasterHost.split(":")[0]; int port = Integer.valueOf(testMasterHost.split(":")[1]); Assert.assertEquals(masterPort, port); @@ -158,7 +165,8 @@ public class FailoverServiceTest { doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); given(processService.findValidTaskListByProcessId(Mockito.anyInt())) .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); - given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); + given(processService.findProcessInstanceDetailById(Mockito.anyInt())) + .willReturn(Optional.ofNullable(processInstance)); Thread.sleep(1000); Server masterServer = new Server(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 6668f2d36c..d47d68fa3d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -17,6 +17,17 @@ package org.apache.dolphinscheduler.remote; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; @@ -35,6 +46,8 @@ import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NettyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @@ -43,112 +56,48 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.timeout.IdleStateHandler; - -/** - * remoting netty client - */ -public class NettyRemotingClient { +public class NettyRemotingClient implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); - /** - * client bootstrap - */ private final Bootstrap bootstrap = new Bootstrap(); - /** - * encoder - */ private final NettyEncoder encoder = new NettyEncoder(); - /** - * channels - */ private final ConcurrentHashMap channels = new ConcurrentHashMap<>(128); - /** - * started flag - */ private final AtomicBoolean isStarted = new AtomicBoolean(false); - /** - * worker group - */ private final EventLoopGroup workerGroup; - /** - * client config - */ private final NettyClientConfig clientConfig; - /** - * saync semaphore - */ private final Semaphore asyncSemaphore = new Semaphore(200, true); - /** - * callback thread executor - */ private final ExecutorService callbackExecutor; - /** - * client handler - */ private final NettyClientHandler clientHandler; - /** - * response future executor - */ private final ScheduledExecutorService responseFutureExecutor; - /** - * client init - * - * @param clientConfig client config - */ public NettyRemotingClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; if (Epoll.isAvailable()) { - this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); - } - }); + this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient")); } else { - this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); - } - }); + this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient")); } - this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), + this.callbackExecutor = new ThreadPoolExecutor( + Constants.CPUS, + Constants.CPUS, + 1, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(1000), + new NamedThreadFactory("CallbackExecutor"), new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); @@ -157,9 +106,6 @@ public class NettyRemotingClient { this.start(); } - /** - * start - */ private void start() { this.bootstrap @@ -371,9 +317,7 @@ public class NettyRemotingClient { return null; } - /** - * close - */ + @Override public void close() { if (isStarted.compareAndSet(true, false)) { try { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java index 739cbbebe1..18ee88f696 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java @@ -17,88 +17,52 @@ package org.apache.dolphinscheduler.remote.config; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.remote.utils.Constants; -/** - * netty client config - */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class NettyClientConfig { /** * worker threads,default get machine cpus */ + @Builder.Default private int workerThreads = Constants.CPUS; /** * whether tpc delay */ + @Builder.Default private boolean tcpNoDelay = true; /** * whether keep alive */ + @Builder.Default private boolean soKeepalive = true; /** * send buffer size */ + @Builder.Default private int sendBufferSize = 65535; /** * receive buffer size */ + @Builder.Default private int receiveBufferSize = 65535; /** * connect timeout millis */ + @Builder.Default private int connectTimeoutMillis = 3000; - public int getWorkerThreads() { - return workerThreads; - } - - public void setWorkerThreads(int workerThreads) { - this.workerThreads = workerThreads; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public boolean isSoKeepalive() { - return soKeepalive; - } - - public void setSoKeepalive(boolean soKeepalive) { - this.soKeepalive = soKeepalive; - } - - public int getSendBufferSize() { - return sendBufferSize; - } - - public void setSendBufferSize(int sendBufferSize) { - this.sendBufferSize = sendBufferSize; - } - - public int getReceiveBufferSize() { - return receiveBufferSize; - } - - public void setReceiveBufferSize(int receiveBufferSize) { - this.receiveBufferSize = receiveBufferSize; - } - - public int getConnectTimeoutMillis() { - return connectTimeoutMillis; - } - - public void setConnectTimeoutMillis(int connectTimeoutMillis) { - this.connectTimeoutMillis = connectTimeoutMillis; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index 4392d6e14a..a8361fa4ed 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -30,10 +30,11 @@ import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.log.LogClientService; +import org.apache.dolphinscheduler.service.log.LogClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.File; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -186,17 +187,15 @@ public class ProcessUtils { * @param taskExecutionContext taskExecutionContext * @return yarn application ids */ - public static List killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) { + public static @Nullable List killYarnJob(@NonNull LogClient logClient, + @NonNull TaskExecutionContext taskExecutionContext) { if (taskExecutionContext.getLogPath() == null) { return Collections.emptyList(); } try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); - List appIds; - try (LogClientService logClient = new LogClientService()) { - Host host = Host.of(taskExecutionContext.getHost()); - appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); - } + Host host = Host.of(taskExecutionContext.getHost()); + List appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); if (CollectionUtils.isNotEmpty(appIds)) { if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { taskExecutionContext diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java similarity index 57% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java index 735fbab7c4..611bb49b67 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; -import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; @@ -46,43 +45,23 @@ import lombok.NonNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; -/** - * log client - */ -public class LogClientService implements AutoCloseable { +@Service +public class LogClient implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); + private static final Logger logger = LoggerFactory.getLogger(LogClient.class); - private final NettyClientConfig clientConfig; + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; private final NettyRemotingClient client; - private volatile boolean isRunning; - - /** - * request time out - */ private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L; - /** - * construct client - */ - public LogClientService() { - this.clientConfig = new NettyClientConfig(); - this.clientConfig.setWorkerThreads(4); - this.client = new NettyRemotingClient(clientConfig); - this.isRunning = true; - } - - /** - * close - */ - @Override - public void close() { - this.client.close(); - this.isRunning = false; - logger.info("logger client closed"); + public LogClient() { + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + this.client = new NettyRemotingClient(nettyClientConfig); + logger.info("Initialized LogClientService with config: {}", nettyClientConfig); } /** @@ -96,25 +75,30 @@ public class LogClientService implements AutoCloseable { * @return log content */ public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) { - logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, + logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); - String result = ""; final Host address = new Host(host, port); try { Command command = request.convert2Command(); - Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); + Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject( - response.getBody(), RollViewLogResponseCommand.class); + RollViewLogResponseCommand rollReviewLog = + JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class); return rollReviewLog.getMsg(); } + return "Roll view log response is null"; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.error( + "Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error, the current thread has been interrupted", + host, port, path, skipLineNum, limit, ex); + return "Roll view log error: " + ex.getMessage(); } catch (Exception e) { - logger.error("roll view log error", e); - } finally { - this.client.closeChannel(address); + logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port, + path, skipLineNum, limit, e); + return "Roll view log error: " + e.getMessage(); } - return result; } /** @@ -126,28 +110,31 @@ public class LogClientService implements AutoCloseable { * @return log content */ public String viewLog(String host, int port, String path) { - logger.info("view log path {}", path); + logger.info("View log from host: {}, port: {}, logPath: {}", host, port, path); ViewLogRequestCommand request = new ViewLogRequestCommand(path); - String result = ""; final Host address = new Host(host, port); try { if (NetUtils.getHost().equals(host)) { - result = LoggerUtils.readWholeFileContent(request.getPath()); + return LoggerUtils.readWholeFileContent(request.getPath()); } else { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - ViewLogResponseCommand viewLog = JSONUtils.parseObject( - response.getBody(), ViewLogResponseCommand.class); - result = viewLog.getMsg(); + ViewLogResponseCommand viewLog = + JSONUtils.parseObject(response.getBody(), ViewLogResponseCommand.class); + return viewLog.getMsg(); } + return "View log response is null"; } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.error("View log from host: {}, port: {}, logPath: {} error, the current thread has been interrupted", + host, port, path, ex); + return "View log error: " + ex.getMessage(); } catch (Exception e) { - logger.error("view log error", e); - } finally { - this.client.closeChannel(address); + logger.error("View log from host: {}, port: {}, logPath: {} error", host, port, path, e); + return "View log error: " + e.getMessage(); } - return result; } /** @@ -159,23 +146,28 @@ public class LogClientService implements AutoCloseable { * @return log content bytes */ public byte[] getLogBytes(String host, int port, String path) { - logger.info("log path {}", path); + logger.info("Get log bytes from host: {}, port: {}, logPath {}", host, port, path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); final Host address = new Host(host, port); try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - GetLogBytesResponseCommand getLog = JSONUtils.parseObject( - response.getBody(), GetLogBytesResponseCommand.class); - return getLog.getData() == null ? new byte[0] : getLog.getData(); + GetLogBytesResponseCommand getLog = + JSONUtils.parseObject(response.getBody(), GetLogBytesResponseCommand.class); + return getLog.getData() == null ? EMPTY_BYTE_ARRAY : getLog.getData(); } + return EMPTY_BYTE_ARRAY; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.error( + "Get logSize from host: {}, port: {}, logPath: {} error, the current thread has been interrupted", + host, port, path, ex); + return EMPTY_BYTE_ARRAY; } catch (Exception e) { - logger.error("get log size error", e); - } finally { - this.client.closeChannel(address); + logger.error("Get logSize from host: {}, port: {}, logPath: {} error", host, port, path, e); + return EMPTY_BYTE_ARRAY; } - return new byte[0]; } /** @@ -187,24 +179,28 @@ public class LogClientService implements AutoCloseable { * @return remove task status */ public Boolean removeTaskLog(String host, int port, String path) { - logger.info("log path {}", path); + logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path); RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path); - Boolean result = false; final Host address = new Host(host, port); try { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject( - response.getBody(), RemoveTaskLogResponseCommand.class); + RemoveTaskLogResponseCommand taskLogResponse = + JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class); return taskLogResponse.getStatus(); } + return false; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.error( + "Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted", + host, port, path, ex); + return false; } catch (Exception e) { - logger.error("remove task log error", e); - } finally { - this.client.closeChannel(address); + logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e); + return false; } - return result; } public @Nullable List getAppIds(@NonNull String host, int port, @@ -212,26 +208,25 @@ public class LogClientService implements AutoCloseable { logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath); final Host workerAddress = new Host(host, port); List appIds = null; - try { - if (NetUtils.getHost().equals(host)) { - appIds = new ArrayList<>(LogUtils.getAppIdsFromLogFile(taskLogFilePath)); - } else { - final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command(); - Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT); - if (response != null) { - GetAppIdResponseCommand responseCommand = - JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class); - appIds = responseCommand.getAppIds(); - } + if (NetUtils.getHost().equals(host)) { + appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath); + } else { + final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command(); + Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT); + if (response != null) { + GetAppIdResponseCommand responseCommand = + JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class); + appIds = responseCommand.getAppIds(); } - } finally { - client.closeChannel(workerAddress); } logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath); return appIds; } - public boolean isRunning() { - return isRunning; + @Override + public void close() { + this.client.close(); + logger.info("LogClientService closed"); } + } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java deleted file mode 100644 index f3c1078f07..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.log; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * log asyc callback - */ -public class LogPromise { - - private static final ConcurrentHashMap PROMISES = new ConcurrentHashMap<>(); - - /** - * request unique identification - */ - private long opaque; - - /** - * start timemillis - */ - private final long start; - - /** - * timeout - */ - private final long timeout; - - /** - * latch - */ - private final CountDownLatch latch; - - /** - * result - */ - private Object result; - - public LogPromise(long opaque, long timeout) { - this.opaque = opaque; - this.timeout = timeout; - this.start = System.currentTimeMillis(); - this.latch = new CountDownLatch(1); - PROMISES.put(opaque, this); - } - - /** - * notify client finish - * @param opaque unique identification - * @param result result - */ - public static void notify(long opaque, Object result) { - LogPromise promise = PROMISES.remove(opaque); - if (promise != null) { - promise.doCountDown(result); - } - } - - /** - * countdown - * - * @param result result - */ - private void doCountDown(Object result) { - this.result = result; - this.latch.countDown(); - } - - /** - * whether timeout - * @return timeout - */ - public boolean isTimeout() { - return System.currentTimeMillis() - start > timeout; - } - - /** - * get result - * @return - */ - public Object getResult() { - try { - latch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - PROMISES.remove(opaque); - return this.result; - } - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index a94b01e609..e11ac7820e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; import org.springframework.transaction.annotation.Transactional; @@ -77,7 +78,7 @@ public interface ProcessService { boolean verifyIsNeedCreateCommand(Command command); - ProcessInstance findProcessInstanceDetailById(int processId); + Optional findProcessInstanceDetailById(int processId); List getTaskNodeListByDefinition(long defineCode); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index ebba2d8781..58393891d8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,22 +17,14 @@ package org.apache.dolphinscheduler.service.process; -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; - -import org.apache.commons.lang3.StringUtils; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -134,12 +126,17 @@ import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; -import org.apache.dolphinscheduler.service.log.LogClientService; +import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; -import org.apache.commons.collections.CollectionUtils; - +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -150,23 +147,25 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import io.micrometer.core.annotation.Counted; +import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; /** * process relative dao that some mappers in this. @@ -279,6 +278,9 @@ public class ProcessServiceImpl implements ProcessService { @Autowired private CuringParamsService curingGlobalParamsService; + @Autowired + private LogClient logClient; + /** * handle Command (construct ProcessInstance from Command) , wrapped in transaction * @@ -477,8 +479,8 @@ public class ProcessServiceImpl implements ProcessService { * @return process instance */ @Override - public ProcessInstance findProcessInstanceDetailById(int processId) { - return processInstanceMapper.queryDetailById(processId); + public Optional findProcessInstanceDetailById(int processId) { + return Optional.ofNullable(processInstanceMapper.queryDetailById(processId)); } /** @@ -600,16 +602,14 @@ public class ProcessServiceImpl implements ProcessService { if (CollectionUtils.isEmpty(taskInstanceList)) { return; } - try (LogClientService logClient = new LogClientService()) { - for (TaskInstance taskInstance : taskInstanceList) { - String taskLogPath = taskInstance.getLogPath(); - if (Strings.isNullOrEmpty(taskInstance.getHost())) { - continue; - } - Host host = Host.of(taskInstance.getHost()); - // remove task log from loggerserver - logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath); + for (TaskInstance taskInstance : taskInstanceList) { + String taskLogPath = taskInstance.getLogPath(); + if (Strings.isNullOrEmpty(taskInstance.getHost())) { + continue; } + Host host = Host.of(taskInstance.getHost()); + // remove task log from loggerserver + logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath); } } @@ -752,7 +752,7 @@ public class ProcessServiceImpl implements ProcessService { */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, Command command, - Map cmdParam) throws CodeGenerateException { + Map cmdParam) { ProcessInstance processInstance = new ProcessInstance(processDefinition); processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setProcessDefinitionVersion(processDefinition.getVersion()); @@ -915,8 +915,8 @@ public class ProcessServiceImpl implements ProcessService { * @param host host * @return process instance */ - protected ProcessInstance constructProcessInstance(Command command, - String host) throws CronParseException, CodeGenerateException { + protected @Nullable ProcessInstance constructProcessInstance(Command command, + String host) throws CronParseException, CodeGenerateException { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); @@ -932,7 +932,7 @@ public class ProcessServiceImpl implements ProcessService { if (processInstanceId == 0) { processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); } else { - processInstance = this.findProcessInstanceDetailById(processInstanceId); + processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null); if (processInstance == null) { return null; } @@ -1071,7 +1071,8 @@ public class ProcessServiceImpl implements ProcessService { * * @return ProcessDefinition */ - private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map cmdParam) { + private @Nullable ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, + Map cmdParam) { if (cmdParam != null) { int processInstanceId = 0; if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) { @@ -1083,7 +1084,7 @@ public class ProcessServiceImpl implements ProcessService { } if (processInstanceId != 0) { - ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId); + ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null); if (processInstance == null) { return null; } @@ -1177,7 +1178,8 @@ public class ProcessServiceImpl implements ProcessService { // copy parent instance user def params to sub process.. String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID); if (!Strings.isNullOrEmpty(parentInstanceId)) { - ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); + ProcessInstance parentInstance = + findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)).orElse(null); if (parentInstance != null) { subProcessInstance.setGlobalParams( joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); @@ -3147,7 +3149,7 @@ public class ProcessServiceImpl implements ProcessService { if (task == null) { return; } - ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); + ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null); if (processInstance != null && (processInstance.getState().isFailure() || processInstance.getState().isStop())) { List validTaskList = findValidTaskListByProcessId(processInstance.getId()); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java similarity index 82% rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java index 6a6fb4c3bd..c0b6001303 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java @@ -40,8 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) -@PrepareForTest({LogClientService.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class}) -public class LogClientServiceTest { +@PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class}) +public class LogClientTest { @Test public void testViewLogFromLocal() { @@ -54,8 +54,8 @@ public class LogClientServiceTest { PowerMockito.mockStatic(LoggerUtils.class); PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11"); - LogClientService logClientService = new LogClientService(); - String log = logClientService.viewLog(localMachine, port, path); + LogClient logClient = new LogClient(); + String log = logClient.viewLog(localMachine, port, path); Assert.assertNotNull(log); } @@ -75,8 +75,8 @@ public class LogClientServiceTest { command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8)); PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) .thenReturn(command); - LogClientService logClientService = new LogClientService(); - String log = logClientService.viewLog(localMachine, port, path); + LogClient logClient = new LogClient(); + String log = logClient.viewLog(localMachine, port, path); Assert.assertNotNull(log); } @@ -86,8 +86,8 @@ public class LogClientServiceTest { PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); PowerMockito.doNothing().when(remotingClient).close(); - LogClientService logClientService = new LogClientService(); - logClientService.close(); + LogClient logClient = new LogClient(); + logClient.close(); } @Test @@ -100,8 +100,8 @@ public class LogClientServiceTest { PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) .thenReturn(command); - LogClientService logClientService = new LogClientService(); - String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10); + LogClient logClient = new LogClient(); + String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10); Assert.assertNotNull(msg); } @@ -115,8 +115,8 @@ public class LogClientServiceTest { PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) .thenReturn(command); - LogClientService logClientService = new LogClientService(); - byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log"); + LogClient logClient = new LogClient(); + byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log"); Assert.assertNotNull(logBytes); } @@ -130,14 +130,9 @@ public class LogClientServiceTest { PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) .thenReturn(command); - LogClientService logClientService = new LogClientService(); - Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path"); + LogClient logClient = new LogClient(); + Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path"); Assert.assertTrue(status); } - @Test - public void testIsRunning() { - LogClientService logClientService = new LogClientService(); - Assert.assertTrue(logClientService.isRunning()); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java index 9ee702fd20..5be423d8c6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java @@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import java.util.Set; +import java.util.List; public abstract class AbstractRemoteTask extends AbstractTask { @@ -38,7 +38,7 @@ public abstract class AbstractRemoteTask extends AbstractTask { this.cancelApplication(); } - public abstract Set getApplicationIds() throws TaskException; + public abstract List getApplicationIds() throws TaskException; public abstract void cancelApplication() throws TaskException; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index e51429692f..5e3ec8fab2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import java.util.Set; -import java.util.regex.Matcher; +import java.util.List; import java.util.regex.Pattern; /** @@ -106,24 +105,11 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask { * @return * @throws TaskException */ - public Set getApplicationIds() throws TaskException { + @Override + public List getApplicationIds() throws TaskException { return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger); } - /** - * find app id - * - * @param line line - * @return appid - */ - protected String findAppId(String line) { - Matcher matcher = YARN_APPLICATION_REGEX.matcher(line); - if (matcher.find()) { - return matcher.group(); - } - return null; - } - /** * create command * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index 0df5c3a911..c3833c071a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -23,8 +23,10 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -42,14 +44,14 @@ public class LogUtils { private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); - public Set getAppIdsFromLogFile(@NonNull String logPath) { + public List getAppIdsFromLogFile(@NonNull String logPath) { return getAppIdsFromLogFile(logPath, log); } - public Set getAppIdsFromLogFile(@NonNull String logPath, Logger logger) { + public List getAppIdsFromLogFile(@NonNull String logPath, Logger logger) { File logFile = new File(logPath); if (!logFile.exists() || !logFile.isFile()) { - return Collections.emptySet(); + return Collections.emptyList(); } Set appIds = new HashSet<>(); try (Stream stream = Files.lines(Paths.get(logPath))) { @@ -65,10 +67,10 @@ public class LogUtils { } } }); - return appIds; + return new ArrayList<>(appIds); } catch (IOException e) { logger.error("Get appId from log file erro, logPath: {}", logPath, e); - return Collections.emptySet(); + return Collections.emptyList(); } } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java index bd961d3137..d72a9737bf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java @@ -17,12 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.api.utils; -import java.util.Set; +import java.util.List; import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Sets; +import com.google.common.collect.Lists; public class LogUtilsTest { @@ -31,7 +31,7 @@ public class LogUtilsTest { @Test public void getAppIdsFromLogFile() { - Set appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE); - Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds); + List appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE); + Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java index 51751fe10d..11269d3ac7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -17,13 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.dinky; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.MissingNode; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -31,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; + import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; @@ -45,10 +42,13 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.MissingNode; public class DinkyTask extends AbstractRemoteTask { @@ -73,8 +73,8 @@ public class DinkyTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override @@ -112,20 +112,23 @@ public class DinkyTask extends AbstractRemoteTask { if (!checkResult(jobInstanceInfoResult)) { break; } - String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); + String jobInstanceStatus = + jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); switch (jobInstanceStatus) { case DinkyTaskConstants.STATUS_FINISHED: final int exitStatusCode = mapStatusToExitCode(status); // Use address-taskId as app id setAppIds(String.format("%s-%s", address, taskId)); setExitStatusCode(exitStatusCode); - logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS)); + logger.info("dinky task finished with results: {}", + result.get(DinkyTaskConstants.API_RESULT_DATAS)); finishFlag = true; break; case DinkyTaskConstants.STATUS_FAILED: case DinkyTaskConstants.STATUS_CANCELED: case DinkyTaskConstants.STATUS_UNKNOWN: - errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText()); + errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error") + .asText()); finishFlag = true; break; default: @@ -191,14 +194,14 @@ public class DinkyTask extends AbstractRemoteTask { String address = this.dinkyParameters.getAddress(); String taskId = this.dinkyParameters.getTaskId(); logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}", - this.taskExecutionContext.getTaskInstanceId(), - address, - taskId); + this.taskExecutionContext.getTaskInstanceId(), + address, + taskId); cancelTask(address, taskId); logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}", - this.taskExecutionContext.getTaskInstanceId(), - address, - taskId); + this.taskExecutionContext.getTaskInstanceId(), + address, + taskId); } private JsonNode submitTask(String address, String taskId) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java index a746185805..0b44b7c894 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java @@ -17,6 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.emr; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; + import com.amazonaws.SdkBaseException; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; @@ -31,16 +40,6 @@ import com.amazonaws.services.elasticmapreduce.model.StepStatus; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Sets; -import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.TaskException; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - /** * AddJobFlowSteps task executor * @@ -51,10 +50,9 @@ public class EmrAddStepsTask extends AbstractEmrTask { private String stepId; private final HashSet waitingStateSet = Sets.newHashSet( - StepState.PENDING.toString(), - StepState.CANCEL_PENDING.toString(), - StepState.RUNNING.toString() - ); + StepState.PENDING.toString(), + StepState.CANCEL_PENDING.toString(), + StepState.RUNNING.toString()); /** * constructor @@ -66,8 +64,8 @@ public class EmrAddStepsTask extends AbstractEmrTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override @@ -126,13 +124,16 @@ public class EmrAddStepsTask extends AbstractEmrTask { final AddJobFlowStepsRequest addJobFlowStepsRequest; try { - addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); + addJobFlowStepsRequest = + objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); } catch (JsonProcessingException e) { throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e); } - // When a single task definition is associated with multiple steps, the state tracking will have high complexity. - // Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state. + // When a single task definition is associated with multiple steps, the state tracking will have high + // complexity. + // Therefore, A task definition only supports the association of a single step, which can better ensure the + // reliability of the task state. if (addJobFlowStepsRequest.getSteps().size() > 1) { throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step"); } @@ -178,7 +179,8 @@ public class EmrAddStepsTask extends AbstractEmrTask { @Override public void cancelApplication() throws TaskException { - logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId); + logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", + this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId); CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId); CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest); @@ -187,10 +189,10 @@ public class EmrAddStepsTask extends AbstractEmrTask { } CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList() - .stream() - .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId)) - .findFirst() - .orElseThrow(() -> new EmrTaskException("cancel emr step failed")); + .stream() + .filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId)) + .findFirst() + .orElseThrow(() -> new EmrTaskException("cancel emr step failed")); if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) { throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java index 770fb9f996..38a1177555 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.util.Collections; import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.TimeUnit; import com.amazonaws.SdkBaseException; @@ -57,8 +57,8 @@ public class EmrJobFlowTask extends AbstractEmrTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index f6d4e56815..7e63db7462 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -29,11 +29,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.commons.collections4.CollectionUtils; import java.io.IOException; -import java.util.Collections; import java.util.List; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class FlinkStreamTask extends FlinkTask implements StreamTask { @@ -99,7 +95,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { @Override public void cancelApplication() throws TaskException { - Set appIds = getApplicationIds(); + List appIds = getApplicationIds(); if (CollectionUtils.isEmpty(appIds)) { logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId()); return; @@ -120,7 +116,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { @Override public void savePoint() throws Exception { - Set appIds = getApplicationIds(); + List appIds = getApplicationIds(); if (CollectionUtils.isEmpty(appIds)) { logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId()); return; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index a342b2aac3..397b2bc38c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -19,30 +19,13 @@ package org.apache.dolphinscheduler.plugin.task.flink; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; -import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; + import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -113,34 +96,6 @@ public class FlinkTask extends AbstractYarnTask { return flinkParameters; } - @Override - public Set getApplicationIds() throws TaskException { - Set appIds = new HashSet<>(); - - File file = new File(taskRequest.getLogPath()); - if (!file.exists()) { - return appIds; - } - - /* - * analysis log? get submitted yarn application id - */ - try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) { - String line; - while ((line = br.readLine()) != null) { - String appId = findAppId(line); - if (StringUtils.isNotEmpty(appId)) { - appIds.add(appId); - } - } - } catch (FileNotFoundException e) { - throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath()); - } catch (IOException e) { - throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e); - } - return appIds; - } - /** * find app id * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java index 06ae0b953e..dc0aa6a357 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.hivecli; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -39,7 +38,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; public class HiveCliTask extends AbstractRemoteTask { @@ -59,8 +57,8 @@ public class HiveCliTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 6587fddc31..a83f4081ff 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -17,10 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.jupyter; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -41,7 +38,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; + +import com.fasterxml.jackson.databind.ObjectMapper; public class JupyterTask extends AbstractRemoteTask { @@ -66,8 +64,8 @@ public class JupyterTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java index b405ae5f17..691e9abf3a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java @@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Set; public class K8sTask extends AbstractK8sTask { @@ -59,8 +59,8 @@ public class K8sTask extends AbstractK8sTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override @@ -72,7 +72,7 @@ public class K8sTask extends AbstractK8sTask { protected String buildCommand() { K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters(); Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace()); + Map namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace()); String namespaceName = namespace.get(NAMESPACE_NAME); String clusterName = namespace.get(CLUSTER); k8sTaskMainParameters.setImage(k8sTaskParameters.getImage()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java index 177122847e..debf913431 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java @@ -17,10 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.pigeon; -import org.apache.commons.collections4.CollectionUtils; - import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -28,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import org.apache.commons.collections4.CollectionUtils; import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.apache.http.client.ClientProtocolException; @@ -37,10 +36,7 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.java_websocket.client.WebSocketClient; -import org.java_websocket.handshake.ServerHandshake; -import java.io.IOException; import java.net.HttpURLConnection; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -48,9 +44,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + /** * TIS DataX Task **/ @@ -70,8 +68,8 @@ public class PigeonTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override @@ -103,9 +101,10 @@ public class PigeonTask extends AbstractRemoteTask { ExecResult execState = null; int taskId; WebSocketClient webSocket = null; - try (CloseableHttpClient client = HttpClients.createDefault(); - // trigger to start PIGEON dataX task - CloseableHttpResponse response = client.execute(post)) { + try ( + CloseableHttpClient client = HttpClients.createDefault(); + // trigger to start PIGEON dataX task + CloseableHttpResponse response = client.execute(post)) { triggerResult = processResponse(triggerUrl, response, BizResult.class); if (!triggerResult.isSuccess()) { List errormsg = triggerResult.getErrormsg(); @@ -155,7 +154,8 @@ public class PigeonTask extends AbstractRemoteTask { long costTime = System.currentTimeMillis() - startTime; logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}", targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'"); - setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE); + setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS + : TaskConstants.EXIT_CODE_FAILURE); } catch (Exception e) { logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); @@ -187,15 +187,17 @@ public class PigeonTask extends AbstractRemoteTask { logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId()); final String triggerUrl = getTriggerUrl(); - StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8); + StringEntity entity = + new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8); CancelResult cancelResult = null; HttpPost post = new HttpPost(triggerUrl); addFormUrlencoded(post); post.setEntity(entity); - try (CloseableHttpClient client = HttpClients.createDefault(); - // trigger to start TIS dataX task - CloseableHttpResponse response = client.execute(post)) { + try ( + CloseableHttpClient client = HttpClients.createDefault(); + // trigger to start TIS dataX task + CloseableHttpResponse response = client.execute(post)) { cancelResult = processResponse(triggerUrl, response, CancelResult.class); if (!cancelResult.isSuccess()) { List errormsg = triggerResult.getErrormsg(); @@ -229,6 +231,7 @@ public class PigeonTask extends AbstractRemoteTask { final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId); logger.info("apply ws connection,uri:{}", applyURI); WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) { + @Override public void onOpen(ServerHandshake handshakedata) { logger.info("start to receive remote execute log"); @@ -254,7 +257,8 @@ public class PigeonTask extends AbstractRemoteTask { return webSocketClient; } - private T processResponse(String applyUrl, CloseableHttpResponse response, Class clazz) throws Exception { + private T processResponse(String applyUrl, CloseableHttpResponse response, + Class clazz) throws Exception { StatusLine resStatus = response.getStatusLine(); if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) { throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase()); @@ -272,6 +276,7 @@ public class PigeonTask extends AbstractRemoteTask { } private static class CancelResult extends AjaxResult { + private Object bizresult; @Override @@ -285,6 +290,7 @@ public class PigeonTask extends AbstractRemoteTask { } private static class BizResult extends AjaxResult { + private TriggerBuildResult bizresult; @Override @@ -302,6 +308,7 @@ public class PigeonTask extends AbstractRemoteTask { } private static class StatusResult extends AjaxResult { + private Map bizresult; @Override @@ -351,6 +358,7 @@ public class PigeonTask extends AbstractRemoteTask { } private static class TriggerBuildResult { + private int taskid; public int getTaskid() { @@ -387,6 +395,7 @@ public class PigeonTask extends AbstractRemoteTask { } private static class ExecLog { + private String logType; private String msg; private int taskId; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java index 22c898d355..8299f85cd2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java @@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.spi.utils.PropertyUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Set; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -73,8 +73,8 @@ public class SagemakerTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index b0e8aa5454..3d896adc58 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -17,11 +17,10 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.BooleanUtils; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -33,6 +32,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.BooleanUtils; + import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -42,10 +44,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; - -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS; /** * seatunnel task @@ -82,8 +80,8 @@ public class SeatunnelTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } @Override @@ -180,11 +178,13 @@ public class SeatunnelTask extends AbstractRemoteTask { } private String buildConfigFilePath() { - return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); + return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), + taskExecutionContext.getTaskAppId()); } private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException { - logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); + logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), + taskExecutionContext.getExecutePath()); if (!Files.exists(Paths.get(scriptFile))) { logger.info("generate script file:{}", scriptFile); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index 75b6155587..5c0473154e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -17,11 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.zeppelin; -import com.fasterxml.jackson.databind.ObjectMapper; -import kong.unirest.Unirest; - import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; -import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -29,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils; + import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.ParagraphResult; @@ -39,7 +36,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; + +import kong.unirest.Unirest; + +import com.fasterxml.jackson.databind.ObjectMapper; public class ZeppelinTask extends AbstractRemoteTask { @@ -235,8 +235,8 @@ public class ZeppelinTask extends AbstractRemoteTask { } @Override - public Set getApplicationIds() throws TaskException { - return Collections.emptySet(); + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 927920360a..ed351c3b86 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.processor; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -25,6 +26,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; @@ -42,12 +44,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; -import org.apache.dolphinscheduler.service.log.LogClientService; - -import org.apache.commons.collections.CollectionUtils; - - - +import org.apache.dolphinscheduler.service.log.LogClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -65,15 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); - /** - * task execute manager - */ @Autowired private WorkerManagerThread workerManager; @Autowired private MessageRetryRunner messageRetryRunner; + @Autowired + private LogClient logClient; + /** * task kill process * @@ -92,12 +89,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { logger.info("task kill command : {}", killCommand); int taskInstanceId = killCommand.getTaskInstanceId(); - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); - return; - } + try { + LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); + TaskExecutionContext taskExecutionContext = + TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + if (taskExecutionContext == null) { + logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); + return; + } int processId = taskExecutionContext.getProcessId(); if (processId == 0) { @@ -110,19 +109,22 @@ public class TaskKillProcessor implements NettyRequestProcessor { return; } - // if processId > 0, it should call cancelApplication to cancel remote application too. - this.cancelApplication(taskInstanceId); - Pair> result = doKill(taskExecutionContext); + // if processId > 0, it should call cancelApplication to cancel remote application too. + this.cancelApplication(taskInstanceId); + Pair> result = doKill(taskExecutionContext); taskExecutionContext.setCurrentExecutionStatus( result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); sendTaskKillResponseCommand(channel, taskExecutionContext); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); - logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); + logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); + } finally { + LoggerUtils.removeTaskInstanceIdMDC(); + } } private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { @@ -228,10 +230,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { host, logPath, executePath, tenantCode); return Pair.of(false, Collections.emptyList()); } - try (LogClientService logClient = new LogClientService()) { + try { logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath); List appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath); if (CollectionUtils.isEmpty(appIds)) { + logger.info("The appId is empty"); return Pair.of(true, Collections.emptyList()); } @@ -241,7 +244,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { Thread.currentThread().interrupt(); logger.error("kill yarn job error, the current thread has been interrtpted", e); } catch (Exception e) { - logger.error("kill yarn job error", e); + logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e); } return Pair.of(false, Collections.emptyList()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 275fdf113d..9cb1a1e4f4 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.runner; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.google.common.base.Strings; import lombok.NonNull; import org.apache.dolphinscheduler.common.Constants; @@ -35,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -50,6 +52,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Date; +import java.util.List; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; @@ -120,7 +123,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { if (task != null) { try { task.cancel(); - ProcessUtils.killYarnJob(taskExecutionContext); + List appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath()); + if (CollectionUtils.isNotEmpty(appIds)) { + ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); + } } catch (Exception e) { logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e); }