Browse Source

cherry-pick Refactor LogServiceClient Singleton to avoid repeat creation of NettyClient #11777 (#12542)

Co-authored-by: Wenjun Ruan <wenjun@apache.org>
3.1.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
f3277277f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  4. 97
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  5. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  6. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
  7. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  8. 27
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  9. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  10. 28
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  11. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  12. 6
      dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  13. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  14. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  15. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
  16. 17
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  17. 18
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  18. 106
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  19. 64
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
  20. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  21. 141
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
  22. 108
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
  23. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  24. 88
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  25. 33
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
  26. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
  27. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  28. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
  29. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
  30. 27
      dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
  31. 38
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  32. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
  33. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  34. 47
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  35. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
  36. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  37. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
  38. 37
      dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
  39. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  40. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  41. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  42. 25
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  43. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java

@ -20,48 +20,36 @@ import org.apache.dolphinscheduler.api.enums.Status;
import java.text.MessageFormat; import java.text.MessageFormat;
import lombok.Data;
/** @Data
* service exception
*/
public class ServiceException extends RuntimeException { public class ServiceException extends RuntimeException {
/** private int code;
* code
*/
private Integer code;
public ServiceException() { public ServiceException() {
this(Status.INTERNAL_SERVER_ERROR_ARGS);
} }
public ServiceException(Status status) { public ServiceException(Status status) {
super(status.getMsg()); this(status.getCode(), status.getMsg());
this.code = status.getCode();
} }
public ServiceException(Status status, Object... formatter) { public ServiceException(Status status, Object... formatter) {
super(MessageFormat.format(status.getMsg(), formatter)); this(status.getCode(), MessageFormat.format(status.getMsg(), formatter));
this.code = status.getCode();
}
public ServiceException(Integer code,String message) {
super(message);
this.code = code;
} }
public ServiceException(String message) { public ServiceException(String message) {
super(message); this(Status.INTERNAL_SERVER_ERROR_ARGS, message);
} }
public ServiceException(String message, Exception cause) { public ServiceException(int code, String message) {
super(message, cause); this(code, message, null);
}
public Integer getCode() {
return this.code;
} }
public void setCode(Integer code) { public ServiceException(int code, String message, Exception cause) {
super(message, cause);
this.code = code; this.code = code;
} }
} }

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -31,6 +31,7 @@ import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LEN
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
@ -377,11 +378,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result; return result;
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
if (processInstance == null) { .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@ -1094,7 +1092,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/ */
@Override @Override
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) { public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) { if (processInstance == null) {
return null; return null;
} }

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -66,7 +66,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
private LogClientService logClient; @Autowired
private LogClient logClient;
@Autowired @Autowired
ProjectMapper projectMapper; ProjectMapper projectMapper;
@ -77,20 +78,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired @Autowired
TaskDefinitionMapper taskDefinitionMapper; TaskDefinitionMapper taskDefinitionMapper;
@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
this.logClient = new LogClientService();
}
}
@PreDestroy
public void close() {
if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) {
logClient.close();
}
}
/** /**
* view log * view log
* *

97
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -17,19 +17,10 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; import com.baomidou.mybatisplus.core.metadata.IPage;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -78,9 +69,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service;
import org.apache.commons.lang3.StringUtils; import org.springframework.transaction.annotation.Transactional;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -97,12 +88,18 @@ import java.util.Objects;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import org.springframework.stereotype.Service; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import org.springframework.transaction.annotation.Transactional; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
import com.baomidou.mybatisplus.core.metadata.IPage; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/** /**
* process instance service impl * process instance service impl
@ -228,7 +225,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@ -346,16 +344,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
Integer processId) throws IOException { Integer processId) throws IOException {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
Map<String, Object> result = Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
ProcessDefinition processDefinition = .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
return result; return result;
} }
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId); List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
@ -488,27 +485,22 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
String locations, int timeout, String tenantCode) { String locations, int timeout, String tenantCode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
Map<String, Object> result = Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
//check process instance exists //check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
if (processInstance == null) { .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance exists in project //check process instance exists in project
ProcessDefinition processDefinition0 = ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) { if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result; return result;
} }
//check process instance status //check process instance status
if (!processInstance.getState().isFinished()) { if (!processInstance.getState().isFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update"); processInstance.getName(), processInstance.getState().toString(), "update");
return result; return result;
} }
@ -626,11 +618,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId); ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId)
if (subInstance == null) { .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId));
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
return result;
}
if (subInstance.getIsSubProcess() == Flag.NO) { if (subInstance.getIsSubProcess() == Flag.NO) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName()); putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result; return result;
@ -666,23 +655,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
if (null == processInstance) { .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
return result;
}
// check process instance status // check process instance status
if (!processInstance.getState().isFinished()) { if (!processInstance.getState().isFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete");
processInstance.getName(), processInstance.getState().toString(), "delete");
return result;
} }
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId)); throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
} }
try { try {
@ -728,7 +711,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition = ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result; return result;
} }
@ -817,7 +800,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()); processInstance.getProcessDefinitionVersion());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result; return result;
} }
GanttDto ganttDto = new GanttDto(); GanttDto ganttDto = new GanttDto();

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -78,9 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.cronutils.model.Cron; import com.cronutils.model.Cron;
/**
* scheduler service impl
*/
@Service @Service
public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService { public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService {
@ -382,9 +379,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result; return result;
} }
} catch (Exception e) { } catch (Exception e) {
result.put(Constants.MSG, Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); : Status.OFFLINE_SCHEDULE_ERROR;
throw new ServiceException(result.get(Constants.MSG).toString(), e); result.put(Constants.STATUS, status);
throw new ServiceException(status, e);
} }
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java

