做網(wǎng)站建設(shè)百度網(wǎng)站的域名地址
溫馨提示:本文需要結(jié)合上一篇 gRPC 文章一起食用,否則可能看不懂。
前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我們再來稍微深入一點點,來看下 gRPC 中四種不同的通信模式。
gRPC 中四種不同的通信模式分別是:
- 一元 RPC
- 服務(wù)端流 RPC
- 客戶端流 RPC
- 雙向流 RPC
接下來松哥就通過四個完整的案例,來分別和向伙伴們演示這四種不同的通信模式。
1. 準(zhǔn)備工作
關(guān)于 gRPC 的基礎(chǔ)知識我們就不啰嗦了,咱們直接來看我今天的 proto 文件,如下:
這次我新建了一個名為 book.proto 的文件,這里主要定義了一些圖書相關(guān)的方法,如下:
syntax = "proto3";option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";package book;service BookService {rpc addBook(Book) returns (google.protobuf.StringValue);rpc getBook(google.protobuf.StringValue) returns (Book);rpc searchBooks(google.protobuf.StringValue) returns (stream Book);rpc updateBooks(stream Book) returns (google.protobuf.StringValue);rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}message Book {string id = 1;repeated string tags = 2;string name = 3;float price = 4;string author = 5;
}message BookSet {string id = 1;repeated Book bookList = 3;
}
這個文件中,有一些內(nèi)容我們在上篇文章中都講過了,講過的我就不再重復(fù)了,我說一些上篇文章沒有涉及到的東西:
- 由于我們在這個文件中,引用了 Google 提供的 StringValue(
google.protobuf.StringValue
),所以這個文件上面我們首先用 import 導(dǎo)入相關(guān)的文件,導(dǎo)入之后,才可以使用。 - 在方法參數(shù)和返回值中出現(xiàn)的 stream,就表示這個方法的參數(shù)或者返回值是流的形式(其實就是數(shù)據(jù)可以多次傳輸)。
- message 中出現(xiàn)了一個上篇文章沒有的關(guān)鍵字 repeated,這個表示這個字段可以重復(fù),可以簡單理解為這就是我們 Java 中的數(shù)組。
好了,和上篇文章相比,本文主要就是這幾個地方不一樣。
proto 文件寫好之后,按照上篇文章介紹的方法進行編譯,生成對應(yīng)的代碼,這里就不再重復(fù)了。
2. 一元 RPC
一元 RPC 是一種比較簡單的 RPC 模式,其實說白了我們上篇文章和大家介紹的就是一種一元 RPC,也就是客戶端發(fā)起一個請求,服務(wù)端給出一個響應(yīng),然后請求結(jié)束。
上面我們定義的五個方法中,addBook 和 getBook 都算是一種一元 RPC。
2.1 addBook
先來看 addBook 方法,這個方法的邏輯很簡單,我們提前在服務(wù)端準(zhǔn)備一個 Map 用來保存 Book,addBook 調(diào)用的時候,就把 book 對象存入到 Map 中,并且將 book 的 ID 返回,大家就這樣一件事,來看看服務(wù)端的代碼:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三國演義").setAuthor("羅貫中").setPrice(30).addTags("明清小說").addTags("通俗小說").build();Book b2 = Book.newBuilder().setId("2").setName("西游記").setAuthor("吳承恩").setPrice(40).addTags("志怪小說").addTags("通俗小說").build();Book b3 = Book.newBuilder().setId("3").setName("水滸傳").setAuthor("施耐庵").setPrice(50).addTags("明清小說").addTags("通俗小說").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void addBook(Book request, StreamObserver<StringValue> responseObserver) {bookMap.put(request.getId(), request);responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());responseObserver.onCompleted();}
}
看過上篇文章的小伙伴,我覺得這段代碼應(yīng)該很好理解。
客戶端調(diào)用方式如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);addBook(stub);}private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() = " + stringValue.getValue());}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("添加完畢");}});countDownLatch.await();}
}
這里我使用了 CountDownLatch 來實現(xiàn)線程等待,等服務(wù)端給出響應(yīng)之后,客戶端再結(jié)束。這里在回調(diào)的 onNext 方法中,我們就可以拿到服務(wù)端的返回值。
2.2 getBook
getBook 跟上面的 addBook 類似,先來看服務(wù)端代碼,如下:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三國演義").setAuthor("羅貫中").setPrice(30).addTags("明清小說").addTags("通俗小說").build();Book b2 = Book.newBuilder().setId("2").setName("西游記").setAuthor("吳承恩").setPrice(40).addTags("志怪小說").addTags("通俗小說").build();Book b3 = Book.newBuilder().setId("3").setName("水滸傳").setAuthor("施耐庵").setPrice(50).addTags("明清小說").addTags("通俗小說").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void getBook(StringValue request, StreamObserver<Book> responseObserver) {String id = request.getValue();Book book = bookMap.get(id);if (book != null) {responseObserver.onNext(book);responseObserver.onCompleted();} else {responseObserver.onCompleted();}}
}
這個 getBook 就是根據(jù)客戶端傳來的 id,從 Map 中查詢到一個 Book 并返回。
客戶端調(diào)用代碼如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);getBook(stub);}private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver<Book>() {@Overridepublic void onNext(Book book) {System.out.println("book = " + book);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("查詢完畢");}});countDownLatch.await();}
}
小伙伴們大概也能看出來,addBook 和 getBook 基本上操作套路是一模一樣的。
3. 服務(wù)端流 RPC
前面的一元 RPC,客戶端發(fā)起一個請求,服務(wù)端給出一個響應(yīng),請求就結(jié)束了。服務(wù)端流則是客戶端發(fā)起一個請求,服務(wù)端給一個響應(yīng)序列,這個響應(yīng)序列組成一個流。
上面我們給出的 searchBook 就是這樣一個例子,searchBook 是傳遞圖書的 tags 參數(shù),然后在服務(wù)端查詢哪些書的 tags 滿足條件,將滿足條件的書全部都返回去。
我們來看下服務(wù)端的代碼:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三國演義").setAuthor("羅貫中").setPrice(30).addTags("明清小說").addTags("通俗小說").build();Book b2 = Book.newBuilder().setId("2").setName("西游記").setAuthor("吳承恩").setPrice(40).addTags("志怪小說").addTags("通俗小說").build();Book b3 = Book.newBuilder().setId("3").setName("水滸傳").setAuthor("施耐庵").setPrice(50).addTags("明清小說").addTags("通俗小說").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic void searchBooks(StringValue request, StreamObserver<Book> responseObserver) {Set<String> keySet = bookMap.keySet();String tags = request.getValue();for (String key : keySet) {Book book = bookMap.get(key);int tagsCount = book.getTagsCount();for (int i = 0; i < tagsCount; i++) {String t = book.getTags(i);if (t.equals(tags)) {responseObserver.onNext(book);break;}}}responseObserver.onCompleted();}
}
小伙伴們看下,這段 Java 代碼應(yīng)該很好理解:
- 首先從 request 中提取客戶端傳來的 tags 參數(shù)。
- 遍歷 bookMap,查看每一本書的 tags 是否等于客戶端傳來的 tags,如果相等,說明添加匹配,則通過
responseObserver.onNext(book);
將這本書寫回到客戶端。 - 等所有操作都完成后,執(zhí)行
responseObserver.onCompleted();
,表示服務(wù)端的響應(yīng)序列結(jié)束了,這樣客戶端也就知道請求結(jié)束了。
我們來看看客戶端的代碼,如下:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);searchBook(stub);}private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);stub.searchBooks(StringValue.newBuilder().setValue("明清小說").build(), new StreamObserver<Book>() {@Overridepublic void onNext(Book book) {System.out.println(book);}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {countDownLatch.countDown();System.out.println("查詢完畢!");}});countDownLatch.await();}
}
客戶端的代碼好理解,搜索的關(guān)鍵字是 明清小說
,每當(dāng)服務(wù)端返回一次數(shù)據(jù)的時候,客戶端回調(diào)的 onNext 方法就會被觸發(fā)一次,當(dāng)服務(wù)端之行了 responseObserver.onCompleted();
之后,客戶端的 onCompleted 方法也會被觸發(fā)。
這個就是服務(wù)端流,客戶端發(fā)起一個請求,服務(wù)端通過 onNext 可以多次寫回數(shù)據(jù)。
4. 客戶端流 RPC
客戶端流則是客戶端發(fā)起多個請求,服務(wù)端只給出一個響應(yīng)。
上面的 updateBooks 就是一個客戶端流的案例,客戶端想要修改圖書,可以發(fā)起多個請求修改多本書,服務(wù)端則收集多次修改的結(jié)果,將之匯總?cè)缓笠淮涡苑祷亟o客戶端。
我們先來看看服務(wù)端的代碼:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三國演義").setAuthor("羅貫中").setPrice(30).addTags("明清小說").addTags("通俗小說").build();Book b2 = Book.newBuilder().setId("2").setName("西游記").setAuthor("吳承恩").setPrice(40).addTags("志怪小說").addTags("通俗小說").build();Book b3 = Book.newBuilder().setId("3").setName("水滸傳").setAuthor("施耐庵").setPrice(50).addTags("明清小說").addTags("通俗小說").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic StreamObserver<Book> updateBooks(StreamObserver<StringValue> responseObserver) {StringBuilder sb = new StringBuilder("更新的圖書 ID 為:");return new StreamObserver<Book>() {@Overridepublic void onNext(Book book) {bookMap.put(book.getId(), book);sb.append(book.getId()).append(",");}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());responseObserver.onCompleted();}};}
}
客戶端每發(fā)送一本書來,就會觸發(fā)服務(wù)端的 onNext 方法,然后我們在這方法中進行圖書的更新操作,并記錄更新結(jié)果。最后,我們在 onCompleted 方法中,將更新結(jié)果匯總返回給客戶端,基本上就是這樣一個流程。
我們再來看看客戶端的代碼:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);updateBook(stub);}private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);StreamObserver<Book> request = stub.updateBooks(new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue stringValue) {System.out.println("stringValue.getValue() = " + stringValue.getValue());}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {System.out.println("更新完畢");countDownLatch.countDown();}});request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());request.onCompleted();countDownLatch.await();}
}
在客戶端這塊,updateBooks 方法會返回一個 StreamObserver 對象,調(diào)用該對象的 onNext 方法就是給服務(wù)端傳遞數(shù)據(jù)了,可以傳遞多個數(shù)據(jù),調(diào)用該對象的 onCompleted 方法就是告訴服務(wù)端數(shù)據(jù)傳遞結(jié)束了,此時也會觸發(fā)服務(wù)端的 onCompleted 方法,服務(wù)端的 onCompleted 方法執(zhí)行之后,進而觸發(fā)了客戶端的 onCompleted 方法。
5. 雙向流 RPC
雙向流其實就是 3、4 小節(jié)的合體。即客戶端多次發(fā)送數(shù)據(jù),服務(wù)端也多次響應(yīng)數(shù)據(jù)。
我們先來看下服務(wù)端的代碼:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {private Map<String, Book> bookMap = new HashMap<>();private List<Book> books = new ArrayList<>();public BookServiceImpl() {Book b1 = Book.newBuilder().setId("1").setName("三國演義").setAuthor("羅貫中").setPrice(30).addTags("明清小說").addTags("通俗小說").build();Book b2 = Book.newBuilder().setId("2").setName("西游記").setAuthor("吳承恩").setPrice(40).addTags("志怪小說").addTags("通俗小說").build();Book b3 = Book.newBuilder().setId("3").setName("水滸傳").setAuthor("施耐庵").setPrice(50).addTags("明清小說").addTags("通俗小說").build();bookMap.put("1", b1);bookMap.put("2", b2);bookMap.put("3", b3);}@Overridepublic StreamObserver<StringValue> processBooks(StreamObserver<BookSet> responseObserver) {return new StreamObserver<StringValue>() {@Overridepublic void onNext(StringValue stringValue) {Book b = Book.newBuilder().setId(stringValue.getValue()).build();books.add(b);if (books.size() == 3) {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();}}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();responseObserver.onNext(bookSet);books.clear();responseObserver.onCompleted();}};}
}
這段代碼沒有實際意義,單純?yōu)榱私o小伙伴們演示雙向流,我的操作邏輯是客戶端傳遞多個 ID 到服務(wù)端,然后服務(wù)端根據(jù)這些 ID 構(gòu)建對應(yīng)的 Book 對象,然后三個三個一組,再返回給客戶端??蛻舳嗣看伟l(fā)送一個請求,都會觸發(fā)服務(wù)端的 onNext 方法,我們在這個方法中對請求分組返回。最后如果還有剩余的請求,我們在 onCompleted() 方法中返回。
再來看看客戶端的代碼:
public class BookServiceClient {public static void main(String[] args) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);processBook(stub);}private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);StreamObserver<StringValue> request = stub.processBooks(new StreamObserver<BookSet>() {@Overridepublic void onNext(BookSet bookSet) {System.out.println("bookSet = " + bookSet);System.out.println("=============");}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onCompleted() {System.out.println("處理完畢!");countDownLatch.countDown();}});request.onNext(StringValue.newBuilder().setValue("a").build());request.onNext(StringValue.newBuilder().setValue("b").build());request.onNext(StringValue.newBuilder().setValue("c").build());request.onNext(StringValue.newBuilder().setValue("d").build());request.onCompleted();countDownLatch.await();}
}
這個客戶端的代碼跟第四小節(jié)一模一樣,不再贅述了。
好啦,這就是松哥和小伙伴們介紹的 gRPC 的四種不同的通信模式,文章中只給出了一些關(guān)鍵代碼,如果小伙伴們沒看明白,建議結(jié)合上篇文章一起閱讀就懂啦~