diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index be666aed70..55224c8152 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -283,7 +282,6 @@ public class MasterBaseTaskExecThread implements Callable { logger.warn("process id:{} process name:{} task id: {},name:{} execution time out", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); // send warn mail - ProcessDefinition processDefine = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); alertDao.sendTaskTimeoutAlert(processInstance.getWarningGroupId(), processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName()); return true; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java index f7a9991a08..d37ebbc793 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java @@ -442,17 +442,11 @@ public class ProcessUtils { public static List killYarnJob(TaskExecutionContext taskExecutionContext) { try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); - LogClientService logClient = null; String log; - try { - logClient = new LogClientService(); + try (LogClientService logClient = new LogClientService()) { log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(), Constants.RPC_PORT, taskExecutionContext.getLogPath()); - } finally { - if (logClient != null) { - logClient.close(); - } } if (StringUtils.isNotEmpty(log)) { List appIds = LoggerUtils.getAppIds(log, logger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 8cbf0471b2..b4713a9844 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -97,7 +97,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { Pair> result = doKill(killCommand); taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); + new NettyRemoteChannel(channel, command.getOpaque())); TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); @@ -107,7 +107,6 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * do kill * - * @param killCommand * @return kill result */ private Pair> doKill(TaskKillRequestCommand killCommand) { @@ -148,7 +147,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { * build TaskKillResponseCommand * * @param killCommand kill command - * @param result exe result + * @param result exe result * @return build TaskKillResponseCommand */ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, @@ -168,16 +167,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * kill yarn job * - * @param host host - * @param logPath logPath + * @param host host + * @param logPath logPath * @param executePath executePath - * @param tenantCode tenantCode - * @return Pair> yarn kill result + * @param tenantCode tenantCode + * @return Pair> yarn kill result */ private Pair> killYarnJob(String host, String logPath, String executePath, String tenantCode) { - LogClientService logClient = null; - try { - logClient = new LogClientService(); + try (LogClientService logClient = new LogClientService();) { logger.info("view log host : {},logPath : {}", host, logPath); String log = logClient.viewLog(host, Constants.RPC_PORT, logPath); List appIds = Collections.emptyList(); @@ -194,10 +191,6 @@ public class TaskKillProcessor implements NettyRequestProcessor { return Pair.of(true, appIds); } catch (Exception e) { logger.error("kill yarn job error", e); - } finally { - if (logClient != null) { - logClient.close(); - } } return Pair.of(false, Collections.emptyList()); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index 22092e51ce..f207a229e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * log client */ -public class LogClientService { +public class LogClientService implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); @@ -67,6 +67,7 @@ public class LogClientService { /** * close */ + @Override public void close() { this.client.close(); this.isRunning = false; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 7a02d70b1d..1f48821321 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -384,17 +384,11 @@ public class ProcessService { * @param processInstanceId processInstanceId */ public void removeTaskLogFile(Integer processInstanceId) { - - LogClientService logClient = null; - - try { - logClient = new LogClientService(); - List taskInstanceList = findValidTaskListByProcessId(processInstanceId); - - if (CollectionUtils.isEmpty(taskInstanceList)) { - return; - } - + List taskInstanceList = findValidTaskListByProcessId(processInstanceId); + if (CollectionUtils.isEmpty(taskInstanceList)) { + return; + } + try (LogClientService logClient = new LogClientService()) { for (TaskInstance taskInstance : taskInstanceList) { String taskLogPath = taskInstance.getLogPath(); if (StringUtils.isEmpty(taskInstance.getHost())) { @@ -408,14 +402,9 @@ public class ProcessService { // compatible old version ip = taskInstance.getHost(); } - // remove task log from loggerserver logClient.removeTaskLog(ip, port, taskLogPath); } - } finally { - if (logClient != null) { - logClient.close(); - } } } @@ -435,7 +424,7 @@ public class ProcessService { * recursive query sub process definition id by parent id. * * @param parentId parentId - * @param ids ids + * @param ids ids */ public void recurseFindSubProcessId(int parentId, List ids) { ProcessDefinition processDefinition = processDefineMapper.selectById(parentId); @@ -466,7 +455,7 @@ public class ProcessService { * create recovery waiting thread command and delete origin command at the same time. * if the recovery command is exists, only update the field update_time * - * @param originCommand originCommand + * @param originCommand originCommand * @param processInstance processInstance */ public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) { @@ -518,7 +507,7 @@ public class ProcessService { /** * get schedule time from command * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return date */ @@ -534,8 +523,8 @@ public class ProcessService { * generate a new work process instance from command. * * @param processDefinition processDefinition - * @param command command - * @param cmdParam cmdParam map + * @param command command + * @param cmdParam cmdParam map * @return process instance */ private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition, @@ -569,7 +558,7 @@ public class ProcessService { processInstance.setConnects(processDefinition.getConnects()); // reset global params while there are start parameters - setGlobalParamIfCommanded(processDefinition,cmdParam); + setGlobalParamIfCommanded(processDefinition, cmdParam); // curing global params processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( @@ -621,7 +610,7 @@ public class ProcessService { * use definition creator's tenant. * * @param tenantId tenantId - * @param userId userId + * @param userId userId * @return tenant */ public Tenant getTenantForProcess(int tenantId, int userId) { @@ -644,7 +633,7 @@ public class ProcessService { /** * check command parameters is valid * - * @param command command + * @param command command * @param cmdParam cmdParam map * @return whether command param is valid */ @@ -664,7 +653,7 @@ public class ProcessService { * construct process instance according to one command. * * @param command command - * @param host host + * @param host host * @return process instance */ private ProcessInstance constructProcessInstance(Command command, String host) { @@ -827,7 +816,7 @@ public class ProcessService { * return complement data if the process start with complement data * * @param processInstance processInstance - * @param command command + * @param command command * @return command type */ private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) { @@ -842,8 +831,8 @@ public class ProcessService { * initialize complement data parameters * * @param processDefinition processDefinition - * @param processInstance processInstance - * @param cmdParam cmdParam + * @param processInstance processInstance + * @param cmdParam cmdParam */ private void initComplementDataParam(ProcessDefinition processDefinition, ProcessInstance processInstance, @@ -915,7 +904,7 @@ public class ProcessService { * only the keys doesn't in sub process global would be joined. * * @param parentGlobalParams parentGlobalParams - * @param subGlobalParams subGlobalParams + * @param subGlobalParams subGlobalParams * @return global params join */ private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) { @@ -985,7 +974,7 @@ public class ProcessService { * set map {parent instance id, task instance id, 0(child instance id)} * * @param parentInstance parentInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) { @@ -1014,7 +1003,7 @@ public class ProcessService { * find previous task work process map. * * @param parentProcessInstance parentProcessInstance - * @param parentTask parentTask + * @param parentTask parentTask * @return process instance map */ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance, @@ -1040,7 +1029,7 @@ public class ProcessService { * create sub work process command * * @param parentProcessInstance parentProcessInstance - * @param task task + * @param task task */ public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) { if (!task.isSubProcess()) { @@ -1067,7 +1056,7 @@ public class ProcessService { /** * complement data needs transform parent parameter to child. */ - private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance,Map fatherParams) { + private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance, Map fatherParams) { // set sub work process command String processMapStr = JSONUtils.toJsonString(instanceMap); Map cmdParam = JSONUtils.toMap(processMapStr); @@ -1111,13 +1100,13 @@ public class ProcessService { Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); Map globalMap = this.getGlobalParamMap(parentProcessInstance.getGlobalParams()); - Map fatherParams = new HashMap<>(); + Map fatherParams = new HashMap<>(); if (CollectionUtils.isNotEmpty(allParam)) { for (Property info : allParam) { fatherParams.put(info.getProp(), globalMap.get(info.getProp())); } } - String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance,fatherParams); + String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance, fatherParams); return new Command( commandType, @@ -1163,7 +1152,7 @@ public class ProcessService { * update sub process definition * * @param parentProcessInstance parentProcessInstance - * @param childDefinitionId childDefinitionId + * @param childDefinitionId childDefinitionId */ private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) { ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId()); @@ -1177,7 +1166,7 @@ public class ProcessService { /** * submit task to mysql * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstance processInstance * @return task instance */ @@ -1231,7 +1220,7 @@ public class ProcessService { * return stop if work process state is ready stop * if all of above are not satisfied, return submit success * - * @param taskInstance taskInstance + * @param taskInstance taskInstance * @param processInstanceState processInstanceState * @return process instance state */ @@ -1407,7 +1396,7 @@ public class ProcessService { * get id list by task state * * @param instanceId instanceId - * @param state state + * @param state state * @return task instance states */ public List findTaskIdByInstanceState(int instanceId, ExecutionStatus state) { @@ -1462,7 +1451,7 @@ public class ProcessService { * find work process map by parent process id and parent task id. * * @param parentWorkProcessId parentWorkProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance map */ public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) { @@ -1484,7 +1473,7 @@ public class ProcessService { * find sub process instance * * @param parentProcessId parentProcessId - * @param parentTaskId parentTaskId + * @param parentTaskId parentTaskId * @return process instance */ public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) { @@ -1516,12 +1505,12 @@ public class ProcessService { /** * change task state * - * @param state state - * @param startTime startTime - * @param host host + * @param state state + * @param startTime startTime + * @param host host * @param executePath executePath - * @param logPath logPath - * @param taskInstId taskInstId + * @param logPath logPath + * @param taskInstId taskInstId */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host, String executePath, @@ -1549,12 +1538,12 @@ public class ProcessService { * update the process instance * * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects + * @param processJson processJson + * @param globalParams globalParams + * @param scheduleTime scheduleTime + * @param flag flag + * @param locations locations + * @param connects connects * @return update process instance result */ public int updateProcessInstance(Integer processInstanceId, String processJson, @@ -1575,10 +1564,10 @@ public class ProcessService { /** * change task state * - * @param state state - * @param endTime endTime + * @param state state + * @param endTime endTime * @param taskInstId taskInstId - * @param varPool varPool + * @param varPool varPool */ public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date endTime, @@ -1746,7 +1735,7 @@ public class ProcessService { * update process instance state by id * * @param processInstanceId processInstanceId - * @param executionStatus executionStatus + * @param executionStatus executionStatus * @return update process result */ public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) { @@ -1783,7 +1772,7 @@ public class ProcessService { /** * find tenant code by resource name * - * @param resName resource name + * @param resName resource name * @param resourceType resource type * @return tenant code */ @@ -1821,9 +1810,9 @@ public class ProcessService { /** * get dependency cycle by work process define id and scheduler fire time * - * @param masterId masterId + * @param masterId masterId * @param processDefinitionId processDefinitionId - * @param scheduledFireTime the time the task schedule is expected to trigger + * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency * @throws Exception if error throws Exception */ @@ -1836,8 +1825,8 @@ public class ProcessService { /** * get dependency cycle list by work process define id list and scheduler fire time * - * @param masterId masterId - * @param ids ids + * @param masterId masterId + * @param ids ids * @param scheduledFireTime the time the task schedule is expected to trigger * @return CycleDependency list * @throws Exception if error throws Exception @@ -1932,8 +1921,8 @@ public class ProcessService { * find last running process instance * * @param definitionId process definition id - * @param startTime start time - * @param endTime end time + * @param startTime start time + * @param endTime end time * @return process instance */ public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { @@ -2033,7 +2022,7 @@ public class ProcessService { /** * list unauthorized udf function * - * @param userId user id + * @param userId user id * @param needChecks data source id array * @return unauthorized udf function list */ @@ -2124,8 +2113,6 @@ public class ProcessService { /** * solve the branch rename bug * - * @param processData - * @param oldJson * @return String */ public String changeJson(ProcessData processData, String oldJson) { @@ -2180,6 +2167,7 @@ public class ProcessService { /** * add authorized resources + * * @param ownResources own resources * @param userId userId */