@ -17,25 +17,27 @@
package org.apache.dolphinscheduler.api.exceptions; package org.apache.dolphinscheduler.api.exceptions;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class ServiceExceptionTest { public class ServiceExceptionTest {
@Test @Test
public void getCodeTest() { public void getCodeTest() {
ServiceException serviceException = new ServiceException(); ServiceException serviceException = new ServiceException();
Assert.assertNull(serviceException.getCode()); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), serviceException.getCode());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST); serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
Assert.assertNotNull(serviceException.getCode()); Assert.assertEquals(Status.ALERT_GROUP_EXIST.getCode(), serviceException.getCode());
serviceException = new ServiceException(10012, "alarm group already exists"); serviceException = new ServiceException(10012, "alarm group already exists");
Assert.assertNotNull(serviceException.getCode()); Assert.assertEquals(10012, serviceException.getCode());
} }
@Test @Test
public void getMessageTest() { public void getMessageTest() {
ServiceException serviceException = new ServiceException(); ServiceException serviceException = new ServiceException();
Assert.assertNull(serviceException.getMessage()); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), serviceException.getMessage());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST); serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
Assert.assertNotNull(serviceException.getMessage()); Assert.assertNotNull(serviceException.getMessage());

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -66,6 +66,7 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.assertj.core.util.Lists; import org.assertj.core.util.Lists;
import org.junit.Assert; import org.junit.Assert;
@ -193,7 +194,7 @@ public class ExecutorServiceTest {
doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null)); doReturn(1).when(processService).createCommand(argThat(c -> c.getId() == null));
doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null)); doReturn(0).when(processService).createCommand(argThat(c -> c.getId() != null));
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList()); Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance); Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance));
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition); Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue); Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance); Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);

