转载

Java并发编程入门(十一)限流场景和Spring限流器实现

限流场景一般基于硬件资源的使用负载,包括CPU,内存,IO。例如某个报表服务需要消耗大量内存,如果并发数增加就会拖慢整个应用,甚至内存溢出导致应用挂掉。

限流适用于会动态增加的资源,已经池化的资源不一定需要限流,例如数据库连接池,它是已经确定的资源,池的大小固定(即使可以动态伸缩池大小),这种场景下并不需要通过限流来实现,只要能做到如果池内链接已经使用完,则无法再获取新的连接则可。

因此,使用限流的前提是:

1.防止资源使用过载产生不良影响。

2.使用的资源会动态增加,例如一个站点的请求。

二、Spring中实现限流

I、限流需求

1.只针对Controller限流

2.根据url请求路径限流

3.可根据正则表达式匹配url来限流 4.可定义多个限流规则,每个规则的最大流量不同

II、相关类结构

Java并发编程入门(十一)限流场景和Spring限流器实现

1.CurrentLimiteAspect是一个拦截器,在controller执行前后执行后拦截

2.CurrentLimiter是限流器,可以添加限流规则,根据限流规则获取流量通行证,释放流量通行证;如果获取通行证失败则抛出异常。

3.LimiteRule是限流规则,限流规则可设置匹配url的正则表达式和最大流量值,同时获取该规则的流量通信证和释放流量通信证。

4.AcquireResult是获取流量通信证的结果,结果有3种:获取成功,获取失败,不需要获取。

5.Application是Spring的启动类,简单起见,在启动类种添加限流规则。

III、Show me code

1.AcquireResult.java

public class AcquireResult {

    /** 获取通行证成功 */
    public static final int ACQUIRE_SUCCESS = 0;

    /** 获取通行证失败 */
    public static final int ACQUIRE_FAILED = 1;

    /** 不需要获取通行证 */
    public static final int ACQUIRE_NONEED = 2;

    /** 获取通行证结果 */
    private int result;

    /** 可用通行证数量 */
    private int availablePermits;

    public int getResult() {
        return result;
    }

    public void setResult(int result) {
        this.result = result;
    }

    public int getAvailablePermits() {
        return availablePermits;
    }

    public void setAvailablePermits(int availablePermits) {
        this.availablePermits = availablePermits;
    }
}
复制代码

2.LimiteRule.java

/**
 * @ClassName LimiteRule
 * @Description TODO
 * @Author 铿然一叶
 * @Date 2019/10/4 20:18
 * @Version 1.0
 * javashizhan.com
 **/
public class LimiteRule {

    /** 信号量 */
    private final Semaphore sema;

    /** 请求URL匹配规则 */
    private final String pattern;

    /** 最大并发数 */
    private final int maxConcurrent;

    public LimiteRule(String pattern, int maxConcurrent) {
        this.sema = new Semaphore(maxConcurrent);
        this.pattern = pattern;
        this.maxConcurrent = maxConcurrent;
    }

    /**
     * 获取通行证
     * @param urlPath 请求Url
     * @return 0-获取成功,1-没有获取到通行证,2-不需要获取通行证
     */
    public synchronized AcquireResult tryAcquire(String urlPath) {

        AcquireResult acquireResult = new AcquireResult();
        acquireResult.setAvailablePermits(this.sema.availablePermits());

        try {
            //Url请求匹配规则则获取通行证
            if (Pattern.matches(pattern, urlPath)) {

                boolean acquire = this.sema.tryAcquire(50, TimeUnit.MILLISECONDS);

                if (acquire) {
                    acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS);
                    print(urlPath);
                } else {
                    acquireResult.setResult(AcquireResult.ACQUIRE_FAILED);
                }
            } else {
                acquireResult.setResult(AcquireResult.ACQUIRE_NONEED);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return acquireResult;
    }

    /**
     * 释放通行证
     */
    public synchronized void release() {
        this.sema.release();
        print(null);
    }

    /**
     * 得到最大并发数
     * @return
     */
    public int getMaxConcurrent() {
        return this.maxConcurrent;
    }

    /**
     * 得到匹配表达式
     * @return
     */
    public String getPattern() {
        return this.pattern;
    }

    /**
     * 打印日志
     * @param urlPath
     */
    private void print(String urlPath) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("Pattern: ").append(pattern).append(", ");
        if (null != urlPath) {
            buffer.append("urlPath: ").append(urlPath).append(", ");
        }
        buffer.append("Available Permits:").append(this.sema.availablePermits());
        System.out.println(buffer.toString());
    }

}
复制代码

3.CurrentLimiter.java

/**
 * @ClassName CurrentLimiter
 * @Description TODO
 * @Author 铿然一叶
 * @Date 2019/10/4 20:18
 * @Version 1.0
 * javashizhan.com
 **/
public class CurrentLimiter {

    /** 本地线程变量,存储一次请求获取到的通行证,和其他并发请求隔离开,在controller执行完后释放本次请求获得的通行证 */
    private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>();

    /** 所有限流规则 */
    private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>();

    /** 私有构造器,避免实例化 */
    private CurrentLimiter() {}

    /**
     * 添加限流规则,在spring启动时添加,不需要加锁,如果在运行中动态添加,需要加锁
     * @param rule
     */
    public static void addRule(LimiteRule rule) {
        printRule(rule);
        allLimiteRules.add(rule);
    }

