diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index a5a341376e..f8ad4c6e8e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); - if (null == processInstance) { putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; @@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService { + processService.removeTaskLogFile(processInstanceId); // delete database cascade int delete = processService.deleteWorkProcessInstanceById(processInstanceId); + + processService.deleteAllSubWorkProcessByParentId(processInstanceId); processService.deleteWorkProcessMapByParentId(processInstanceId); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index c8d56597ee..d1ffc65f57 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java new file mode 100644 index 0000000000..4cf66265cf --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.log; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * remove task log request command + */ +public class RemoveTaskLogRequestCommand implements Serializable { + + /** + * log path + */ + private String path; + + public RemoveTaskLogRequestCommand() { + } + + public RemoveTaskLogRequestCommand(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command(){ + Command command = new Command(); + command.setType(CommandType.REMOVE_TAK_LOG_REQUEST); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java new file mode 100644 index 0000000000..a72f84ab41 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command.log; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + +import java.io.Serializable; + +/** + * remove task log request command + */ +public class RemoveTaskLogResponseCommand implements Serializable { + + /** + * log path + */ + private Boolean status; + + public RemoveTaskLogResponseCommand() { + } + + public RemoveTaskLogResponseCommand(Boolean status) { + this.status = status; + } + + public Boolean getStatus() { + return status; + } + + public void setStatus(Boolean status) { + this.status = status; + } + + /** + * package request command + * + * @return command + */ + public Command convert2Command(long opaque){ + Command command = new Command(opaque); + command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE); + byte[] body = FastJsonSerializer.serialize(this); + command.setBody(body); + return command; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index 44ec68f89f..a73e47397b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { */ final CommandType commandType = command.getType(); switch (commandType){ - case GET_LOG_BYTES_REQUEST: - GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( - command.getBody(), GetLogBytesRequestCommand.class); - byte[] bytes = getFileContentBytes(getLogRequest.getPath()); - GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); - channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); - break; - case VIEW_WHOLE_LOG_REQUEST: + case GET_LOG_BYTES_REQUEST: + GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( + command.getBody(), GetLogBytesRequestCommand.class); + byte[] bytes = getFileContentBytes(getLogRequest.getPath()); + GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); + channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); + break; + case VIEW_WHOLE_LOG_REQUEST: ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize( command.getBody(), ViewLogRequestCommand.class); String msg = readWholeFileContent(viewLogRequest.getPath()); @@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString()); channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque())); break; + case REMOVE_TAK_LOG_REQUEST: + RemoveTaskLogRequestCommand removeTaskLogRequest = FastJsonSerializer.deserialize( + command.getBody(), RemoveTaskLogRequestCommand.class); + + String taskLogPath = removeTaskLogRequest.getPath(); + + File taskLogFile = new File(taskLogPath); + Boolean status = true; + try { + if (taskLogFile.exists()){ + taskLogFile.delete(); + } + }catch (Exception e){ + status = false; + } + + RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status); + channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque())); + break; default: throw new IllegalArgumentException("unknown commandType"); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java index 3520fb09ec..f1999e641c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java @@ -55,6 +55,7 @@ public class LoggerServer { this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor()); this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); + this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index 8e63c89405..4567d8051d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -144,4 +144,33 @@ public class LogClientService { } return result; } + + + /** + * remove task log + * @param host host + * @param port port + * @param path path + * @return remove task status + */ + public Boolean removeTaskLog(String host, int port, String path) { + logger.info("log path {}", path); + RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path); + Boolean result = false; + final Host address = new Host(host, port); + try { + Command command = request.convert2Command(); + Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); + if(response != null){ + RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize( + response.getBody(), RemoveTaskLogResponseCommand.class); + return taskLogResponse.getStatus(); + } + } catch (Exception e) { + logger.error("remove task log error", e); + } finally { + this.client.closeChannel(address); + } + return result; + } } \ No newline at end of file diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d9faf8cc3d..f0ae76ea4d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.quartz.CronExpression; import org.slf4j.Logger; @@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.io.File; import java.util.*; import java.util.stream.Collectors; @@ -238,7 +241,7 @@ public class ProcessService { * @param defineId * @return */ - public List getTaskNodeListByDefinitionId(Integer defineId){ + public List getTaskNodeListByDefinitionId(Integer defineId){ ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); if (processDefinition == null) { logger.info("process define not exists"); @@ -293,15 +296,44 @@ public class ProcessService { List subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId); - for(Integer subId : subProcessIdList ){ + for(Integer subId : subProcessIdList){ deleteAllSubWorkProcessByParentId(subId); deleteWorkProcessMapByParentId(subId); + removeTaskLogFile(subId); deleteWorkProcessInstanceById(subId); } return 1; } + /** + * remove task log file + * @param processInstanceId processInstanceId + */ + public void removeTaskLogFile(Integer processInstanceId){ + + LogClientService logClient = new LogClientService(); + + List taskInstanceList = findValidTaskListByProcessId(processInstanceId); + + if (CollectionUtils.isEmpty(taskInstanceList)){ + return; + } + + for (TaskInstance taskInstance : taskInstanceList){ + String taskLogPath = taskInstance.getLogPath(); + if (StringUtils.isEmpty(taskInstance.getHost())){ + continue; + } + String ip = Host.of(taskInstance.getHost()).getIp(); + int port = Constants.RPC_PORT; + + // remove task log from loggerserver + logClient.removeTaskLog(ip,port,taskLogPath); + } + } + + /** * calculate sub process number in the process define. * @param processDefinitionId processDefinitionId