Draven
4 months ago
25 changed files with 419 additions and 425 deletions
@ -1,92 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.audit; |
||||
|
||||
import org.apache.dolphinscheduler.api.configuration.ApiConfig; |
||||
|
||||
import java.util.List; |
||||
import java.util.concurrent.BlockingQueue; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
|
||||
import javax.annotation.PostConstruct; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
@Slf4j |
||||
public class AuditPublishService { |
||||
|
||||
private final BlockingQueue<AuditMessage> auditMessageQueue = new LinkedBlockingQueue<>(); |
||||
|
||||
@Autowired |
||||
private List<AuditSubscriber> subscribers; |
||||
|
||||
@Autowired |
||||
private ApiConfig apiConfig; |
||||
|
||||
/** |
||||
* create a daemon thread to process the message queue |
||||
*/ |
||||
@PostConstruct |
||||
private void init() { |
||||
if (apiConfig.isAuditEnable()) { |
||||
Thread thread = new Thread(this::doPublish); |
||||
thread.setDaemon(true); |
||||
thread.setName("Audit-Log-Consume-Thread"); |
||||
thread.start(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* publish a new audit message |
||||
* |
||||
* @param message audit message |
||||
*/ |
||||
public void publish(AuditMessage message) { |
||||
if (apiConfig.isAuditEnable() && !auditMessageQueue.offer(message)) { |
||||
log.error("Publish audit message failed, message:{}", message); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* subscribers execute the message processor method |
||||
*/ |
||||
private void doPublish() { |
||||
AuditMessage message = null; |
||||
while (true) { |
||||
try { |
||||
message = auditMessageQueue.take(); |
||||
for (AuditSubscriber subscriber : subscribers) { |
||||
try { |
||||
subscriber.execute(message); |
||||
} catch (Exception e) { |
||||
log.error("Consume audit message failed, message:{}", message, e); |
||||
} |
||||
} |
||||
} catch (InterruptedException e) { |
||||
log.error("Consume audit message failed, message:{}", message, e); |
||||
Thread.currentThread().interrupt(); |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
@ -1,28 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.audit; |
||||
|
||||
public interface AuditSubscriber { |
||||
|
||||
/** |
||||
* process the audit message |
||||
* |
||||
* @param message |
||||
*/ |
||||
void execute(AuditMessage message); |
||||
} |
@ -1,42 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.audit; |
||||
|
||||
import org.apache.dolphinscheduler.dao.entity.AuditLog; |
||||
import org.apache.dolphinscheduler.dao.mapper.AuditLogMapper; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Component |
||||
public class AuditSubscriberImpl implements AuditSubscriber { |
||||
|
||||
@Autowired |
||||
private AuditLogMapper logMapper; |
||||
|
||||
@Override |
||||
public void execute(AuditMessage message) { |
||||
AuditLog auditLog = new AuditLog(); |
||||
auditLog.setUserId(message.getUser().getId()); |
||||
auditLog.setResourceType(message.getResourceType().getCode()); |
||||
auditLog.setOperation(message.getOperation().getCode()); |
||||
auditLog.setTime(message.getAuditDate()); |
||||
auditLog.setResourceId(message.getResourceId()); |
||||
logMapper.insert(auditLog); |
||||
} |
||||
} |
@ -0,0 +1,21 @@
|
||||
package org.apache.dolphinscheduler.api.audit; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.Audit.AuditObjectType; |
||||
import org.apache.dolphinscheduler.common.enums.Audit.AuditOperationType; |
||||
|
||||
import java.lang.annotation.Documented; |
||||
import java.lang.annotation.ElementType; |
||||
import java.lang.annotation.Retention; |
||||
import java.lang.annotation.RetentionPolicy; |
||||
import java.lang.annotation.Target; |
||||
|
||||
@Target(ElementType.METHOD) |
||||
@Retention(RetentionPolicy.RUNTIME) |
||||
@Documented |
||||
public @interface OperatorLog { |
||||
|
||||
String describe() default ""; |
||||
AuditObjectType objectType() default AuditObjectType.PROJECT; |
||||
AuditOperationType operationType() default AuditOperationType.CREATE; |
||||
|
||||
} |
@ -0,0 +1,108 @@
|
||||
package org.apache.dolphinscheduler.api.audit; |
||||
|
||||
import java.lang.reflect.Field; |
||||
import java.lang.reflect.Method; |
||||
import java.util.*; |
||||
|
||||
import org.apache.dolphinscheduler.api.service.AuditService; |
||||
import org.apache.dolphinscheduler.api.utils.Result; |
||||
import org.apache.dolphinscheduler.common.enums.Audit.AuditOperationType; |
||||
import org.apache.dolphinscheduler.dao.entity.AuditLog; |
||||
import org.apache.dolphinscheduler.dao.entity.Project; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; |
||||
import org.aspectj.lang.ProceedingJoinPoint; |
||||
import org.aspectj.lang.annotation.Around; |
||||
import org.aspectj.lang.annotation.Aspect; |
||||
import org.aspectj.lang.annotation.Pointcut; |
||||
import org.aspectj.lang.reflect.MethodSignature; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
@Aspect |
||||
@Component |
||||
public class OperatorLogAspect { |
||||
|
||||
@Autowired |
||||
private AuditService auditService; |
||||
|
||||
@Autowired |
||||
private ProjectMapper projectMapper; |
||||
|
||||
@Pointcut("@annotation(OperatorLog)") |
||||
public void logPointCut() { |
||||
|
||||
} |
||||
|
||||
@Around("logPointCut()") |
||||
public Object around(ProceedingJoinPoint point) throws Throwable { |
||||
|
||||
MethodSignature signature = (MethodSignature) point.getSignature(); |
||||
Method method = signature.getMethod(); |
||||
|
||||
OperatorLog sysLog = method.getAnnotation(OperatorLog.class); |
||||
if (sysLog == null) { |
||||
return null; |
||||
} |
||||
|
||||
Object[] args = point.getArgs(); |
||||
String[] strings = signature.getParameterNames(); |
||||
|
||||
User user = null; |
||||
|
||||
Map<String, Object> map = new HashMap<>(); |
||||
for (int i = 0; i < strings.length; i++) { |
||||
map.put(strings[i], args[i]); |
||||
|
||||
if (args[i] instanceof User) { |
||||
user = (User)args[i]; |
||||
} |
||||
} |
||||
|
||||
AuditLog auditLog = new AuditLog(); |
||||
auditLog.setUserId(user.getId()); |
||||
auditLog.setObjectType(sysLog.objectType().getCode()); |
||||
auditLog.setOperationType(sysLog.operationType().getCode()); |
||||
auditLog.setTime(new Date()); |
||||
|
||||
if(sysLog.operationType().equals(AuditOperationType.DELETE)){ |
||||
// need to get the name before real deleted
|
||||
switch (sysLog.objectType()){ |
||||
case PROJECT: |
||||
Project project = projectMapper.queryByCode((long)map.get("code")); |
||||
auditLog.setDetail(project.getName()); |
||||
auditLog.setObjectId(project.getId()); |
||||
default: |
||||
break; |
||||
} |
||||
} |
||||
|
||||
long beginTime = System.currentTimeMillis(); |
||||
Object result = point.proceed(); |
||||
long time = System.currentTimeMillis() - beginTime; |
||||
|
||||
saveLog(result, sysLog, auditLog); |
||||
|
||||
return result; |
||||
} |
||||
|
||||
private void saveLog(Object object, OperatorLog sysLog, AuditLog auditLog) { |
||||
Result result = (Result)object; |
||||
if(result.isFailed()) { |
||||
return; |
||||
} |
||||
|
||||
auditService.addAudit(auditLog); |
||||
} |
||||
|
||||
public static int getId(Object obj) { |
||||
try { |
||||
Field idField = obj.getClass().getDeclaredField("id"); |
||||
idField.setAccessible(true); |
||||
return (int) idField.get(obj); |
||||
} catch (NoSuchFieldException | IllegalAccessException | ClassCastException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
return -1; |
||||
} |
||||
} |
@ -0,0 +1,80 @@
|
||||
package org.apache.dolphinscheduler.api.utils; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.UserType; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
|
||||
import java.lang.reflect.Field; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
import java.util.Objects; |
||||
|
||||
public class AuditLogUtils { |
||||
|
||||
public static void main(String[] args) { |
||||
User user1 = new User(); |
||||
user1.setId(1); |
||||
user1.setUserType(UserType.ADMIN_USER); |
||||
user1.setEmail("ddd"); |
||||
|
||||
User user2 = new User(); |
||||
user2.setId(3); |
||||
user2.setUserType(UserType.ADMIN_USER); |
||||
user2.setPhone("1234"); |
||||
AuditLogUtils.getDiff(user1, user2); |
||||
} |
||||
|
||||
public static String getDiff(Object pre, Object now) { |
||||
Map<String, Object> map1 = getAllFieldValues(pre); |
||||
Map<String, Object> map2 = getAllFieldValues(now); |
||||
|
||||
for (String key1 : map1.keySet()) { |
||||
Object valueOld = map1.get(key1); |
||||
if(map2.containsKey(key1)) { |
||||
Object valueNew = map2.get(key1); |
||||
if (!valueOld.equals(valueNew)) { |
||||
System.out.println(key1 + " from " + valueOld + " to " + valueNew); |
||||
} |
||||
} else { |
||||
System.out.println(key1 + " value " + valueOld + " deleted"); |
||||
} |
||||
} |
||||
|
||||
for (String key2 : map2.keySet()) { |
||||
|
||||
if(!map1.containsKey(key2)) { |
||||
Object valueNew = map2.get(key2); |
||||
System.out.println(key2 + " value " + valueNew + " added"); |
||||
} |
||||
} |
||||
return "!"; |
||||
} |
||||
|
||||
public static Map<String, Object> getAllFieldValues(Object obj) { |
||||
Class<?> clazz = obj.getClass(); |
||||
|
||||
Map<String, Object> map = new HashMap<>(); |
||||
|
||||
while (clazz != null) { |
||||
Field[] fields = clazz.getDeclaredFields(); |
||||
|
||||
for (Field field : fields) { |
||||
field.setAccessible(true); |
||||
|
||||
try { |
||||
Object value = field.get(obj); |
||||
|
||||
if(!Objects.isNull(value)) { |
||||
map.put(field.getName(), value); |
||||
} |
||||
} catch (Exception e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
|
||||
// 获取父类,直到父类为null
|
||||
clazz = clazz.getSuperclass(); |
||||
} |
||||
|
||||
return map; |
||||
} |
||||
} |
@ -0,0 +1,102 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.enums.Audit; |
||||
|
||||
import lombok.Getter; |
||||
|
||||
import java.util.*; |
||||
|
||||
/** |
||||
* Audit Operation type |
||||
*/ |
||||
|
||||
public enum AuditObjectType { |
||||
|
||||
PROJECT(0, -1, "Project", true), |
||||
RESOURCE(1,-1, "Resource", false), |
||||
DATASOURCE(2,-1, "Datasource", true), |
||||
SECURITY(3,-1, "Security", false), |
||||
WORKFLOW(4,0, "Workflow", true), |
||||
WORKFLOW_INSTANCE(5,4, "Workflow instance", true), |
||||
TASK(6,5, "Workflow instance", true), |
||||
FLINK(7,0, "Flink", true), |
||||
ETL(8,0, "Etl", true); |
||||
@Getter |
||||
private final int code; |
||||
@Getter |
||||
private final int parentCode; |
||||
@Getter |
||||
private final String name; |
||||
@Getter |
||||
private final boolean hasLogs; |
||||
@Getter |
||||
private int level; |
||||
|
||||
private static final Map<Integer, List<AuditObjectType>> AUDIT_OBJECT_LEVEL_MAP = new HashMap<>(); |
||||
|
||||
private static HashMap<Integer, AuditObjectType> AUDIT_OBJECT_MAP = new HashMap<>(); |
||||
|
||||
|
||||
static { |
||||
for (AuditObjectType auditObjectType : values()) { |
||||
int level = calculateLevel(auditObjectType); |
||||
AUDIT_OBJECT_LEVEL_MAP.computeIfAbsent(level, k -> new ArrayList<>()).add(auditObjectType); |
||||
auditObjectType.level = level; |
||||
AUDIT_OBJECT_MAP.put(auditObjectType.code, auditObjectType); |
||||
} |
||||
} |
||||
|
||||
public static AuditObjectType of(int status) { |
||||
if (AUDIT_OBJECT_MAP.containsKey(status)) { |
||||
return AUDIT_OBJECT_MAP.get(status); |
||||
} |
||||
throw new IllegalArgumentException("invalid audit operation type code " + status); |
||||
} |
||||
|
||||
AuditObjectType(int code, int parentCode, String name, boolean hasLogs) { |
||||
this.code = code; |
||||
this.parentCode = parentCode; |
||||
this.name = name; |
||||
this.hasLogs = hasLogs; |
||||
} |
||||
|
||||
private static int calculateLevel(AuditObjectType auditObjectType) { |
||||
int level = 0; |
||||
int parentCode = auditObjectType.parentCode; |
||||
|
||||
while (parentCode != -1) { |
||||
level++; |
||||
parentCode = getParentCode(parentCode); |
||||
} |
||||
|
||||
return level; |
||||
} |
||||
|
||||
private static int getParentCode(int code) { |
||||
for (AuditObjectType objectType : values()) { |
||||
if (objectType.code == code) { |
||||
return objectType.parentCode; |
||||
} |
||||
} |
||||
return -1; |
||||
} |
||||
|
||||
public static void main(String[] args) { |
||||
System.out.println(1); |
||||
} |
||||
} |
@ -1,61 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.common.enums; |
||||
|
||||
import java.util.HashMap; |
||||
|
||||
/** |
||||
* Audit Module type |
||||
*/ |
||||
public enum AuditResourceType { |
||||
|
||||
// TODO: add other audit resource enums
|
||||
USER_MODULE(0, "USER"), |
||||
PROJECT_MODULE(1, "PROJECT"); |
||||
|
||||
private final int code; |
||||
private final String enMsg; |
||||
|
||||
private static HashMap<Integer, AuditResourceType> AUDIT_RESOURCE_MAP = new HashMap<>(); |
||||
|
||||
static { |
||||
for (AuditResourceType auditResourceType : AuditResourceType.values()) { |
||||
AUDIT_RESOURCE_MAP.put(auditResourceType.code, auditResourceType); |
||||
} |
||||
} |
||||
|
||||
AuditResourceType(int code, String enMsg) { |
||||
this.code = code; |
||||
this.enMsg = enMsg; |
||||
} |
||||
|
||||
public int getCode() { |
||||
return this.code; |
||||
} |
||||
|
||||
public String getMsg() { |
||||
return this.enMsg; |
||||
} |
||||
|
||||
public static AuditResourceType of(int status) { |
||||
if (AUDIT_RESOURCE_MAP.containsKey(status)) { |
||||
return AUDIT_RESOURCE_MAP.get(status); |
||||
} |
||||
throw new IllegalArgumentException("invalid audit resource type code " + status); |
||||
} |
||||
} |
Loading…
Reference in new issue