27
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
@ -28,15 +31,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.InjectMocks; import org.mockito.InjectMocks;
@ -47,9 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
/** /**
* logger service test * logger service test
*/ */
@ -74,10 +73,8 @@ public class LoggerServiceTest {
@Mock @Mock
private TaskDefinitionMapper taskDefinitionMapper; private TaskDefinitionMapper taskDefinitionMapper;
@Before @Mock
public void init() { private LogClient logClient;
this.loggerService.init();
}
@Test @Test
public void testQueryDataSourceList() { public void testQueryDataSourceList() {
@ -132,6 +129,8 @@ public class LoggerServiceTest {
taskInstance.setLogPath("/temp/log"); taskInstance.setLogPath("/temp/log");
// if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage, // if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
// so no assert will be added here // so no assert will be added here
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);
loggerService.getLogBytes(1); loggerService.getLogBytes(1);
} }
@ -184,17 +183,15 @@ public class LoggerServiceTest {
taskInstance.setId(1); taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080"); taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log"); taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,DOWNLOAD_LOG )).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
.thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);
loggerService.getLogBytes(loginUser, projectCode, 1); loggerService.getLogBytes(loginUser, projectCode, 1);
} }
@After
public void close() {
this.loggerService.close();
}
/** /**
* get mock Project * get mock Project
* *

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -200,7 +200,7 @@ public class ProcessDefinitionServiceTest {
.checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION); .checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0); processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
} catch (ServiceException serviceException) { } catch (ServiceException serviceException) {
Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue()); Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode());
} }
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();

28
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
@ -70,6 +71,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -308,7 +310,8 @@ public class ProcessInstanceServiceTest {
processDefinition.setProjectCode(projectCode); processDefinition.setProjectCode(projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition); processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1); Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1);
@ -353,7 +356,8 @@ public class ProcessInstanceServiceTest {
res.setData("xxx"); res.setData("xxx");
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList); when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res); when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@ -451,14 +455,18 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try {
Map<String, Object> processInstanceNullRes = Map<String, Object> processInstanceNullRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1, processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, ""); shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS)); Assert.fail();
} catch (ServiceException ex) {
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// process instance not finish // process instance not finish
when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceNotFinishRes = Map<String, Object> processInstanceNotFinishRes =
@ -523,16 +531,20 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try {
Map<String, Object> processInstanceNullRes = Map<String, Object> processInstanceNullRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
} catch (ServiceException ex) {
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// not sub process // not sub process
ProcessInstance processInstance = getProcessInstance(); ProcessInstance processInstance = getProcessInstance();
processInstance.setIsSubProcess(Flag.NO); processInstance.setIsSubProcess(Flag.NO);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance); when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
Map<String, Object> notSubProcessRes = Map<String, Object> notSubProcessRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1); processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS)); Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS));

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -51,6 +51,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -158,7 +159,7 @@ public class TaskInstanceServiceTest {
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn); eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())) when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()))
.thenReturn(processInstance); .thenReturn(Optional.of(processInstance));
Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "", Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20); "test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);

6
dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -46,10 +46,8 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -168,9 +166,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
if (!checkPathSecurity(logPath)) { if (!checkPathSecurity(logPath)) {
throw new IllegalArgumentException("Illegal path"); throw new IllegalArgumentException("Illegal path");
} }
Set<String> appIds = LogUtils.getAppIdsFromLogFile(logPath); List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
channel.writeAndFlush( channel.writeAndFlush(
new GetAppIdResponseCommand(new ArrayList<>(appIds)).convert2Command(command.getOpaque())); new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
break; break;
default: default:
throw new IllegalArgumentException("unknown commandType: " + commandType); throw new IllegalArgumentException("unknown commandType: " + commandType);

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -73,17 +73,21 @@ public class MasterFailoverService {
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager; private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private final LogClient logClient;
public MasterFailoverService(@NonNull RegistryClient registryClient, public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig, @NonNull MasterConfig masterConfig,
@NonNull ProcessService processService, @NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager, @NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) { @NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
@NonNull LogClient logClient) {
this.registryClient = registryClient; this.registryClient = registryClient;
this.masterConfig = masterConfig; this.masterConfig = masterConfig;
this.processService = processService; this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager; this.nettyExecutorManager = nettyExecutorManager;
this.localAddress = masterConfig.getMasterAddress();
this.processInstanceExecCacheManager = processInstanceExecCacheManager; this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.logClient = logClient;
} }
@ -233,7 +237,7 @@ public class MasterFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) { if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job"); LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(logClient, taskExecutionContext);
} }
// kill worker task, When the master failover and worker failover happened in the same time, // kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE. // the task may not be failover if we don't set NEED_FAULT_TOLERANCE.

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -17,13 +17,16 @@
package org.apache.dolphinscheduler.server.master.service; package org.apache.dolphinscheduler.server.master.service;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -37,13 +40,14 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.apache.commons.collections4.CollectionUtils; import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -52,14 +56,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import lombok.NonNull;
@Service @Service
public class WorkerFailoverService { public class WorkerFailoverService {
@ -70,19 +66,22 @@ public class WorkerFailoverService {
private final ProcessService processService; private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool; private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager; private final ProcessInstanceExecCacheManager cacheManager;
private final LogClient logClient;
private final String localAddress; private final String localAddress;
public WorkerFailoverService(@NonNull RegistryClient registryClient, public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig, @NonNull MasterConfig masterConfig,
@NonNull ProcessService processService, @NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool, @NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager) { @NonNull ProcessInstanceExecCacheManager cacheManager,
@NonNull LogClient logClient) {
this.registryClient = registryClient; this.registryClient = registryClient;
this.masterConfig = masterConfig; this.masterConfig = masterConfig;
this.processService = processService; this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool; this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager; this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.logClient = logClient;
this.localAddress = masterConfig.getMasterAddress();
} }
/** /**
@ -172,7 +171,7 @@ public class WorkerFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) { if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job"); LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(logClient, taskExecutionContext);
} }
} else { } else {
LOGGER.info("The failover taskInstance is a master task"); LOGGER.info("The failover taskInstance is a master task");

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java

@ -63,8 +63,7 @@ public class DataQualityResultOperator {
if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) { if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
ProcessInstance processInstance = ProcessInstance processInstance =
processService.findProcessInstanceDetailById( processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()).orElse(null);
Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId())));
// when the task is failure or cancel, will delete the execute result and statistics value // when the task is failure or cancel, will delete the execute result and statistics value
if (taskResponseEvent.getState().isFailure() if (taskResponseEvent.getState().isFailure()

17
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -25,8 +25,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask; import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -34,7 +32,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ScheduledExecutorService; import java.util.Optional;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -58,26 +56,17 @@ public class MasterRegistryClientTest {
@InjectMocks @InjectMocks
private MasterRegistryClient masterRegistryClient; private MasterRegistryClient masterRegistryClient;
@Mock
private MasterConfig masterConfig;
@Mock @Mock
private RegistryClient registryClient; private RegistryClient registryClient;
@Mock
private ScheduledExecutorService heartBeatExecutor;
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private MasterConnectStrategy masterConnectStrategy;
@Mock @Mock
private MasterHeartBeatTask masterHeartBeatTask; private MasterHeartBeatTask masterHeartBeatTask;
@Mock @Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager; private MasterConfig masterConfig;
@Before @Before
public void before() throws Exception { public void before() throws Exception {
@ -105,7 +94,7 @@ public class MasterRegistryClientTest {
taskInstance.setHost("127.0.0.1:8080"); taskInstance.setHost("127.0.0.1:8080");
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString())) given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
.willReturn(Arrays.asList(taskInstance)); .willReturn(Arrays.asList(taskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(Optional.of(processInstance));
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true); given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
Server server = new Server(); Server server = new Server();
server.setHost("127.0.0.1"); server.setHost("127.0.0.1");

18
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE; import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -38,12 +37,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -89,6 +90,9 @@ public class FailoverServiceTest {
@Mock @Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Mock
private LogClient logClient;
private static int masterPort = 5678; private static int masterPort = 5678;
private static int workerPort = 1234; private static int workerPort = 1234;
@ -106,17 +110,20 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext); springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort); given(masterConfig.getListenPort()).willReturn(masterPort);
testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
MasterFailoverService masterFailoverService = MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager); new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig, masterConfig,
processService, processService,
workflowExecuteThreadPool, workflowExecuteThreadPool,
cacheManager); cacheManager,
logClient);
failoverService = new FailoverService(masterFailoverService, workerFailoverService); failoverService = new FailoverService(masterFailoverService, workerFailoverService);
testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0]; String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]); int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port); Assert.assertEquals(masterPort, port);
@ -158,7 +165,8 @@ public class FailoverServiceTest {
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class)); doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(processService.findValidTaskListByProcessId(Mockito.anyInt())) given(processService.findValidTaskListByProcessId(Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance)); .willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance); given(processService.findProcessInstanceDetailById(Mockito.anyInt()))
.willReturn(Optional.ofNullable(processInstance));
Thread.sleep(1000); Thread.sleep(1000);
Server masterServer = new Server(); Server masterServer = new Server();

106
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -17,6 +17,17 @@
package org.apache.dolphinscheduler.remote; package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -35,6 +46,8 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils; import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -43,112 +56,48 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
/** public class NettyRemotingClient implements AutoCloseable {
* remoting netty client
*/
public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* client bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder(); private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128); private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final EventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
/**
* client config
*/
private final NettyClientConfig clientConfig; private final NettyClientConfig clientConfig;
/**
* saync semaphore
*/
private final Semaphore asyncSemaphore = new Semaphore(200, true); private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
* callback thread executor
*/
private final ExecutorService callbackExecutor; private final ExecutorService callbackExecutor;
/**
* client handler
*/
private final NettyClientHandler clientHandler; private final NettyClientHandler clientHandler;
/**
* response future executor
*/
private final ScheduledExecutorService responseFutureExecutor; private final ScheduledExecutorService responseFutureExecutor;
/**
* client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig) { public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
if (Epoll.isAvailable()) { if (Epoll.isAvailable()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
} else { } else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
private final AtomicInteger threadIndex = new AtomicInteger(0); }
this.callbackExecutor = new ThreadPoolExecutor(
@Override Constants.CPUS,
public Thread newThread(Runnable r) { Constants.CPUS,
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); 1,
} TimeUnit.MINUTES,
}); new LinkedBlockingQueue<>(1000),
} new NamedThreadFactory("CallbackExecutor"),
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy()); new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.clientHandler = new NettyClientHandler(this, callbackExecutor);
@ -157,9 +106,6 @@ public class NettyRemotingClient {
this.start(); this.start();
} }
/**
* start
*/
private void start() { private void start() {
this.bootstrap this.bootstrap
@ -371,9 +317,7 @@ public class NettyRemotingClient {
return null; return null;
} }
/** @Override
* close
*/
public void close() { public void close() {
if (isStarted.compareAndSet(true, false)) { if (isStarted.compareAndSet(true, false)) {
try { try {

64
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java

@ -17,88 +17,52 @@
package org.apache.dolphinscheduler.remote.config; package org.apache.dolphinscheduler.remote.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
/** @Data
* netty client config @Builder
*/ @NoArgsConstructor
@AllArgsConstructor
public class NettyClientConfig { public class NettyClientConfig {
/** /**
* worker threadsdefault get machine cpus * worker threadsdefault get machine cpus
*/ */
@Builder.Default
private int workerThreads = Constants.CPUS; private int workerThreads = Constants.CPUS;
/** /**
* whether tpc delay * whether tpc delay
*/ */
@Builder.Default
private boolean tcpNoDelay = true; private boolean tcpNoDelay = true;
/** /**
* whether keep alive * whether keep alive
*/ */
@Builder.Default
private boolean soKeepalive = true; private boolean soKeepalive = true;
/** /**
* send buffer size * send buffer size
*/ */
@Builder.Default
private int sendBufferSize = 65535; private int sendBufferSize = 65535;
/** /**
* receive buffer size * receive buffer size
*/ */
@Builder.Default
private int receiveBufferSize = 65535; private int receiveBufferSize = 65535;
/** /**
* connect timeout millis * connect timeout millis
*/ */
@Builder.Default
private int connectTimeoutMillis = 3000; private int connectTimeoutMillis = 3000;
public int getWorkerThreads() {
return workerThreads;
}
public void setWorkerThreads(int workerThreads) {
this.workerThreads = workerThreads;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public boolean isSoKeepalive() {
return soKeepalive;
}
public void setSoKeepalive(boolean soKeepalive) {
this.soKeepalive = soKeepalive;
}
public int getSendBufferSize() {
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
public int getReceiveBufferSize() {
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
} }

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -30,10 +30,11 @@ import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -186,17 +187,15 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @return yarn application ids * @return yarn application ids
*/ */
public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) { public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) { if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
List<String> appIds;
try (LogClientService logClient = new LogClientService()) {
Host host = Host.of(taskExecutionContext.getHost()); Host host = Host.of(taskExecutionContext.getHost());
appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath()); List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
}
if (CollectionUtils.isNotEmpty(appIds)) { if (CollectionUtils.isNotEmpty(appIds)) {
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext taskExecutionContext

141
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java

@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -46,43 +45,23 @@ import lombok.NonNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/** @Service
* log client public class LogClient implements AutoCloseable {
*/
public class LogClientService implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
private final NettyClientConfig clientConfig; private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private final NettyRemotingClient client; private final NettyRemotingClient client;
private volatile boolean isRunning;
/**
* request time out
*/
private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L; private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L;
/** public LogClient() {
* construct client NettyClientConfig nettyClientConfig = new NettyClientConfig();
*/ this.client = new NettyRemotingClient(nettyClientConfig);
public LogClientService() { logger.info("Initialized LogClientService with config: {}", nettyClientConfig);
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
}
/**
* close
*/
@Override
public void close() {
this.client.close();
this.isRunning = false;
logger.info("logger client closed");
} }
/** /**
@ -96,25 +75,30 @@ public class LogClientService implements AutoCloseable {
* @return log content * @return log content
*/ */
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) { public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
skipLineNum, limit); skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject( RollViewLogResponseCommand rollReviewLog =
response.getBody(), RollViewLogResponseCommand.class); JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg(); return rollReviewLog.getMsg();
} }
return "Roll view log response is null";
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error, the current thread has been interrupted",
host, port, path, skipLineNum, limit, ex);
return "Roll view log error: " + ex.getMessage();
} catch (Exception e) { } catch (Exception e) {
logger.error("roll view log error", e); logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port,
} finally { path, skipLineNum, limit, e);
this.client.closeChannel(address); return "Roll view log error: " + e.getMessage();
} }
return result;
} }
/** /**
@ -126,28 +110,31 @@ public class LogClientService implements AutoCloseable {
* @return log content * @return log content
*/ */
public String viewLog(String host, int port, String path) { public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path); logger.info("View log from host: {}, port: {}, logPath: {}", host, port, path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path); ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
if (NetUtils.getHost().equals(host)) { if (NetUtils.getHost().equals(host)) {
result = LoggerUtils.readWholeFileContent(request.getPath()); return LoggerUtils.readWholeFileContent(request.getPath());
} else { } else {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
ViewLogResponseCommand viewLog = JSONUtils.parseObject( ViewLogResponseCommand viewLog =
response.getBody(), ViewLogResponseCommand.class); JSONUtils.parseObject(response.getBody(), ViewLogResponseCommand.class);
result = viewLog.getMsg(); return viewLog.getMsg();
} }
return "View log response is null";
} }
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error("View log from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return "View log error: " + ex.getMessage();
} catch (Exception e) { } catch (Exception e) {
logger.error("view log error", e); logger.error("View log from host: {}, port: {}, logPath: {} error", host, port, path, e);
} finally { return "View log error: " + e.getMessage();
this.client.closeChannel(address);
} }
return result;
} }
/** /**
@ -159,23 +146,28 @@ public class LogClientService implements AutoCloseable {
* @return log content bytes * @return log content bytes
*/ */
public byte[] getLogBytes(String host, int port, String path) { public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path); logger.info("Get log bytes from host: {}, port: {}, logPath {}", host, port, path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
GetLogBytesResponseCommand getLog = JSONUtils.parseObject( GetLogBytesResponseCommand getLog =
response.getBody(), GetLogBytesResponseCommand.class); JSONUtils.parseObject(response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData() == null ? new byte[0] : getLog.getData(); return getLog.getData() == null ? EMPTY_BYTE_ARRAY : getLog.getData();
} }
return EMPTY_BYTE_ARRAY;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Get logSize from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return EMPTY_BYTE_ARRAY;
} catch (Exception e) { } catch (Exception e) {
logger.error("get log size error", e); logger.error("Get logSize from host: {}, port: {}, logPath: {} error", host, port, path, e);
} finally { return EMPTY_BYTE_ARRAY;
this.client.closeChannel(address);
} }
return new byte[0];
} }
/** /**
@ -187,24 +179,28 @@ public class LogClientService implements AutoCloseable {
* @return remove task status * @return remove task status
*/ */
public Boolean removeTaskLog(String host, int port, String path) { public Boolean removeTaskLog(String host, int port, String path) {
logger.info("log path {}", path); logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path); RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
Boolean result = false;
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject( RemoveTaskLogResponseCommand taskLogResponse =
response.getBody(), RemoveTaskLogResponseCommand.class); JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus(); return taskLogResponse.getStatus();
} }
return false;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return false;
} catch (Exception e) { } catch (Exception e) {
logger.error("remove task log error", e); logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
} finally { return false;
this.client.closeChannel(address);
} }
return result;
} }
public @Nullable List<String> getAppIds(@NonNull String host, int port, public @Nullable List<String> getAppIds(@NonNull String host, int port,
@ -212,9 +208,8 @@ public class LogClientService implements AutoCloseable {
logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath); logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath);
final Host workerAddress = new Host(host, port); final Host workerAddress = new Host(host, port);
List<String> appIds = null; List<String> appIds = null;
try {
if (NetUtils.getHost().equals(host)) { if (NetUtils.getHost().equals(host)) {
appIds = new ArrayList<>(LogUtils.getAppIdsFromLogFile(taskLogFilePath)); appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
} else { } else {
final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command(); final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
@ -224,14 +219,14 @@ public class LogClientService implements AutoCloseable {
appIds = responseCommand.getAppIds(); appIds = responseCommand.getAppIds();
} }
} }
} finally {
client.closeChannel(workerAddress);
}
logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath); logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath);
return appIds; return appIds;
} }
public boolean isRunning() { @Override
return isRunning; public void close() {
this.client.close();
logger.info("LogClientService closed");
} }
} }

