一、寫在前面
在上一篇博客中提到過對(duì)于網(wǎng)絡(luò)爬蟲這種包含大量網(wǎng)絡(luò)請(qǐng)求的任務(wù),是可以用Celery來做到加速爬取的,那么,這一篇博客就要具體說一下怎么用Celery來對(duì)我們的爬蟲進(jìn)行一個(gè)加速!
?
二、知識(shí)補(bǔ)充
1.class celery.group
group這個(gè)類表示創(chuàng)建一組要并行執(zhí)行的任務(wù),不過一組任務(wù)是懶惰的,所以你需要運(yùn)行并對(duì)其進(jìn)行評(píng)估。要了解這個(gè)類,可以查看文檔,或者在Pycharm中直接Ctrl+左鍵就能直接查看源碼了,如下圖:
當(dāng)然了,直接看源碼還不夠,最好還是自己動(dòng)下手。所以先創(chuàng)建一個(gè)test.py,其中代碼如下:
1
from
celery
import
Celery
2
3
4
app = Celery(
"
test
"
, broker=
"
redis://127.0.0.1:6379
"
, backend=
"
redis://127.0.0.1:6379
"
)
5
6
7
@app.task
8
def
add(x, y):
9
return
x +
y
10
11
12
if
__name__
==
'
__main__
'
:
13
app.start()
然后運(yùn)行Celery服務(wù)器,再在test.py所在目錄下創(chuàng)建一個(gè)test_run.py用于測(cè)試,其中代碼如下:
1
from
celery
import
group
2
from
.test
import
add
3
4
5
lazy_group = group(add.s(2, 2), add.s(4, 4
))
6
print
(type(lazy_group))
7
result =
lazy_group()
8
print
(result)
9
print
(type(result))
10
print
(result.get())
在Pycharm中運(yùn)行test_run.py,得到的結(jié)果如下:
fe54f453-eb9c-4b24-87e3-a26fab75967f
[4, 8]
? 通過查看源碼可以知道,是可以往group中傳入一個(gè)由任務(wù)組成的可迭代對(duì)象的,所以這就進(jìn)行一下測(cè)試,對(duì)上面的代碼進(jìn)行一點(diǎn)修改:
1
from
celery
import
group
2
from
CelerySpider.test
import
add
3
4
5
lazy_group = group(add.s(x, y)
for
x, y
in
zip([1, 3, 5, 7, 9], [2, 4, 6, 8, 10
]))
6
result =
lazy_group()
7
print
(result)
8
print
(result.get())
運(yùn)行之后得到了我們想要的結(jié)果:
f03387f1-af00-400b-b58a-37901563251d
[3, 7, 11, 15, 19]
2.celer.result.collect()
在Celery中有一個(gè)類result,這個(gè)類包含了任務(wù)運(yùn)行的結(jié)果和狀態(tài)等,而在這個(gè)類中就有一個(gè)collect()方法,使用該方法能在結(jié)果返回時(shí)收集結(jié)果。和之前一樣的步驟,先看看源碼:
這里看源碼也是看得一頭霧水,不如動(dòng)手寫代碼試試看。創(chuàng)建一個(gè)app.py,其中代碼如下:
1
from
celery
import
Celery, group, result
2
3
4
app = Celery(
"
test
"
, broker=
"
redis://127.0.0.1:6379
"
, backend=
"
redis://127.0.0.1:6379
"
)
5
6
7
@app.task(trail=
True)
8
def
A(how_many):
9
return
group(B.s(i)
for
i
in
range(how_many))()
10
11
12
@app.task(trail=
True)
13
def
B(i):
14
return
pow2.delay(i)
15
16
17
@app.task(trail=
True)
18
def
pow2(i):
19
return
i ** 2
20
21
22
if
__name__
==
'
__main__
'
:
23
app.start()
可以看到在設(shè)置任務(wù)的時(shí)候都加了參數(shù)trail=True,這是為了存儲(chǔ)子任務(wù)列表運(yùn)行后的結(jié)果,雖然是默認(rèn)設(shè)置,但這里明確啟用。在運(yùn)行Celery服務(wù)器之中,進(jìn)入app.py同級(jí)目錄,輸入python,然后執(zhí)行如下代碼:
>>> from app import A
>>> res = A.delay(10)
>>> [i[1] for i in res.collect() if isinstance(i[1], int)]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
?
三、具體步驟
1.項(xiàng)目結(jié)構(gòu)
這個(gè)爬蟲項(xiàng)目的基本文件如下:
其中app.py用于創(chuàng)建Celery實(shí)例,celeryconfig.py是Celery需要使用的配置文件,tasks.py里面的則是具體的任務(wù),crawl.py是爬蟲腳本,在打開Celery服務(wù)器之后,運(yùn)行此文件即可。
2.主要代碼
首先是app.py,代碼如下,其中config_from_object()方法用于配置Celery,傳入的參數(shù)是一個(gè)可被導(dǎo)入的模塊:
1
from
celery
import
Celery
2
3
4
app = Celery(
"
spiders
"
, include=[
"
CelerySpider.tasks
"
])
5
#
導(dǎo)入配置文件
6
app.config_from_object(
"
CelerySpider.celeryconfig
"
)
7
8
9
if
__name__
==
'
__main__
'
:
10
app.start()
? 下面是tasks.py中的代碼,其中包含了發(fā)送請(qǐng)求和解析網(wǎng)頁的代碼:
1
import
requests
2
from
lxml
import
etree
3
from
celery
import
group
4
from
CelerySpider.app
import
app
5
6
7
headers =
{
8
"
Cookie
"
:
"
__cfduid=d5d815918f19b7370d14f80fc93f1f27e1566719058; UM_distinctid=16cc7bba92f7b6-0aac860ea9b9a7-7373e61-144000-16cc7bba930727; CNZZDATA1256911977=1379501843-1566718872-https%253A%252F%252Fwww.baidu.com%252F%7C1566718872; XSRF-TOKEN=eyJpdiI6InJvNVdZM0krZ1wvXC9BQjg3YUk5aGM1Zz09IiwidmFsdWUiOiI5WkI4QU42a0VTQUxKU2ZZelVxK1dFdVFydlVxb3g0NVpicEdkSGtyN0Uya3VkXC9pUkhTd2plVUtUTE5FNWR1aCIsIm1hYyI6Ijg4NjViZTQzNGRhZDcxNTdhMDZlMWM5MzI4NmVkOGZhNmRlNTBlYWM0MzUyODIyOWQ4ZmFhOTUxYjBjMTRmNDMifQ%3D%3D; doutula_session=eyJpdiI6IjFoK25pTG50azEwOXlZbmpWZGtacnc9PSIsInZhbHVlIjoiVGY2MU5Ob2pocnJsNVBLZUNMTWw5OVpjT0J6REJmOGVpSkZwNFlUZVwvd0tsMnZsaiszWEpTbEdyZFZ6cW9UR1QiLCJtYWMiOiIxZGQzNTJlNzBmYWE0MmQzMzQ0YzUzYmYwYmMyOWY3YzkxZjJlZTllNDdiZTlkODA2YmQ3YWRjNGRmZDgzYzNmIn0%3D
"
,
9
"
Referer
"
:
"
https://www.doutula.com/article/list/?page=1
"
,
10
"
UserAgent
"
:
"
Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36
"
11
}
12
13
14
@app.task(trail=
True)
15
def
main(urls):
16
#
主函數(shù)
17
return
group(call.s(url)
for
url
in
urls)()
18
19
20
@app.task(trail=
True)
21
def
call(url):
22
#
發(fā)送請(qǐng)求
23
try
:
24
res = requests.get(url, headers=
headers)
25
parse.delay(res.text)
26
except
Exception as e:
27
print
(e)
28
29
30
@app.task(trail=
True)
31
def
parse(html):
32
#
解析網(wǎng)頁
33
et =
etree.HTML(html)
34
href_list = et.xpath(
'
//*[@id="home"]/div/div[2]/a/@href
'
)
35
result =
[]
36
for
href
in
href_list:
37
href_res = requests.get(href, headers=
headers)
38
href_et =
etree.HTML(href_res.text)
39
src_list = href_et.xpath(
'
//*[@class="artile_des"]/table/tbody/tr/td/a/img/@src
'
)
40
result.extend(src_list)
41
return
result
最后是crawl.py中的代碼:
1
import
time
2
from
CelerySpider.tasks
import
main
3
4
5
start_time =
time.time()
6
7
8
url_list = [
"
https://www.doutula.com/article/list/?page={}
"
.format(i)
for
i
in
range(1, 31
)]
9
res =
main.delay(url_list)
10
all_src =
[]
11
for
i
in
res.collect():
12
if
isinstance(i[1], list)
and
isinstance(i[1
][0], str):
13
all_src.extend(i[1
])
14
15
print
(
"
Src count:
"
, len(all_src))
16
17
18
end_time =
time.time()
19
print
(
"
Cost time:
"
, end_time - start_time)
? 此次爬取的網(wǎng)站是一個(gè)表情包網(wǎng)站,url_list就表示要爬取的url,這里我選擇爬取30頁來測(cè)試。all_src用于存儲(chǔ)表情包圖片的資源鏈接,通過collect()方法提取出要爬取的鏈接,然后將這些表情包下載下來,最后打印出下載的圖片數(shù)量和整個(gè)程序所耗費(fèi)的時(shí)間。
?
?四、運(yùn)行結(jié)果
當(dāng)運(yùn)行Celery服務(wù)后,再運(yùn)行crawl.py文件,會(huì)看到如下信息打印出來:
當(dāng)整個(gè)爬蟲運(yùn)行完畢后,會(huì)打印出所耗費(fèi)的時(shí)間:
?
完整代碼已上傳到GitHub!
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

