Browse Source

delete log files while deleting process instances #2463 (#2693)

* script variable has "processDefinitionId" is error #2664

* blank in eamil and left font align  #2648

* delete log files while deleting process instances #2463

* delete log files while deleting process instances #2463

* delete log files while deleting process instances #2463

* delete log files while deleting process instances #2463

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/3/MERGE
qiaozhanwei 5 years ago committed by GitHub
parent
commit
443a6d1935
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  2. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  3. 63
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
  4. 63
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
  5. 35
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  6. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
  7. 29
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  8. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -476,8 +476,6 @@ public class ProcessInstanceService extends BaseDAGService {
return checkResult;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
if (null == processInstance) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
@ -485,8 +483,11 @@ public class ProcessInstanceService extends BaseDAGService {
processService.removeTaskLogFile(processInstanceId);
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

63
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* remove task log request command
*/
public class RemoveTaskLogRequestCommand implements Serializable {
/**
* log path
*/
private String path;
public RemoveTaskLogRequestCommand() {
}
public RemoveTaskLogRequestCommand(String path) {
this.path = path;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(){
Command command = new Command();
command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

63
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
/**
* remove task log request command
*/
public class RemoveTaskLogResponseCommand implements Serializable {
/**
* log path
*/
private Boolean status;
public RemoveTaskLogResponseCommand() {
}
public RemoveTaskLogResponseCommand(Boolean status) {
this.status = status;
}
public Boolean getStatus() {
return status;
}
public void setStatus(Boolean status) {
this.status = status;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command(long opaque){
Command command = new Command(opaque);
command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

35
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -60,14 +60,14 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
*/
final CommandType commandType = command.getType();
switch (commandType){
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break;
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath());
@ -86,6 +86,25 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break;
case REMOVE_TAK_LOG_REQUEST:
RemoveTaskLogRequestCommand removeTaskLogRequest = FastJsonSerializer.deserialize(
command.getBody(), RemoveTaskLogRequestCommand.class);
String taskLogPath = removeTaskLogRequest.getPath();
File taskLogFile = new File(taskLogPath);
Boolean status = true;
try {
if (taskLogFile.exists()){
taskLogFile.delete();
}
}catch (Exception e){
status = false;
}
RemoveTaskLogResponseCommand removeTaskLogResponse = new RemoveTaskLogResponseCommand(status);
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break;
default:
throw new IllegalArgumentException("unknown commandType");
}

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java

@ -55,6 +55,7 @@ public class LoggerServer {
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
this.server.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
}
/**

29
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -144,4 +144,33 @@ public class LogClientService {
}
return result;
}
/**
* remove task log
* @param host host
* @param port port
* @param path path
* @return remove task status
*/
public Boolean removeTaskLog(String host, int port, String path) {
logger.info("log path {}", path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
Boolean result = false;
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if(response != null){
RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize(
response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
}
} catch (Exception e) {
logger.error("remove task log error", e);
} finally {
this.client.closeChannel(address);
}
return result;
}
}

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
@ -37,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.util.*;
import java.util.stream.Collectors;
@ -238,7 +241,7 @@ public class ProcessService {
* @param defineId
* @return
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId){
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
if (processDefinition == null) {
logger.info("process define not exists");
@ -293,15 +296,44 @@ public class ProcessService {
List<Integer> subProcessIdList = processInstanceMapMapper.querySubIdListByParentId(processInstanceId);
for(Integer subId : subProcessIdList ){
for(Integer subId : subProcessIdList){
deleteAllSubWorkProcessByParentId(subId);
deleteWorkProcessMapByParentId(subId);
removeTaskLogFile(subId);
deleteWorkProcessInstanceById(subId);
}
return 1;
}
/**
* remove task log file
* @param processInstanceId processInstanceId
*/
public void removeTaskLogFile(Integer processInstanceId){
LogClientService logClient = new LogClientService();
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)){
return;
}
for (TaskInstance taskInstance : taskInstanceList){
String taskLogPath = taskInstance.getLogPath();
if (StringUtils.isEmpty(taskInstance.getHost())){
continue;
}
String ip = Host.of(taskInstance.getHost()).getIp();
int port = Constants.RPC_PORT;
// remove task log from loggerserver
logClient.removeTaskLog(ip,port,taskLogPath);
}
}
/**
* calculate sub process number in the process define.
* @param processDefinitionId processDefinitionId

Loading…
Cancel
Save