Browse Source

Refactor record audit log logic (#15881)

3.2.2-prepare
旺阳 8 months ago committed by GitHub
parent
commit
d306f1d04b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 74
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/OperatorLogAspect.java
  2. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/OperatorUtils.java
  3. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/AuditOperator.java
  4. 41
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/BaseAuditOperator.java
  5. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  6. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  7. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuditModelType.java

74
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/OperatorLogAspect.java

@ -17,18 +17,26 @@
package org.apache.dolphinscheduler.api.audit;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.audit.operator.AuditOperator;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
@ -40,34 +48,74 @@ import io.swagger.v3.oas.annotations.Operation;
@Component
public class OperatorLogAspect {
private static final ThreadLocal<AuditContext> auditThreadLocal = new ThreadLocal<>();
@Pointcut("@annotation(org.apache.dolphinscheduler.api.audit.OperatorLog)")
public void logPointCut() {
}
@Around("logPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable {
@Before("logPointCut()")
public void before(JoinPoint point) {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
OperatorLog operatorLog = method.getAnnotation(OperatorLog.class);
Operation operation = method.getAnnotation(Operation.class);
if (operation == null) {
log.warn("Operation is null of method: {}", method.getName());
return point.proceed();
return;
}
long beginTime = System.currentTimeMillis();
Map<String, Object> paramsMap = OperatorUtils.getParamsMap(point, signature);
Result<?> result = (Result<?>) point.proceed();
User user = OperatorUtils.getUser(paramsMap);
if (user == null) {
log.error("user is null");
return;
}
AuditType auditType = operatorLog.auditType();
try {
AuditOperator operator = SpringApplicationContext.getBean(operatorLog.auditType().getOperatorClass());
long latency = System.currentTimeMillis() - beginTime;
operator.recordAudit(paramsMap, result, latency, operation, operatorLog);
List<AuditLog> auditLogList = OperatorUtils.buildAuditLogList(operation.description(), auditType, user);
operator.setRequestParam(auditType, auditLogList, paramsMap);
AuditContext auditContext =
new AuditContext(auditLogList, paramsMap, operatorLog, System.currentTimeMillis(), operator);
auditThreadLocal.set(auditContext);
} catch (Throwable throwable) {
log.error("Record audit log error", throwable);
}
}
@AfterReturning(value = "logPointCut()", returning = "returnValue")
public void afterReturning(Object returnValue) {
try {
AuditContext auditContext = auditThreadLocal.get();
if (auditContext == null) {
return;
}
auditContext.getOperator().recordAudit(auditContext, returnValue);
} catch (Throwable throwable) {
log.error("Record audit log error", throwable);
} finally {
auditThreadLocal.remove();
}
}
@AfterThrowing("logPointCut()")
public void afterThrowing() {
auditThreadLocal.remove();
}
@Getter
@Setter
@AllArgsConstructor
public static class AuditContext {
return result;
List<AuditLog> auditLogList;
Map<String, Object> paramsMap;
OperatorLog operatorLog;
long beginTime;
AuditOperator operator;
}
}

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/OperatorUtils.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.audit;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuditModelType;
import org.apache.dolphinscheduler.common.enums.AuditOperationType;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@ -36,24 +37,12 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.reflect.MethodSignature;
@Slf4j
public class OperatorUtils {
protected void changeObjectForVersionRelated(AuditOperationType auditOperationType, Map<String, Object> paramsMap,
List<AuditLog> auditLogList) {
switch (auditOperationType) {
case SWITCH_VERSION:
case DELETE_VERSION:
auditLogList.get(0).setModelName(paramsMap.get("version").toString());
break;
default:
break;
}
}
public static boolean resultFail(Result<?> result) {
return result != null && result.isFailed();
}
@ -66,7 +55,7 @@ public class OperatorUtils {
auditLog.setOperationType(auditType.getAuditOperationType().getName());
auditLog.setDescription(apiDescription);
auditLog.setCreateTime(new Date());
auditLogList.add(auditLog);
return auditLogList;
}
@ -80,7 +69,7 @@ public class OperatorUtils {
return null;
}
public static Map<String, Object> getParamsMap(ProceedingJoinPoint point, MethodSignature signature) {
public static Map<String, Object> getParamsMap(JoinPoint point, MethodSignature signature) {
Object[] args = point.getArgs();
String[] strings = signature.getParameterNames();
@ -95,7 +84,7 @@ public class OperatorUtils {
public static AuditOperationType modifyReleaseOperationType(AuditType auditType, Map<String, Object> paramsMap) {
switch (auditType.getAuditOperationType()) {
case RELEASE:
ReleaseState releaseState = (ReleaseState) paramsMap.get("releaseState");
ReleaseState releaseState = (ReleaseState) paramsMap.get(Constants.RELEASE_STATE);
if (releaseState == null) {
break;
}
@ -109,7 +98,7 @@ public class OperatorUtils {
}
break;
case EXECUTE:
ExecuteType executeType = (ExecuteType) paramsMap.get("executeType");
ExecuteType executeType = (ExecuteType) paramsMap.get(Constants.EXECUTE_TYPE);
if (executeType == null) {
break;
}
@ -184,7 +173,7 @@ public class OperatorUtils {
}
public static boolean isUdfResource(Map<String, Object> paramsMap) {
ResourceType resourceType = (ResourceType) paramsMap.get("type");
ResourceType resourceType = (ResourceType) paramsMap.get(Constants.STRING_PLUGIN_PARAM_TYPE);
return resourceType != null && resourceType.equals(ResourceType.UDF);
}

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/AuditOperator.java

@ -17,18 +17,16 @@
package org.apache.dolphinscheduler.api.audit.operator;
import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.audit.OperatorLogAspect;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
import java.util.List;
import java.util.Map;
import io.swagger.v3.oas.annotations.Operation;
public interface AuditOperator {
void recordAudit(Map<String, Object> paramsMap,
Result<?> result,
long latency,
Operation operation,
OperatorLog operatorLog) throws Throwable;
void recordAudit(OperatorLogAspect.AuditContext auditContext, Object returnValue);
void setRequestParam(AuditType auditType, List<AuditLog> auditLogList, Map<String, Object> paramsMap);
}

41
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/operator/BaseAuditOperator.java

@ -18,12 +18,12 @@
package org.apache.dolphinscheduler.api.audit.operator;
import org.apache.dolphinscheduler.api.audit.OperatorLog;
import org.apache.dolphinscheduler.api.audit.OperatorLogAspect;
import org.apache.dolphinscheduler.api.audit.OperatorUtils;
import org.apache.dolphinscheduler.api.audit.enums.AuditType;
import org.apache.dolphinscheduler.api.service.AuditService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.commons.lang3.math.NumberUtils;
@ -36,7 +36,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.base.Strings;
import io.swagger.v3.oas.annotations.Operation;
@Service
@Slf4j
@ -46,40 +45,34 @@ public abstract class BaseAuditOperator implements AuditOperator {
private AuditService auditService;
@Override
public void recordAudit(Map<String, Object> paramsMap,
Result<?> result,
long latency,
Operation operation,
OperatorLog operatorLog) {
AuditType auditType = operatorLog.auditType();
User user = OperatorUtils.getUser(paramsMap);
if (user == null) {
log.error("user is null");
return;
public void recordAudit(OperatorLogAspect.AuditContext auditContext, Object returnValue) {
Result<?> result = new Result<>();
if (returnValue instanceof Result) {
result = (Result<?>) returnValue;
if (OperatorUtils.resultFail(result)) {
log.error("request fail, code {}", result.getCode());
return;
}
}
List<AuditLog> auditLogList = OperatorUtils.buildAuditLogList(operation.description(), auditType, user);
setRequestParam(auditType, auditLogList, paramsMap);
long latency = System.currentTimeMillis() - auditContext.getBeginTime();
List<AuditLog> auditLogList = auditContext.getAuditLogList();
if (OperatorUtils.resultFail(result)) {
log.error("request fail, code {}", result.getCode());
return;
}
Map<String, Object> paramsMap = auditContext.getParamsMap();
OperatorLog operatorLog = auditContext.getOperatorLog();
AuditType auditType = operatorLog.auditType();
setObjectIdentityFromReturnObject(auditType, result, auditLogList);
modifyAuditOperationType(auditType, paramsMap, auditLogList);
modifyAuditObjectType(auditType, paramsMap, auditLogList);
auditLogList.forEach(auditLog -> auditLog.setLatency(latency));
auditLogList.forEach(auditLog -> auditService.addAudit(auditLog));
}
protected void setRequestParam(AuditType auditType, List<AuditLog> auditLogList, Map<String, Object> paramsMap) {
@Override
public void setRequestParam(AuditType auditType, List<AuditLog> auditLogList, Map<String, Object> paramsMap) {
String[] paramNameArr = auditType.getRequestParamName();
if (paramNameArr.length == 0) {

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -105,8 +105,6 @@ import com.google.common.collect.Lists;
@Slf4j
public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDefinitionService {
private static final String RELEASESTATE = "releaseState";
@Autowired
private ProjectMapper projectMapper;
@ -1297,7 +1295,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
if (null == releaseState) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code);
@ -1337,7 +1335,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
break;
default:
log.warn("Parameter releaseState is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE);
return result;
}
int update = taskDefinitionMapper.updateById(taskDefinition);

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -747,4 +747,8 @@ public final class Constants {
* K8S sensitive param
*/
public static final String K8S_CONFIG_REGEX = "(?<=((?i)configYaml(\" : \"))).*?(?=(\",\\n))";
public static final String RELEASE_STATE = "releaseState";
public static final String EXECUTE_TYPE = "executeType";
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuditModelType.java

@ -29,7 +29,7 @@ import lombok.Getter;
@Getter
public enum AuditModelType {
PROJECT("Project", null), // 1
PROJECT("Project", null),
PROCESS("Process", PROJECT),
PROCESS_INSTANCE("ProcessInstance", PROCESS),
TASK("Task", PROCESS),

Loading…
Cancel
Save