如何鑒別建設(shè)銀行網(wǎng)站真?zhèn)位潴w育網(wǎng)體育
1. 簡(jiǎn)介
本文介紹了在并發(fā)編程中數(shù)據(jù)匯總的問題,并探討了在并發(fā)環(huán)境下使用互斥鎖和通道兩種方式來保證數(shù)據(jù)安全性的方法。
首先,通過一個(gè)實(shí)例,描述了一個(gè)并發(fā)拉取數(shù)據(jù)并匯總的案例,并使用互斥鎖來確保線程安全。然后,討論了互斥鎖的一些缺點(diǎn),引出了通道作為一種替代方案,并介紹了通道的基本使用和特性。接下來,通過實(shí)例演示了如何使用通道來實(shí)現(xiàn)并發(fā)下的數(shù)據(jù)匯總。
最后,引用了etcd中使用通道實(shí)現(xiàn)協(xié)程并發(fā)下數(shù)據(jù)匯總的例子,展示了通道在實(shí)際項(xiàng)目中的應(yīng)用。
2. 問題引入
在請(qǐng)求處理過程中,經(jīng)常需要通過RPC接口拉取數(shù)據(jù)。有時(shí)候,由于數(shù)據(jù)量較大,單個(gè)數(shù)據(jù)拉取操作可能會(huì)導(dǎo)致整個(gè)請(qǐng)求的處理時(shí)間較長(zhǎng)。為了加快處理速度,我們通常考慮同時(shí)開啟多個(gè)協(xié)程并發(fā)地拉取數(shù)據(jù)。一旦多個(gè)協(xié)程并發(fā)拉取數(shù)據(jù)后,主協(xié)程需要匯總這些協(xié)程拉取到的數(shù)據(jù),然后再返回結(jié)果。在這個(gè)過程中,往往涉及對(duì)共享資源的并發(fā)訪問,為了保證線程安全性,通常會(huì)使用互斥鎖。下面通過一個(gè)簡(jiǎn)單的代碼來展示該過程:
package mainimport ("fmt""sync""time"
)type Data struct {ID intName string
}var (// 匯總結(jié)果dataList []Data// 互斥鎖mutex sync.Mutex
)func fetchData(page int, wg *sync.WaitGroup) {// 模擬RPC接口拉取數(shù)據(jù)的耗時(shí)操作time.Sleep(time.Second)// 假設(shè)從RPC接口獲取到了一批數(shù)據(jù)data := Data{ID: page,Name: fmt.Sprintf("Data %d", page),}// 使用互斥鎖保護(hù)共享數(shù)據(jù)的并發(fā)訪問mutex.Lock()defer mutext.Unlock()dataList = append(dataList, data)wg.Done()
}func main() {var wg sync.WaitGroup// 定義需要拉取的數(shù)據(jù)頁數(shù)numPages := 10// 啟動(dòng)多個(gè)協(xié)程并發(fā)地拉取數(shù)據(jù)for i := 1; i <= numPages; i++ {wg.Add(1)go fetchData(i, &wg)}// 等待所有協(xié)程完成wg.Wait()// 打印拉取到的數(shù)據(jù)fmt.Println("Fetched data:")for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)}
}
在上述示例中,我們定義了一個(gè)共享的dataList
切片用于保存拉取到的數(shù)據(jù)。每個(gè)goroutine通過調(diào)用fetchData
函數(shù)來模擬拉取數(shù)據(jù)的過程,并使用互斥鎖mutex
保護(hù)dataList
的并發(fā)訪問。主協(xié)程使用sync.WaitGroup
等待所有協(xié)程完成數(shù)據(jù)拉取任務(wù),然后打印出拉取到的數(shù)據(jù)。通過并發(fā)地拉取數(shù)據(jù),并使用互斥鎖保證線程安全,我們可以顯著提高數(shù)據(jù)拉取的速度,并且確保數(shù)據(jù)的正確性和一致性。
回看上述實(shí)現(xiàn),其實(shí)是涉及到了多個(gè)協(xié)程操作同一份數(shù)據(jù),有可能導(dǎo)致線程安全的問題,然后這里是通過互斥鎖來保證線程安全的。確實(shí),使用互斥鎖是可以保證線程安全的,但是也是存在一些缺點(diǎn)的,比如競(jìng)爭(zhēng)和阻塞,兩個(gè)協(xié)程同時(shí)競(jìng)爭(zhēng)互斥鎖時(shí),只有一個(gè)協(xié)程能夠獲得鎖,而其他協(xié)程則會(huì)被阻塞,這個(gè)就可能導(dǎo)致性能瓶頸,當(dāng)然在這個(gè)場(chǎng)景下問題不大。其次就是代碼的復(fù)雜性提高了,使用互斥鎖需要仔細(xì)設(shè)計(jì)和管理,確保鎖的正確獲取和釋放。這增加了代碼的復(fù)雜性和維護(hù)成本,如果在代碼中處理鎖的方式不正確,可能會(huì)死鎖,導(dǎo)致程序無法繼續(xù)執(zhí)行。
那我們其實(shí)就有疑問,在協(xié)程并發(fā)下數(shù)據(jù)匯總的場(chǎng)景,是否存在其他方式,不需要通過使用互斥鎖,也能夠保證線程安全呢? 其實(shí)還真有,Go
語言中的channel
非常適用于這種情況。通過使用通道,我們可以實(shí)現(xiàn)線程安全的數(shù)據(jù)共享和同步,而無需顯式地使用互斥鎖。下面我們來了解一下channel
。
3. channel的使用
3.1 channel的基本介紹
3.1.1 基本說明
channel
在Go語言中是一種特殊的數(shù)據(jù)結(jié)構(gòu),用于協(xié)程之間的通信和同步。它類似于一個(gè)先進(jìn)先出(FIFO)的隊(duì)列,用于數(shù)據(jù)的傳輸和共享。在并發(fā)環(huán)境中,可以將數(shù)據(jù)發(fā)送到通道,也可以從通道中接收數(shù)據(jù),而這兩個(gè)操作都是線程安全的。
使用channel
的優(yōu)勢(shì)在于它提供了內(nèi)置的同步機(jī)制,無需顯式地使用互斥鎖來處理并發(fā)訪問。
當(dāng)一個(gè)協(xié)程向通道發(fā)送數(shù)據(jù)時(shí),如果通道已滿,發(fā)送操作會(huì)被阻塞,直到有其他協(xié)程從通道中接收數(shù)據(jù)釋放空間。同樣地,當(dāng)一個(gè)協(xié)程從通道接收數(shù)據(jù)時(shí),如果通道為空,接收操作也會(huì)被阻塞,直到有其他協(xié)程向通道發(fā)送數(shù)據(jù)。
同時(shí),當(dāng)多個(gè)協(xié)程同時(shí)訪問通道時(shí),Go運(yùn)行時(shí)系統(tǒng)會(huì)自動(dòng)處理協(xié)程之間的同步和并發(fā)訪問的細(xì)節(jié),保證數(shù)據(jù)的正確性和一致性。從而可以放心地在多個(gè)協(xié)程中使用通道進(jìn)行數(shù)據(jù)的發(fā)送和接收操作,而不需要額外的鎖或同步機(jī)制來保證線程安全。
因此,使用channel
其實(shí)是可以避免常見的并發(fā)問題,如競(jìng)態(tài)條件和死鎖,簡(jiǎn)化了并發(fā)編程的復(fù)雜性。
3.1.2 基本使用
通過上面對(duì)channel
的基本介紹,我們已經(jīng)對(duì)channel
有了基本的了解,其實(shí)可以粗略理解其為一個(gè)并發(fā)安全的隊(duì)列。下面來了解下channel
的基本語法,從而能夠開始使用channel
。
channel基本操作分為創(chuàng)建channel
,發(fā)送數(shù)據(jù)到channel
,接收channel
中的數(shù)據(jù),以及關(guān)閉channel
。下面對(duì)其進(jìn)行簡(jiǎn)單展示:
創(chuàng)建channel,使用make函數(shù)創(chuàng)建通道,通道的類型可以根據(jù)需要選擇,例如int
、string
等:
ch := make(chan int)
發(fā)送數(shù)據(jù)到channel:使用<-
操作符將數(shù)據(jù)發(fā)送到通道中
ch <- data
接收channel中的數(shù)據(jù): 使用<-
操作符從通道中接收數(shù)據(jù)
result := <-ch
關(guān)閉channel, 使用close
函數(shù)關(guān)閉通道。關(guān)閉通道后,仍然可以從通道接收數(shù)據(jù),但無法再向通道發(fā)送數(shù)據(jù)
close(ch)
通過上面channel
的四個(gè)基本操作,便能夠?qū)崿F(xiàn)在不同協(xié)程間線程安全得傳遞數(shù)據(jù)。最后通過一個(gè)例子,完整得展示channel
的基本使用。
package mainimport "fmt"func main() {ch := make(chan string) // 創(chuàng)建字符串通道defer close(ch)go func() {ch <- "hello, channel!" // 發(fā)送數(shù)據(jù)到通道}()result := <-ch // 從通道接收數(shù)據(jù)fmt.Println(result)
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè)字符串通道ch
。然后,在一個(gè)單獨(dú)的協(xié)程中,我們向通道發(fā)送了字符串"hello, channel!"。最后,主協(xié)程從通道中接收數(shù)據(jù),并將其打印出來。
通過使用通道,我們可以實(shí)現(xiàn)協(xié)程之間的數(shù)據(jù)傳輸和同步,確保數(shù)據(jù)的安全共享和線程安全性。通道的使用能夠簡(jiǎn)化并發(fā)編程的復(fù)雜性,提供一種高效、可靠的方式來處理并發(fā)場(chǎng)景下的數(shù)據(jù)傳遞。
3.2 使用channel實(shí)現(xiàn)匯總數(shù)據(jù)
下面,我們使用channel
來實(shí)現(xiàn)并發(fā)數(shù)據(jù)匯總,替換掉之前使用互斥鎖來保證線程安全的實(shí)現(xiàn):
package mainimport ("fmt""sync""time"
)type Data struct {ID intName string
}func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {// 模擬 RPC 接口拉取數(shù)據(jù)的耗時(shí)操作time.Sleep(time.Second)// 假設(shè)從 RPC 接口獲取到了一批數(shù)據(jù)data := Data{ID: page,Name: fmt.Sprintf("Data %d", page),}ch <- data // 將數(shù)據(jù)發(fā)送到通道wg.Done()
}func main() {var wg sync.WaitGroup// 定義需要拉取的數(shù)據(jù)頁數(shù)numPages := 10dataCh := make(chan Data, 10) // 創(chuàng)建用于接收數(shù)據(jù)的通道// 啟動(dòng)多個(gè)協(xié)程并發(fā)地拉取數(shù)據(jù)for i := 1; i <= numPages; i++ {wg.Add(1)go fetchData(i, dataCh, &wg)}go func() {wg.Wait()close(dataCh) // 關(guān)閉通道,表示數(shù)據(jù)已經(jīng)全部發(fā)送完成}()// 從通道接收數(shù)據(jù)并匯總var dataList []Datafor data := range dataCh {dataList = append(dataList, data)}// 打印拉取到的數(shù)據(jù)fmt.Println("Fetched data:")for _, data := range dataList {fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)}
}
在修改后的代碼中,我們創(chuàng)建了一個(gè)用于接收數(shù)據(jù)的 dataCh
。每個(gè)協(xié)程通過將數(shù)據(jù)發(fā)送到該channel
來完成數(shù)據(jù)的匯總。主協(xié)程通過從channel
接收數(shù)據(jù),并將其添加到 dataList
中實(shí)現(xiàn)數(shù)據(jù)的匯總過程。這種方式不需要顯式地加鎖和解鎖,并且避免了互斥鎖帶來的復(fù)雜性和性能問題。
通過使用channel
,我們能夠以一種更直觀、更安全的方式實(shí)現(xiàn)協(xié)程之間的數(shù)據(jù)傳遞和同步。channel
在并發(fā)編程中起到了關(guān)鍵的作用,簡(jiǎn)化了并發(fā)操作的管理和實(shí)現(xiàn)。同時(shí),它提供了內(nèi)置的同步機(jī)制,保證了數(shù)據(jù)的正確性和一致性,避免了死鎖和競(jìng)態(tài)條件的問題。
3.3 總結(jié)
協(xié)程間的并發(fā)下匯總數(shù)據(jù)可以歸類為協(xié)程間的數(shù)據(jù)傳遞這個(gè)場(chǎng)景。在這個(gè)場(chǎng)景中,多個(gè)協(xié)程并發(fā)地拉取數(shù)據(jù),然后將數(shù)據(jù)匯總到一個(gè)共享的數(shù)據(jù)結(jié)構(gòu)中。為了保證數(shù)據(jù)的正確性和一致性,需要使用某種機(jī)制來確保多個(gè)協(xié)程對(duì)共享數(shù)據(jù)的并發(fā)訪問是安全的。
在原始的實(shí)現(xiàn)中,使用了互斥鎖來保護(hù)共享數(shù)據(jù)的并發(fā)訪問?;コ怄i提供了互斥訪問的機(jī)制,確保同一時(shí)間只有一個(gè)協(xié)程可以訪問共享數(shù)據(jù),從而避免了數(shù)據(jù)競(jìng)爭(zhēng)和不一致性。這種方式在保證線程安全的同時(shí),引入了鎖的開銷和復(fù)雜性。
而使用channel
來實(shí)現(xiàn)協(xié)程間的安全數(shù)據(jù)傳遞可以更簡(jiǎn)潔和高效。每個(gè)協(xié)程可以將拉取到的數(shù)據(jù)通過channel
發(fā)送到主協(xié)程,主協(xié)程通過接收channel
中的數(shù)據(jù)來進(jìn)行匯總。channel
提供了并發(fā)安全的數(shù)據(jù)傳遞機(jī)制,協(xié)程之間的數(shù)據(jù)傳輸是同步和有序的。由于channel
本身就提供了同步機(jī)制,不需要額外的鎖和同步操作,能夠更簡(jiǎn)潔地實(shí)現(xiàn)協(xié)程間的安全數(shù)據(jù)傳遞。
因此,如果需要在多個(gè)協(xié)程間實(shí)現(xiàn)數(shù)據(jù)傳遞,而且由此可能帶來線程安全的問題,此時(shí)使用channel
來實(shí)現(xiàn)是相對(duì)比較合適的。
4. 開源項(xiàng)目中的使用
假設(shè)我們需要對(duì)etcd
進(jìn)行性能測(cè)試,此時(shí)需要模擬大量并發(fā)請(qǐng)求,對(duì)etcd
進(jìn)行負(fù)載測(cè)試,并收集每個(gè)請(qǐng)求的執(zhí)行時(shí)間、成功/失敗狀態(tài)等結(jié)果數(shù)據(jù)。然后主協(xié)程需要收集每一個(gè)請(qǐng)求的結(jié)果數(shù)據(jù),并進(jìn)行統(tǒng)計(jì)計(jì)算,生成相應(yīng)的性能報(bào)告?;诖?#xff0c;能夠計(jì)算出總請(qǐng)求數(shù)、請(qǐng)求成功率、平均執(zhí)行時(shí)間、最慢/最快請(qǐng)求等統(tǒng)計(jì)信息,以及錯(cuò)誤分布情況和慢速請(qǐng)求的詳細(xì)信息。
從上面的講述來看,其實(shí)我們可以大概想象出這個(gè)模型,多個(gè)協(xié)程并發(fā)執(zhí)行,然后獲取每個(gè)請(qǐng)求的結(jié)果數(shù)據(jù)。然后主協(xié)程需要收集匯總這些數(shù)據(jù),基于此來生成性能報(bào)告。這個(gè)模型其實(shí)也就是我們上面所說的協(xié)程并發(fā)下的數(shù)據(jù)匯總,因此通過channel
來實(shí)現(xiàn)協(xié)程間的數(shù)據(jù)傳輸,是非常合適的。
下面我們來看看etcd
中對(duì)應(yīng)的實(shí)現(xiàn)。etcd
中存在一個(gè)report
對(duì)象的實(shí)現(xiàn),能夠接受一系列的請(qǐng)求數(shù)據(jù)的結(jié)果,然后生成性能報(bào)告返回回去。結(jié)構(gòu)體定義如下:
type report struct {results chan Resultstats Stats
}
func (r *report) Results() chan<- Result { return r.results }// Result describes the timings for an operation.
type Result struct {Start time.TimeEnd time.TimeErr error
}func newReport(precision string) *report {r := &report{results: make(chan Result, 16),}return r
}
Result
結(jié)構(gòu)體為單個(gè)測(cè)試的結(jié)果,而 report
結(jié)構(gòu)體則用于整個(gè)測(cè)試過程的報(bào)告和統(tǒng)計(jì)信息。通過使用 results
通道,可以將每個(gè)測(cè)試的結(jié)果發(fā)送到 report
結(jié)構(gòu)體中,以便進(jìn)行統(tǒng)計(jì)和生成報(bào)告。
當(dāng)進(jìn)行性能壓測(cè)時(shí),首先通過newReport
生成一個(gè)report
對(duì)象,然后啟動(dòng)多個(gè)協(xié)程同時(shí)進(jìn)行壓測(cè)請(qǐng)求,每一個(gè)請(qǐng)求處理完成之后,便會(huì)生成一個(gè)處理結(jié)果,存儲(chǔ)到Result
對(duì)象當(dāng)中。然后基于report
對(duì)象的Results
方法獲取到對(duì)應(yīng)的channel
,將處理結(jié)果傳輸給主協(xié)程。
主協(xié)程便通過遍歷report
對(duì)象中的results
變量對(duì)應(yīng)的channel
,匯總計(jì)算所有處理結(jié)果,基于此便能夠生成壓測(cè)結(jié)果和報(bào)告。下面來看其具體流程。
首先是創(chuàng)建一個(gè)report
對(duì)象,然后啟動(dòng)多個(gè)協(xié)程來處理請(qǐng)求,將結(jié)果發(fā)送到report
對(duì)象中的results
對(duì)應(yīng)的channel
中。
// 這里NewReportSample方法,其實(shí)是對(duì)上面newReport方法的一個(gè)封裝
r := NewReportSample("%f")
// 這里假設(shè)只有一個(gè)協(xié)程,模擬執(zhí)行一系列的測(cè)試,并將測(cè)試結(jié)果發(fā)送到 Report 對(duì)象的 results 通道中。
go func() {start := time.Now()for i := 0; i < 5; i++ {// 不真實(shí)進(jìn)行請(qǐng)求,只是簡(jiǎn)單獲取執(zhí)行結(jié)果,將測(cè)試結(jié)果進(jìn)行傳輸end := start.Add(time.Second)r.Results() <- Result{Start: start, End: end}start = end}r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}// 假設(shè)所有壓測(cè)請(qǐng)求都執(zhí)行完成了close(r.Results())
}()
// 主協(xié)程 匯總所有的處理結(jié)果,然后生成壓測(cè)報(bào)告
stats := <-r.Stats()
以上代碼中,r
是通過 NewReportSample("%f")
創(chuàng)建的一個(gè) Report
對(duì)象。然后,在一個(gè)單獨(dú)的協(xié)程中,執(zhí)行了一系列的測(cè)試,并將測(cè)試結(jié)果發(fā)送到 r.Results()
通道中。
這段代碼的作用是模擬執(zhí)行一系列的測(cè)試,并將測(cè)試結(jié)果發(fā)送到 Report
對(duì)象的 results
通道中。通過使用 r.Results()
方法返回的通道,可以將測(cè)試結(jié)果發(fā)送到報(bào)告對(duì)象中進(jìn)行統(tǒng)計(jì)和處理。
接下來,主協(xié)程應(yīng)該不斷從 r.Results()
方法返回的通道中讀取數(shù)據(jù),匯總所有的處理結(jié)果,從而生成壓測(cè)報(bào)告。這個(gè)方法其實(shí)是被封裝在r.Stas()
方法中,具體如下:
func (r *report) Stats() <-chan Stats {// 創(chuàng)建一個(gè)channeldonec := make(chan Stats, 1)// 啟動(dòng)一個(gè)協(xié)程來執(zhí)行g(shù)o func() {defer close(donec)r.processResults()s := r.stats.copy()if r.sps != nil {s.TimeSeries = r.sps.getTimeSeries()}// 執(zhí)行完成的話,將結(jié)果返回donec <- s}()// 返回channelreturn donec
}// Stats方法啟動(dòng)的協(xié)程中,實(shí)際運(yùn)行的任務(wù)
func (r *report) processResults() {st := time.Now()// 遍歷r.results方法中channel中的數(shù)據(jù),然后執(zhí)行處理流程for res := range r.results {r.processResult(&res)}// 后續(xù)執(zhí)行一些具體的計(jì)算邏輯
}
上述代碼是 report
結(jié)構(gòu)體中的兩個(gè)方法,其中 Stats()
方法返回一個(gè)只讀的 Stats
通道。這個(gè)方法會(huì)在一個(gè)單獨(dú)的協(xié)程中執(zhí)行,并處理 results
通道中的測(cè)試結(jié)果。事實(shí)上就是匯總channel
中的數(shù)據(jù),然后進(jìn)行一定的處理,然后返回。
5. 總結(jié)
本文通過介紹并發(fā)編程中的數(shù)據(jù)匯總問題,提出了使用互斥鎖和通道來保證線程安全的方法。互斥鎖適用于臨界區(qū)保護(hù)和共享資源的互斥訪問,但可能存在死鎖和性能瓶頸的問題。相比之下,通道提供了更直觀和安全的協(xié)程間通信方式,避免了鎖的問題,并提供了更靈活的并發(fā)模式。
基于以上內(nèi)容的介紹,大概能夠明確下,在數(shù)據(jù)傳遞和匯總的場(chǎng)景下,使用channel
來實(shí)現(xiàn)可能是更為合適的,能夠提高代碼的可讀性和并發(fā)安全性。希望以上內(nèi)容對(duì)你有所幫助。