Browse Source

Make LogServiceClient Singleton (#11777)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
d3a77c68e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java
  2. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  4. 105
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  5. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  6. 14
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java
  7. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  8. 49
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  9. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  10. 38
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  11. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  12. 6
      dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  13. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  14. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java
  15. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java
  16. 17
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  17. 18
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  18. 104
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  19. 64
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
  20. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  21. 155
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
  22. 108
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
  23. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  24. 102
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  25. 33
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
  26. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java
  27. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  28. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java
  29. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java
  30. 39
      dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
  31. 50
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  32. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
  33. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  34. 47
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  35. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
  36. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  37. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
  38. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
  39. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  40. 22
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  41. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  42. 49
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  43. 8
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

36
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/exceptions/ServiceException.java

@ -20,48 +20,36 @@ import org.apache.dolphinscheduler.api.enums.Status;
import java.text.MessageFormat;
import lombok.Data;
/**
* service exception
*/
@Data
public class ServiceException extends RuntimeException {
/**
* code
*/
private Integer code;
private int code;
public ServiceException() {
this(Status.INTERNAL_SERVER_ERROR_ARGS);
}
public ServiceException(Status status) {
super(status.getMsg());
this.code = status.getCode();
this(status.getCode(), status.getMsg());
}
public ServiceException(Status status, Object... formatter) {
super(MessageFormat.format(status.getMsg(), formatter));
this.code = status.getCode();
}
public ServiceException(Integer code,String message) {
super(message);
this.code = code;
this(status.getCode(), MessageFormat.format(status.getMsg(), formatter));
}
public ServiceException(String message) {
super(message);
this(Status.INTERNAL_SERVER_ERROR_ARGS, message);
}
public ServiceException(String message, Exception cause) {
super(message, cause);
public ServiceException(int code, String message) {
this(code, message, null);
}
public Integer getCode() {
return this.code;
}
public void setCode(Integer code) {
public ServiceException(int code, String message, Exception cause) {
super(message, cause);
this.code = code;
}
}

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -31,6 +31,7 @@ import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_TIME_MAX_LEN
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
@ -366,11 +367,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
.orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@ -1022,7 +1020,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
*/
@Override
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -31,7 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang3.StringUtils;
@ -66,7 +66,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired
private ProcessService processService;
private LogClientService logClient;
@Autowired
private LogClient logClient;
@Autowired
ProjectMapper projectMapper;
@ -77,20 +78,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
this.logClient = new LogClientService();
}
}
@PreDestroy
public void close() {
if (Objects.nonNull(this.logClient) && this.logClient.isRunning()) {
logClient.close();
}
}
/**
* view log
*

105
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -17,17 +17,10 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
@ -76,9 +69,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@ -95,12 +88,18 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
/**
* process instance service impl
@ -226,7 +225,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
@ -339,17 +339,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public Map<String, Object> queryTaskListByProcessId(User loginUser, long projectCode,
Integer processId) throws IOException {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
return result;
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
@ -481,28 +480,23 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
String globalParams,
String locations, int timeout, String tenantCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
// check process instance exists in project
ProcessDefinition processDefinition0 =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
//check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
//check process instance exists in project
ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
// check process instance status
//check process instance status
if (!processInstance.getState().isFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
@ -620,11 +614,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId);
if (subInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId);
return result;
}
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId));
if (subInstance.getIsSubProcess() == Flag.NO) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result;
@ -660,23 +651,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
// check process instance status
if (!processInstance.getState().isFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "delete");
return result;
throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete");
}
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, String.valueOf(processInstanceId));
return result;
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
try {
@ -722,7 +707,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
@ -811,7 +796,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
GanttDto ganttDto = new GanttDto();

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -78,9 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.cronutils.model.Cron;
/**
* scheduler service impl
*/
@Service
public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerService {
@ -382,9 +379,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
} catch (Exception e) {
result.put(Constants.MSG,
scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString(), e);
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
result.put(Constants.STATUS, status);
throw new ServiceException(status, e);
}
putMsg(result, Status.SUCCESS);

14
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ServiceExceptionTest.java

@ -17,25 +17,27 @@
package org.apache.dolphinscheduler.api.exceptions;
import org.apache.dolphinscheduler.api.enums.Status;
import org.junit.Assert;
import org.junit.Test;
public class ServiceExceptionTest {
@Test
public void getCodeTest(){
public void getCodeTest() {
ServiceException serviceException = new ServiceException();
Assert.assertNull(serviceException.getCode());
Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), serviceException.getCode());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
Assert.assertNotNull(serviceException.getCode());
Assert.assertEquals(Status.ALERT_GROUP_EXIST.getCode(), serviceException.getCode());
serviceException = new ServiceException(10012, "alarm group already exists");
Assert.assertNotNull(serviceException.getCode());
Assert.assertEquals(10012, serviceException.getCode());
}
@Test
public void getMessageTest(){
public void getMessageTest() {
ServiceException serviceException = new ServiceException();
Assert.assertNull(serviceException.getMessage());
Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getMsg(), serviceException.getMessage());
serviceException = new ServiceException(Status.ALERT_GROUP_EXIST);
Assert.assertNotNull(serviceException.getMessage());

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -54,6 +54,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
@ -179,7 +180,7 @@ public class ExecutorServiceTest {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(Optional.ofNullable(processInstance));
Mockito.when(processService.findProcessDefinition(1L, 1)).thenReturn(processDefinition);
Mockito.when(taskGroupQueueMapper.selectById(1)).thenReturn(taskGroupQueue);
Mockito.when(processInstanceMapper.selectById(1)).thenReturn(processInstance);

49
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
@ -28,15 +31,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@ -47,9 +49,6 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG;
/**
* logger service test
*/
@ -74,10 +73,8 @@ public class LoggerServiceTest {
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Before
public void init() {
this.loggerService.init();
}
@Mock
private LogClient logClient;
@Test
public void testQueryDataSourceList() {
@ -85,11 +82,11 @@ public class LoggerServiceTest {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Result result = loggerService.queryLog(2, 1, 1);
//TASK_INSTANCE_NOT_FOUND
// TASK_INSTANCE_NOT_FOUND
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue());
try {
//HOST NOT FOUND OR ILLEGAL
// HOST NOT FOUND OR ILLEGAL
result = loggerService.queryLog(1, 1, 1);
} catch (RuntimeException e) {
Assert.assertTrue(true);
@ -97,7 +94,7 @@ public class LoggerServiceTest {
}
Assert.assertEquals(Status.TASK_INSTANCE_HOST_IS_NULL.getCode(), result.getCode().intValue());
//SUCCESS
// SUCCESS
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
@ -111,7 +108,7 @@ public class LoggerServiceTest {
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
//task instance is null
// task instance is null
try {
loggerService.getLogBytes(2);
} catch (RuntimeException e) {
@ -119,7 +116,7 @@ public class LoggerServiceTest {
logger.error("testGetLogBytes error: {}", "task instance is null");
}
//task instance host is null
// task instance host is null
try {
loggerService.getLogBytes(1);
} catch (RuntimeException e) {
@ -127,11 +124,13 @@ public class LoggerServiceTest {
logger.error("testGetLogBytes error: {}", "task instance host is null");
}
//success
// success
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
//if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
// if use @RunWith(PowerMockRunner.class) mock object,sonarcloud will not calculate the coverage,
// so no assert will be added here
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);
loggerService.getLogBytes(1);
}
@ -152,12 +151,12 @@ public class LoggerServiceTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
// SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,VIEW_LOG)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, VIEW_LOG)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
@ -179,22 +178,20 @@ public class LoggerServiceTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
// SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,DOWNLOAD_LOG )).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, DOWNLOAD_LOG))
.thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Mockito.when(logClient.getLogBytes(Mockito.anyString(), Mockito.anyInt(), Mockito.anyString()))
.thenReturn(new byte[0]);
loggerService.getLogBytes(loginUser, projectCode, 1);
}
@After
public void close() {
this.loggerService.close();
}
/**
* get mock Project
*
@ -218,4 +215,4 @@ public class LoggerServiceTest {
result.put(Constants.MSG, status.getMsg());
}
}
}
}

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -203,7 +203,7 @@ public class ProcessDefinitionServiceTest {
.checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
} catch (ServiceException serviceException) {
Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue());
Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode());
}
Map<String, Object> result = new HashMap<>();

38
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
@ -70,6 +71,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
import org.junit.Test;
@ -308,7 +310,8 @@ public class ProcessInstanceServiceTest {
processDefinition.setProjectCode(projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(processDefinition);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceById(loginUser, projectCode, 1);
@ -353,7 +356,8 @@ public class ProcessInstanceServiceTest {
res.setData("xxx");
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
@ -451,14 +455,18 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance();
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, INSTANCE_UPDATE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
Map<String, Object> processInstanceNullRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try {
Map<String, Object> processInstanceNullRes =
processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson, "2020-02-21 00:00:00", true, "", "", 0, "");
Assert.fail();
} catch (ServiceException ex) {
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// process instance not finish
when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceNotFinishRes =
@ -523,16 +531,20 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(1)).thenReturn(null);
Map<String, Object> processInstanceNullRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceNullRes.get(Constants.STATUS));
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.empty());
try {
Map<String, Object> processInstanceNullRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
} catch (ServiceException ex) {
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_EXIST.getCode(), ex.getCode());
}
// not sub process
ProcessInstance processInstance = getProcessInstance();
processInstance.setIsSubProcess(Flag.NO);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.findProcessInstanceDetailById(1)).thenReturn(processInstance);
when(processService.findProcessInstanceDetailById(1)).thenReturn(Optional.ofNullable(processInstance));
Map<String, Object> notSubProcessRes =
processInstanceService.queryParentInstanceBySubId(loginUser, projectCode, 1);
Assert.assertEquals(Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, notSubProcessRes.get(Constants.STATUS));

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -51,6 +51,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.Assert;
import org.junit.Test;
@ -158,7 +159,7 @@ public class TaskInstanceServiceTest {
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()))
.thenReturn(processInstance);
.thenReturn(Optional.of(processInstance));
Result successRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20);

6
dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -46,10 +46,8 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@ -168,9 +166,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
if (!checkPathSecurity(logPath)) {
throw new IllegalArgumentException("Illegal path");
}
Set<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
List<String> appIds = LogUtils.getAppIdsFromLogFile(logPath);
channel.writeAndFlush(
new GetAppIdResponseCommand(new ArrayList<>(appIds)).convert2Command(command.getOpaque()));
new GetAppIdResponseCommand(appIds).convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType: " + commandType);

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -38,6 +37,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@ -73,17 +73,21 @@ public class MasterFailoverService {
private final ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private final LogClient logClient;
public MasterFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull NettyExecutorManager nettyExecutorManager,
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager) {
@NonNull ProcessInstanceExecCacheManager processInstanceExecCacheManager,
@NonNull LogClient logClient) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.nettyExecutorManager = nettyExecutorManager;
this.localAddress = masterConfig.getMasterAddress();
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.logClient = logClient;
}
@ -233,7 +237,7 @@ public class MasterFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
}
// kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -17,13 +17,16 @@
package org.apache.dolphinscheduler.server.master.service;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -37,13 +40,14 @@ import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -52,14 +56,6 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import lombok.NonNull;
@Service
public class WorkerFailoverService {
@ -70,19 +66,22 @@ public class WorkerFailoverService {
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
private final ProcessInstanceExecCacheManager cacheManager;
private final LogClient logClient;
private final String localAddress;
public WorkerFailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager) {
@NonNull ProcessInstanceExecCacheManager cacheManager,
@NonNull LogClient logClient) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
this.logClient = logClient;
this.localAddress = masterConfig.getMasterAddress();
}
/**
@ -172,7 +171,7 @@ public class WorkerFailoverService {
if (masterConfig.isKillYarnJobWhenTaskFailover()) {
// only kill yarn job if exists , the local thread has exited
LOGGER.info("TaskInstance failover begin kill the task related yarn job");
ProcessUtils.killYarnJob(taskExecutionContext);
ProcessUtils.killYarnJob(logClient, taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DataQualityResultOperator.java

@ -63,8 +63,7 @@ public class DataQualityResultOperator {
if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(
Integer.parseInt(String.valueOf(taskInstance.getProcessInstanceId())));
processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()).orElse(null);
// when the task is failure or cancel, will delete the execute result and statistics value
if (taskResponseEvent.getState().isFailure()

17
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -25,8 +25,6 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.task.MasterHeartBeatTask;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -34,7 +32,7 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledExecutorService;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
@ -58,26 +56,17 @@ public class MasterRegistryClientTest {
@InjectMocks
private MasterRegistryClient masterRegistryClient;
@Mock
private MasterConfig masterConfig;
@Mock
private RegistryClient registryClient;
@Mock
private ScheduledExecutorService heartBeatExecutor;
@Mock
private ProcessService processService;
@Mock
private MasterConnectStrategy masterConnectStrategy;
@Mock
private MasterHeartBeatTask masterHeartBeatTask;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
private MasterConfig masterConfig;
@Before
public void before() throws Exception {
@ -105,7 +94,7 @@ public class MasterRegistryClientTest {
taskInstance.setHost("127.0.0.1:8080");
given(processService.queryNeedFailoverTaskInstances(Mockito.anyString()))
.willReturn(Arrays.asList(taskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(Optional.of(processInstance));
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any())).willReturn(true);
Server server = new Server();
server.setHost("127.0.0.1");

18
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
@ -38,12 +37,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
@ -89,6 +90,9 @@ public class FailoverServiceTest {
@Mock
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Mock
private LogClient logClient;
private static int masterPort = 5678;
private static int workerPort = 1234;
@ -106,17 +110,20 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
given(masterConfig.getMasterAddress()).willReturn(testMasterHost);
MasterFailoverService masterFailoverService =
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager, processInstanceExecCacheManager);
new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager,
processInstanceExecCacheManager, logClient);
WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient,
masterConfig,
processService,
workflowExecuteThreadPool,
cacheManager);
cacheManager,
logClient);
failoverService = new FailoverService(masterFailoverService, workerFailoverService);
testMasterHost = NetUtils.getAddr(masterConfig.getListenPort());
String ip = testMasterHost.split(":")[0];
int port = Integer.valueOf(testMasterHost.split(":")[1]);
Assert.assertEquals(masterPort, port);
@ -158,7 +165,8 @@ public class FailoverServiceTest {
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(processService.findValidTaskListByProcessId(Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt())).willReturn(processInstance);
given(processService.findProcessInstanceDetailById(Mockito.anyInt()))
.willReturn(Optional.ofNullable(processInstance));
Thread.sleep(1000);
Server masterServer = new Server();

104
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -17,6 +17,17 @@
package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
@ -35,6 +46,8 @@ import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
@ -43,112 +56,48 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
/**
* remoting netty client
*/
public class NettyRemotingClient {
public class NettyRemotingClient implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/**
* client bootstrap
*/
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* channels
*/
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
*/
private final EventLoopGroup workerGroup;
/**
* client config
*/
private final NettyClientConfig clientConfig;
/**
* saync semaphore
*/
private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
* callback thread executor
*/
private final ExecutorService callbackExecutor;
/**
* client handler
*/
private final NettyClientHandler clientHandler;
/**
* response future executor
*/
private final ScheduledExecutorService responseFutureExecutor;
/**
* client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (Epoll.isAvailable()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
this.callbackExecutor = new ThreadPoolExecutor(
Constants.CPUS,
Constants.CPUS,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("CallbackExecutor"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
@ -157,9 +106,6 @@ public class NettyRemotingClient {
this.start();
}
/**
* start
*/
private void start() {
this.bootstrap
@ -371,9 +317,7 @@ public class NettyRemotingClient {
return null;
}
/**
* close
*/
@Override
public void close() {
if (isStarted.compareAndSet(true, false)) {
try {

64
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java

@ -17,88 +17,52 @@
package org.apache.dolphinscheduler.remote.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.remote.utils.Constants;
/**
* netty client config
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NettyClientConfig {
/**
* worker threadsdefault get machine cpus
*/
@Builder.Default
private int workerThreads = Constants.CPUS;
/**
* whether tpc delay
*/
@Builder.Default
private boolean tcpNoDelay = true;
/**
* whether keep alive
*/
@Builder.Default
private boolean soKeepalive = true;
/**
* send buffer size
*/
@Builder.Default
private int sendBufferSize = 65535;
/**
* receive buffer size
*/
@Builder.Default
private int receiveBufferSize = 65535;
/**
* connect timeout millis
*/
@Builder.Default
private int connectTimeoutMillis = 3000;
public int getWorkerThreads() {
return workerThreads;
}
public void setWorkerThreads(int workerThreads) {
this.workerThreads = workerThreads;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public boolean isSoKeepalive() {
return soKeepalive;
}
public void setSoKeepalive(boolean soKeepalive) {
this.soKeepalive = soKeepalive;
}
public int getSendBufferSize() {
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize) {
this.sendBufferSize = sendBufferSize;
}
public int getReceiveBufferSize() {
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}
public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}
public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -30,10 +30,11 @@ import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -186,17 +187,15 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
public static @Nullable List<String> killYarnJob(@NonNull LogClient logClient,
@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
List<String> appIds;
try (LogClientService logClient = new LogClientService()) {
Host host = Host.of(taskExecutionContext.getHost());
appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
}
Host host = Host.of(taskExecutionContext.getHost());
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath());
if (CollectionUtils.isNotEmpty(appIds)) {
if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) {
taskExecutionContext

155
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java

@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
@ -46,43 +45,23 @@ import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* log client
*/
public class LogClientService implements AutoCloseable {
@Service
public class LogClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
private final NettyClientConfig clientConfig;
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private final NettyRemotingClient client;
private volatile boolean isRunning;
/**
* request time out
*/
private static final long LOG_REQUEST_TIMEOUT = 10 * 1000L;
/**
* construct client
*/
public LogClientService() {
this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
}
/**
* close
*/
@Override
public void close() {
this.client.close();
this.isRunning = false;
logger.info("logger client closed");
public LogClient() {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(nettyClientConfig);
logger.info("Initialized LogClientService with config: {}", nettyClientConfig);
}
/**
@ -96,25 +75,30 @@ public class LogClientService implements AutoCloseable {
* @return log content
*/
public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {
logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
logger.info("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path,
skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = "";
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
Command response = client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
response.getBody(), RollViewLogResponseCommand.class);
RollViewLogResponseCommand rollReviewLog =
JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
return "Roll view log response is null";
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error, the current thread has been interrupted",
host, port, path, skipLineNum, limit, ex);
return "Roll view log error: " + ex.getMessage();
} catch (Exception e) {
logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
logger.error("Roll view log from host : {}, port : {}, path {}, skipLineNum {} ,limit {} error", host, port,
path, skipLineNum, limit, e);
return "Roll view log error: " + e.getMessage();
}
return result;
}
/**
@ -126,28 +110,31 @@ public class LogClientService implements AutoCloseable {
* @return log content
*/
public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path);
logger.info("View log from host: {}, port: {}, logPath: {}", host, port, path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = "";
final Host address = new Host(host, port);
try {
if (NetUtils.getHost().equals(host)) {
result = LoggerUtils.readWholeFileContent(request.getPath());
return LoggerUtils.readWholeFileContent(request.getPath());
} else {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
ViewLogResponseCommand viewLog = JSONUtils.parseObject(
response.getBody(), ViewLogResponseCommand.class);
result = viewLog.getMsg();
ViewLogResponseCommand viewLog =
JSONUtils.parseObject(response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg();
}
return "View log response is null";
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error("View log from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return "View log error: " + ex.getMessage();
} catch (Exception e) {
logger.error("view log error", e);
} finally {
this.client.closeChannel(address);
logger.error("View log from host: {}, port: {}, logPath: {} error", host, port, path, e);
return "View log error: " + e.getMessage();
}
return result;
}
/**
@ -159,23 +146,28 @@ public class LogClientService implements AutoCloseable {
* @return log content bytes
*/
public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path);
logger.info("Get log bytes from host: {}, port: {}, logPath {}", host, port, path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
GetLogBytesResponseCommand getLog = JSONUtils.parseObject(
response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData() == null ? new byte[0] : getLog.getData();
GetLogBytesResponseCommand getLog =
JSONUtils.parseObject(response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData() == null ? EMPTY_BYTE_ARRAY : getLog.getData();
}
return EMPTY_BYTE_ARRAY;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Get logSize from host: {}, port: {}, logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return EMPTY_BYTE_ARRAY;
} catch (Exception e) {
logger.error("get log size error", e);
} finally {
this.client.closeChannel(address);
logger.error("Get logSize from host: {}, port: {}, logPath: {} error", host, port, path, e);
return EMPTY_BYTE_ARRAY;
}
return new byte[0];
}
/**
@ -187,24 +179,28 @@ public class LogClientService implements AutoCloseable {
* @return remove task status
*/
public Boolean removeTaskLog(String host, int port, String path) {
logger.info("log path {}", path);
logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
Boolean result = false;
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject(
response.getBody(), RemoveTaskLogResponseCommand.class);
RemoveTaskLogResponseCommand taskLogResponse =
JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
}
return false;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
"Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
host, port, path, ex);
return false;
} catch (Exception e) {
logger.error("remove task log error", e);
} finally {
this.client.closeChannel(address);
logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
return false;
}
return result;
}
public @Nullable List<String> getAppIds(@NonNull String host, int port,
@ -212,26 +208,25 @@ public class LogClientService implements AutoCloseable {
logger.info("Begin to get appIds from worker: {}:{} taskLogPath: {}", host, port, taskLogFilePath);
final Host workerAddress = new Host(host, port);
List<String> appIds = null;
try {
if (NetUtils.getHost().equals(host)) {
appIds = new ArrayList<>(LogUtils.getAppIdsFromLogFile(taskLogFilePath));
} else {
final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
GetAppIdResponseCommand responseCommand =
JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
appIds = responseCommand.getAppIds();
}
if (NetUtils.getHost().equals(host)) {
appIds = LogUtils.getAppIdsFromLogFile(taskLogFilePath);
} else {
final Command command = new GetAppIdRequestCommand(taskLogFilePath).convert2Command();
Command response = this.client.sendSync(workerAddress, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
GetAppIdResponseCommand responseCommand =
JSONUtils.parseObject(response.getBody(), GetAppIdResponseCommand.class);
appIds = responseCommand.getAppIds();
}
} finally {
client.closeChannel(workerAddress);
}
logger.info("Get appIds: {} from worker: {}:{} taskLogPath: {}", appIds, host, port, taskLogFilePath);
return appIds;
}
public boolean isRunning() {
return isRunning;
@Override
public void close() {
this.client.close();
logger.info("LogClientService closed");
}
}

108
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.log;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* log asyc callback
*/
public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
/**
* request unique identification
*/
private long opaque;
/**
* start timemillis
*/
private final long start;
/**
* timeout
*/
private final long timeout;
/**
* latch
*/
private final CountDownLatch latch;
/**
* result
*/
private Object result;
public LogPromise(long opaque, long timeout) {
this.opaque = opaque;
this.timeout = timeout;
this.start = System.currentTimeMillis();
this.latch = new CountDownLatch(1);
PROMISES.put(opaque, this);
}
/**
* notify client finish
* @param opaque unique identification
* @param result result
*/
public static void notify(long opaque, Object result) {
LogPromise promise = PROMISES.remove(opaque);
if (promise != null) {
promise.doCountDown(result);
}
}
/**
* countdown
*
* @param result result
*/
private void doCountDown(Object result) {
this.result = result;
this.latch.countDown();
}
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout() {
return System.currentTimeMillis() - start > timeout;
}
/**
* get result
* @return
*/
public Object getResult() {
try {
latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
PROMISES.remove(opaque);
return this.result;
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -58,6 +58,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.springframework.transaction.annotation.Transactional;
@ -77,7 +78,7 @@ public interface ProcessService {
boolean verifyIsNeedCreateCommand(Command command);
ProcessInstance findProcessInstanceDetailById(int processId);
Optional<ProcessInstance> findProcessInstanceDetailById(int processId);
List<TaskDefinition> getTaskNodeListByDefinition(long defineCode);

102
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,21 +17,14 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -133,13 +126,17 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.apache.commons.collections.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@ -150,22 +147,24 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/**
* process relative dao that some mappers in this.
@ -278,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private CuringParamsService curingGlobalParamsService;
@Autowired
private LogClient logClient;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@ -474,8 +476,8 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance
*/
@Override
public ProcessInstance findProcessInstanceDetailById(int processId) {
return processInstanceMapper.queryDetailById(processId);
public Optional<ProcessInstance> findProcessInstanceDetailById(int processId) {
return Optional.ofNullable(processInstanceMapper.queryDetailById(processId));
}
/**
@ -597,16 +599,14 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
try (LogClientService logClient = new LogClientService()) {
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (Strings.isNullOrEmpty(taskInstance.getHost())) {
continue;
}
Host host = Host.of(taskInstance.getHost());
// remove task log from loggerserver
logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (Strings.isNullOrEmpty(taskInstance.getHost())) {
continue;
}
Host host = Host.of(taskInstance.getHost());
// remove task log from loggerserver
logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
}
}
@ -749,7 +749,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
Command command,
Map<String, String> cmdParam) throws CodeGenerateException {
Map<String, String> cmdParam) {
ProcessInstance processInstance = new ProcessInstance(processDefinition);
processInstance.setProcessDefinitionCode(processDefinition.getCode());
processInstance.setProcessDefinitionVersion(processDefinition.getVersion());
@ -912,8 +912,8 @@ public class ProcessServiceImpl implements ProcessService {
* @param host host
* @return process instance
*/
protected ProcessInstance constructProcessInstance(Command command,
String host) throws CronParseException, CodeGenerateException {
protected @Nullable ProcessInstance constructProcessInstance(Command command,
String host) throws CronParseException, CodeGenerateException {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
@ -929,7 +929,7 @@ public class ProcessServiceImpl implements ProcessService {
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
} else {
processInstance = this.findProcessInstanceDetailById(processInstanceId);
processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
@ -1068,7 +1068,8 @@ public class ProcessServiceImpl implements ProcessService {
*
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode, Map<String, String> cmdParam) {
private @Nullable ProcessDefinition getProcessDefinitionByCommand(long processDefinitionCode,
Map<String, String> cmdParam) {
if (cmdParam != null) {
int processInstanceId = 0;
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING)) {
@ -1080,7 +1081,7 @@ public class ProcessServiceImpl implements ProcessService {
}
if (processInstanceId != 0) {
ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId);
ProcessInstance processInstance = this.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
return null;
}
@ -1174,7 +1175,8 @@ public class ProcessServiceImpl implements ProcessService {
// copy parent instance user def params to sub process..
String parentInstanceId = paramMap.get(CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID);
if (!Strings.isNullOrEmpty(parentInstanceId)) {
ProcessInstance parentInstance = findProcessInstanceDetailById(Integer.parseInt(parentInstanceId));
ProcessInstance parentInstance =
findProcessInstanceDetailById(Integer.parseInt(parentInstanceId)).orElse(null);
if (parentInstance != null) {
subProcessInstance.setGlobalParams(
joinGlobalParams(parentInstance.getGlobalParams(), subProcessInstance.getGlobalParams()));
@ -3146,7 +3148,7 @@ public class ProcessServiceImpl implements ProcessService {
if (task == null) {
return;
}
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId());
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());

33
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java → dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java

@ -40,8 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({LogClientService.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
public class LogClientServiceTest {
@PrepareForTest({LogClient.class, NetUtils.class, LoggerUtils.class, NettyRemotingClient.class})
public class LogClientTest {
@Test
public void testViewLogFromLocal() {
@ -54,8 +54,8 @@ public class LogClientServiceTest {
PowerMockito.mockStatic(LoggerUtils.class);
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
LogClientService logClientService = new LogClientService();
String log = logClientService.viewLog(localMachine, port, path);
LogClient logClient = new LogClient();
String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@ -75,8 +75,8 @@ public class LogClientServiceTest {
command.setBody(JSONUtils.toJsonString(new ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
String log = logClientService.viewLog(localMachine, port, path);
LogClient logClient = new LogClient();
String log = logClient.viewLog(localMachine, port, path);
Assert.assertNotNull(log);
}
@ -86,8 +86,8 @@ public class LogClientServiceTest {
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
PowerMockito.doNothing().when(remotingClient).close();
LogClientService logClientService = new LogClientService();
logClientService.close();
LogClient logClient = new LogClient();
logClient.close();
}
@Test
@ -100,8 +100,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
String msg = logClientService.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
LogClient logClient = new LogClient();
String msg = logClient.rollViewLog("localhost", 1234, "/tmp/log", 0, 10);
Assert.assertNotNull(msg);
}
@ -115,8 +115,8 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
byte[] logBytes = logClientService.getLogBytes("localhost", 1234, "/tmp/log");
LogClient logClient = new LogClient();
byte[] logBytes = logClient.getLogBytes("localhost", 1234, "/tmp/log");
Assert.assertNotNull(logBytes);
}
@ -130,14 +130,9 @@ public class LogClientServiceTest {
PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClientService logClientService = new LogClientService();
Boolean status = logClientService.removeTaskLog("localhost", 1234, "/log/path");
LogClient logClient = new LogClient();
Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
Assert.assertTrue(status);
}
@Test
public void testIsRunning() {
LogClientService logClientService = new LogClientService();
Assert.assertTrue(logClientService.isRunning());
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractRemoteTask.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Set;
import java.util.List;
public abstract class AbstractRemoteTask extends AbstractTask {
@ -38,7 +38,7 @@ public abstract class AbstractRemoteTask extends AbstractTask {
this.cancelApplication();
}
public abstract Set<String> getApplicationIds() throws TaskException;
public abstract List<String> getApplicationIds() throws TaskException;
public abstract void cancelApplication() throws TaskException;

20
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -21,8 +21,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.List;
import java.util.regex.Pattern;
/**
@ -106,24 +105,11 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
* @return
* @throws TaskException
*/
public Set<String> getApplicationIds() throws TaskException {
@Override
public List<String> getApplicationIds() throws TaskException {
return LogUtils.getAppIdsFromLogFile(taskRequest.getLogPath(), logger);
}
/**
* find app id
*
* @param line line
* @return appid
*/
protected String findAppId(String line) {
Matcher matcher = YARN_APPLICATION_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group();
}
return null;
}
/**
* create command
*

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java

@ -23,8 +23,10 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -42,14 +44,14 @@ public class LogUtils {
private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
public Set<String> getAppIdsFromLogFile(@NonNull String logPath) {
public List<String> getAppIdsFromLogFile(@NonNull String logPath) {
return getAppIdsFromLogFile(logPath, log);
}
public Set<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
public List<String> getAppIdsFromLogFile(@NonNull String logPath, Logger logger) {
File logFile = new File(logPath);
if (!logFile.exists() || !logFile.isFile()) {
return Collections.emptySet();
return Collections.emptyList();
}
Set<String> appIds = new HashSet<>();
try (Stream<String> stream = Files.lines(Paths.get(logPath))) {
@ -65,10 +67,10 @@ public class LogUtils {
}
}
});
return appIds;
return new ArrayList<>(appIds);
} catch (IOException e) {
logger.error("Get appId from log file erro, logPath: {}", logPath, e);
return Collections.emptySet();
return Collections.emptyList();
}
}
}

8
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtilsTest.java

@ -17,12 +17,12 @@
package org.apache.dolphinscheduler.plugin.task.api.utils;
import java.util.Set;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.google.common.collect.Lists;
public class LogUtilsTest {
@ -31,7 +31,7 @@ public class LogUtilsTest {
@Test
public void getAppIdsFromLogFile() {
Set<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
Assert.assertEquals(Sets.newHashSet("application_1548381669007_1234"), appIds);
List<String> appIds = LogUtils.getAppIdsFromLogFile(APP_ID_FILE);
Assert.assertEquals(Lists.newArrayList("application_1548381669007_1234"), appIds);
}
}

39
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@ -17,13 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.dinky;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -31,6 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
@ -45,10 +42,13 @@ import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
public class DinkyTask extends AbstractRemoteTask {
@ -73,8 +73,8 @@ public class DinkyTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
@ -112,20 +112,23 @@ public class DinkyTask extends AbstractRemoteTask {
if (!checkResult(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus = jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
String jobInstanceStatus =
jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
logger.info("dinky task finished with results: {}", result.get(DinkyTaskConstants.API_RESULT_DATAS));
logger.info("dinky task finished with results: {}",
result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error").asText());
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
.asText());
finishFlag = true;
break;
default:
@ -191,14 +194,14 @@ public class DinkyTask extends AbstractRemoteTask {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
logger.info("trying terminate dinky task, taskId: {}, address: {}, taskId: {}",
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
cancelTask(address, taskId);
logger.warn("dinky task terminated, taskId: {}, address: {}, taskId: {}",
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
this.taskExecutionContext.getTaskInstanceId(),
address,
taskId);
}
private JsonNode submitTask(String address, String taskId) {

50
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -17,6 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.emr;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@ -31,16 +40,6 @@ import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* AddJobFlowSteps task executor
*
@ -51,10 +50,9 @@ public class EmrAddStepsTask extends AbstractEmrTask {
private String stepId;
private final HashSet<String> waitingStateSet = Sets.newHashSet(
StepState.PENDING.toString(),
StepState.CANCEL_PENDING.toString(),
StepState.RUNNING.toString()
);
StepState.PENDING.toString(),
StepState.CANCEL_PENDING.toString(),
StepState.RUNNING.toString());
/**
* constructor
@ -66,8 +64,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
@ -126,13 +124,16 @@ public class EmrAddStepsTask extends AbstractEmrTask {
final AddJobFlowStepsRequest addJobFlowStepsRequest;
try {
addJobFlowStepsRequest = objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
addJobFlowStepsRequest =
objectMapper.readValue(emrParameters.getStepsDefineJson(), AddJobFlowStepsRequest.class);
} catch (JsonProcessingException e) {
throw new EmrTaskException("can not parse AddJobFlowStepsRequest from json", e);
}
// When a single task definition is associated with multiple steps, the state tracking will have high complexity.
// Therefore, A task definition only supports the association of a single step, which can better ensure the reliability of the task state.
// When a single task definition is associated with multiple steps, the state tracking will have high
// complexity.
// Therefore, A task definition only supports the association of a single step, which can better ensure the
// reliability of the task state.
if (addJobFlowStepsRequest.getSteps().size() > 1) {
throw new EmrTaskException("ds emr addJobFlowStepsTask only support one step");
}
@ -178,7 +179,8 @@ public class EmrAddStepsTask extends AbstractEmrTask {
@Override
public void cancelApplication() throws TaskException {
logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}", this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
logger.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
@ -187,10 +189,10 @@ public class EmrAddStepsTask extends AbstractEmrTask {
}
CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
.stream()
.filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
.findFirst()
.orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
.stream()
.filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
.findFirst()
.orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());

6
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SdkBaseException;
@ -57,8 +57,8 @@ public class EmrJobFlowTask extends AbstractEmrTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override

8
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -29,11 +29,7 @@ import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FlinkStreamTask extends FlinkTask implements StreamTask {
@ -99,7 +95,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override
public void cancelApplication() throws TaskException {
Set<String> appIds = getApplicationIds();
List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) {
logger.error("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return;
@ -120,7 +116,7 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override
public void savePoint() throws Exception {
Set<String> appIds = getApplicationIds();
List<String> appIds = getApplicationIds();
if (CollectionUtils.isEmpty(appIds)) {
logger.warn("can not get appId, taskInstanceId:{}", taskExecutionContext.getTaskInstanceId());
return;

47
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -19,30 +19,13 @@ package org.apache.dolphinscheduler.plugin.task.flink;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -113,34 +96,6 @@ public class FlinkTask extends AbstractYarnTask {
return flinkParameters;
}
@Override
public Set<String> getApplicationIds() throws TaskException {
Set<String> appIds = new HashSet<>();
File file = new File(taskRequest.getLogPath());
if (!file.exists()) {
return appIds;
}
/*
* analysis log? get submitted yarn application id
*/
try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(taskRequest.getLogPath()), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
String appId = findAppId(line);
if (StringUtils.isNotEmpty(appId)) {
appIds.add(appId);
}
}
} catch (FileNotFoundException e) {
throw new TaskException("get application id error, file not found, path:" + taskRequest.getLogPath());
} catch (IOException e) {
throw new TaskException("get application id error, path:" + taskRequest.getLogPath(), e);
}
return appIds;
}
/**
* find app id
*

6
dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.hivecli;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -39,7 +38,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class HiveCliTask extends AbstractRemoteTask {
@ -59,8 +57,8 @@ public class HiveCliTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override

10
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -41,7 +38,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JupyterTask extends AbstractRemoteTask {
@ -66,8 +64,8 @@ public class JupyterTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override

8
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class K8sTask extends AbstractK8sTask {
@ -59,8 +59,8 @@ public class K8sTask extends AbstractK8sTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
@ -72,7 +72,7 @@ public class K8sTask extends AbstractK8sTask {
protected String buildCommand() {
K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
Map<String, String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
k8sTaskMainParameters.setImage(k8sTaskParameters.getImage());

45
dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java

@ -17,10 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -28,6 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
@ -37,10 +36,7 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@ -48,9 +44,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
@ -70,8 +68,8 @@ public class PigeonTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
@ -103,9 +101,10 @@ public class PigeonTask extends AbstractRemoteTask {
ExecResult execState = null;
int taskId;
WebSocketClient webSocket = null;
try (CloseableHttpClient client = HttpClients.createDefault();
// trigger to start PIGEON dataX task
CloseableHttpResponse response = client.execute(post)) {
try (
CloseableHttpClient client = HttpClients.createDefault();
// trigger to start PIGEON dataX task
CloseableHttpResponse response = client.execute(post)) {
triggerResult = processResponse(triggerUrl, response, BizResult.class);
if (!triggerResult.isSuccess()) {
List<String> errormsg = triggerResult.getErrormsg();
@ -155,7 +154,8 @@ public class PigeonTask extends AbstractRemoteTask {
long costTime = System.currentTimeMillis() - startTime;
logger.info("PIGEON task: {},taskId:{} costTime : {} milliseconds, statusCode : {}",
targetJobName, taskId, costTime, (execState == ExecResult.SUCCESS) ? "'success'" : "'failure'");
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS : TaskConstants.EXIT_CODE_FAILURE);
setExitStatusCode((execState == ExecResult.SUCCESS) ? TaskConstants.EXIT_CODE_SUCCESS
: TaskConstants.EXIT_CODE_FAILURE);
} catch (Exception e) {
logger.error("execute PIGEON dataX faild,PIGEON task name:" + targetJobName, e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@ -187,15 +187,17 @@ public class PigeonTask extends AbstractRemoteTask {
logger.info("start to cancelApplication taskId:{}", triggerResult.getTaskId());
final String triggerUrl = getTriggerUrl();
StringEntity entity = new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
StringEntity entity =
new StringEntity(config.getJobCancelPostBody(triggerResult.getTaskId()), StandardCharsets.UTF_8);
CancelResult cancelResult = null;
HttpPost post = new HttpPost(triggerUrl);
addFormUrlencoded(post);
post.setEntity(entity);
try (CloseableHttpClient client = HttpClients.createDefault();
// trigger to start TIS dataX task
CloseableHttpResponse response = client.execute(post)) {
try (
CloseableHttpClient client = HttpClients.createDefault();
// trigger to start TIS dataX task
CloseableHttpResponse response = client.execute(post)) {
cancelResult = processResponse(triggerUrl, response, CancelResult.class);
if (!cancelResult.isSuccess()) {
List<String> errormsg = triggerResult.getErrormsg();
@ -229,6 +231,7 @@ public class PigeonTask extends AbstractRemoteTask {
final String applyURI = config.getJobLogsFetchUrl(tisHost, dataXName, taskId);
logger.info("apply ws connection,uri:{}", applyURI);
WebSocketClient webSocketClient = new WebSocketClient(new URI(applyURI)) {
@Override
public void onOpen(ServerHandshake handshakedata) {
logger.info("start to receive remote execute log");
@ -254,7 +257,8 @@ public class PigeonTask extends AbstractRemoteTask {
return webSocketClient;
}
private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response, Class<T> clazz) throws Exception {
private <T extends AjaxResult> T processResponse(String applyUrl, CloseableHttpResponse response,
Class<T> clazz) throws Exception {
StatusLine resStatus = response.getStatusLine();
if (HttpURLConnection.HTTP_OK != resStatus.getStatusCode()) {
throw new IllegalStateException("request server " + applyUrl + " faild:" + resStatus.getReasonPhrase());
@ -272,6 +276,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class CancelResult extends AjaxResult<Object> {
private Object bizresult;
@Override
@ -285,6 +290,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class BizResult extends AjaxResult<TriggerBuildResult> {
private TriggerBuildResult bizresult;
@Override
@ -302,6 +308,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class StatusResult extends AjaxResult<Map> {
private Map bizresult;
@Override
@ -351,6 +358,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class TriggerBuildResult {
private int taskid;
public int getTaskid() {
@ -387,6 +395,7 @@ public class PigeonTask extends AbstractRemoteTask {
}
private static class ExecLog {
private String logType;
private String msg;
private int taskId;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -34,8 +34,8 @@ import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
@ -73,8 +73,8 @@ public class SagemakerTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override

22
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -17,11 +17,10 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -33,6 +32,9 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -42,10 +44,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
/**
* seatunnel task
@ -82,8 +80,8 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
@ -180,11 +178,13 @@ public class SeatunnelTask extends AbstractRemoteTask {
}
private String buildConfigFilePath() {
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
return String.format("%s/seatunnel_%s.conf", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
}
private void createConfigFileIfNotExists(String script, String scriptFile) throws IOException {
logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
logger.info("tenantCode :{}, task dir:{}", taskExecutionContext.getTenantCode(),
taskExecutionContext.getExecutePath());
if (!Files.exists(Paths.get(scriptFile))) {
logger.info("generate script file:{}", scriptFile);

14
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.zeppelin;
import com.fasterxml.jackson.databind.ObjectMapper;
import kong.unirest.Unirest;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -29,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.NoteResult;
import org.apache.zeppelin.client.ParagraphResult;
@ -39,7 +36,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kong.unirest.Unirest;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ZeppelinTask extends AbstractRemoteTask {
@ -235,8 +235,8 @@ public class ZeppelinTask extends AbstractRemoteTask {
}
@Override
public Set<String> getApplicationIds() throws TaskException {
return Collections.emptySet();
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
}

49
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -25,6 +26,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@ -42,12 +44,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.service.log.LogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -65,15 +62,15 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
/**
* task execute manager
*/
@Autowired
private WorkerManagerThread workerManager;
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private LogClient logClient;
/**
* task kill process
*
@ -92,12 +89,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
}
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
}
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
@ -110,19 +109,22 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return;
}
// if processId > 0, it should call cancelApplication to cancel remote application too.
this.cancelApplication(taskInstanceId);
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
// if processId > 0, it should call cancelApplication to cancel remote application too.
this.cancelApplication(taskInstanceId);
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
}
}
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@ -228,10 +230,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try (LogClientService logClient = new LogClientService()) {
try {
logger.info("Get appIds from worker {}:{} taskLogPath: {}", host.getIp(), host.getPort(), logPath);
List<String> appIds = logClient.getAppIds(host.getIp(), host.getPort(), logPath);
if (CollectionUtils.isEmpty(appIds)) {
logger.info("The appId is empty");
return Pair.of(true, Collections.emptyList());
}
@ -241,7 +244,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Thread.currentThread().interrupt();
logger.error("kill yarn job error, the current thread has been interrtpted", e);
} catch (Exception e) {
logger.error("kill yarn job error", e);
logger.error("Kill yarn job error, host: {}, logPath: {}, executePath: {}, tenantCode: {}", host, logPath, executePath, tenantCode, e);
}
return Pair.of(false, Collections.emptyList());
}

8
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
@ -35,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -50,6 +52,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
@ -120,7 +123,10 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
if (task != null) {
try {
task.cancel();
ProcessUtils.killYarnJob(taskExecutionContext);
List<String> appIds = LogUtils.getAppIdsFromLogFile(taskExecutionContext.getLogPath());
if (CollectionUtils.isNotEmpty(appIds)) {
ProcessUtils.cancelApplication(appIds, logger, taskExecutionContext.getTenantCode(), taskExecutionContext.getExecutePath());
}
} catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
}

Loading…
Cancel
Save