转载

聊聊dubbo的AccessLogFilter

本文主要研究一下dubbo的AccessLogFilter

AccessLogFilter

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/AccessLogFilter.java

@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
public class AccessLogFilter implements Filter {

    private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);

    private static final String ACCESS_LOG_KEY = "dubbo.accesslog";

    private static final int LOG_MAX_BUFFER = 5000;

    private static final long LOG_OUTPUT_INTERVAL = 5000;

    private static final String FILE_DATE_FORMAT = "yyyyMMdd";

    // It's safe to declare it as singleton since it runs on single thread only
    private static final DateFormat FILE_NAME_FORMATTER = new SimpleDateFormat(FILE_DATE_FORMAT);

    private static final Map<String, Set<AccessLogData>> LOG_ENTRIES = new ConcurrentHashMap<String, Set<AccessLogData>>();

    private static final ScheduledExecutorService LOG_SCHEDULED = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Access-Log", true));

    /**
     * Default constructor initialize demon thread for writing into access log file with names with access log key
     * defined in url <b>accesslog</b>
     */
    public AccessLogFilter() {
        LOG_SCHEDULED.scheduleWithFixedDelay(this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
    }

    /**
     * This method logs the access log for service method invocation call.
     *
     * @param invoker service
     * @param inv     Invocation service method.
     * @return Result from service method.
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        try {
            String accessLogKey = invoker.getUrl().getParameter(ACCESS_LOG_KEY);
            if (ConfigUtils.isNotEmpty(accessLogKey)) {
                AccessLogData logData = buildAccessLogData(invoker, inv);
                log(accessLogKey, logData);
            }
        } catch (Throwable t) {
            logger.warn("Exception in AccessLogFilter of service(" + invoker + " -> " + inv + ")", t);
        }
        return invoker.invoke(inv);
    }

    private void log(String accessLog, AccessLogData accessLogData) {
        Set<AccessLogData> logSet = LOG_ENTRIES.computeIfAbsent(accessLog, k -> new ConcurrentHashSet<>());

        if (logSet.size() < LOG_MAX_BUFFER) {
            logSet.add(accessLogData);
        } else {
            //TODO we needs use force writing to file so that buffer gets clear and new log can be written.
            logger.warn("AccessLog buffer is full skipping buffer ");
        }
    }

    private void writeLogToFile() {
        if (!LOG_ENTRIES.isEmpty()) {
            for (Map.Entry<String, Set<AccessLogData>> entry : LOG_ENTRIES.entrySet()) {
                try {
                    String accessLog = entry.getKey();
                    Set<AccessLogData> logSet = entry.getValue();
                    if (ConfigUtils.isDefault(accessLog)) {
                        processWithServiceLogger(logSet);
                    } else {
                        File file = new File(accessLog);
                        createIfLogDirAbsent(file);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Append log to " + accessLog);
                        }
                        renameFile(file);
                        processWithAccessKeyLogger(logSet, file);
                    }

                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

    private void processWithAccessKeyLogger(Set<AccessLogData> logSet, File file) throws IOException {
        try (FileWriter writer = new FileWriter(file, true)) {
            for (Iterator<AccessLogData> iterator = logSet.iterator();
                 iterator.hasNext();
                 iterator.remove()) {
                writer.write(iterator.next().getLogMessage());
                writer.write("/r/n");
            }
            writer.flush();
        }
    }

    private AccessLogData buildAccessLogData(Invoker<?> invoker, Invocation inv) {
        RpcContext context = RpcContext.getContext();
        AccessLogData logData = AccessLogData.newLogData();
        logData.setServiceName(invoker.getInterface().getName());
        logData.setMethodName(inv.getMethodName());
        logData.setVersion(invoker.getUrl().getParameter(VERSION_KEY));
        logData.setGroup(invoker.getUrl().getParameter(GROUP_KEY));
        logData.setInvocationTime(new Date());
        logData.setTypes(inv.getParameterTypes());
        logData.setArguments(inv.getArguments());
        return logData;
    }

    private void processWithServiceLogger(Set<AccessLogData> logSet) {
        for (Iterator<AccessLogData> iterator = logSet.iterator();
             iterator.hasNext();
             iterator.remove()) {
            AccessLogData logData = iterator.next();
            LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
        }
    }

    private void createIfLogDirAbsent(File file) {
        File dir = file.getParentFile();
        if (null != dir && !dir.exists()) {
            dir.mkdirs();
        }
    }

    private void renameFile(File file) {
        if (file.exists()) {
            String now = FILE_NAME_FORMATTER.format(new Date());
            String last = FILE_NAME_FORMATTER.format(new Date(file.lastModified()));
            if (!now.equals(last)) {
                File archive = new File(file.getAbsolutePath() + "." + last);
                file.renameTo(archive);
            }
        }
    }
}
  • AccessLogFilter实现了org.apache.dubbo.rpc.Filter接口,其构造器注册了一个定时任务,每隔LOG_OUTPUT_INTERVAL执行一次writeLogToFile
  • invoke方法从invoker.getUrl()获取accessLogKey,如果不为空则使用buildAccessLogData构建AccessLogData,然后放入到LOG_ENTRIES中,如果超出LOG_MAX_BUFFER则对其并打印warn日志
  • writeLogToFile方法遍历LOG_ENTRIES,将AccessLogData写入文件,如果是default的则使用processWithServiceLogger,否则使用processWithAccessKeyLogger方法

AccessLogFilterTest

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/AccessLogFilterTest.java

public class AccessLogFilterTest {

    Filter accessLogFilter = new AccessLogFilter();

    // Test filter won't throw an exception
    @Test
    public void testInvokeException() {
        Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(null);
        Invocation invocation = new MockInvocation();
        LogUtil.start();
        accessLogFilter.invoke(invoker, invocation);
        assertEquals(1, LogUtil.findMessage("Exception in AccessLogFilter of service"));
        LogUtil.stop();
    }

    // TODO how to assert thread action
    @Test
    public void testDefault() {
        URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1");
        Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
        Invocation invocation = new MockInvocation();
        accessLogFilter.invoke(invoker, invocation);
    }

    @Test
    public void testCustom() {
        URL url = URL.valueOf("test://test:11/test?accesslog=custom-access.log");
        Invoker<AccessLogFilterTest> invoker = new MyInvoker<AccessLogFilterTest>(url);
        Invocation invocation = new MockInvocation();
        accessLogFilter.invoke(invoker, invocation);
    }

}
  • 这里验证了invokeException、default、custom场景

AccessLogData

dubbo-2.7.3/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/AccessLogData.java

public final class AccessLogData {

    private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final DateFormat MESSAGE_DATE_FORMATTER = new SimpleDateFormat(MESSAGE_DATE_FORMAT);

    private static final String VERSION = "version";
    private static final String GROUP = "group";
    private static final String SERVICE = "service";
    private static final String METHOD_NAME = "method-name";
    private static final String INVOCATION_TIME = "invocation-time";
    private static final String TYPES = "types";
    private static final String ARGUMENTS = "arguments";
    private static final String REMOTE_HOST = "remote-host";
    private static final String REMOTE_PORT = "remote-port";
    private static final String LOCAL_HOST = "localhost";
    private static final String LOCAL_PORT = "local-port";

    /**
     * This is used to store log data in key val format.
     */
    private Map<String, Object> data;

    /**
     * Default constructor.
     */
    private AccessLogData() {
        RpcContext context = RpcContext.getContext();
        data = new HashMap<>();
        setLocalHost(context.getLocalHost());
        setLocalPort(context.getLocalPort());
        setRemoteHost(context.getRemoteHost());
        setRemotePort(context.getRemotePort());
    }

    //......

    public String getLogMessage() {
        StringBuilder sn = new StringBuilder();

        sn.append("[")
                .append(MESSAGE_DATE_FORMATTER.format(getInvocationTime()))
                .append("] ")
                .append(get(REMOTE_HOST))
                .append(":")
                .append(get(REMOTE_PORT))
                .append(" -> ")
                .append(get(LOCAL_HOST))
                .append(":")
                .append(get(LOCAL_PORT))
                .append(" - ");

        String group = get(GROUP) != null ? get(GROUP).toString() : "";
        if (StringUtils.isNotEmpty(group.toString())) {
            sn.append(group).append("/");
        }

        sn.append(get(SERVICE));

        String version = get(VERSION) != null ? get(VERSION).toString() : "";
        if (StringUtils.isNotEmpty(version.toString())) {
            sn.append(":").append(version);
        }

        sn.append(" ");
        sn.append(get(METHOD_NAME));

        sn.append("(");
        Class<?>[] types = get(TYPES) != null ? (Class<?>[]) get(TYPES) : new Class[0];
        boolean first = true;
        for (Class<?> type : types) {
            if (first) {
                first = false;
            } else {
                sn.append(",");
            }
            sn.append(type.getName());
        }
        sn.append(") ");


        Object[] args = get(ARGUMENTS) != null ? (Object[]) get(ARGUMENTS) : null;
        if (args != null && args.length > 0) {
            sn.append(JSON.toJSONString(args));
        }

        return sn.toString();
    }

    //......
}
  • AccessLogData定义了version、group、service、method-name、invocation-time、types、arguments、remote-host、remote-port、localhost、local-port常量;getLogMessage则构建log的输出

小结

  • AccessLogFilter实现了org.apache.dubbo.rpc.Filter接口,其构造器注册了一个定时任务,每隔LOG_OUTPUT_INTERVAL执行一次writeLogToFile
  • AccessLogFilter的invoke方法从invoker.getUrl()获取accessLogKey,如果不为空则使用buildAccessLogData构建AccessLogData,然后放入到LOG_ENTRIES中,如果超出LOG_MAX_BUFFER则对其并打印warn日志
  • AccessLogFilter的writeLogToFile方法遍历LOG_ENTRIES,将AccessLogData写入文件,如果是default的则使用processWithServiceLogger,否则使用processWithAccessKeyLogger方法

doc

  • AccessLogFilter
原文  https://segmentfault.com/a/1190000020134110
正文到此结束
Loading...