網(wǎng)站發(fā)布信息技巧上海百網(wǎng)優(yōu)seo優(yōu)化公司
深入理解Python多進程編程 multiprocessing
flyfish
Python 的 multiprocessing
模塊允許創(chuàng)建多個進程,從而可以利用多核處理器的能力來并行執(zhí)行任務。這意味著程序的不同部分可以在不同的CPU核心上同時運行,極大地提高了處理效率,特別是在執(zhí)行計算密集型任務時。
與多線程相比,multiprocessing
使用的是系統(tǒng)級的進程而不是線程。每個進程都有獨立的內(nèi)存空間和系統(tǒng)資源,而線程則共享同一個進程的內(nèi)存空間。因此,在Python中(特別是由于全局解釋器鎖GIL的存在),對于CPU密集型任務,使用multiprocessing
比多線程能更有效地利用多核CPU的優(yōu)勢。
進程的概念
在計算機操作系統(tǒng)中,進程是操作系統(tǒng)進行資源分配和調度的基本單位。一個進程可以包含多個線程。當使用multiprocessing
模塊時,可以創(chuàng)建新的進程,這些新進程將與主程序并行運行,并且它們各自擁有獨立的內(nèi)存空間。
示例代碼1:單個進程打印數(shù)字
下面是一個簡單的示例,演示如何使用multiprocessing
模塊創(chuàng)建一個進程來打印從1到5的數(shù)字:
import multiprocessing
import timedef print_numbers():"""打印從1到5的數(shù)字"""for i in range(1, 6):print("數(shù)字:", i)time.sleep(1) # 模擬耗時操作if __name__ == "__main__":# 創(chuàng)建一個新的進程process = multiprocessing.Process(target=print_numbers)# 啟動進程process.start()# 等待進程完成process.join()
數(shù)字: 1
數(shù)字: 2
數(shù)字: 3
數(shù)字: 4
數(shù)字: 5
multiprocessing.Process()
:創(chuàng)建一個新的進程對象。target=print_numbers
:指定該進程的目標函數(shù)為print_numbers
。process.start()
:啟動進程。process.join()
:等待進程結束。
示例代碼2:兩個進程分別打印不同字符串
下面是另一個示例,演示如何同時啟動兩個進程,每個進程打印不同的字符串:
import multiprocessingdef print_message(message):"""打印傳入的消息"""print(f"消息: {message}")if __name__ == "__main__":# 創(chuàng)建兩個進程process1 = multiprocessing.Process(target=print_message, args=("Hello from Process 1",))process2 = multiprocessing.Process(target=print_message, args=("Hello from Process 2",))# 啟動兩個進程process1.start()process2.start()# 等待兩個進程都完成process1.join()process2.join()
消息: Hello from Process 1
消息: Hello from Process 2
在這個例子中,定義了一個print_message
函數(shù),它接受一個字符串參數(shù)并打印出來。然后,創(chuàng)建了兩個進程,每個進程都調用這個函數(shù),但傳遞了不同的字符串參數(shù)。通過args
參數(shù),可以向目標函數(shù)傳遞額外的參數(shù)。最后,啟動這兩個進程,并等待它們完成各自的執(zhí)行。這樣,就可以看到兩個進程幾乎同時開始工作,并打印出各自的消息。
示例3:使用 multiprocessing.Value
在多個進程中共享一個計數(shù)器
multiprocessing.Value
Value
允許多個進程共享一個值。它適用于需要在多個進程中共享簡單數(shù)據(jù)類型(如整數(shù)或浮點數(shù))的情況。
import multiprocessingdef increment(counter, lock):"""增加計數(shù)器的值"""for _ in range(1000):with lock:counter.value += 1if __name__ == "__main__":# 創(chuàng)建一個共享的整數(shù)值和鎖counter = multiprocessing.Value('i', 0) # 'i' 表示整數(shù)類型lock = multiprocessing.Lock()# 創(chuàng)建多個進程來增加計數(shù)器processes = [multiprocessing.Process(target=increment, args=(counter, lock)) for _ in range(10)]# 啟動所有進程for p in processes:p.start()# 等待所有進程完成for p in processes:p.join()print("最終計數(shù)器值:", counter.value)
最終計數(shù)器值: 10000
multiprocessing.Value(typecode_or_type, *args, lock=True)
:創(chuàng)建一個新的共享值對象。typecode_or_type
指定了要共享的數(shù)據(jù)類型(例如'i'
表示整數(shù))。value.value
:訪問共享值的實際內(nèi)容。lock
:確保對共享資源的安全訪問,防止競態(tài)條件。
進程(Process)和線程(Thread)在Python中的區(qū)別
特性 | 進程(Process) | 線程(Thread) |
---|---|---|
內(nèi)存空間 | 每個進程有獨立的內(nèi)存空間 | 所有線程共享同一進程的內(nèi)存空間 |
資源消耗 | 開銷較大,需要更多系統(tǒng)資源 | 輕量級,開銷小,資源共享 |
通信難度 | 進程間通信復雜(IPC),如管道、套接字等 | 線程間通信簡單,直接訪問相同變量和數(shù)據(jù)結構 |
全局解釋器鎖(GIL) | 不受GIL限制,適合計算密集型任務 | 受GIL限制,對于計算密集型任務效率提升有限 |
適用場景 | 計算密集型任務,穩(wěn)定性要求高的應用 | I/O密集型任務,快速響應用戶界面的應用 |
崩潰影響 | 一個進程崩潰不影響其他進程 | 一個線程出錯可能導致整個進程崩潰 |
Python中多線程(Thread)和多進程(Process)的區(qū)別
特性 | 多線程(Thread) | 多進程(Process) |
---|---|---|
內(nèi)存空間 | 所有線程共享同一進程的內(nèi)存空間 | 每個進程有獨立的內(nèi)存空間 |
資源消耗 | 輕量級,開銷小,資源共享 | 開銷較大,需要更多系統(tǒng)資源 |
通信難度 | 線程間通信簡單,直接訪問相同變量和數(shù)據(jù)結構 | 進程間通信復雜(IPC),如管道、套接字等 |
全局解釋器鎖 (GIL) | 受GIL限制,對于計算密集型任務效率提升有限 | 不受GIL限制,適合計算密集型任務 |
適用場景 | I/O密集型任務,快速響應用戶界面的應用 | 計算密集型任務,穩(wěn)定性要求高的應用 |
崩潰影響 | 一個線程出錯可能導致整個進程崩潰 | 一個進程崩潰不影響其他進程 |
創(chuàng)建與銷毀開銷 | 創(chuàng)建和銷毀開銷較小 | 創(chuàng)建和銷毀開銷較大 |
并發(fā)性能 | 對于I/O密集型任務性能較好,但對于CPU密集型任務受限 | 對于CPU密集型任務性能較好 |
示例用途 | 網(wǎng)絡請求、文件讀寫、GUI應用等 | 數(shù)據(jù)分析、圖像處理、科學計算等 |
進程間通信
在Python的multiprocessing
模塊中,提供了幾種常用的進程間通信(IPC)方式,包括隊列(Queue)、管道(Pipe)等。這些工具允許不同的進程之間安全地傳遞數(shù)據(jù)。
使用 multiprocessing.Queue
實現(xiàn)進程間通信
Queue
是一個線程和進程安全的 FIFO 隊列,非常適合用于進程間的簡單數(shù)據(jù)交換。
示例代碼:
import multiprocessingdef producer(queue):"""生產(chǎn)者函數(shù),向隊列中添加數(shù)據(jù)"""for i in range(5):queue.put(f"數(shù)據(jù) {i}")print(f"生產(chǎn)者放入: 數(shù)據(jù) {i}")def consumer(queue):"""消費者函數(shù),從隊列中取出數(shù)據(jù)"""while not queue.empty():data = queue.get()print(f"消費者獲取: {data}")if __name__ == "__main__":# 創(chuàng)建一個隊列對象queue = multiprocessing.Queue()# 創(chuàng)建生產(chǎn)者和消費者進程p1 = multiprocessing.Process(target=producer, args=(queue,))p2 = multiprocessing.Process(target=consumer, args=(queue,))# 啟動進程p1.start()p2.start()# 等待兩個進程完成p1.join()p2.join()
生產(chǎn)者放入: 數(shù)據(jù) 0
生產(chǎn)者放入: 數(shù)據(jù) 1
生產(chǎn)者放入: 數(shù)據(jù) 2
生產(chǎn)者放入: 數(shù)據(jù) 3
生產(chǎn)者放入: 數(shù)據(jù) 4
消費者獲取: 數(shù)據(jù) 0
消費者獲取: 數(shù)據(jù) 1
消費者獲取: 數(shù)據(jù) 2
消費者獲取: 數(shù)據(jù) 3
消費者獲取: 數(shù)據(jù) 4
- 隊列的使用:
queue.put()
用于向隊列中添加數(shù)據(jù),queue.get()
用于從隊列中取出數(shù)據(jù)。 - 數(shù)據(jù)傳遞原理:生產(chǎn)者進程通過調用
put
方法將數(shù)據(jù)放入隊列,而消費者進程通過調用get
方法從隊列中取出數(shù)據(jù)。Queue
對象是進程安全的,因此多個進程可以同時訪問它而不發(fā)生沖突。
使用 multiprocessing.Pipe
實現(xiàn)進程間通信
Pipe
提供了一個雙向通道,適用于兩個進程之間的直接通信。
示例代碼:
import multiprocessingdef sender(conn, messages):"""發(fā)送者函數(shù),通過管道發(fā)送消息"""for msg in messages:conn.send(msg)print(f"發(fā)送者發(fā)送: {msg}")conn.close()def receiver(conn):"""接收者函數(shù),通過管道接收消息"""while True:msg = conn.recv()if msg == "END":breakprint(f"接收者接收: {msg}")if __name__ == "__main__":# 創(chuàng)建一個管道對象parent_conn, child_conn = multiprocessing.Pipe()# 準備要發(fā)送的消息messages = ["Hello", "from", "sender", "END"]# 創(chuàng)建發(fā)送者和接收者進程p1 = multiprocessing.Process(target=sender, args=(child_conn, messages))p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))# 啟動進程p1.start()p2.start()# 等待兩個進程完成p1.join()p2.join()
發(fā)送者發(fā)送: Hello
發(fā)送者發(fā)送: from
發(fā)送者發(fā)送: sender
發(fā)送者發(fā)送: END
接收者接收: Hello
接收者接收: from
接收者接收: sender
進程池的使用
multiprocessing.Pool
是一個用于管理一組工作進程的類,它可以簡化并行任務的分配和結果收集。
示例代碼:使用 Pool
并行計算數(shù)字的平方
import multiprocessingdef square(n):"""計算一個數(shù)的平方"""return n * nif __name__ == "__main__":# 定義要處理的數(shù)字列表numbers = [1, 2, 3, 4, 5]# 創(chuàng)建一個包含4個進程的進程池with multiprocessing.Pool(processes=4) as pool:# 使用map方法將square函數(shù)應用于每個數(shù)字results = pool.map(square, numbers)print("結果:", results)
結果: [1, 4, 9, 16, 25]
- 進程池的概念和作用:
Pool
允許你指定一定數(shù)量的工作進程,并且可以通過map
、apply
等方法輕松地將任務分配給這些進程。這樣可以有效地利用多核CPU來加速計算密集型任務。 - 設置進程池大小:通過
processes
參數(shù)指定進程池中的工作進程數(shù)量,默認情況下,它會根據(jù)系統(tǒng)CPU核心數(shù)自動調整。 - 處理任務的方式:
pool.map()
方法類似于內(nèi)置的map()
函數(shù),但它會在多個進程中并行執(zhí)行。在這個例子中,我們將square
函數(shù)應用到numbers
列表中的每個元素,并返回計算結果。
Semaphore(信號量)
信號量是一種更高級的同步機制,可以用來控制同時訪問某一資源的進程數(shù)量。
示例:使用 Semaphore
控制并發(fā)訪問
import multiprocessing
import timedef worker(semaphore, name):with semaphore:print(f"{name} 獲得信號量")time.sleep(1)if __name__ == "__main__":semaphore = multiprocessing.Semaphore(3) # 最多允許3個進程同時訪問processes = [multiprocessing.Process(target=worker, args=(semaphore, f"進程 {i}")) for i in range(6)]for p in processes:p.start()for p in processes:p.join()
Event(事件)
事件是一種簡單的線程間通信機制,可以讓一個或多個進程等待某個特定事件的發(fā)生。
示例:使用 Event
實現(xiàn)進程間的同步
import multiprocessing
import timedef wait_for_event(event):print("等待事件觸發(fā)...")event.wait() # 阻塞直到事件被設置print("事件已觸發(fā)!")def set_event(event):time.sleep(3)event.set() # 觸發(fā)事件if __name__ == "__main__":event = multiprocessing.Event()p1 = multiprocessing.Process(target=wait_for_event, args=(event,))p2 = multiprocessing.Process(target=set_event, args=(event,))p1.start()p2.start()p1.join()p2.join()
Manager(管理器)
Manager
提供了更高層次的接口,可以創(chuàng)建可以在不同進程之間共享的數(shù)據(jù)結構,如列表、字典等。
示例:使用 Manager
創(chuàng)建共享數(shù)據(jù)結構
import multiprocessingdef append_to_list(shared_list, item):shared_list.append(item)print(f"添加到共享列表: {item}")if __name__ == "__main__":with multiprocessing.Manager() as manager:shared_list = manager.list() # 創(chuàng)建一個可共享的列表processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)]for p in processes:p.start()for p in processes:p.join()print("最終共享列表:", list(shared_list))
文中processes = [multiprocessing.Process(target=append_to_list, args=(shared_list, i)) for i in range(5)]
這一句 等于下面的代碼
processes = []
for i in range(5):p = multiprocessing.Process(target=append_to_list, args=(shared_list, i))processes.append(p)
共享內(nèi)存
multiprocessing
還支持通過共享內(nèi)存的方式在進程之間共享數(shù)據(jù),這對于大規(guī)模數(shù)據(jù)共享特別有用。
示例:使用 Array
共享數(shù)組
import multiprocessingdef modify_array(shared_array, index, value):shared_array[index] = valueif __name__ == "__main__":array = multiprocessing.Array('i', [1, 2, 3, 4, 5]) # 創(chuàng)建共享數(shù)組processes = [multiprocessing.Process(target=modify_array, args=(array, i, i*10)) for i in range(len(array))]for p in processes:p.start()for p in processes:p.join()print("修改后的數(shù)組:", list(array))
修改后的數(shù)組: [0, 10, 20, 30, 40]