寧波市住房和城鄉(xiāng)建設(shè)局網(wǎng)站上海百度推廣官方電話
一、多線程
threading 模塊
threading 模塊對象
對象 | 描述 |
---|---|
Thread | 表示一個執(zhí)行線程的對象 |
Lock | 鎖原語對象(與 thread 模塊中的鎖一樣) |
RLock | 可重入鎖對象,使單一線程可以(再次)獲得已持有的鎖(遞歸鎖) |
Condition | 條件變量對象,使得一個線程等待另一個線程苗族特定的條件,比如改變狀態(tài)或某個數(shù)據(jù)值 |
Event | 添加變量的通用版本,任意數(shù)量的線程等待某個時間的發(fā)生,在該事件發(fā)生后所有線程將被激活 |
Semaphore | 為線程鍵共享的有限資源提供一個 計(jì)算器(信號量),如果沒有可用資源時會被阻塞 |
BoundedSemaphore | 與 Semaphore 相似,不過不允許超過初始值 |
Timer | 與 Thread 相似,在運(yùn)行前要等待一段時間 |
Barrier | 創(chuàng)建一個障礙,必須達(dá)到指定數(shù)量的線程后才可以繼續(xù) |
守護(hù)線程
1、thread 模塊不支持守護(hù)線程,當(dāng)主線程退出后,所有子線程也會退出,不管其是否在工作
2、threading 模塊支持守護(hù)線程:等待一個客戶端請求服務(wù)的服務(wù)器,如果客戶端沒有請求,守護(hù)線程是空閑的,如果把一個線程設(shè)置為守護(hù)線程,就表示這個線程不重要的。進(jìn)程退出時不需要等待這個線程執(zhí)行完成。
如果主線程退出時,不需要等待某些子線程完成,可以將子線程設(shè)置為 守護(hù)線程,標(biāo)記為真時,表示該線程不重要。在啟動線程前執(zhí)行 thread.daemon=True
可以設(shè)置守護(hù)線程,檢查線程的守護(hù)狀態(tài)也可以判斷它。
Thread 類
threading
模塊的 Thread
類是主要的執(zhí)行對象,下面是 Thread
對象的屬性和方法列表:
屬性 | 描述 | 方法 | 描述 |
---|---|---|---|
name | 線程名 | start() | 開始執(zhí)行該線程 |
ident | 線程的標(biāo)識符 | run() | 定義線程功能的方法,通常在子類中被應(yīng)用開發(fā)者重寫 |
daemon | 布爾值,表示這個線程是否是守護(hù)線程 | join(timeout=None) | 直至啟動的線程終止之前一直掛起,除非給出 timeout,否則一直阻塞 |
Thread(group=None, target=None, name=None, agrs=(), kwargs={}, verbose=None, daemon=None)# 實(shí)例化一個線程對象,需要一個可調(diào)用的 target,一般是函數(shù),及其參數(shù) args(元組)或 kwargs
# 也可以傳遞 name 或 group 參數(shù),daemon 將會設(shè)定 thread.daemon 屬性/標(biāo)志
創(chuàng)建線程的三種方法
創(chuàng)建 Thread 實(shí)例,傳遞給它一個函數(shù)
import threading
from time import sleep, ctimeloops = [4, 2]
def loop(nloop, nsec):print('loop 函數(shù)開始執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函數(shù)結(jié)束執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函數(shù)開始執(zhí)行:', ctime())threads = []nloops = range(len(loops))for i in nloops:t = threading.Thread(target=loop, args=(i, loops[i]))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序結(jié)束:', ctime())if __name__ == '__main__':main()
上述程序?qū)⑸蓛蓚€線程,將其添加到一個列表中,循環(huán)啟動 start()
。join()
方法將會程序掛起,會等待所有線程結(jié)束或超時。因此要比
等待釋放的無限循環(huán)更加清晰。
運(yùn)行結(jié)果如下:
主函數(shù)開始執(zhí)行: Sat Sep 7 17:31:40 2019
loop 函數(shù)開始執(zhí)行 0,時間: Sat Sep 7 17:31:40 2019
loop 函數(shù)開始執(zhí)行 1,時間: Sat Sep 7 17:31:40 2019
loop 函數(shù)結(jié)束執(zhí)行 1,時間: Sat Sep 7 17:31:42 2019
loop 函數(shù)結(jié)束執(zhí)行 0,時間: Sat Sep 7 17:31:44 2019
程序結(jié)束: Sat Sep 7 17:31:44 2019
創(chuàng)建 Thread 的實(shí)例,傳給它一個可調(diào)用的類實(shí)例
import threading
from time import sleep, ctimeloops = [4, 2]class ThreadFunc:def __init__(self, func, args, name=''):self.name = nameself.func = funcself.args = argsdef __call__(self, *args, **kwargs):"""當(dāng)調(diào)用 ThreadFunc() 時會自動執(zhí)行 fun()"""self.func(*self.args)def loop(nloop, nsec):print('loop 函數(shù)開始執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函數(shù)結(jié)束執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函數(shù)開始執(zhí)行:', ctime())threads = []nloops = range(len(loops))for i in nloops: # 創(chuàng)建 Thread 的實(shí)例,傳給它一個可調(diào)用的類實(shí)例t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序結(jié)束:', ctime())if __name__ == '__main__':main()
派生 Thread 的子類,并創(chuàng)建子類的實(shí)例
自定義的類要繼承 threading.Thread
,構(gòu)造函數(shù)必須先調(diào)用其基類的構(gòu)造函數(shù),__call__()
在子類中必須要寫 run()
import threading
from time import sleep, ctimeloops = [4, 2]class MyThread(threading.Thread):def __init__(self, func, args, name=''):threading.Thread.__init__(self)self.name = nameself.func = funcself.args = argsdef run(self, *args, **kwargs):self.func(*self.args)def loop(nloop, nsec):print('loop 函數(shù)開始執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函數(shù)結(jié)束執(zhí)行 %(nloop)s,時間: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函數(shù)開始執(zhí)行:', ctime())threads = []nloops = range(len(loops))for i in nloops:t = MyThread(loop, (i, loops[i]), loop.__name__)threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序結(jié)束:', ctime())if __name__ == '__main__':main()
threading 模塊的其他函數(shù)
- active_count():當(dāng)前活動的 Thread 對象個數(shù)
- current_thread:返回當(dāng)前的 Thread 對象
- enumerate():返回當(dāng)前活動的 Thread 對象列表
- settrace(func):為所有線程設(shè)置一個 trace 函數(shù)
- setprofile(func):為所有線程設(shè)置一個 profile 函數(shù)
- stack_size(size=0):返回新建線程的棧大小,或?yàn)楹罄m(xù)創(chuàng)建的線程設(shè)定棧的大小為 size
啟動和停止線程
import time
import threadingdef countdown(n):while n > 0:print('T-minus:%s,time:%s' % (n, time.ctime()))n -= 1time.sleep(1)if t.is_alive():print('Still running...')else:print('Completed')print('主線程...')if __name__ == '__main__':t = threading.Thread(target=countdown, args=(5, ))t.start()t.join()
判斷一個線程是否存活,可以調(diào)用 is_alive()
方法,解釋器會在所有線程都結(jié)束后才執(zhí)行剩余的代碼,如果需要長時間運(yùn)行的線程
或者一直運(yùn)行的后臺任務(wù),可以使用后臺線程:
threading.Thread(target=countdown, args=(5, ), daemon=True)
后臺線程無法等待,這些線程會在主線程終止時自動銷毀。但是你也不能對線程做額外的高級操作,如:發(fā)送信號,調(diào)整它的調(diào)度,終止線程等。
join() 方法
join()
方法將懸掛當(dāng)前子線程,直至所有子線程結(jié)束。
import time, threadingdef loop():print('thread %s is running...' % threading.current_thread().name)n = 0while n < 5:n += 1print('thread %s >>> %s' % (threading.current_thread().name, n))time.sleep(1)print('thread %s ended.' % threading.current_thread().name)if __name__ == '__main__':print('thread %s is running>>>>' % threading.current_thread().name)t = threading.Thread(target=loop, name='LoopThread')t.start()t.join()print('thread %s ended>>>>' % threading.current_thread().name)
- name:指定子線程名字,不指定默認(rèn)為 thread-1、thread-2
- MainThread 為主線程
- threading.current_thread().name:獲取當(dāng)前線程的實(shí)例(名字)
可以看到 join()
它將子線程添加到當(dāng)前主線程中,并等待子線程終止,才允許它后面的代碼:
thread MainThread is running>>>>
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended>>>>
停止線程
如果線程執(zhí)行一些如 I/O 這樣的阻塞操作,通過輪詢來終止線程將使得線程間的協(xié)調(diào)變得非常棘手。如,如果一個線程一直阻塞在一個 I/O
操作上,就永遠(yuǎn)無法返回檢查自己是否已經(jīng)被結(jié)束了。要正確處理這些問題,需要利用 超時循環(huán) 來小心操作線程:
class IOTask:def terminate(self):self._running = Falsedef run(self, sock):# sock is a socketsock.settimeout(5) # set timeout periodwhile self._running:try:data = sock.recv(8192)breakexcept socket.timeout:continue# continued processing...# Terminatedreturn
判斷線程是否啟動
啟動了一個線程,但是你想知道它是否真的已經(jīng)開始了,由于線程是獨(dú)立運(yùn)行的且狀態(tài)不可預(yù)測的。如果程序中其他線程
要通過判斷某個線程的狀態(tài)來確定自己的下一步操作,這就會顯得很棘手。
我們可以使用 Event 對象來解決這個問題,Event 對象包含一個可由線程設(shè)置信號的標(biāo)志,允許線程等待某事的發(fā)生。
- 初始時,標(biāo)志為假
- 標(biāo)志未假時,這個線程會被一直阻塞直至標(biāo)志為真
from threading import Thread, Event
import timedef countdown(n, started_evt):print('countdown 開始...')started_evt.set() # 設(shè)置標(biāo)志為真while n > 0:print('T-minus', n)n -= 1time.sleep(1)started_evt = Event() # 創(chuàng)建 Event 對象print('啟動 countdown 函數(shù)')
t = Thread(target=countdown, args=(5, started_evt))
t.start()# 等待線程開始
started_evt.wait()
print('countdown is running...')
可以看到 countdown is running...
一直在 countdown 開始...
輸出之后才打印,這是因?yàn)?event 在協(xié)調(diào)線程。使得主線程要等
countdown()
函數(shù)輸出啟動信息后,才繼續(xù)執(zhí)行。
啟動 countdown 函數(shù)
countdown 開始...
T-minus 5
countdown is running...
T-minus 4
T-minus 3
T-minus 2
T-minus 1
如果你將 started_evt.set()
注銷掉,再運(yùn)行程序,會發(fā)現(xiàn)程序一直被阻塞…,這是因?yàn)闃?biāo)志為假,線程被阻塞。
三、協(xié)程
Python 協(xié)程只能運(yùn)行在時間循環(huán)中,但是一旦事件循環(huán)運(yùn)行,又會阻塞當(dāng)前任務(wù)。因此 動態(tài)添加任務(wù)/協(xié)程
需要再開一個線程,這個線程主要任務(wù)時運(yùn)行事件循環(huán),因?yàn)槭菬o限循環(huán),會阻塞當(dāng)前線程:
import asyncio
from threading import Threadasync def production_task():i = 0while True:# 將 consumption 這個協(xié)程每秒注冊一個運(yùn)行到線程中的循環(huán),thread_loop 每秒會獲取一個一直打印 i 的無限循環(huán)任務(wù)# run_coroutine_threadsafe 這個方法只能用在運(yùn)行在線程中的循環(huán)事件使用asyncio.run_coroutine_threadsafe(consumption(i), thread_loop)await asyncio.sleep(1) # 必須加 awaiti += 1async def consumption(i):while True:print('我是第{}任務(wù)'.format(i))await asyncio.sleep(1)def start_loop(loop):# 運(yùn)行事件循環(huán),loop 以參數(shù)的形式傳遞進(jìn)來運(yùn)行asyncio.set_event_loop(loop)loop.run_forever()thread_loop = asyncio.new_event_loop() # 獲取一個事件循環(huán)
run_loop_thread = Thread(target=start_loop, args=(thread_loop,)) # 將每次事件循環(huán)運(yùn)行在一個線程中,防止阻塞當(dāng)前主線程
run_loop_thread.start() # 運(yùn)行線程,同時協(xié)程事件循環(huán)也會運(yùn)行advocate_loop = asyncio.get_event_loop() # 將生產(chǎn)任務(wù)的協(xié)程注冊到這個循環(huán)中
advocate_loop.run_until_complete(production_task()) # 運(yùn)行次循環(huán)
我是第0任務(wù)
我是第1任務(wù)
我是第0任務(wù)
我是第1任務(wù)
我是第0任務(wù)
我是第2任務(wù)
我是第3任務(wù)
我是第1任務(wù)
我是第0任務(wù)
四、五種 unix IO 模型
epoll 并不代表一定比 select 好(效率高)
- 在并發(fā)高的情況下,連接活躍度不是很高的情況下,epoll 比 select 好(比如:Web 連接,用戶連接可能隨時斷開)
- 在并發(fā)性不高,同時連接活躍,select 比 epoll 好(比如:游戲連接,一般要保持持續(xù)連接)
select、poll、epoll 本質(zhì)還是同步阻塞的,之所以能支持高并發(fā),是因?yàn)橐粋€進(jìn)程能同時監(jiān)聽多個文件描述符
非阻塞 IO 不一定比阻塞好,因?yàn)樗恢毖h(huán)檢測服務(wù)器是否有數(shù)據(jù)返回,如果后續(xù)的程序不依賴前面的連接,其效率要高,要是依賴前面的程序,效率不一定要好。