java 服务接口API限流 Rate Limit

摘要:
服务接口的流量控制策略:分流、降级、流量限制等。2)使用Reids的列表结构,而不是incr命令1FUNCTIONLIMIT_API_CALLL2current=LLEN3IFcurrent˃10THEN4ERROR“toomanyrequestsperssecond”5ELSE6IFEXIST==FALSE7MULTI8RPUSH9EXPIRE10EXEC11ELSE12RPUSHX13END14PERFORM_API_CALL()15ENDRateLimit将Redis列表用作容器。LLEN用于检查访问次数。事务包含两个命令,RPUSH和EXPIRE,用于创建列表并设置执行第一个计数时的过期时间。RPUSHX增加后续计数操作中的数量。
 

一、场景描述                                                                                                

     很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。

    也就是面对大流量时,如何进行流量控制?

    服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。

     实际场景中常用的限流策略:

  • Nginx前端限流

         按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流

  • 业务应用系统限流

        1、客户端限流

        2、服务端限流

  • 数据库限流

        红线区,力保数据库

二、常用的限流算法                                                                                       

     常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。

     1、漏桶算法

         漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:

   java 服务接口API限流 Rate Limit第1张

         可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。

         因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.

     2、令牌桶算法

         令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.

java 服务接口API限流 Rate Limit第2张

  令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.

三、基于Redis功能的实现                                                                                

       简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。

      1)使用Redis的incr命令,将计数器作为Lua脚本         

1 local current
2 current = redis.call("incr",KEYS[1])
3 if tonumber(current) == 1 then
4     redis.call("expire",KEYS[1],1)
5 end

        Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。

       2)使用Reids的列表结构代替incr命令

复制代码
 1 FUNCTION LIMIT_API_CALL(ip)
 2 current = LLEN(ip)
 3 IF current > 10 THEN
 4     ERROR "too many requests per second"
 5 ELSE
 6     IF EXISTS(ip) == FALSE
 7         MULTI
 8             RPUSH(ip,ip)
 9             EXPIRE(ip,1)
10         EXEC
11     ELSE
12         RPUSHX(ip,ip)
13     END
14     PERFORM_API_CALL()
15 END
复制代码

         Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,

    RPUSHX在后续的计数操作中进行增加操作。

四、基于令牌桶算法的实现                                                                                

       令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。

      

复制代码
  1 import java.io.BufferedWriter;
  2 import java.io.FileOutputStream;
  3 import java.io.IOException;
  4 import java.io.OutputStreamWriter;
  5 import java.util.Random;
  6 import java.util.concurrent.ArrayBlockingQueue;
  7 import java.util.concurrent.Executors;
  8 import java.util.concurrent.ScheduledExecutorService;
  9 import java.util.concurrent.TimeUnit;
 10 import java.util.concurrent.locks.ReentrantLock;
 11  
 12 import com.google.common.base.Preconditions;
 13 import com.netease.datastream.util.framework.LifeCycle;
 14  
 15
 20 public class TokenBucket implements LifeCycle {
 21  
 22 // 默认桶大小个数 即最大瞬间流量是64M
 23  private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
 24  
 25 // 一个桶的单位是1字节
 26  private int everyTokenSize = 1;
 27  
 28 // 瞬间最大流量
 29  private int maxFlowRate;
 30  
 31 // 平均流量
 32  private int avgFlowRate;
 33  
 34 // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
 35  private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
 36  
 37 private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 38  
 39 private volatile boolean isStart = false;
 40  
 41 private ReentrantLock lock = new ReentrantLock(true);
 42  
 43 private static final byte A_CHAR = 'a';
 44  
 45 public TokenBucket() {
 46  }
 47  
 48 public TokenBucket(int maxFlowRate, int avgFlowRate) {
 49  this.maxFlowRate = maxFlowRate;
 50  this.avgFlowRate = avgFlowRate;
 51  }
 52  
 53 public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
 54  this.everyTokenSize = everyTokenSize;
 55  this.maxFlowRate = maxFlowRate;
 56  this.avgFlowRate = avgFlowRate;
 57  }
 58  
 59 public void addTokens(Integer tokenNum) {
 60  
 61 // 若是桶已经满了,就不再家如新的令牌
 62  for (int i = 0; i < tokenNum; i++) {
 63  tokenQueue.offer(Byte.valueOf(A_CHAR));
 64  }
 65  }
 66  
 67 public TokenBucket build() {
 68  
 69 start();
 70  return this;
 71  }
 72  
 73 /**
 74  * 获取足够的令牌个数
 75  *
 76  * @return
 77  */
 78  public boolean getTokens(byte[] dataSize) {
 79  
 80 Preconditions.checkNotNull(dataSize);
 81  Preconditions.checkArgument(isStart, "please invoke start method first !");
 82  
 83 int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
 84  
 85 final ReentrantLock lock = this.lock;
 86  lock.lock();
 87  try {
 88  boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
 89  if (!result) {
 90  return false;
 91  }
 92  
 93 int tokenCount = 0;
 94  for (int i = 0; i < needTokenNum; i++) {
 95  Byte poll = tokenQueue.poll();
 96  if (poll != null) {
 97  tokenCount++;
 98  }
 99  }
100  
101 return tokenCount == needTokenNum;
102  } finally {
103  lock.unlock();
104  }
105  }
106  
107 @Override
108  public void start() {
109  
110 // 初始化桶队列大小
111  if (maxFlowRate != 0) {
112  tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
113  }
114  
115 // 初始化令牌生产者
116  TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
117  scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
118  isStart = true;
119  
120 }
121  
122 @Override
123  public void stop() {
124  isStart = false;
125  scheduledExecutorService.shutdown();
126  }
127  
128 @Override
129  public boolean isStarted() {
130  return isStart;
131  }
132  
133 class TokenProducer implements Runnable {
134  
135 private int avgFlowRate;
136  private TokenBucket tokenBucket;
137  
138 public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
139  this.avgFlowRate = avgFlowRate;
140  this.tokenBucket = tokenBucket;
141  }
142  
143 @Override
144  public void run() {
145  tokenBucket.addTokens(avgFlowRate);
146  }
147  }
148  
149 public static TokenBucket newBuilder() {
150  return new TokenBucket();
151  }
152  
153 public TokenBucket everyTokenSize(int everyTokenSize) {
154  this.everyTokenSize = everyTokenSize;
155  return this;
156  }
157  
158 public TokenBucket maxFlowRate(int maxFlowRate) {
159  this.maxFlowRate = maxFlowRate;
160  return this;
161  }
162  
163 public TokenBucket avgFlowRate(int avgFlowRate) {
164  this.avgFlowRate = avgFlowRate;
165  return this;
166  }
167  
168 private String stringCopy(String data, int copyNum) {
169  
170 StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
171  
172 for (int i = 0; i < copyNum; i++) {
173  sbuilder.append(data);
174  }
175  
176 return sbuilder.toString();
177  
178 }
179  
180 public static void main(String[] args) throws IOException, InterruptedException {
181  
182 tokenTest();
183  }
184  
185 private static void arrayTest() {
186  ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
187  tokenQueue.offer(1);
188  tokenQueue.offer(1);
189  tokenQueue.offer(1);
190  System.out.println(tokenQueue.size());
191  System.out.println(tokenQueue.remainingCapacity());
192  }
193  
194 private static void tokenTest() throws InterruptedException, IOException {
195  TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
196  
197 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
198  String data = "xxxx";// 四个字节
199  for (int i = 1; i <= 1000; i++) {
200  
201 Random random = new Random();
202  int i1 = random.nextInt(100);
203  boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
204  TimeUnit.MILLISECONDS.sleep(100);
205  if (tokens) {
206  bufferedWriter.write("token pass --- index:" + i1);
207  System.out.println("token pass --- index:" + i1);
208  } else {
209  bufferedWriter.write("token rejuect --- index" + i1);
210  System.out.println("token rejuect --- index" + i1);
211  }
212  
213 bufferedWriter.newLine();
214  bufferedWriter.flush();
215  }
216  
217 bufferedWriter.close();
218  }
219  
220 }
复制代码