108
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.log;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* log asyc callback
*/
public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
/**
* request unique identification
*/
private long opaque;
/**
* start timemillis
*/
private final long start;
/**
* timeout
*/
private final long timeout;
/**
* latch
*/
private final CountDownLatch latch;
/**
* result
*/
private Object result;
public LogPromise(long opaque, long timeout) {
this.opaque = opaque;
this.timeout = timeout;
this.start = System.currentTimeMillis();
this.latch = new CountDownLatch(1);
PROMISES.put(opaque, this);
}
/**
* notify client finish
* @param opaque unique identification
* @param result result
*/
public static void notify(long opaque, Object result) {
LogPromise promise = PROMISES.remove(opaque);
if (promise != null) {
promise.doCountDown(result);
}
}
/**
* countdown
*
* @param result result
*/
private void doCountDown(Object result) {
this.result = result;
this.latch.countDown();
}
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout() {
return System.currentTimeMillis() - start > timeout;
}
/**
* get result
* @return
*/
public Object getResult() {
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
PROMISES.remove(opaque);
return this.result;
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@ -77,7 +78,7 @@ public interface ProcessService {
boolean verifyIsNeedCreateCommand(Command command); boolean verifyIsNeedCreateCommand(Command command);
ProcessInstance findProcessInstanceDetailById(int processId); Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode); List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);

