最近博主手上有一個爬蟲項目,開始深入研究python爬蟲開發,這是我篇博客也相當于是我的學習筆記,我認為學習爬蟲第一步,先學習python多線程與多進程,熟悉網絡編程,接下來會陸續以博客的方式跟大家做分享。
多進程
Python實現多進程的方式主要有兩種,一種方法是使用os模塊中的fork方法,另一種方法是使用 multiprocessing模塊。這兩種方法的區別在于前者僅適用于 Unix/Linux操作系統,對 Windows不支持,后者則是跨平臺的實現方式,目前爬蟲程序多數是運行在Unix/Linux操作系統上
一、使用os模塊的fork方式實現線程
fork方法調用一次,返回兩次(操作系統會將當前進程(父進程)復制出一份子線程,這兩個線程幾乎完全相同,其中子線程永遠返回0,父線程返回的是子線程的ID)
import
os
# getpid()獲取當前線程的ID,getppid()獲取父線程的ID
if
__name__
==
"__main__"
:
print
(
'當前進程是(%s)'
%
(
os
.
getpid
(
)
)
)
pid
=
os
.
fork
(
)
if
pid
<
0
:
print
(
'error in fork'
)
elif
pid
==
0
:
print
(
'我是子線程(%s),我的父線程是(%s)'
,
(
os
.
getpid
(
)
,
os
.
getppid
)
)
else
:
print
(
'我(%s)創建了一個子線程(%s).'
,
(
os
.
getpid
(
)
,
pid
)
)
二、使用multiprocessing模塊創建多線程
multiprocessing模塊提供Process類來描述一個進程對象。創建子進程時,只需要傳入一個執行函數和函數的參數,即可完成一個 Process實例的創建,用start()方法啟動進程用 join()方法實現進程間的同步。
import
os
from
multiprocessing
import
Process
def
run_proc
(
name
)
:
print
(
'Child process %s (%s) Running...'
%
(
name
,
os
.
getpid
(
)
)
)
if
__name__
==
'__main__'
:
print
(
'Parent process %s.'
%
os
.
getpid
(
)
)
for
i
in
range
(
5
)
:
p
=
Process
(
target
=
run_proc
,
args
=
(
str
(
i
)
,
)
)
print
(
'Process will start.'
)
p
.
start
(
)
p
.
join
(
)
print
(
'Process end.'
)
三、multiprocessing模塊提供了一個Pool類來代表進程池對象
Pool可以提供指定數量的進程供用戶調用,默認大小是CPU的核數。當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來處理它。下面通過一個例子來演示進程池的工作流程,代碼如下
import
os
,
time
,
random
from
multiprocessing
import
Pool
def
run_task
(
name
)
:
print
(
'Task %s (pid = %s) is running...'
%
(
name
,
os
.
getpid
(
)
)
)
time
.
sleep
(
random
.
random
(
)
*
3
)
print
(
'Task %s is end.'
%
name
)
if
__name__
==
'__main__'
:
print
(
'Current process is %s.'
%
os
.
getpid
(
)
)
p
=
Pool
(
processes
=
3
)
for
i
in
range
(
5
)
:
p
.
apply_async
(
run_task
,
args
=
(
i
,
)
)
print
(
'waiting for all subprocesses done...'
)
p
.
close
(
)
p
.
join
(
)
print
(
'All subprocesses done.'
)
Pool對象調用 join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用 close()之后就不能繼續添加新的 Process了
四、進程間通信
Python提供了多種進程的通信方式,例如Queue、Pipe、Value+Array等
Pipe常用來兩個進程之間的通信,Queue用來在多個進程之間實現通信
1.Queue實現:
from
multiprocessing
import
Process
,
Queue
import
os
,
time
,
random
# 寫數據進程執行的代碼:
def
proc_write
(
p
,
urls
)
:
print
(
'Process(%s)is writing,,,'
%
os
.
getpid
(
)
)
for
url
in
urls
:
p
.
put
(
url
)
print
(
'Put %s to queue...'
%
url
)
time
.
sleep
(
random
.
random
(
)
)
# 讀數據進程執行的代碼:
def
proc_read
(
q
)
:
print
(
'Process (%s) is reading...'
%
os
.
getpid
(
)
)
while
True
:
url
=
q
.
get
(
True
)
print
(
'Get %s from queue.'
%
url
)
if
__name__
==
'__main__'
:
# 父進程創建Queue,并傳給各個子線程
q
=
Queue
(
)
proc_write1
=
Process
(
target
=
proc_write
,
args
=
(
q
,
[
'url_1'
,
'url_2'
,
'url_3'
]
)
)
proc_write2
=
Process
(
target
=
proc_write
,
args
=
(
q
,
[
'url_4'
,
'url_5'
,
'url_6'
]
)
)
proc_reader
=
Process
(
target
=
proc_read
,
args
=
(
q
,
)
)
# 啟動子線程 proc_writer,寫入:
proc_write1
.
start
(
)
proc_write2
.
start
(
)
# 啟動子線程 proc_reader,讀取:
proc_reader
.
start
(
)
# 等待proc_writer結束:
proc_write1
.
join
(
)
proc_write2
.
join
(
)
# proc_reader進程里是死循環,無法等待結束,只能強行終止:
proc_reader
.
terminate
(
)
2.Pipe實現:
Pipe方法返回(conn1,conn2)代表一個管道的兩個端。Pipe方法有 duplex參數,如果duplex參數為True(默認值),那么這個管道是全雙工模式,也就是說 conn1和conn2均可收發。若 duplex為 False, conn1只負責接收消息,conn2只負責發送消息。send和recv方法分別是發送和接收消息的方法。例如,在全雙工模式下,可以調用 conn. send發送消息conn.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那么recv方法會拋出 EOFError
import
multiprocessing
import
os
,
time
,
random
def
proc_send
(
pipe
,
urls
)
:
for
url
in
urls
:
print
(
'Process(%s) send:%s'
%
(
os
.
getpid
(
)
,
url
)
)
pipe
.
send
(
url
)
time
.
sleep
(
random
.
random
(
)
)
def
proc_recv
(
pipe
)
:
while
True
:
print
(
'Process(%s) recv:%s'
%
(
os
.
getpid
(
)
,
pipe
.
recv
(
)
)
)
time
.
sleep
(
random
.
random
(
)
)
if
__name__
==
'__main__'
:
# 調用Pipe()方法,返回兩個conn
pipe
=
multiprocessing
.
Pipe
(
)
p1
=
multiprocessing
.
Process
(
target
=
proc_send
,
args
=
(
pipe
[
0
]
,
[
'url_'
+
str
(
i
)
for
i
in
range
(
10
)
]
)
)
p2
=
multiprocessing
.
Process
(
target
=
proc_recv
,
args
=
(
pipe
[
1
]
,
)
)
p1
.
start
(
)
p2
.
start
(
)
p1
.
join
(
)
p2
.
join
(
)
多線程
Python的標準庫提供了兩個模塊:thread和 threading, thread是低級模塊, threading是高級模塊,對 thread進行了封裝。絕大多數情況下,我們只需要使用 threading這個高級模塊。
一、用threading模塊創建多線程
? threading模塊一般通過兩種方式創建多線程:第一種方式是把一個函數傳人并創建Thread實例,然后調用 start方法開始執行,代碼如下:
import
random
import
time
,
threading
# 新線程執行的代碼
def
thread_run
(
urls
)
:
print
(
'Current %s is running ...'
%
threading
.
current_thread
(
)
.
name
)
for
url
in
urls
:
print
(
'%s --->>> %s'
%
(
threading
.
current_thread
(
)
.
name
,
url
)
)
time
.
sleep
(
random
.
random
(
)
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
print
(
'%s is running...'
%
threading
.
current_thread
(
)
.
name
)
t1
=
threading
.
Thread
(
target
=
thread_run
,
name
=
'Thread_1'
,
args
=
(
[
'url_1'
,
'url_2'
,
'url_3'
]
,
)
)
t2
=
threading
.
Thread
(
target
=
thread_run
,
name
=
'Thread_2'
,
args
=
(
[
'url_4'
,
'url_5'
,
'url_6'
]
,
)
)
t1
.
start
(
)
t2
.
start
(
)
t1
.
join
(
)
t2
.
join
(
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
第二種方式是直接從 threading.Thread繼承并創建線程類,然后重寫init方法和run方法。
代碼如下:
import
random
import
threading
import
time
class
myThread
(
threading
.
Thread
)
:
def
__init__
(
self
,
name
,
urls
)
:
threading
.
Thread
.
__init__
(
self
,
name
=
name
)
self
.
urls
=
urls
def
run
(
self
)
:
print
(
'Current %s is running ...'
%
threading
.
current_thread
(
)
.
name
)
for
url
in
self
.
urls
:
print
(
'%s --->>> %s'
%
(
threading
.
current_thread
(
)
.
name
,
url
)
)
time
.
sleep
(
random
.
random
(
)
)
print
(
'%s ended ...'
%
threading
.
current_thread
(
)
.
name
)
print
(
'%s is running...'
%
threading
.
current_thread
(
)
.
name
)
t1
=
myThread
(
name
=
'Thread_1'
,
urls
=
[
'url_1'
,
'url_2'
,
'url_3'
]
)
t2
=
myThread
(
name
=
'Thread_2'
,
urls
=
[
'url_4'
,
'url_5'
,
'url_6'
]
)
t1
.
start
(
)
t2
.
start
(
)
t1
.
join
(
)
t2
.
join
(
)
print
(
'%s ended.'
%
threading
.
current_thread
(
)
.
name
)
二、線程同步
為了保證數據的正確性,需要對多個線程進行同步,這需要調用Thread的Lock和RLock對象
這兩個對象都有 acquire方法和 release方法,對于那些每次只允許一個線程操作的數據,可以將其操作放到 acquire和 release方法之間。
對于Lock對象而言,如果一個線程連續兩次進行 acquire操作,那么由于之后沒有 release,第二次 acquire將掛起線程。這會導致Lock對象永遠不會 release,使得線程死鎖。 RLock對象允許一個線程多次對其進行 acquire操作,因為在其內部通過一個 counter變量維護著線程 acquire的次數。而且每一次的 acquire操作必須有一個 release操作與之對應在所有的 release操作完成之后,別的線程才能申請該RLock對象。線程同步演示代碼如下:
import
threading
mylock
=
threading
.
RLock
(
)
num
=
0
class
myThread
(
threading
.
Thread
)
:
def
__init__
(
self
,
name
)
:
threading
.
Thread
.
__init__
(
self
,
name
=
name
)
def
run
(
self
)
:
global
num
while
True
:
mylock
.
acquire
(
)
print
(
'%s locked,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
if
num
>=
4
:
mylock
.
release
(
)
print
(
'%s released,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
break
num
+=
1
print
(
'%s released,Number:%d'
%
(
threading
.
current_thread
(
)
.
name
,
num
)
)
mylock
.
release
(
)
if
__name__
==
'__main__'
:
thread1
=
myThread
(
'Thread1'
)
thread2
=
myThread
(
'Thread2'
)
thread1
.
start
(
)
thread2
.
start
(
)
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