    /**
     * 获取流量通信证,所有流量规则都要获取后才能通过,如果一个不能获取则抛出异常
     * 多线程并发,需要加锁
     * @param urlPath
     */
    public synchronized static void tryAcquire(String urlPath) throws Exception {
        //有限流规则则处理
        if (allLimiteRules.size() > 0) {

            //能获取到通行证的流量规则要保存下来,在Controller执行完后要释放
            Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>();

            for(LimiteRule rule:allLimiteRules) {
                //获取通行证
                AcquireResult acquireResult = rule.tryAcquire(urlPath);

                if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) {
                    acquiredLimitRules.add(rule);
                    //获取到通行证的流量规则添加到本地线程变量
                    localAcquiredLimiteRules.set(acquiredLimitRules);

                } else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) {
                    //如果获取不到通行证则抛出异常
                    StringBuffer buffer = new StringBuffer();
                    buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit, the limit is ").append(rule.getMaxConcurrent())
                            .append(", available permit is").append(acquireResult.getAvailablePermits()).append(".");

                    System.out.println(buffer);
                    throw new Exception(buffer.toString());

                } else {
                    StringBuffer buffer = new StringBuffer();
                    buffer.append("This path does not match the limit rule, path is [").append(urlPath)
                            .append("], pattern is [").append(rule.getPattern()).append("].");
                    System.out.println(buffer.toString());
                }
            }
        }
    }

    /**
     * 释放获取到的通行证。在controller执行完后掉调用(抛出异常也需要调用)
     */
    public synchronized static void release() {
        Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get();
        if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) {
            acquiredLimitRules.forEach(rule->{
                rule.release();
            });
        }

        //destory本地线程变量,避免内存泄漏
        localAcquiredLimiteRules.remove();
    }

    /**
     * 打印限流规则信息
     * @param rule
     */
    private static void printRule(LimiteRule rule) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("Add Limit Rule, Max Concurrent: ").append(rule.getMaxConcurrent())
                .append(", Pattern: ").append(rule.getPattern());
        System.out.println(buffer.toString());
    }
}
复制代码

4.CurrentLimiteAspect.java

/**
 * @ClassName CurrentLimiteAspect
 * @Description TODO
 * @Author 铿然一叶
 * @Date 2019/10/4 20:15
 * @Version 1.0
 * javashizhan.com
 **/
@Aspect
@Component
public class CurrentLimiteAspect {

    /**
     * 拦截controller,自行修改路径
     */
    @Pointcut("execution(* com.javashizhan.controller..*(..))")
    public void controller() { }

    @Before("controller()")
    public void controller(JoinPoint point) throws Exception {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        //获取通行证,urlPath的格式如:/limit
        CurrentLimiter.tryAcquire(request.getRequestURI());
    }

    /**
     * controller执行完后调用,即使controller抛出异常这个拦截方法也会被调用
     * @param joinPoint
     */
    @After("controller()")
    public void after(JoinPoint joinPoint) {
        //释放获取到的通行证
        CurrentLimiter.release();
    }
}
复制代码

5.Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);

        //添加限流规则
        LimiteRule rule = new LimiteRule("/limit", 4);
        CurrentLimiter.addRule(rule);
    }
}
复制代码

IV、验证

测试验证碰到的两个坑:

1.人工通过浏览器刷新请求发现controller是串行的

2.通过postman设置了并发测试也还是串行的,即便设置了并发数,如下图:

Java并发编程入门(十一)限流场景和Spring限流器实现

百度无果,只能自行写代码验证了,代码如下:

/**
 * @ClassName TestClient
 * @Description TODO
 * @Author 铿然一叶
 * @Date 2019/10/5 0:51
 * @Version 1.0
 * javashizhan.com
 **/
public class CurrentLimiteTest {

    public static void main(String[] args) {
        final String limitUrlPath = "http://localhost:8080/limit";
        final String noLimitUrlPath = "http://localhost:8080/nolimit";

        //限流测试
        test(limitUrlPath);

        //休眠一会,等上一批线程执行完,方便查看日志
        sleep(5000);

        //不限流测试
        test(noLimitUrlPath);

    }

    private static void test(String urlPath) {
        Thread[] requesters = new Thread[10];

        for (int i = 0; i < requesters.length; i++) {
            requesters[i] = new Thread(new Requester(urlPath));
            requesters[i].start();
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Requester implements Runnable {

    private final String urlPath;
    private final RestTemplate restTemplate = new RestTemplate();

    public Requester(String urlPath) {
        this.urlPath = urlPath;
    }

    @Override
    public void run() {
        String response = restTemplate.getForEntity(urlPath, String.class).getBody();
        System.out.println("response: " + response);
    }
}
复制代码

输出日志如下:

Pattern: /limit, urlPath: /limit, Available Permits:3
Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
Pattern: /limit, Available Permits:1
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
Pattern: /limit, Available Permits:4
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
复制代码

可以看到日志输出信息为:

1.第1个测试url最大并发为4,一次10个并发请求,有4个获取通行证后,剩余6个获取通行证失败。

2.获取到通行证的4个请求在controller执行完后释放了通行证。

3.第2个测试url没有限制并发,10个请求均执行成功。

至此,限流器验证成功。

end.

相关阅读:

Java并发编程(一)知识地图

Java并发编程(二)原子性

Java并发编程(三)可见性

Java并发编程(四)有序性

Java并发编程(五)创建线程方式概览

Java并发编程入门(六)synchronized用法

Java并发编程入门(七)轻松理解wait和notify以及使用场景

Java并发编程入门(八)线程生命周期

Java并发编程入门(九)死锁和死锁定位

Java并发编程入门(十)锁优化

Java并发编程入门(十二)生产者和消费者模式-代码模板

Java并发编程入门(十三)读写锁和缓存模板
原文  https://juejin.im/post/5d9792e8518825157267f6c3
正文到此结束
Loading...