88
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,22 +17,14 @@
package org.apache.dolphinscheduler.service.process; package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; import com.fasterxml.jackson.core.type.TypeReference;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; import com.fasterxml.jackson.databind.node.ObjectNode;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; import com.google.common.base.Joiner;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; import com.google.common.base.Strings;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; import com.google.common.collect.Lists;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import io.micrometer.core.annotation.Counted;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; import org.apache.commons.collections.CollectionUtils;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
@ -134,12 +126,17 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.apache.commons.collections.CollectionUtils; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
@ -150,23 +147,25 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger; import static java.util.stream.Collectors.toSet;
import org.slf4j.LoggerFactory; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import org.springframework.beans.factory.annotation.Autowired; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import org.springframework.stereotype.Component; import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import org.springframework.transaction.annotation.Transactional; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import com.fasterxml.jackson.core.type.TypeReference; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import com.fasterxml.jackson.databind.node.ObjectNode; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import com.google.common.base.Joiner; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import com.google.common.base.Strings; import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import com.google.common.collect.Lists; import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import io.micrometer.core.annotation.Counted; import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/** /**
* process relative dao that some mappers in this. * process relative dao that some mappers in this.
@ -279,6 +278,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired @Autowired
private CuringParamsService curingGlobalParamsService; private CuringParamsService curingGlobalParamsService;
@Autowired
private LogClient logClient;
/** /**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction * handle Command (construct ProcessInstance from Command) , wrapped in transaction
* *
@ -477,8 +479,8 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance * @return process instance
*/ */
@Override @Override
public ProcessInstance findProcessInstanceDetailById(int processId) { public Optional<ProcessInstance> findProcessInstanceDetailById(int processId) {
return processInstanceMapper.queryDetailById(processId); return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
} }
/** /**
@ -600,7 +602,6 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isEmpty(taskInstanceList)) { if (CollectionUtils.isEmpty(taskInstanceList)) {
return; return;
} }
try (LogClientService logClient = new LogClientService()) {
for (TaskInstance taskInstance : taskInstanceList) { for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath(); String taskLogPath = taskInstance.getLogPath();
if (Strings.isNullOrEmpty(taskInstance.getHost())) { if (Strings.isNullOrEmpty(taskInstance.getHost())) {
@ -611,7 +612,6 @@ public class ProcessServiceImpl implements ProcessService {
logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath); logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
} }
} }
}
/** /**
* recursive delete all task instance by process instance id * recursive delete all task instance by process instance id
@ -752,7 +752,7 @@ public class ProcessServiceImpl implements ProcessService {
*/ */
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
Command command, Command command,
Map<String, String> cmdParam) throws CodeGenerateException { Map<String, String> cmdParam) {
ProcessInstance processInstance = new ProcessInstance(processDefinition); ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion()); processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@ -915,7 +915,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host * @param host host
* @return process instance * @return process instance
*/ */
protected ProcessInstance constructProcessInstance(Command command, protected @Nullable ProcessInstance constructProcessInstance(Command command,
String host) throws CronParseException, CodeGenerateException { String host) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance; ProcessInstance processInstance;
ProcessDefinition processDefinition; ProcessDefinition processDefinition;
@ -932,7 +932,7 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstanceId == 0) { if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam); processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else { } else {
processInstance = this.findProcessInstanceDetailById(processInstanceId); processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) { if (processInstance == null) {
return null; return null;
} }
@ -1071,7 +1071,8 @@ public class ProcessServiceImpl implements ProcessService {
* *
* @return ProcessDefinition * @return ProcessDefinition
*/ */
private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) { private @Nullable ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode,
Map<String, String> cmdParam) {
if (cmdParam != null) { if (cmdParam != null) {
int processInstanceId = 0; int processInstanceId = 0;
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) { if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
@ -1083,7 +1084,7 @@ public class ProcessServiceImpl implements ProcessService {
} }
if (processInstanceId != 0) { if (processInstanceId != 0) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId); ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) { if (processInstance == null) {
return null; return null;
} }
@ -1177,7 +1178,8 @@ public class ProcessServiceImpl implements ProcessService {
// copy parent instance user def params to sub process.. // copy parent instance user def params to sub process..
String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID); String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
if (!Strings.isNullOrEmpty(parentInstanceId)) { if (!Strings.isNullOrEmpty(parentInstanceId)) {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)); ProcessInstance parentInstance =
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)).orElse(null);
if (parentInstance != null) { if (parentInstance != null) {
subProcessInstance.setGlobalParams( subProcessInstance.setGlobalParams(
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams())); joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
@ -3147,7 +3149,7 @@ public class ProcessServiceImpl implements ProcessService {
if (task == null) { if (task == null) {
return; return;
} }
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()); ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) { && (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId()); List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());

33
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java

@ -40,8 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({LogClientService.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class}) @PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
public class LogClientServiceTest { public class LogClientTest {
@Test @Test
public void testViewLogFromLocal() { public void testViewLogFromLocal() {
@ -54,8 +54,8 @@ public class LogClientServiceTest {
PowerMockito.mockStatic(LoggerUtils.class); PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11"); PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
String log = logClientService.viewLog(localMachine, port, path); String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log); Assert.assertNotNull(log);
} }
@ -75,8 +75,8 @@ public class LogClientServiceTest {
command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8)); command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command); .thenReturn(command);
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
String log = logClientService.viewLog(localMachine, port, path); String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log); Assert.assertNotNull(log);
} }
@ -86,8 +86,8 @@ public class LogClientServiceTest {
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient); PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
PowerMockito.doNothing().when(remotingClient).close(); PowerMockito.doNothing().when(remotingClient).close();
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
logClientService.close(); logClient.close();
} }
@Test @Test
@ -100,8 +100,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command); .thenReturn(command);
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10); String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
Assert.assertNotNull(msg); Assert.assertNotNull(msg);
} }
@ -115,8 +115,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command); .thenReturn(command);
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log"); byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log");
Assert.assertNotNull(logBytes); Assert.assertNotNull(logBytes);
} }
@ -130,14 +130,9 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong())) PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command); .thenReturn(command);
LogClientService logClientService = new LogClientService(); LogClient logClient = new LogClient();
Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path"); Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
Assert.assertTrue(status); Assert.assertTrue(status);
} }
@Test
public void testIsRunning() {
LogClientService logClientService = new LogClientService();
Assert.assertTrue(logClientService.isRunning());
}
} }

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Set; import java.util.List;
public abstract class AbstractRemoteTask extends AbstractTask { public abstract class AbstractRemoteTask extends AbstractTask {
@ -38,7 +38,7 @@ public abstract class AbstractRemoteTask extends AbstractTask {
this.cancelApplication(); this.cancelApplication();
} }
public abstract Set<String> getApplicationIds() throws TaskException; public abstract List<String> getApplicationIds() throws TaskException;
public abstract void cancelApplication() throws TaskException; public abstract void cancelApplication() throws TaskException;

20
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.Set; import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
@ -106,24 +105,11 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
* @return * @return
* @throws TaskException * @throws TaskException
*/ */
public Set<String> getApplicationIds() throws TaskException { @Override
public List<String> getApplicationIds() throws TaskException {
return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger); return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger);
} }
/**
* find app id
*
* @param line line
* @return appid
*/
protected String findAppId(String line) {
Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group();
}
return null;
}
/** /**
* create command * create command
* *

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java

@ -23,8 +23,10 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -42,14 +44,14 @@ public class LogUtils {
private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
public Set<String> getAppIdsFromLogFile(@NonNull String logPath) { public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
return getAppIdsFromLogFile(logPath, log); return getAppIdsFromLogFile(logPath, log);
} }
public Set<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) { public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
File logFile = new File(logPath); File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) { if (!logFile.exists() || !logFile.isFile()) {
return Collections.emptySet(); return Collections.emptyList();
} }
Set<String> appIds = new HashSet<>(); Set<String> appIds = new HashSet<>();
try (Stream<String> stream = Files.lines(Paths.get(logPath))) { try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
@ -65,10 +67,10 @@ public class LogUtils {
} }
} }
}); });
return appIds; return new ArrayList<>(appIds);
} catch (IOException e) { } catch (IOException e) {
logger.error("Get appId from log file erro, logPath: {}", logPath, e); logger.error("Get appId from log file erro, logPath: {}", logPath, e);
return Collections.emptySet(); return Collections.emptyList();
} }
} }
} }

8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java

@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.api.utils; package org.apache.dolphinscheduler.plugin.task.api.utils;
import java.util.Set; import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Sets; import com.google.common.collect.Lists;
public class LogUtilsTest { public class LogUtilsTest {
@ -31,7 +31,7 @@ public class LogUtilsTest {
@Test @Test
public void getAppIdsFromLogFile() { public void getAppIdsFromLogFile() {
Set<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE); List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds); Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
} }
} }

27
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@ -17,13 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.dinky; package org.apache.dolphinscheduler.plugin.task.dinky;
import com.fasterxml.jackson.core.JsonProcessingException; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -31,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
@ -45,10 +42,13 @@ import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
public class DinkyTask extends AbstractRemoteTask { public class DinkyTask extends AbstractRemoteTask {
@ -73,8 +73,8 @@ public class DinkyTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override
@ -112,20 +112,23 @@ public class DinkyTask extends AbstractRemoteTask {
if (!checkResult(jobInstanceInfoResult)) { if (!checkResult(jobInstanceInfoResult)) {
break; break;
} }
String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText(); String jobInstanceStatus =
jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) { switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED: case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status); final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id // Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId)); setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode); setExitStatusCode(exitStatusCode);
logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS)); logger.info("dinky task finished with results: {}",
result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true; finishFlag = true;
break; break;
case DinkyTaskConstants.STATUS_FAILED: case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED: case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN: case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText()); errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
.asText());
finishFlag = true; finishFlag = true;
break; break;
default: default:

38
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -17,6 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.emr; package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException; import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@ -31,16 +40,6 @@ import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/** /**
* AddJobFlowSteps task executor * AddJobFlowSteps task executor
* *
@ -53,8 +52,7 @@ public class EmrAddStepsTask extends AbstractEmrTask {
private final HashSet<String> waitingStateSet = Sets.newHashSet( private final HashSet<String> waitingStateSet = Sets.newHashSet(
StepState.PENDING.toString(), StepState.PENDING.toString(),
StepState.CANCEL_PENDING.toString(), StepState.CANCEL_PENDING.toString(),
StepState.RUNNING.toString() StepState.RUNNING.toString());
);
/** /**
* constructor * constructor
@ -66,8 +64,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override
@ -126,13 +124,16 @@ public class EmrAddStepsTask extends AbstractEmrTask {
final AddJobFlowStepsRequest addJobFlowStepsRequest; final AddJobFlowStepsRequest addJobFlowStepsRequest;
try { try {
addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class); addJobFlowStepsRequest =
objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e); throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e);
} }
// When a single task definition is associated with multiple steps, the state tracking will have high complexity. // When a single task definition is associated with multiple steps, the state tracking will have high
// Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state. // complexity.
// Therefore, A task definition only supports the association of a single step, which can better ensure the
// reliability of the task state.
if (addJobFlowStepsRequest.getSteps().size() > 1) { if (addJobFlowStepsRequest.getSteps().size() > 1) {
throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step"); throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step");
} }
@ -178,7 +179,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
@Override @Override
public void cancelApplication() throws TaskException { public void cancelApplication() throws TaskException {
logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId); logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId); CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest); CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);

6
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException; import com.amazonaws.SdkBaseException;
@ -57,8 +57,8 @@ public class EmrJobFlowTask extends AbstractEmrTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override

8
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -29,11 +29,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FlinkStreamTask extends FlinkTask implements StreamTask { public class FlinkStreamTask extends FlinkTask implements StreamTask {
@ -99,7 +95,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override @Override
public void cancelApplication() throws TaskException { public void cancelApplication() throws TaskException {
Set<String> appIds = getApplicationIds(); List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) { if (CollectionUtils.isEmpty(appIds)) {
logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId()); logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return; return;
@ -120,7 +116,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override @Override
public void savePoint() throws Exception { public void savePoint() throws Exception {
Set<String> appIds = getApplicationIds(); List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) { if (CollectionUtils.isEmpty(appIds)) {
logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId()); logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return; return;

47
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -19,30 +19,13 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -113,34 +96,6 @@ public class FlinkTask extends AbstractYarnTask {
return flinkParameters; return flinkParameters;
} }
@Override
public Set<String> getApplicationIds() throws TaskException {
Set<String> appIds = new HashSet<>();
File file = new File(taskRequest.getLogPath());
if (!file.exists()) {
return appIds;
}
/*
* analysis log? get submitted yarn application id
*/
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
String appId = findAppId(line);
if (StringUtils.isNotEmpty(appId)) {
appIds.add(appId);
}
}
} catch (FileNotFoundException e) {
throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath());
} catch (IOException e) {
throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e);
}
return appIds;
}
/** /**
* find app id * find app id
* *

6
dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.hivecli;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -39,7 +38,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class HiveCliTask extends AbstractRemoteTask { public class HiveCliTask extends AbstractRemoteTask {
@ -59,8 +57,8 @@ public class HiveCliTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override

10
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.jupyter; package org.apache.dolphinscheduler.plugin.task.jupyter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -41,7 +38,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JupyterTask extends AbstractRemoteTask { public class JupyterTask extends AbstractRemoteTask {
@ -66,8 +64,8 @@ public class JupyterTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override

6
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class K8sTask extends AbstractK8sTask { public class K8sTask extends AbstractK8sTask {
@ -59,8 +59,8 @@ public class K8sTask extends AbstractK8sTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override

37
dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.pigeon; package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -28,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.StatusLine; import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException; import org.apache.http.client.ClientProtocolException;
@ -37,10 +36,7 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -48,9 +44,11 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/** /**
* TIS DataX Task * TIS DataX Task
**/ **/
@ -70,8 +68,8 @@ public class PigeonTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override
@ -103,7 +101,8 @@ public class PigeonTask extends AbstractRemoteTask {
ExecResult execState = null; ExecResult execState = null;
int taskId; int taskId;
WebSocketClient webSocket = null; WebSocketClient webSocket = null;
try (CloseableHttpClient client = HttpClients.createDefault(); try (
CloseableHttpClient client = HttpClients.createDefault();
// trigger to start PIGEON dataX task // trigger to start PIGEON dataX task
CloseableHttpResponse response = client.execute(post)) { CloseableHttpResponse response = client.execute(post)) {
triggerResult = processResponse(triggerUrl, response, BizResult.class); triggerResult = processResponse(triggerUrl, response, BizResult.class);
@ -155,7 +154,8 @@ public class PigeonTask extends AbstractRemoteTask {
long costTime = System.currentTimeMillis() - startTime; long costTime = System.currentTimeMillis() - startTime;
logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}", logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'"); targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE); setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS
: TaskConstants.EXIT_CODE_FAILURE);
} catch (Exception e) { } catch (Exception e) {
logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e); logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@ -187,13 +187,15 @@ public class PigeonTask extends AbstractRemoteTask {
logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId()); logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
final String triggerUrl = getTriggerUrl(); final String triggerUrl = getTriggerUrl();
StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8); StringEntity entity =
new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
CancelResult cancelResult = null; CancelResult cancelResult = null;
HttpPost post = new HttpPost(triggerUrl); HttpPost post = new HttpPost(triggerUrl);
addFormUrlencoded(post); addFormUrlencoded(post);
post.setEntity(entity); post.setEntity(entity);
try (CloseableHttpClient client = HttpClients.createDefault(); try (
CloseableHttpClient client = HttpClients.createDefault();
// trigger to start TIS dataX task // trigger to start TIS dataX task
CloseableHttpResponse response = client.execute(post)) { CloseableHttpResponse response = client.execute(post)) {
cancelResult = processResponse(triggerUrl, response, CancelResult.class); cancelResult = processResponse(triggerUrl, response, CancelResult.class);
@ -229,6 +231,7 @@ public class PigeonTask extends AbstractRemoteTask {
final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId); final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId);
logger.info("apply ws connection,uri:{}", applyURI); logger.info("apply ws connection,uri:{}", applyURI);
WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) { WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) {
@Override @Override
public void onOpen(ServerHandshake handshakedata) { public void onOpen(ServerHandshake handshakedata) {
logger.info("start to receive remote execute log"); logger.info("start to receive remote execute log");
@ -254,7 +257,8 @@ public class PigeonTask extends AbstractRemoteTask {
return webSocketClient; return webSocketClient;
} }
private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, Class<T> clazz) throws Exception { private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response,
Class<T> clazz) throws Exception {
StatusLine resStatus = response.getStatusLine(); StatusLine resStatus = response.getStatusLine();
if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) { if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) {
throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase()); throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase());
@ -272,6 +276,7 @@ public class PigeonTask extends AbstractRemoteTask {
} }
private static class CancelResult extends AjaxResult<Object> { private static class CancelResult extends AjaxResult<Object> {
private Object bizresult; private Object bizresult;
@Override @Override
@ -285,6 +290,7 @@ public class PigeonTask extends AbstractRemoteTask {
} }
private static class BizResult extends AjaxResult<TriggerBuildResult> { private static class BizResult extends AjaxResult<TriggerBuildResult> {
private TriggerBuildResult bizresult; private TriggerBuildResult bizresult;
@Override @Override
@ -302,6 +308,7 @@ public class PigeonTask extends AbstractRemoteTask {
} }
private static class StatusResult extends AjaxResult<Map> { private static class StatusResult extends AjaxResult<Map> {
private Map bizresult; private Map bizresult;
@Override @Override
@ -351,6 +358,7 @@ public class PigeonTask extends AbstractRemoteTask {
} }
private static class TriggerBuildResult { private static class TriggerBuildResult {
private int taskid; private int taskid;
public int getTaskid() { public int getTaskid() {
@ -387,6 +395,7 @@ public class PigeonTask extends AbstractRemoteTask {
} }
private static class ExecLog { private static class ExecLog {
private String logType; private String logType;
private String msg; private String msg;
private int taskId; private int taskId;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
@ -73,8 +73,8 @@ public class SagemakerTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override

22
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -17,11 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel; package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.commons.io.FileUtils; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.commons.lang3.BooleanUtils; import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -33,6 +32,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -42,10 +44,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
/** /**
* seatunnel task * seatunnel task
@ -82,8 +80,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
@Override @Override
@ -180,11 +178,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
} }
private String buildConfigFilePath() { private String buildConfigFilePath() {
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId()); return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
} }
private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException { private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath()); logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
if (!Files.exists(Paths.get(scriptFile))) { if (!Files.exists(Paths.get(scriptFile))) {
logger.info("generate script file:{}", scriptFile); logger.info("generate script file:{}", scriptFile);

14
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin; package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
import kong.unirest.Unirest;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -29,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils; import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig; import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult; import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult; import org.apache.zeppelin.client.ParagraphResult;
@ -39,7 +36,10 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import kong.unirest.Unirest;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ZeppelinTask extends AbstractRemoteTask { public class ZeppelinTask extends AbstractRemoteTask {
@ -235,8 +235,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
} }
@Override @Override
public Set<String> getApplicationIds() throws TaskException { public List<String> getApplicationIds() throws TaskException {
return Collections.emptySet(); return Collections.emptyList();
} }
} }

25
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@ -25,6 +26,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -42,12 +44,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -65,15 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
/**
* task execute manager
*/
@Autowired @Autowired
private WorkerManagerThread workerManager; private WorkerManagerThread workerManager;
@Autowired @Autowired
private MessageRetryRunner messageRetryRunner; private MessageRetryRunner messageRetryRunner;
@Autowired
private LogClient logClient;
/** /**
* task kill process * task kill process
* *
@ -92,6 +89,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand); logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId(); int taskInstanceId = killCommand.getTaskInstanceId();
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
TaskExecutionContext taskExecutionContext = TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) { if (taskExecutionContext == null) {
@ -123,6 +122,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
} }
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@ -228,10 +230,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
host, logPath, executePath, tenantCode); host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList()); return Pair.of(false, Collections.emptyList());
} }
try (LogClientService logClient = new LogClientService()) { try {
logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath); logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath); List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath);
if (CollectionUtils.isEmpty(appIds)) { if (CollectionUtils.isEmpty(appIds)) {
logger.info("The appId is empty");
return Pair.of(true, Collections.emptyList()); return Pair.of(true, Collections.emptyList());
} }
@ -241,7 +244,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.error("kill yarn job error, the current thread has been interrtpted", e); logger.error("kill yarn job error, the current thread has been interrtpted", e);
} catch (Exception e) { } catch (Exception e) {
logger.error("kill yarn job error", e); logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e);
} }
return Pair.of(false, Collections.emptyList()); return Pair.of(false, Collections.emptyList());
} }

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import lombok.NonNull; import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -35,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -50,6 +52,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.NoSuchFileException; import java.nio.file.NoSuchFileException;
import java.util.Date; import java.util.Date;
import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
@ -120,7 +123,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (task != null) { if (task != null) {
try { try {
task.cancel(); task.cancel();
ProcessUtils.killYarnJob(taskExecutionContext); List<String> appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath());
if (CollectionUtils.isNotEmpty(appIds)) {
ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
}
} catch (Exception e) { } catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e); logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
} }

Loading…
Cancel
Save