前言
一個(gè)業(yè)務(wù)型的服務(wù),被open接口后,遭遇并發(fā)掃數(shù)據(jù),于是要做限流操作。一直固執(zhí)的認(rèn)為,業(yè)務(wù)API和OpenAPI要分開處理,或許因?yàn)槠鸪踅尤肫渌髽I(yè)ERP系統(tǒng)都是走較為規(guī)范的OpenAPI,始終對(duì)于這種開發(fā)系統(tǒng)業(yè)務(wù)API的做法感覺不好。
窗口限流
需求是要在Django的一個(gè)工程里做限流,倘若是rest_framework的View也好辦,直接就提供了限流 rest_framework throttling
可參照文檔設(shè)置。不能直接使用設(shè)置的原因是,面對(duì)是Django做的一個(gè)服務(wù),然后proxy至別的服務(wù),工程僅僅承擔(dān)一個(gè)轉(zhuǎn)發(fā)的職責(zé)。如果在LB上限流,無(wú)法區(qū)分來(lái)源IP,只能是總量限流,很可能導(dǎo)致一旦被限流,正常平臺(tái)訪問(wèn)被拒絕。所以我需要的限流需求非常清晰,首先限流的粒度是需要先知道訪問(wèn)來(lái)源的真實(shí)IP,在一定窗口時(shí)間內(nèi)的訪問(wèn)次數(shù),諸如 100/min。
rest_framework 提供了比錯(cuò)的實(shí)現(xiàn)思路,類似實(shí)現(xiàn)一套打點(diǎn)記錄的,片段存儲(chǔ),打點(diǎn)記錄為需要限制的實(shí)時(shí)條件。就以上述 100/min為例,首先一分鐘之內(nèi),IP1沒有任何訪問(wèn),則沒有任何限制數(shù)據(jù),redis的過(guò)期時(shí)間,滿足了此數(shù)據(jù)設(shè)置,再有,1分鐘之內(nèi),要滿足次數(shù)不超過(guò)100次,維護(hù)一個(gè)數(shù)組,長(zhǎng)度超過(guò)100則意味超過(guò)訪問(wèn)限制,數(shù)組中記錄請(qǐng)求每次訪問(wèn)的時(shí)刻值,窗口滑動(dòng)就是淘汰掉連續(xù)訪問(wèn)中,以當(dāng)前時(shí)刻后置一分鐘之前的訪問(wèn)打點(diǎn),保證了數(shù)組窗口永遠(yuǎn)都是以當(dāng)前最近請(qǐng)求進(jìn)入1min之內(nèi)的記錄點(diǎn)。
# throttle setting
THROTTLE_RATES
=
{
'resource1'
:
'100/min'
,
'resource2'
:
'20/second'
}
# throttle class
class
WindowAccessThrottle
:
cache
=
Cache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
self
.
rate
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
parse_rate
(
self
)
:
num
,
period
=
self
.
rate
.
split
(
'/'
)
num_requests
=
int
(
num
)
duration
=
{
's'
:
1
,
'm'
:
60
,
'h'
:
3600
,
'd'
:
86400
}
[
period
[
0
]
]
return
num_requests
,
duration
def
get_cache_key
(
self
)
:
host
=
self
.
request
.
META
[
'HTTP_X_FORWARDED_FOR'
]
\
if
self
.
request
.
META
.
get
(
'HTTP_X_FORWARDED_FOR'
,
None
)
else
\
self
.
request
.
META
[
'REMOTE_ADDR'
]
return
'throttle:{}:{}'
.
format
(
host
,
self
.
view
.
__name__
)
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
num_requests
,
duration
=
self
.
parse_rate
(
)
while
history
and
history
[
-
1
]
<=
now
-
duration
:
history
.
pop
(
)
if
len
(
history
)
>=
num_requests
:
return
False
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
duration
)
return
True
注意
1,上述示例可根據(jù)實(shí)際需求修改
2,在做IP級(jí)別限定是,如果直接調(diào)用request.META[‘REMOTE_ADDR’]獲取的是請(qǐng)求直接過(guò)來(lái)的IP,實(shí)際部署服務(wù)多數(shù)是經(jīng)過(guò)LB,或者nginx反向代理的,REMOTE_ADDR多數(shù)就是前置LB的IP,所以取用HTTP_X_FORWARDED_FOR獲取發(fā)起請(qǐng)求的遠(yuǎn)端IP。
3,
cache = Cache()
就是一個(gè)redis的封裝,稍微實(shí)現(xiàn)下
cache.get_value(self.key, [])
對(duì)獲取支持默認(rèn)值
4,使用時(shí)類似原生的throttle,在view函數(shù)中設(shè)置 scope
4,配合Django的中間件,調(diào)用判定,大致如下:
from
django
.
urls
import
resolve
'''
實(shí)際下面中間件需要根據(jù)需求自定義調(diào)試,如果只是rest_framework的View可以直接用原生的設(shè)定,因?yàn)楣P者是自己封裝的轉(zhuǎn)發(fā)View,
相當(dāng)于重新自定義一個(gè)完全新的通用視圖,需要重新實(shí)現(xiàn)限流
'''
class
ThrottleMiddleware
(
MiddlewareMixin
)
:
def
process_request
(
self
,
request
)
:
resolver
=
resolve
(
request
.
path
)
throttle_scope
=
getattr
(
resolver
.
func
,
'throttle_scope'
,
None
)
throttle
=
WindowAccessThrottle
(
request
,
resolver
.
func
,
throttle_scope
)
if
throttle
.
allow_request
(
)
:
return
else
:
return
HttpResponse
(
)
漏斗限流
上面窗口限流,一定程度上解決了流量猛增的問(wèn)題,但是以上面 120/min的限流為例,用戶在1分鐘的某一瞬間,120的并發(fā),此種場(chǎng)景,上面的限流器基本沒有作用了,設(shè)想能夠在短時(shí)間內(nèi),既限制訪問(wèn)的總量,也能限制訪問(wèn)的頻率至于過(guò)高,漏斗限流就非常理想,基本抽象模型:
1,漏斗參數(shù):
- capacity:容量,漏斗大小
- rate:漏斗流出速率,可以用 total和duration計(jì)算,一段時(shí)間duration內(nèi)允許通過(guò)的總量total
2,當(dāng)漏斗為空漏斗時(shí):
- 訪問(wèn)進(jìn)入的速率 < rate,此時(shí)漏斗無(wú)積壓,請(qǐng)求一律通過(guò)
- 訪問(wèn)進(jìn)入的速率 >= rate,此時(shí)漏斗中逐漸積壓,且漏斗以rate值不斷流出
3,當(dāng)漏斗不為空時(shí):
- 出水口以最大速率流出
- 漏斗未滿,會(huì)繼續(xù)納入
- 漏斗已滿,則會(huì)直接溢出,拒絕請(qǐng)求
用漏斗限流實(shí)現(xiàn)上述IP限流,示例如下:
THROTTLE_RATES
=
{
'funnel'
:
{
'capacity'
:
15
,
'duration'
:
60
,
# seconds
'total'
:
30
,
}
,
}
class
FunnelThrottle
:
cache
=
CusCache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
config
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
rate
=
config
[
'total'
]
/
config
[
'duration'
]
self
.
capacity
=
config
[
'capacity'
]
self
.
duration
=
config
[
'duration'
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
get_cache_key
(
self
)
:
"""
same as WindowAccessThrottle
"""
pass
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
if
not
history
:
# 空漏斗直接放行
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
latest_duration
=
now
-
history
[
0
]
# 距離最近的一次放行時(shí)間間隔
leak_count
=
int
(
latest_duration
*
self
.
rate
)
# 由間隔時(shí)間和漏斗流速計(jì)算此段時(shí)間漏斗騰出空間
for
i
in
range
(
leak_count
)
:
if
history
:
history
.
pop
(
)
else
:
break
# 在上述漏斗清理流出空間后,漏斗仍舊滿量,直接判定不可訪問(wèn)
if
len
(
history
)
>=
self
.
capacity
:
return
False
# 如果可訪問(wèn),請(qǐng)求進(jìn)入漏斗計(jì)量
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
Note:
1,漏斗限流方式和之前窗口限流所用的數(shù)據(jù)結(jié)構(gòu)在cache中基本一致,只因判定算法不同,所達(dá)到的限流效果,完全不同
2,漏斗限流,進(jìn)入漏斗計(jì)量的點(diǎn),表示一律放行通過(guò)了,只是,在漏斗中會(huì)根據(jù)下一次訪問(wèn)進(jìn)入時(shí)間判定該點(diǎn)是否由漏斗的rate失效,而達(dá)到容量合理,限制流速的效果
Redis 漏斗限流 (redis-cell)
上述的漏斗限流算法,在Redis的模塊中已經(jīng)內(nèi)置實(shí)現(xiàn)了一個(gè),具體介紹請(qǐng)參見Github redis-cell詳細(xì)介紹 筆者安裝在MacOS上,基本沒有問(wèn)題:
# 下載mac版本安裝包
https://github.com/brandur/redis-cell/releases
# 解壓
tar
-zxf redis-cell-*.tar.gz
# 復(fù)制可執(zhí)行文件
cp
libredis_cell.dylib /your_redis_server_localtion
# 重啟redis-server,把libredis_cell.dylib加載上
redis-server --loadmodule /path/to/modules/libredis_cell.dylib
安裝重啟后,可以在redis中執(zhí)行 CL.THROTTLE 命令:
# CL.THROTTLE user123 15 30 60 1和實(shí)現(xiàn)算法中的配置類似,user123表示限流key,15: capacity,30: total,60: duration,
127.0.0.1:6379> CL.THROTTLE user123 15 30 60 1
1) (integer) 0 # 0表示允許,1表示拒絕
2) (integer) 16 # 漏斗容量 max_burst + 1 = 15 +1 =16
3) (integer) 15 # 漏斗剩余容量
4) (integer) -1 # 如果被拒絕,多少秒后重試
5) (integer) 2 # 多長(zhǎng)時(shí)間后漏斗完全漏空
但是redis-cell沒有找到對(duì)應(yīng)的sdk
Python Bound method
# python 3.x
def
func
(
)
:
pass
class
A
:
@
classmethod
def
method_cls
(
cls
)
:
pass
def
method_a
(
self
)
:
pass
class
B
(
A
)
:
pass
a
,
b
=
A
(
)
,
B
(
)
print
(
func
)
#
print
(
a
.
method_a
)
#
<__main__.A object at 0x10ef11978>>
print
(
b
.
method_cls
)
#
>
對(duì)于上文中
func
就是一個(gè)函數(shù)對(duì)象,而
method_a
和
method_cls
是歸屬類A的所以,是一個(gè)
bound method
,那么如何查看一個(gè)
bound method
的歸屬呢?
Python 2.x中提供了 im_func,im_class,im_self三個(gè)屬性:
-
im_func
is the function object. -
im_class
is the class the method comes from. -
im_self
is the self object the method is bound to.
Python3.x中
-
__func__
replaceim_func
-
__self__
replaceim_self
2.x中的im_class
取消
# python 3.x
print
(
a
.
method_a
.
__self__
)
print
(
b
.
method_cls
.
__self__
)
# print(func.__self__) error func 無(wú) __self__
print
(
b
.
method_cls
.
__self__
.
__name__
)
# print(b.method_cls.__self__.__name__) error b.method_cls.__self__是一個(gè)實(shí)例,無(wú)__name__屬性
關(guān)于
__name__
和
__qualname__
請(qǐng)參見 PEP 3155
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
