江蘇省城鄉(xiāng)建設(shè)官網(wǎng)站免費(fèi)私人網(wǎng)站建設(shè)
用Java實(shí)現(xiàn)一個(gè)簡(jiǎn)單的KV數(shù)據(jù)庫(kù)
開(kāi)發(fā)思路:
用map存儲(chǔ)數(shù)據(jù),再用一個(gè)List記錄操作日志,開(kāi)一個(gè)新線程將List中的操作寫(xiě)入日志文件中,再開(kāi)一個(gè)線程用于網(wǎng)絡(luò)IO服務(wù)接收客戶端的命令,再啟動(dòng)時(shí)檢查日志,如果有數(shù)據(jù)就讀入map中
關(guān)于redis:
- 存儲(chǔ)結(jié)構(gòu):
- redis:
redis的數(shù)據(jù)保存其實(shí)比較復(fù)雜,使用一個(gè)哈希表保存所有鍵值對(duì),一個(gè)哈希表就是一個(gè)數(shù)組,數(shù)組的每一個(gè)元素是一個(gè)哈希桶,哈希桶中保存的是key和value的指針目錄,再通過(guò),指針去找對(duì)應(yīng)的key和value,當(dāng)然對(duì)于value是List等數(shù)據(jù)結(jié)構(gòu)還用到跳表,雙向列表,壓縮列表,整數(shù)數(shù)組等數(shù)據(jù)結(jié)構(gòu) - SimpleKVDB:
只用了Java的HashMap(偷懶~)
- redis:
- 線程:
- redis:
redis雖然成為單線程,但是redis的網(wǎng)絡(luò)IO和鍵值對(duì)讀寫(xiě)是由一個(gè)線程,但是另外的持久化,異步刪除,集群數(shù)據(jù)同步等,都是額外線程 - SimpleKVDB:
數(shù)據(jù)讀寫(xiě)網(wǎng)絡(luò)IO一個(gè)線程,持久化一個(gè)線程(集群同步本來(lái)想做但是后來(lái)沒(méi)有寫(xiě),也是新開(kāi)一條線程)
- redis:
- 網(wǎng)絡(luò)IO:
- redis:
單線程多路復(fù)用高性能IO模式 - SimpleKVDB:
直接用Java標(biāo)準(zhǔn)庫(kù)NIO,多路復(fù)用IO模式
- redis:
- 持久化:
- redis:
AOF操作日志,RDB快照,AOF用來(lái)記錄每一次的操作(增刪改)可以實(shí)時(shí)同步也可以每隔一個(gè)時(shí)間同步文件中,RDB全量數(shù)據(jù)快照但是需要開(kāi)一條子進(jìn)程開(kāi)銷(xiāo)比較大,redis4.0以后使用一種新的模式,RDB每隔一段時(shí)間全量快照內(nèi)存數(shù)據(jù),AOF記錄每個(gè)RDB之間的操作記錄,當(dāng)下一次全量RDB以后清空AOF再重新記錄操作日志 - SimpleKVDB
只記錄AOF操作日志,開(kāi)一個(gè)新線程,有新的操作就寫(xiě)入(后來(lái)我發(fā)現(xiàn)可以使用mmap內(nèi)存映射的方法,這樣更快效率更高)
- redis:
- 主從數(shù)據(jù)一致
- redis:
選一臺(tái)主服務(wù)器用于寫(xiě)入,從服務(wù)器用于讀取,主服務(wù)器有數(shù)據(jù)寫(xiě)入就同步從服務(wù)器,哨兵機(jī)制,用于監(jiān)控所有服務(wù)器,如果主服務(wù)器崩潰,就選擇一臺(tái)從服務(wù)器作為主服務(wù)器(會(huì)根據(jù)是否下線,網(wǎng)絡(luò)速度,讀寫(xiě)速度等選擇主服務(wù)器),然后通知其他從服務(wù)器連接到新的主服務(wù)器 - SimpleKVDB:
沒(méi)寫(xiě),設(shè)想:本來(lái)是想寫(xiě)一個(gè)配置文件,寫(xiě)入主服務(wù)器IP,其他從服務(wù)器IP,開(kāi)一個(gè)線程在服務(wù)端中寫(xiě)一個(gè)客戶端當(dāng)作主服務(wù)器,讀取配置文件,只有主服務(wù)器才能開(kāi)這個(gè)線程,其他從服務(wù)器還是開(kāi)啟服務(wù),用來(lái)接收主服務(wù)器的數(shù)據(jù),同步從數(shù)據(jù)庫(kù)的內(nèi)存和操作日志里
- redis:
操作展示:
客戶端:
服務(wù)端:
日志文件:
目錄結(jié)構(gòu):
- SimpleKVDB
- SimpleKVDBClient(客戶端)
- SimpleKVDBClient.java(客戶端)
- SimpleKVDBService(服務(wù)端)
- AofAnnotation.java (注解)
- AofInterface.java(接口)
- DynamicAgent.java(動(dòng)態(tài)代理)
- SimpleKVDBService.java(服務(wù)端)
- SimpleKVDBClient(客戶端)
SimpleKVDBClient.java(客戶端):
package SimpleKVDB.SimpleKVDBClient;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class SimpleKVDBClient {public static void main(String[] args) throws Exception {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);socketChannel.connect(new InetSocketAddress("127.0.0.1",5555));while (true){selector.select();//阻塞 等待事件發(fā)生Set<SelectionKey> selectionKeys = selector.selectedKeys();selectionKeys.forEach(key ->{try {if (key.isConnectable()){SocketChannel channel = (SocketChannel) key.channel();if (channel.isConnectionPending()){//是否正在連接channel.finishConnect(); //結(jié)束正在連接ByteBuffer writeBuffer = ByteBuffer.allocate(1024);writeBuffer.put((LocalDateTime.now() + " 連接成功").getBytes());writeBuffer.flip();channel.write(writeBuffer);//將buffer寫(xiě)入channelExecutorService service = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());service.submit(()->{//線程,從鍵盤(pán)讀入數(shù)據(jù)try {while (true){writeBuffer.clear();//清空bufferInputStreamReader input = new InputStreamReader(System.in);BufferedReader bufferedReader = new BufferedReader(input);String senderMessage = bufferedReader.readLine();writeBuffer.put(senderMessage.getBytes());writeBuffer.flip();channel.write(writeBuffer);}}catch (Exception e){e.printStackTrace();}});}channel.register(selector,SelectionKey.OP_READ);//注冊(cè)事件}else if (key.isReadable()){//channel 有信息的輸入SocketChannel channel = (SocketChannel) key.channel();//哪個(gè)channel 觸發(fā)了 readByteBuffer readBuffer = ByteBuffer.allocate(1024);int count = channel.read(readBuffer);//server發(fā)來(lái)的if (count > 0){String receiveMessage = new String(readBuffer.array(),0,count);System.out.println("響應(yīng)結(jié)果:"+receiveMessage);}}}catch (Exception e){e.printStackTrace();}finally {selectionKeys.clear();//移除已經(jīng)發(fā)生的事件}});}}
}
AofAnnotation.java(注解):
package SimpleKVDB.SimpleKVDBService;import java.lang.annotation.*;// ----------- 自定義的注解,用于區(qū)分是什么操作(其實(shí)也可以不用,直接獲取方法名區(qū)分也一樣) -----------
// 自定義的注解
@Retention(RetentionPolicy.RUNTIME)//注解會(huì)在class中存在,運(yùn)行時(shí)可通過(guò)反射獲取
@Target(ElementType.METHOD)//目標(biāo)是方法
@Documented
//文檔生成時(shí),該注解將被包含在javadoc中,可去掉
@interface AofAnnotation {String name() default "";
}
AofInterface.java(動(dòng)態(tài)代理接口):
package SimpleKVDB.SimpleKVDBService;// ----------- 動(dòng)態(tài)代理需要的接口,主要想實(shí)現(xiàn)切面效果在每一個(gè)操作后面加一個(gè)日志 -----------
// 動(dòng)態(tài)代理需要的接口
// 只需要給增刪改上加操作日志,保證數(shù)據(jù)一致性
interface AofInterface {
// @AofAnnotation(name="clear")
// int hashClear();@AofAnnotation(name="set")Object hashSet(String key, Object value);@AofAnnotation(name="remove")Object hashRemove(String key);
}
DynamicAgent.java(動(dòng)態(tài)代理):
package SimpleKVDB.SimpleKVDBService;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;// ----------- 動(dòng)態(tài)代理(實(shí)現(xiàn)切面效果的邏輯代碼) -----------
// 動(dòng)態(tài)代理
public class DynamicAgent<T> implements InvocationHandler {// 接口實(shí)現(xiàn)類(lèi)實(shí)例,如果不使用泛型,這里可以直接用ObjectT rent;void setObject(T obj){this.rent = obj;}// aof內(nèi)存List<String> listData;public void setListData(List<String> list){this.listData = list;}// 生成代碼類(lèi)public Object getProxy(){// 第一個(gè)參數(shù)是代理類(lèi)的類(lèi)加載器,第二個(gè)參數(shù)是代理類(lèi)要實(shí)現(xiàn)的接口,第三個(gè)參數(shù)是處理接口方法的程序// 這里代理類(lèi)是自己,所以直接this,getClass().getClassLoader()是獲取加載器// getClass().getInterfaces() 是獲取實(shí)現(xiàn)類(lèi)的接口// 因?yàn)閕nvoke()就是執(zhí)行方法,所以第三個(gè)參數(shù)也是本身thisreturn Proxy.newProxyInstance(this.getClass().getClassLoader(), rent.getClass().getInterfaces(),this);}// 處理代理實(shí)例,并返回執(zhí)行結(jié)果public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 動(dòng)態(tài)代理本質(zhì)就是通過(guò)反射實(shí)現(xiàn),這里就是執(zhí)行這個(gè)對(duì)象的方法Object result = method.invoke(rent, args);// 獲取注解AofAnnotation say = method.getAnnotation(AofAnnotation.class);// 注解的name內(nèi)容String name = say.name();System.out.println("name::"+name);// aof日志寫(xiě)入aofSetLog(name, args);return result;}// 給aof開(kāi)辟一個(gè)內(nèi)存public void aofSetLog(String name, Object[] args){Map<String, Object> dataMap = new HashMap<String, Object>();// 日志格式String aofData = "*|";if("set".equals(name)){dataMap.put(args[0].toString(), args[1]);aofData = aofData + name+"|"+args[0].toString()+"|"+dataMap.get(args[0].toString());}if("remove".equals(name)){if(null != dataMap && dataMap.size()>0){dataMap.remove(args[0].toString());}aofData = aofData + name+"|"+args[0].toString()+"|";}// 日志內(nèi)存listData.add(aofData);
// System.out.println("listData:::"+listData);}// 返回日志數(shù)據(jù)public List<String> getAofDatas(){return listData;}
}
SimpleKVDBService.java(服務(wù)端):
package SimpleKVDB.SimpleKVDBService;import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;// ----------- KV數(shù)據(jù)庫(kù)的服務(wù)端實(shí)現(xiàn) -----------
public class SimpleKVDBService implements AofInterface {// 全局存儲(chǔ)Map<String, Object> globalMap;public void setGlobalMap(Map<String, Object> map){this.globalMap = map;}// 動(dòng)態(tài)代理對(duì)象AofInterface dl;public void setAofInterface(AofInterface i){this.dl = i;}// 寫(xiě)入修改操作public Object hashSet(String key, Object value){return globalMap.put(key, value);}// 讀取操作public Object hashGet(String key){return globalMap.get(key);}// 刪除操作public Object hashRemove(String key){return globalMap.remove(key);}// 獲取長(zhǎng)度操作public int hashSize(){return globalMap.size();}// 是否為空操作操作public boolean hashIsEmpty(){return globalMap.isEmpty();}// aof日志List<String> aofList;// 引用全局aof日志變量,用來(lái)存儲(chǔ)aof操作日志public void setAofList(List<String> list){this.aofList = list;}// 創(chuàng)建aof文件public File createAofFile(){final String ROOT = '.' + File.separator;File newFolder = new File(ROOT+"simpleKVDB");if(newFolder.exists() && newFolder.isDirectory()){System.out.println("文件夾已經(jīng)存在");}else {boolean isFolder = newFolder.mkdir();if(!isFolder){System.out.println("文件夾創(chuàng)建失敗");}}// 創(chuàng)建一個(gè)文件File newFile = new File(newFolder.getPath(),"aofDatas.aof");if(newFile.exists() && newFile.isFile()){System.out.println("文件已經(jīng)存在");}boolean isFile;try {isFile = newFile.createNewFile();if(!isFile){System.out.println("文件創(chuàng)建失敗");}} catch (IOException e) {e.printStackTrace();}return newFile;}// 開(kāi)一個(gè)線程,寫(xiě)aof寫(xiě)入文件public void aofFileThread() {new Thread(()->{System.out.println("aof日志寫(xiě)入線程:"+Thread.currentThread().getName());while (true){this.setAofFile(this.aofList);}}).start();}// aof寫(xiě)入日志文件邏輯,將aof操作日志寫(xiě)入文件中,持久化public void setAofFile(List<String> aofList){if(null != aofList && aofList.size()>0){// 休眠一秒再寫(xiě)入,不頻繁使用IO寫(xiě)入try{Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 為什么文件夾和文件檢測(cè)放這里每次都要檢測(cè)是防止文件被誤刪除File newFile = this.createAofFile();// 使用try的話自動(dòng)回收/關(guān)閉資源,會(huì)自動(dòng)調(diào)用close方法,不需要手動(dòng)關(guān)閉// 將需要關(guān)閉的資源放在try(xxx; yyy;zzz;)// 流的關(guān)閉是有順序的,自己手動(dòng)關(guān)閉很繁瑣,自動(dòng)關(guān)閉大大降低了難度,非常方便try(// 創(chuàng)建一個(gè)FileOutputStream,Output是寫(xiě)入,文件的byte數(shù)據(jù)傳輸流// FileOutputStream 第二參數(shù)是否追加FileOutputStream fos = new FileOutputStream(newFile, true);// FileOutputStream是通過(guò)byte字節(jié)流的,OutputStreamWriter是將字節(jié)流包裝成想要的字符集的字符流寫(xiě)入OutputStreamWriter osw = new OutputStreamWriter(fos, StandardCharsets.UTF_8);// 使用PrintWriter,可以方便的寫(xiě)入一行字符,第二個(gè)參數(shù)自動(dòng)清空緩沖區(qū)PrintWriter pw = new PrintWriter(osw, true);){// 一邊遍歷一邊刪除aof操作日志Iterator<String> iterator = aofList.iterator();// 判斷是否還有下一個(gè)元素while (iterator.hasNext()){// 獲取下一個(gè)元素String str = iterator.next();// println是每段換行寫(xiě)入,print是不換行寫(xiě)入// 寫(xiě)入其實(shí)是一層一層走的,先是寫(xiě)入內(nèi)容進(jìn)入PrintWriter中,然后再OutputStreamWriter根據(jù)編碼轉(zhuǎn)成字節(jié)byte,然后再是FileOutputStream字節(jié)流寫(xiě)入文件pw.println(str);// 因?yàn)槭且脗鬟f,所以直接刪除元素iterator.remove();}// 清空緩沖區(qū),因?yàn)閿?shù)據(jù)是先進(jìn)入緩沖區(qū)再寫(xiě)入文件,需要在關(guān)閉前將緩沖區(qū)的數(shù)據(jù)全部寫(xiě)入文件才算完成,這樣才能關(guān)閉整個(gè)流,緩存區(qū)的作用是,一個(gè)字節(jié)一個(gè)字節(jié)寫(xiě)入太費(fèi)事兒,所以會(huì)等到一定量的字節(jié)再一起寫(xiě)入,所以會(huì)出現(xiàn)一種可能就是緩存區(qū)還有少量的字節(jié)因?yàn)闆](méi)達(dá)到量沒(méi)有寫(xiě)入,所以需要清空一下,將里面所有剩余的字節(jié)都寫(xiě)入// PrintWriter中設(shè)置了自動(dòng)清空緩沖區(qū)
// pw.flush();}catch (IOException e){e.printStackTrace();}}}// socket服務(wù),與客戶端通訊public void socketServer(AofInterface dl){try {//創(chuàng)建ServerSocketChannel,-->> ServerSocket// 打開(kāi)通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 打開(kāi) SocketChannel 并連接到端口InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);serverSocketChannel.socket().bind(inetSocketAddress);// 配置通道為非阻塞模式serverSocketChannel.configureBlocking(false);//開(kāi)啟selector,并注冊(cè)accept事件// 獲取一個(gè)選擇器實(shí)例Selector selector = Selector.open();// 將套接字通過(guò)到注冊(cè)到選擇器serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true){// 阻塞,等待事件發(fā)生selector.select();// 返回已發(fā)生的注冊(cè)事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 判斷事件類(lèi)型,進(jìn)行相應(yīng)操作selectionKeys.forEach(key ->{final SocketChannel client;try {// 根據(jù)key獲得channelif (key.isAcceptable()){// 之所以轉(zhuǎn)換ServerSocketChannel,因?yàn)榍懊孀?cè)的就是這個(gè)類(lèi)ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();// 新的channel 和客戶端建立了通道client = serverChannel.accept();// 非阻塞client.configureBlocking(false);// 將新的channel和selector,綁定client.register(selector,SelectionKey.OP_READ);//是否有數(shù)據(jù)可讀}else if (key.isReadable()){client = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int count = client.read(readBuffer);if (count>0){readBuffer.flip();Charset charset = StandardCharsets.UTF_8;String receiveMassage = String.valueOf(charset.decode(readBuffer).array());// 顯示哪個(gè)client發(fā)消息System.out.println(client +": "+receiveMassage);// 向客戶端返回的信息String serverStr = "";// 根據(jù)客戶端不同的命令,執(zhí)行不同的方法if(Objects.equals(receiveMassage.split(" ")[0], "set")){dl.hashSet(receiveMassage.split(" ")[1], receiveMassage.split(" ")[2]);serverStr = "set OK";}if(Objects.equals(receiveMassage.split(" ")[0], "remove")){dl.hashRemove(receiveMassage.split(" ")[1]);serverStr = "remove OK";}if(Objects.equals(receiveMassage.split(" ")[0], "get")){serverStr = this.hashGet(receiveMassage.split(" ")[1]).toString();}if(Objects.equals(receiveMassage.split(" ")[0], "isempty")){serverStr = String.valueOf(this.hashIsEmpty());}if(Objects.equals(receiveMassage.split(" ")[0], "size")){serverStr = String.valueOf(this.hashSize());}if(receiveMassage.contains("連接成功")){serverStr = receiveMassage;}SocketChannel channel = (SocketChannel) key.channel();;ByteBuffer writeBuffer = ByteBuffer.allocate(1024);//返回客戶端數(shù)據(jù)writeBuffer.put((serverStr).getBytes());writeBuffer.flip();channel.write(writeBuffer);}}// 處理完事件一定要移除//selectionKeys.clear();}catch (Exception e){e.printStackTrace();}finally {// 處理完事件一定要移除selectionKeys.clear();}});}}catch (IOException e){e.printStackTrace();}}// socket服務(wù)線程public void socketThread(){new Thread(()->{System.out.println("socketServer線程:"+Thread.currentThread().getName());this.socketServer(this.dl);}).start();}// 啟動(dòng)時(shí)檢查持久化aof日志文件public void setAofToMap(){System.out.println("開(kāi)始從AOF中恢復(fù)數(shù)據(jù)!");File readFile = this.createAofFile();// 使用try的話自動(dòng)回收/關(guān)閉資源,會(huì)自動(dòng)調(diào)用close方法,不需要手動(dòng)關(guān)閉// 將需要關(guān)閉的資源放在try(xxx; yyy;zzz;)// 流的關(guān)閉是有順序的,自己手動(dòng)關(guān)閉很繁瑣,自動(dòng)關(guān)閉大大降低了難度,非常方便try(// 創(chuàng)建一個(gè)FileInputStream,Input是寫(xiě)入,文件的byte數(shù)據(jù)傳輸流FileInputStream fis = new FileInputStream(readFile);// FileInputStream是通過(guò)byte字節(jié)流的,InputStreamReader是將字節(jié)流包裝成想要的字符集的字符流寫(xiě)入InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8);// 使用BufferedReader,增加緩存,可以方便的寫(xiě)入一行字符BufferedReader reader = new BufferedReader(isr);){// reader.lines().map(String::trim).forEach(System.out::println); 這是一種lambda寫(xiě)法,效果和下面一樣String str;// 為什么要放在while的條件里面賦值呢?是因?yàn)閞eadLine()一行一行讀取如果到文件結(jié)尾了會(huì)返回一個(gè)null,如果放在while的代碼體里賦值,就需要多一步null的判斷// 讀取和寫(xiě)入正好相反,是先從文件讀取內(nèi)容到緩存區(qū),然后從緩存區(qū)讀出來(lái)while ((str = reader.readLine()) != null){String methodStr = str.split("\\|")[1];String keyStr = str.split("\\|")[2];// 根據(jù)不同指令操作不同方法if("set".equals(methodStr)){Object valueStr = str.split("\\|")[3];this.hashSet(keyStr, valueStr);}if("remove".equals(methodStr)){this.hashRemove(keyStr);}}System.out.println("AOF中恢復(fù)數(shù)據(jù)結(jié)束!");} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {System.out.println("主線程: "+Thread.currentThread().getName());// 全局內(nèi)存Map<String, Object> maps = new HashMap<>();// 全局aof日志內(nèi)存List<String> lists = new ArrayList<>();// 服務(wù)主體類(lèi)SimpleKVDBService sKvService = new SimpleKVDBService();// 全局存儲(chǔ)內(nèi)存sKvService.setGlobalMap(maps);// 動(dòng)態(tài)代理,主要是用于給操作添加日志DynamicAgent<AofInterface> nd = new DynamicAgent<AofInterface>();// 全局aof內(nèi)存nd.setListData(lists);nd.setObject(sKvService);// 獲取代理對(duì)象AofInterface dl = (AofInterface) nd.getProxy();// 啟動(dòng)時(shí)檢查aof文件是否存在sKvService.setAofToMap();// 服務(wù)主體獲取已經(jīng)有日志信息的aof日志信息sKvService.setAofList(nd.getAofDatas());// 引用動(dòng)態(tài)代理sKvService.setAofInterface(dl);// 子線程,寫(xiě)aof寫(xiě)入文件sKvService.aofFileThread();// 子線程,socket服務(wù)線程sKvService.socketThread(); System.out.println(sKvService.globalMap);
System.out.println("22222:"+nd.getAofDatas());
System.out.println("list:"+sKvService.aofList);
System.out.println("333333:"+sKvService.globalMap);}
}