今天我們使用的計算機早已進入多CPU或多核時代,而我們使用的操作係統都是支持“多任務”的操作係統,這使得我們可以同時運行多個程序,也可以將一個程序分解爲若幹個相對獨立的子任務,讓多個子任務並發的執行,從而縮短程序的執行時間,同時也讓用戶獲得更好的體驗。因此在當下不管是用什麽編程語言進行開發,實現讓程序同時執行多個任務也就是常說的“並發編程”,應該是程序員必備技能之一。爲此,我們需要先討論兩個概念,一個叫進程,一個叫線程。
進程就是操作係統中執行的一個程序,操作係統以進程爲單位分配存儲空間,每個進程都有自己的地址空間、數據棧以及其他用於跟蹤進程執行的輔助數據,操作係統管理所有進程的執行,爲它們合理的分配資源。進程可以通過fork或spawn的方式來創建新的進程來執行其他的任務,不過新的進程也有自己獨立的內存空間,因此必須通過進程間通信機制(IPC,Inter-Process Communication)來實現數據共享,具體的方式包括管道、信號、套接字、共享內存區等。
一個進程還可以擁有多個並發的執行線索,簡單的說就是擁有多個可以獲得CPU調度的執行單元,這就是所謂的線程。由於線程在同一個進程下,它們可以共享相同的上下文,因此相對於進程而言,線程間的信息共享和通信更加容易。當然在單核CPU係統中,真正的並發是不可能的,因爲在某個時刻能夠獲得CPU的只有唯一的一個線程,多個線程共享了CPU的執行時間。使用多線程實現並發編程爲程序帶來的好處是不言而喻的,最主要的體現在提升程序的性能和改善用戶體驗,今天我們使用的軟件幾乎都用到了多線程技術,這一點可以利用係統自帶的進程監控工具(如macOS中的“活動監視器”、Windows中的“任務管理器”)來證實,如下圖所示。
當然多線程也並不是沒有壞處,站在其他進程的角度,多線程的程序對其他程序並不友好,因爲它占用了更多的CPU執行時間,導致其他程序無法獲得足夠的CPU執行時間;另一方面,站在開發者的角度,編寫和調試多線程的程序都對開發者有較高的要求,對於初學者來說更加困難。
Python既支持多進程又支持多線程,因此使用Python實現並發編程主要有3種方式:多進程、多線程、多進程+多線程。
Unix和Linux操作係統上提供了fork()
係統調用來創建進程,調用fork()
函數的是父進程,創建出的是子進程,子進程是父進程的一個拷貝,但是子進程擁有自己的PID。fork()
函數非常特殊它會返回兩次,父進程中可以通過fork()
函數的返回值得到子進程的PID,而子進程中的返回值永遠都是0。Python的os模塊提供了fork()
函數。由於Windows係統沒有fork()
調用,因此要實現跨平台的多進程編程,可以使用multiprocessing模塊的Process
類來創建子進程,而且該模塊還提供了更高級的封裝,例如批量啓動進程的進程池(Pool
)、用於進程間通信的隊列(Queue
)和管道(Pipe
)等。
下面用一個下載文件的例子來說明使用多進程和不使用多進程到底有什麽差別,先看看下面的代碼。
from random import randint
from time import time, sleep
def download_task(filename):
print('開始下載%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))
def main():
start = time()
download_task('Python從入門到住院.pdf')
download_task('Peking Hot.avi')
end = time()
print('總共耗費了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
下面是運行程序得到的一次運行結果。
開始下載Python從入門到住院.pdf...
Python從入門到住院.pdf下載完成! 耗費了6秒
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
總共耗費了13.01秒.
從上面的例子可以看出,如果程序中的代碼只能按順序一點點的往下執行,那麽即使執行兩個毫不相關的下載任務,也需要先等待一個文件下載完成後才能開始下一個下載任務,很顯然這並不合理也沒有效率。接下來我們使用多進程的方式將兩個下載任務放到不同的進程中,代碼如下所示。
from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep
def download_task(filename):
print('啓動下載進程,進程號[%d].' % getpid())
print('開始下載%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))
def main():
start = time()
p1 = Process(target=download_task, args=('Python從入門到住院.pdf', ))
p1.start()
p2 = Process(target=download_task, args=('Peking Hot.avi', ))
p2.start()
p1.join()
p2.join()
end = time()
print('總共耗費了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
在上面的代碼中,我們通過Process
類創建了進程對象,通過target
參數我們傳入一個函數來表示進程啓動後要執行的代碼,後面的args
是一個元組,它代表了傳遞給函數的參數。Process
對象的start
方法用來啓動進程,而join
方法表示等待進程執行結束。運行上面的代碼可以明顯發現兩個下載任務“同時”啓動了,而且程序的執行時間將大大縮短,不再是兩個任務的時間總和。下面是程序的一次執行結果。
啓動下載進程,進程號[1530].
開始下載Python從入門到住院.pdf...
啓動下載進程,進程號[1531].
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
Python從入門到住院.pdf下載完成! 耗費了10秒
總共耗費了10.01秒.
我們也可以使用subprocess模塊中的類和函數來創建和啓動子進程,然後通過管道來和子進程通信,這些內容我們不在此進行講解,有興趣的讀者可以自己了解這些知識。接下來我們將重點放在如何實現兩個進程間的通信。我們啓動兩個進程,一個輸出Ping,一個輸出Pong,兩個進程輸出的Ping和Pong加起來一共10個。聽起來很簡單吧,但是如果這樣寫可是錯的哦。
from multiprocessing import Process
from time import sleep
counter = 0
def sub_task(string):
global counter
while counter < 10:
print(string, end='', flush=True)
counter += 1
sleep(0.01)
def main():
Process(target=sub_task, args=('Ping', )).start()
Process(target=sub_task, args=('Pong', )).start()
if __name__ == '__main__':
main()
看起來沒毛病,但是最後的結果是Ping和Pong各輸出了10個,Why?當我們在程序中創建進程的時候,子進程複制了父進程及其所有的數據結構,每個子進程有自己獨立的內存空間,這也就意味著兩個子進程中各有一個counter
變量,所以結果也就可想而知了。要解決這個問題比較簡單的辦法是使用multiprocessing模塊中的Queue
類,它是可以被多個進程共享的隊列,底層是通過管道和信號量(semaphore)機制來實現的,有興趣的讀者可以自己嘗試一下。
在Python早期的版本中就引入了thread模塊(現在名爲_thread)來實現多線程編程,然而該模塊過於底層,而且很多功能都沒有提供,因此目前的多線程開發我們推薦使用threading模塊,該模塊對多線程編程提供了更好的面向對象的封裝。我們把剛才下載文件的例子用多線程的方式來實現一遍。
from random import randint
from threading import Thread
from time import time, sleep
def download(filename):
print('開始下載%s...' % filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))
def main():
start = time()
t1 = Thread(target=download, args=('Python從入門到住院.pdf',))
t1.start()
t2 = Thread(target=download, args=('Peking Hot.avi',))
t2.start()
t1.join()
t2.join()
end = time()
print('總共耗費了%.3f秒' % (end - start))
if __name__ == '__main__':
main()
我們可以直接使用threading模塊的Thread
類來創建線程,但是我們之前講過一個非常重要的概念叫“繼承”,我們可以從已有的類創建新類,因此也可以通過繼承Thread
類的方式來創建自定義的線程類,然後再創建線程對象並啓動線程。代碼如下所示。
from random import randint
from threading import Thread
from time import time, sleep
class DownloadTask(Thread):
def __init__(self, filename):
super().__init__()
self._filename = filename
def run(self):
print('開始下載%s...' % self._filename)
time_to_download = randint(5, 10)
sleep(time_to_download)
print('%s下載完成! 耗費了%d秒' % (self._filename, time_to_download))
def main():
start = time()
t1 = DownloadTask('Python從入門到住院.pdf')
t1.start()
t2 = DownloadTask('Peking Hot.avi')
t2.start()
t1.join()
t2.join()
end = time()
print('總共耗費了%.2f秒.' % (end - start))
if __name__ == '__main__':
main()
因爲多個線程可以共享進程的內存空間,因此要實現多個線程間的通信相對簡單,大家能想到的最直接的辦法就是設置一個全局變量,多個線程共享這個全局變量即可。但是當多個線程共享同一個變量(我們通常稱之爲“資源”)的時候,很有可能産生不可控的結果從而導致程序失效甚至崩潰。如果一個資源被多個線程競爭使用,那麽我們通常稱之爲“臨界資源”,對“臨界資源”的訪問需要加上保護,否則資源會處於“混亂”的狀態。下面的例子演示了100個線程向同一個銀行賬戶轉賬(轉入1元錢)的場景,在這個例子中,銀行賬戶就是一個臨界資源,在沒有保護的情況下我們很有可能會得到錯誤的結果。
from time import sleep
from threading import Thread
class Account(object):
def __init__(self):
self._balance = 0
def deposit(self, money):
# 計算存款後的余額
new_balance = self._balance + money
# 模擬受理存款業務需要0.01秒的時間
sleep(0.01)
# 修改賬戶余額
self._balance = new_balance
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__(self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
# 創建100個存款的線程向同一個賬戶中存錢
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
# 等所有存款的線程都執行完畢
for t in threads:
t.join()
print('賬戶余額爲: ¥%d元' % account.balance)
if __name__ == '__main__':
main()
運行上面的程序,結果讓人大跌眼鏡,100個線程分別向賬戶中轉入1元錢,結果居然遠遠小於100元。之所以出現這種情況是因爲我們沒有對銀行賬戶這個“臨界資源”加以保護,多個線程同時向賬戶中存錢時,會一起執行到new_balance = self._balance + money
這行代碼,多個線程得到的賬戶余額都是初始狀態下的0
,所以都是0
上面做了+1的操作,因此得到了錯誤的結果。在這種情況下,“鎖”就可以派上用場了。我們可以通過“鎖”來保護“臨界資源”,只有獲得“鎖”的線程才能訪問“臨界資源”,而其他沒有得到“鎖”的線程只能被阻塞起來,直到獲得“鎖”的線程釋放了“鎖”,其他線程才有機會獲得“鎖”,進而訪問被保護的“臨界資源”。下面的代碼演示了如何使用“鎖”來保護對銀行賬戶的操作,從而獲得正確的結果。
from time import sleep
from threading import Thread, Lock
class Account(object):
def __init__(self):
self._balance = 0
self._lock = Lock()
def deposit(self, money):
# 先獲取鎖才能執行後續的代碼
self._lock.acquire()
try:
new_balance = self._balance + money
sleep(0.01)
self._balance = new_balance
finally:
# 在finally中執行釋放鎖的操作保證正常異常鎖都能釋放
self._lock.release()
@property
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__(self, account, money):
super().__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
for _ in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
for t in threads:
t.join()
print('賬戶余額爲: ¥%d元' % account.balance)
if __name__ == '__main__':
main()
比較遺憾的一件事情是Python的多線程並不能發揮CPU的多核特性,這一點只要啓動幾個執行死循環的線程就可以得到證實了。之所以如此,是因爲Python的解釋器有一個“全局解釋器鎖”(GIL)的東西,任何線程執行前必須先獲得GIL鎖,然後每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行,這是一個曆史遺留問題,但是即便如此,就如我們之前舉的例子,使用多線程在提升執行效率和改善用戶體驗方面仍然是有積極意義的。
無論是多進程還是多線程,只要數量一多,效率肯定上不去,爲什麽呢?我們打個比方,假設你不幸正在準備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱爲單任務模型。如果你打算切換到多任務模型,可以先做1分鍾語文,再切換到數學作業,做1分鍾,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是一樣的了,以旁觀者的角度來看,你就正在同時寫5科作業。
但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),然後,打開數學課本、找出圓規直尺(這叫準備新環境),才能開始做數學作業。操作係統在切換進程或者線程時也是一樣的,它需要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),然後,把新任務的執行環境準備好(恢複上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作係統可能就主要忙著切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬盤狂響,點窗口無反應,係統處於假死狀態。所以,多任務一旦多到一個限度,反而會使得係統性能急劇下降,最終導致所有任務都做不好。
是否采用多任務的第二個考慮是任務的類型,可以把任務分爲計算密集型和I/O密集型。計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如對視頻進行編碼解碼或者格式轉換等等,這種任務全靠CPU的運算能力,雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低。計算密集型任務由於主要消耗CPU資源,這類任務用Python這樣的腳本語言去執行效率通常很低,最能勝任這類任務的是C語言,我們之前提到了Python中有嵌入C/C++代碼的機制。
除了計算密集型任務,其他的涉及到網絡、存儲介質I/O的任務都可以視爲I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因爲I/O的速度遠遠低於CPU和內存的速度)。對於I/O密集型任務,如果啓動多任務,就可以減少I/O等待時間從而讓CPU高效率的運轉。有一大類的任務都屬於I/O密集型任務,這其中包括了我們很快會涉及到的網絡應用和Web應用。
**說明:**上面的內容和例子來自於廖雪峰官方網站的《Python教程》,因爲對作者文中的某些觀點持有不同的看法,對原文的文字描述做了適當的調整。
現代操作係統對I/O操作的改進中最爲重要的就是支持異步I/O。如果充分利用操作係統提供的異步I/O支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱爲事件驅動模型。Nginx就是支持異步I/O的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。用Node.js開發的服務器端程序也使用了這種工作模式,這也是當下實現多任務編程的一種趨勢。
在Python語言中,單線程+異步I/O的編程模型稱爲協程,有了協程的支持,就可以基於事件驅動編寫高效的多任務程序。協程最大的優勢就是極高的執行效率,因爲子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷。協程的第二個優勢就是不需要多線程的鎖機制,因爲只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不用加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。如果想要充分利用CPU的多核特性,最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。關於這方面的內容,我稍後會做一個專題來進行講解。
如下所示的界面中,有“下載”和“關於”兩個按鈕,用休眠的方式模擬點擊“下載”按鈕會聯網下載文件需要耗費10秒的時間,如果不使用“多線程”,我們會發現,當點擊“下載”按鈕後整個程序的其他部分都被這個耗時間的任務阻塞而無法執行了,這顯然是非常糟糕的用戶體驗,代碼如下所示。
import time
import tkinter
import tkinter.messagebox
def download():
# 模擬下載任務需要花費10秒鍾時間
time.sleep(10)
tkinter.messagebox.showinfo('提示', '下載完成!')
def show_about():
tkinter.messagebox.showinfo('關於', '作者: 駱昊(v1.0)')
def main():
top = tkinter.Tk()
top.title('單線程')
top.geometry('200x150')
top.wm_attributes('-topmost', True)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下載', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='關於', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')
tkinter.mainloop()
if __name__ == '__main__':
main()
如果使用多線程將耗時間的任務放到一個獨立的線程中執行,這樣就不會因爲執行耗時間的任務而阻塞了主線程,修改後的代碼如下所示。
import time
import tkinter
import tkinter.messagebox
from threading import Thread
def main():
class DownloadTaskHandler(Thread):
def run(self):
time.sleep(10)
tkinter.messagebox.showinfo('提示', '下載完成!')
# 啓用下載按鈕
button1.config(state=tkinter.NORMAL)
def download():
# 禁用下載按鈕
button1.config(state=tkinter.DISABLED)
# 通過daemon參數將線程設置爲守護線程(主程序退出就不再保留執行)
# 在線程中處理耗時間的下載任務
DownloadTaskHandler(daemon=True).start()
def show_about():
tkinter.messagebox.showinfo('關於', '作者: 駱昊(v1.0)')
top = tkinter.Tk()
top.title('單線程')
top.geometry('200x150')
top.wm_attributes('-topmost', 1)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text='下載', command=download)
button1.pack(side='left')
button2 = tkinter.Button(panel, text='關於', command=show_about)
button2.pack(side='right')
panel.pack(side='bottom')
tkinter.mainloop()
if __name__ == '__main__':
main()
我們來完成1~100000000求和的計算密集型任務,這個問題本身非常簡單,有點循環的知識就能解決,代碼如下所示。
from time import time
def main():
total = 0
number_list = [x for x in range(1, 100000001)]
start = time()
for number in number_list:
total += number
print(total)
end = time()
print('Execution time: %.3fs' % (end - start))
if __name__ == '__main__':
main()
在上面的代碼中,我故意先去創建了一個列表容器然後填入了100000000個數,這一步其實是比較耗時間的,所以爲了公平起見,當我們將這個任務分解到8個進程中去執行的時候,我們暫時也不考慮列表切片操作花費的時間,只是把做運算和合並運算結果的時間統計出來,代碼如下所示。
from multiprocessing import Process, Queue
from random import randint
from time import time
def task_handler(curr_list, result_queue):
total = 0
for number in curr_list:
total += number
result_queue.put(total)
def main():
processes = []
number_list = [x for x in range(1, 100000001)]
result_queue = Queue()
index = 0
# 啓動8個進程將數據切片後進行運算
for _ in range(8):
p = Process(target=task_handler,
args=(number_list[index:index + 12500000], result_queue))
index += 12500000
processes.append(p)
p.start()
# 開始記錄所有進程執行完成花費的時間
start = time()
for p in processes:
p.join()
# 合並執行結果
total = 0
while not result_queue.empty():
total += result_queue.get()
print(total)
end = time()
print('Execution time: ', (end - start), 's', sep='')
if __name__ == '__main__':
main()
比較兩段代碼的執行結果(在我目前使用的MacBook上,上面的代碼需要大概6秒左右的時間,而下面的代碼只需要不到1秒的時間,再強調一次我們只是比較了運算的時間,不考慮列表創建及切片操作花費的時間),使用多進程後由於獲得了更多的CPU執行時間以及更好的利用了CPU的多核特性,明顯的減少了程序的執行時間,而且計算量越大效果越明顯。當然,如果願意還可以將多個進程部署在不同的計算機上,做成分布式進程,具體的做法就是通過multiprocessing.managers模塊中提供的管理器將Queue
對象通過網絡共享出來(注冊到網絡上讓其他計算機可以訪問),這部分內容也留到爬蟲的專題再進行講解。