pika作為類redis的存儲系統(tǒng),為了彌補在性能上的不足,在整個系統(tǒng)中大量使用多線程的結(jié)構(gòu),涉及到多線程編程,勢必需要為線程加鎖來保證數(shù)據(jù)訪問的一致性和有效性。其中主要用到了三種鎖
應用掛起指令,在掛起指令的執(zhí)行中,會添加寫鎖,以確保,此時沒有其他指令執(zhí)行。其他的普通指令在會添加讀鎖,可以并行訪問。 其中掛起指令有:
保證當前服務器在執(zhí)行掛起指令時,起到阻寫作用。
行鎖,用于對一個key加鎖,保證同一時間只有一個線程對一個key進行操作。
pika中存取的數(shù)據(jù)都是類key,value數(shù)據(jù),不同key所對應的數(shù)據(jù)完全獨立,所以只需要對key加鎖可以保證數(shù)據(jù)在并發(fā)訪問時的一致性,行鎖相對來說,鎖定粒度小,也可以保證數(shù)據(jù)訪問的高效性。
在pika系統(tǒng)中,對于數(shù)據(jù)庫的操作都需要添加行鎖,主要在應用于兩個地方,在系統(tǒng)上層指令過程中和在數(shù)據(jù)引擎層面。在pika系統(tǒng)中,對于寫指令(會改變數(shù)據(jù)狀態(tài),如SET,HSET)需要除了更新數(shù)據(jù)庫狀態(tài),還涉及到pika的增量同步,需要在binlog中添加所執(zhí)行的寫指令,用于保證master和slave的數(shù)據(jù)庫狀態(tài)一致。故一條寫指令的執(zhí)行,主要有兩個部分:
其加鎖情況,如下圖:
在圖中可以看到,對同一個key,加了兩次行鎖,在實際應用中,pika上所加的鎖就已經(jīng)能夠保證數(shù)據(jù)訪問的正確性。如果只是為了pika所需要的業(yè)務,nemo層面使用行鎖是多余的,但是nemo的設計初衷就是通過對rocksdb的改造和封裝提供一套完整的類redis數(shù)據(jù)訪問的解決方案,而不僅僅是為pika提供數(shù)據(jù)庫引擎。這種設計思路也是秉承了Unix中的設計原則:Write programs that do one thing and do it well。
這樣設計大大降低了pika與nemo之間的耦合,也使得nemo可以被單獨拿出來測試和使用,在pika中的數(shù)據(jù)遷移工具就是完全使用nemo來完成,不必依賴任何pika相關(guān)的東西。另外對于nemo感興趣或者有需求的團隊也可以直接將nemo作為數(shù)據(jù)庫引擎而不需要修改任何代碼就能使用完整的數(shù)據(jù)訪問功能。
在pika系統(tǒng)中,一把行鎖就可以維護所有key。在行鎖的實現(xiàn)上是將一個key與一把互斥鎖相綁定,并將其放入哈希表中維護,來保證每次訪問key的線程只有一個,但是不可能也不需要為每一個key保留一把互斥鎖,只需要當有多條線程訪問同一個key時才需要鎖,在所有線程都訪問結(jié)束之后,就可以銷毀這個綁定key的互斥鎖,釋放資源。具體實現(xiàn)如下:
class RecordLock {
public:
RecordLock(port::RecordMutex *mu, const std::string &key)
: mu_(mu), key_(key) {
mu_->Lock(key_);
}
~RecordLock() { mu_->Unlock(key_); }
private:
port::RecordMutex *const mu_;
std::string key_;
// No copying allowed
RecordLock(const RecordLock&);
void operator=(const RecordLock&);
};
void RecordMutex::Lock(const std::string &key) {
mutex_.Lock();
std::unordered_map<std::string, RefMutex *>::const_iterator it = records_.find(key);
if (it != records_.end()) {
//log_info ("tid=(%u) >Lock key=(%s) exist, map_size=%u", pthread_self(), key.c_str(), records_.size());
RefMutex *ref_mutex = it->second;
ref_mutex->Ref();
mutex_.Unlock();
ref_mutex->Lock();
//log_info ("tid=(%u) <Lock key=(%s) exist", pthread_self(), key.c_str());
} else {
//log_info ("tid=(%u) >Lock key=(%s) new, map_size=%u ++", pthread_self(), key.c_str(), records_.size());
RefMutex *ref_mutex = new RefMutex();
records_.insert(std::make_pair(key, ref_mutex));
ref_mutex->Ref();
mutex_.Unlock();
ref_mutex->Lock();
//log_info ("tid=(%u) <Lock key=(%s) new", pthread_self(), key.c_str());
}
}
完整代碼可參考:slash_mutex.cc slash_mutex.h
更多建議: