網(wǎng)站解析查詢?nèi)W(wǎng)營銷渠道
etcd是存儲鍵值數(shù)據(jù)的服務器
客戶端通過長連接watch實時更新數(shù)據(jù)
場景:
當主機A給服務器存儲 name: 小王
主機B從服務器中查name ,得到name-小王
當主機A更改name 小李
服務器實時通知主機B name 已經(jīng)被更改成小李了。
應用:服務注冊與發(fā)現(xiàn)
?
?
?
更改配置
若需要修改,則可以配置:/etc/default/etcd
sudo vi /etc/profile
在最后加上 export ETCDCTL_API=3 來確定etcd的版本 (每個終端都要設置,報錯了可以嘗試查看該終端上是否更改了etcd的版本)
??
??
使用
1.啟動 sudo systemctl start etcd
source /etc/profile
2.添加 etcdctl put
3.查找 etcdctl get
4.刪除 etcdctl del
??
?
一些接口的使用
?
?
使用樣例
put:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>int main(int argc,char * argv[])
{//創(chuàng)建Clientconst string Host_url="http://127.0.0.1:2379"; etcd::Client client(Host_url); //用url初始化客戶端//獲取lease_id//keep_alive是一個?;顚ο?#xff0c;leasekeepalive(5),指的是生命周期為5s,//leasekeepalive()函數(shù)返回一個異步操作的對象,get()函數(shù)是指等待該異步對象操作完成,操作失敗會拋出異常auto keep_alive=client.leasekeepalive(5).get(); //keep_alive是一個//獲取租約的id,用于putauto leaseid=keep_alive->Lease();cout<<leaseid<<endl;//插入鍵值,傳入key-value 以及l(fā)easeid,put操作返回異步對象auto ret=client.put("/service/user","127.0.0.1:8080",leaseid).get();if(ret.is_ok()==false){cout<<"插入鍵值失敗了"<<endl;exit(1);}auto ret2=client.put("/service/friend","127.0.0.1:8081",leaseid).get();if(ret.is_ok()==false){cout<<"插入鍵值失敗了"<<endl;exit(1);}//暫停該進程std::this_thread::sleep_for(std::chrono::seconds(10));
}
源碼:
初始化
?
獲取保活對象,異步對象
?
?
?
get:
#include<iostream>
#include<etcd/Client.hpp>
#include<etcd/KeepAlive.hpp>
#include<etcd/Response.hpp>
#include<etcd/Value.hpp>
#include<etcd/Watcher.hpp>
//自己設置的回調函數(shù)。
//
void cback(const etcd::Response& re)
{//照搬t(yī)xt目錄下的 watch檢查是否出錯if (re.error_code()) {std::cout << "Watcher " << re.watch_id() << " fails with "<< re.error_code() << ": " << re.error_message() << std::endl;}//源碼 class Event {// public:// enum class EventType {// PUT,// DELETE_,// INVALID,// };//匹配事件for(const auto& e:re.events()){if(e.event_type()==etcd::Event::EventType::PUT){cout<<"你的key-value已經(jīng)發(fā)生了改變"<<endl;cout<<"之前的key::"<<e.prev_kv().key()<<"-value"<<e.prev_kv().as_string()<<endl;cout<<"之前的key::"<<e.kv().key()<<"-value"<<e.kv().as_string()<<endl;}else if(e.event_type()==etcd::Event::EventType::DELETE_){ cout<<"你的value已經(jīng)被刪除了"<<endl;cout<<"之前的key::"<<e.prev_kv().key()<<"-value:"<<e.prev_kv().as_string()<<endl; }}
}
int main()
{const string Host_url="http://127.0.0.1:2379";//根據(jù)庫中指定url初始化客戶端etcd::Client client(Host_url);//ls是指獲取該/service key值下的所有value,在路徑查找中通常只需要設置一個目錄即可找到該目錄下全部value值。返回異步對象auto resp=client.ls("/service").get();if(resp.is_ok()==false){cout<<"獲取信息無效"<<endl;exit(1);}//獲取keys,指的是符合/service下的文件名個數(shù),以便于遍歷auto sz=resp.keys().size();cout<<sz<<endl;for(int i=0;i<sz;i++){cout<<resp.value(i).as_string()<<"可以提供"<<resp.key(i)<<"服務"<<endl;}//監(jiān)控裝置,監(jiān)視/service下的所有key-value,通常只監(jiān)控它是否修改和是否刪除//需要自己設置cback回調函數(shù)auto watcher=etcd::Watcher(client, "/service",cback, true);watcher.Wait();//等待。相當于啟動監(jiān)聽裝置return 0;
}
源碼:
?
?
?
?
tips:txt中有各種test的使用樣例
???
???
封裝客戶端
二次封裝:封裝etcd-client-api,
? ? ? ? ?實現(xiàn)兩種類型的客戶端
? ? ?? ?1.服務注冊客戶端:向服務器新增服務信息數(shù)據(jù),并進行?;?/p>
????????2.服務發(fā)現(xiàn)客戶端:從服務器查找服務信息數(shù)據(jù),并進行改變事件監(jiān)控封裝的時候,我們盡量減少模塊之間的耦合度,本質上etcd是一個鍵值存儲系統(tǒng),并不是專門用于作為注冊中心進行服務注冊和發(fā)現(xiàn)的。
封裝思想:
?????????1.封裝服務注冊客戶端類提供一個接口:向服務器新增數(shù)據(jù)并進行保活參數(shù):注冊中心地址(etcd服務器地址),新增的服務信息(服務名-主機地址鍵值對)封裝服務發(fā)現(xiàn)客戶端類
服務下線事件接口(數(shù)據(jù)刪除)??
? ? ? ? 2.封裝服務發(fā)現(xiàn)客戶端類
提供兩個設置回調函數(shù)的接口:服務上線事件接口(數(shù)據(jù)新增),服務下線事件接口(數(shù)據(jù)刪除)
代碼
#pragma once
#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"#include <functional>
namespace common{
class Rigistry
{
public:using ptr=std::shared_ptr<Rigistry>;Rigistry(const string & Host): _client(std::make_shared<etcd::Client>(Host)), _keep_alive(_client->leasekeepalive(5).get()), _leaseid(_keep_alive->Lease()){}~Rigistry() { _keep_alive->Cancel(); }bool registry(const std::string& service, const std::string& host){auto ret = _client->put(service, host, _leaseid).get();if (ret.is_ok() == false){LOG_ERROR("客戶端服務注冊失敗,{}", ret.error_message());return false;}return true;}private:std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::KeepAlive> _keep_alive;int64_t _leaseid;
};class Discovery
{
public:using ptr=std::shared_ptr<Discovery>;using NotifyCallback = std::function<void(const std::string&, const std::string&)>;Discovery(const std::string &Host,const std::string &basedir,const NotifyCallback& put_cb,const NotifyCallback& del_cb)//在 Discovery 對象構造的過程中,online 和 offonline 會發(fā)生隱式轉換 轉換成NotifyCallback類型//因此得+const & 或者值傳遞的方式: _put_cb(put_cb), _del_cb(del_cb), _client(std::make_shared<etcd::Client>(Host)){auto resp = _client->ls(basedir).get();if (resp.is_ok() == false){LOG_ERROR("客戶端服務獲取信息失敗,{}", resp.error_message());}auto sz = resp.keys().size();for (int i = 0; i < sz; i++){if(put_cb){put_cb(resp.key(i),resp.value(i).as_string());LOG_DEBUG("新增服務:{}-{}",resp.key(i),resp.value(i).as_string());}}_watcher=(std::make_shared<etcd::Watcher>(*_client.get(), basedir, std::bind(&Discovery::cback, this, std::placeholders::_1), true));// Watcher(Client const& client, 。。。要求傳入client,// 我們的_client被用shared_ptr封裝了起來,得解引用 + get();}private:void cback(const etcd::Response &re){if (re.is_ok()==false){std::cout <<"收到一個錯誤的事間通知"<<re.error_message() << std::endl;LOG_ERROR("客戶端服務回調函數(shù)信息失敗,{}", re.error_message());}for (const auto &e : re.events()){if (e.event_type() == etcd::Event::EventType::PUT){if(_put_cb){_put_cb(e.kv().key(),e.kv().as_string());}LOG_DEBUG("新增服務:{}-{}",e.kv().key(),e.kv().as_string());}else if (e.event_type() == etcd::Event::EventType::DELETE_){if(_del_cb){_del_cb(e.prev_kv().key(),e.prev_kv().as_string());}LOG_DEBUG("下線服務:{}-{}",e.prev_kv().key(),e.prev_kv().as_string());}}}private:NotifyCallback _put_cb;NotifyCallback _del_cb;std::shared_ptr<etcd::Client> _client;std::shared_ptr<etcd::Watcher> _watcher;
};
}
?
?