陽谷網(wǎng)站建設公司網(wǎng)店運營教學
個人隨筆 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu)
1. leveldb的Compaction全局線程
在leveldb中,有一個全局的后臺線程BGThread,用于數(shù)據(jù)庫的MinorCompact與MajorCompact。
重點關注“全局線程”:
這個標識著無論一個進程打開多少個leveldb庫,該Compact線程只有一個;
如果在一個程序中,打開了許多庫在讀寫,多個庫都觸發(fā)了MinorCompact,那么這些MinorCompact將在Compact線程中依次執(zhí)行;
也因為是全局線程,所以需要觸發(fā)minorcompact的db線程,把db的this指針傳遞給Compact線程中來。
基于此,來看一下代碼上的實現(xiàn):
2. 任務Schedule部分,建立了一個創(chuàng)建一次的全局子線程
通過started_backgroud_thread_來標識是否已創(chuàng)建;
定義了一個std::thread backgroud_thread,并detach掉,形成一個全局的子線程;
把任務函數(shù)與參數(shù)放入到backgroud_work_queue_中;
void WindowsEnv::Schedule(void (*background_work_function)(void* background_work_arg),void* background_work_arg) {background_work_mutex_.Lock();// Start the background thread, if we haven't done so already.if (!started_background_thread_) {started_background_thread_ = true;std::thread background_thread(WindowsEnv::BackgroundThreadEntryPoint, this);background_thread.detach();}// If the queue is empty, the background thread may be waiting for work.if (background_work_queue_.empty()) {background_work_cv_.Signal();}background_work_queue_.emplace(background_work_function, background_work_arg);background_work_mutex_.Unlock();
}
3. 任務線程執(zhí)行部分,不斷的從隊列中取任務函數(shù)與參數(shù),并執(zhí)行任務函數(shù)
從backgroud_work_queue_中取出任務函數(shù)與參數(shù),執(zhí)行函數(shù);
其中為了避免queue中一直沒有數(shù)據(jù)一直取的情況,使用了一個backgroupd_work_cv來在為空時等待;
static void BackgroundThreadEntryPoint(WindowsEnv* env) {env->BackgroundThreadMain();
}
void WindowsEnv::BackgroundThreadMain() {while (true) {background_work_mutex_.Lock();// Wait until there is work to be done.while (background_work_queue_.empty()) {background_work_cv_.Wait();}assert(!background_work_queue_.empty());auto background_work_function = background_work_queue_.front().function;void* background_work_arg = background_work_queue_.front().arg;background_work_queue_.pop();background_work_mutex_.Unlock();background_work_function(background_work_arg);}
}
4. 在需要觸發(fā)Compact的地方,傳入任務函數(shù)BGWork與當前庫的指針
調(diào)用MaybeScheduleCompaction,在其中判定一個需要Compact的場景,排除掉一些不需要發(fā)起Compact的場景:
- 已經(jīng)在執(zhí)行Compact的場景除外;
- 已經(jīng)關閉完畢的場景除外;
- 該數(shù)據(jù)庫后臺已經(jīng)有問題的場景除外;
- 沒有需要Compact的場景除外:沒有待執(zhí)行的Minorcompact+也沒有待執(zhí)行的手動Compaction+沒有待執(zhí)行的size-compaction/file-compaction;
void DBImpl::MaybeScheduleCompaction() {mutex_.AssertHeld();if (background_compaction_scheduled_) {// Already scheduled}else if (shutting_down_.load(std::memory_order_acquire)) {// DB is being deleted; no more background compactions}else if (!bg_error_.ok()) {// Already got an error; no more changes}else if (imm_ == nullptr && manual_compaction_ == nullptr &&!versions_->NeedsCompaction()) {// No work to be done}else {background_compaction_scheduled_ = true;env_->Schedule(&DBImpl::BGWork, this);}
}
4. 任務函數(shù)BGWork又做了什么呢?
BGWork是一層包裝,直接調(diào)用數(shù)據(jù)db參數(shù)的方法BackgroundCall;
- 里面又檢查了一次數(shù)據(jù)庫已經(jīng)關閉的場景和數(shù)據(jù)庫任務出錯情況,因為從任務排到隊列中,到排到任務執(zhí)行,可能是要等一段時間的,在這個過程中,也可能條件改變了。
- 接著執(zhí)行BackgroundCompaction來做該db的Compaction;
- 做完Compaction之后,把background_compaction_scheduled設定為false,以允許MaybeScheduleCompaction再次進入;
- 接著就調(diào)用MaybeScheduleCompaction函數(shù),再檢查一邊這個數(shù)據(jù)有沒有待compaction內(nèi)容未做完,例如這一次只做了MinorCompact,還有MajorCompact要做,再次去排隊等執(zhí)行;
- 然后再來標識庫的db后臺任務項當前執(zhí)行完了,發(fā)送一個信號background_work_finished_signal_;
background_work_finished_signal_這個信號量通常用來通知寫線程或關閉線程,通知該db的后臺一項任務做完了,通知等待線程可以繼續(xù)向后執(zhí)行了。
void DBImpl::BGWork(void* db) {reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}void DBImpl::BackgroundCall() {MutexLock l(&mutex_);assert(background_compaction_scheduled_);if (shutting_down_.load(std::memory_order_acquire)) {// No more background work when shutting down.}else if (!bg_error_.ok()) {// No more background work after a background error.}else {BackgroundCompaction();}background_compaction_scheduled_ = false;// Previous compaction may have produced too many files in a level,// so reschedule another compaction if needed.MaybeScheduleCompaction();background_work_finished_signal_.SignalAll();
}
個人隨筆 (Owed by: 春夜喜雨 http://blog.csdn.net/chunyexiyu)