参考:

http://xiaobaoqiu.github.io/blog/2015/07/02/ratelimiter/

http://redisdoc.com/string/incr.html

http://www.cnblogs.com/zhengyun_ustc/archive/2012/11/17/topic1.html

由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

免责声明:文章转载自《java 服务接口API限流 Rate Limit》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇如何根据角色批量激活SAP Fiori服务antd中,popover 不同情境下设置不同背景图,无法设置className的情况下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

bladex中服务远程调用

//api中接口 //实现类 //需要调用的远程服务的接口 //需要远程调用服务的实现类 package org.springblade.flow.business.service.impl; import lombok.AllArgsConstructor; import org.flowable.task.api.Task; impor...

Elasticsearch状态API接口排障总结

ES的Restful API,共四类API: 1. 检查集群、节点、索引等健康与否,以及获取其相应状态。 2. 管理集群、节点、索引及元数据 3. 执行CRUB操作(即:增删查改) 4. 执行高级操作,如:paging,filtering等。 ES API的访问接口: TCP:9200,并且ES是基于HTT...

ASP.NET Web API身份验证和授权

英语原文地址:http://www.asp.net/web-api/overview/security/authentication-and-authorization-in-aspnet-web-api 本文是作者所理解和翻译的内容。 这篇文章包括两部分:身份验证和授权。 身份验证用来确定一个用户的身份。例如,Alice用她的用户名和密码登陆系统,服务...

使用 VSCode 创建 SpringBoot RESTful 增删改查接口项目并发布

声明:不用 idea 只是因为我想试试别的环境能不能一样起来,太复杂的配置和优化、惯例写法我不会,我并不是专业 SpringBoot 后台开发。 目录 0 准备 1 创建 maven 工程 2 认识项目文件结构 3 创建数据模型类 4 创建表示连接器的接口(也叫 repository) 5 创建控制器 6 代码写完后的目录结构 7 修改端口:修改...

C#——WebApi 接口参数传参详解

本篇打算通过get、post、put、delete四种请求方式分别谈谈基础类型(包括int/string/datetime等)、实体、数组等类型的参数如何传递。 一、get请求 对于取数据,我们使用最多的应该就是get请求了吧。下面通过几个示例看看我们的get请求参数传递。 1、基础类型参数 [HttpGet] public string GetA...

web API请求与参数获取

总结webAPI的常用请求方法与后台参数的获取: 一:get请求:(会将所以参数拼接到URL里面) 1:基础类型:string a=“hello” , 前端无论你是写到ajax里面的data属性还是直接拼接到URL里面,后台直接string a获取; [HttpGet] public JObject AddUserInfo(string a) {........