中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁(yè) > news >正文

如何做網(wǎng)站與網(wǎng)頁(yè)微信營(yíng)銷軟件

如何做網(wǎng)站與網(wǎng)頁(yè),微信營(yíng)銷軟件,網(wǎng)站開(kāi)發(fā)實(shí)訓(xùn)日記,優(yōu)設(shè)網(wǎng)網(wǎng)址一、 分布式遠(yuǎn)程服務(wù)(Remote Service) 基于Redis的Java分布式遠(yuǎn)程服務(wù),可以用來(lái)通過(guò)共享接口執(zhí)行存在于另一個(gè)Redisson實(shí)例里的對(duì)象方法。換句話說(shuō)就是通過(guò)Redis實(shí)現(xiàn)了Java的遠(yuǎn)程過(guò)程調(diào)用(RPC)。分布式遠(yuǎn)程服務(wù)基于可…

一、?分布式遠(yuǎn)程服務(wù)(Remote Service)

基于Redis的Java分布式遠(yuǎn)程服務(wù),可以用來(lái)通過(guò)共享接口執(zhí)行存在于另一個(gè)Redisson實(shí)例里的對(duì)象方法。換句話說(shuō)就是通過(guò)Redis實(shí)現(xiàn)了Java的遠(yuǎn)程過(guò)程調(diào)用(RPC)。分布式遠(yuǎn)程服務(wù)基于可以用POJO對(duì)象,方法的參數(shù)和返回類不受限制,可以是任何類型。

分布式遠(yuǎn)程服務(wù)(Remote Service)提供了兩種類型的RRemoteService實(shí)例:

  • 服務(wù)端(遠(yuǎn)端)實(shí)例 - 用來(lái)執(zhí)行遠(yuǎn)程方法(工作者實(shí)例即worker instance). 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();// 在調(diào)用遠(yuǎn)程方法以前,應(yīng)該首先注冊(cè)遠(yuǎn)程服務(wù)
// 只注冊(cè)了一個(gè)服務(wù)端工作者實(shí)例,只能同時(shí)執(zhí)行一個(gè)并發(fā)調(diào)用
remoteService.register(SomeServiceInterface.class, someServiceImpl);// 注冊(cè)了12個(gè)服務(wù)端工作者實(shí)例,可以同時(shí)執(zhí)行12個(gè)并發(fā)調(diào)用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • 客戶端(本地)實(shí)例 - 用來(lái)請(qǐng)求遠(yuǎn)程方法. 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

客戶端和服務(wù)端必須使用一樣的共享接口,生成兩者的Redisson實(shí)例必須采用相同的連接配置??蛻舳撕头?wù)端實(shí)例可以運(yùn)行在同一個(gè)JVM里,也可以是不同的??蛻舳撕头?wù)端的數(shù)量不收限制。(注意:盡管Redisson不做任何限制,但是Redis的限制仍然有效。)

在服務(wù)端工作者可用實(shí)例數(shù)量 大于1 的時(shí)候,將并行執(zhí)行并發(fā)調(diào)用的遠(yuǎn)程方法。

并行執(zhí)行工作者數(shù)量計(jì)算方法如下: T = R * N

T - 并行執(zhí)行工作者總數(shù) R - Redisson服務(wù)端數(shù)量 N - 注冊(cè)服務(wù)端時(shí)指定的執(zhí)行工作者數(shù)量

超過(guò)該數(shù)量的并發(fā)請(qǐng)求將在列隊(duì)中等候執(zhí)行。

在服務(wù)端工作者實(shí)例可用數(shù)量為 1 時(shí),遠(yuǎn)程過(guò)程調(diào)用將會(huì)按 順序執(zhí)行。這種情況下,每次只有一個(gè)請(qǐng)求將會(huì)被執(zhí)行,其他請(qǐng)求將在列隊(duì)中等候執(zhí)行。

1. 分布式遠(yuǎn)程服務(wù)工作流程

分布式遠(yuǎn)程服務(wù)為每個(gè)注冊(cè)接口建立了兩個(gè)列隊(duì)。一個(gè)列隊(duì)用于請(qǐng)求,由服務(wù)端監(jiān)聽(tīng),另一個(gè)列隊(duì)用于應(yīng)答回執(zhí)和結(jié)果回復(fù),由客戶端監(jiān)聽(tīng)。應(yīng)答回執(zhí)用于判定該請(qǐng)求是否已經(jīng)被接受。如果在指定的超時(shí)時(shí)間內(nèi)沒(méi)有被執(zhí)行工作者執(zhí)行將會(huì)拋出RemoteServiceAckTimeoutException錯(cuò)誤。

下圖描述了每次發(fā)起遠(yuǎn)程過(guò)程調(diào)用請(qǐng)求的工作流程。

2. 發(fā)送即不管(Fire-and-Forget)模式和應(yīng)答回執(zhí)(Ack-Response)模式

分布式遠(yuǎn)程服務(wù)通過(guò)org.redisson.core.RemoteInvocationOptions類,為每個(gè)遠(yuǎn)程過(guò)程調(diào)用提供了一些可配置選項(xiàng)。這些選項(xiàng)可以用來(lái)指定和修改請(qǐng)求超時(shí)和選擇跳過(guò)應(yīng)答回執(zhí)或結(jié)果的發(fā)送模式。例如:

// 應(yīng)答回執(zhí)超時(shí)1秒鐘,遠(yuǎn)程執(zhí)行超時(shí)30秒鐘
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();// 無(wú)需應(yīng)答回執(zhí),遠(yuǎn)程執(zhí)行超時(shí)30秒鐘
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();// 應(yīng)答回執(zhí)超時(shí)1秒鐘,不等待執(zhí)行結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();// 應(yīng)答回執(zhí)超時(shí)1分鐘,不等待執(zhí)行結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();// 發(fā)送即不管(Fire-and-Forget)模式,無(wú)需應(yīng)答回執(zhí),不等待結(jié)果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);

3. 異步調(diào)用

遠(yuǎn)程過(guò)程調(diào)用也可以采用異步的方式執(zhí)行。異步調(diào)用需要單獨(dú)提交一個(gè)帶有@RRemoteAsync注解(annotation)的異步接口類。異步接口方法簽名必須與遠(yuǎn)程接口的方法簽名相符。異步接口的返回類必須是org.redisson.api.RFuture對(duì)象或其子對(duì)象。在調(diào)用RRemoteService.get方法時(shí)將對(duì)異步接口的方法進(jìn)行驗(yàn)證。異步接口無(wú)須包含所有的遠(yuǎn)程接口里的方法,只需要包含要求異步執(zhí)行的方法即可。

// 遠(yuǎn)程接口
public interface RemoteInterface {Long someMethod1(Long param1, String param2);void someMethod2(MyObject param);MyObject someMethod3();}// 匹配遠(yuǎn)程接口的異步接口
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {RFuture<Long> someMethod1(Long param1, String param2);RFuture<Void> someMethod2(MyObject param);}RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);

4. 取消異步調(diào)用

通過(guò)調(diào)用Future.cancel()方法可以非常方便的取消一個(gè)異步調(diào)用。分布式遠(yuǎn)程服務(wù)允許在三個(gè)階段中任何一個(gè)階段取消異步調(diào)用:

  1. 遠(yuǎn)程調(diào)用請(qǐng)求在列隊(duì)中排隊(duì)階段
  2. 遠(yuǎn)程調(diào)用請(qǐng)求已經(jīng)被分布式遠(yuǎn)程服務(wù)接受,還未發(fā)送應(yīng)答回執(zhí),執(zhí)行尚未開(kāi)始。
  3. 遠(yuǎn)程調(diào)用請(qǐng)求已經(jīng)在執(zhí)行階段

想要正確的處理第三個(gè)階段,在服務(wù)端代碼里應(yīng)該檢查Thread.currentThread().isInterrupted()的返回狀態(tài)。范例如下:

// 遠(yuǎn)程接口
public interface MyRemoteInterface {Long myBusyMethod(Long param1, String param2);}// 匹配遠(yuǎn)程接口的異步接口
@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {RFuture<Long> myBusyMethod(Long param1, String param2);}// 遠(yuǎn)程接口的實(shí)現(xiàn)
public class MyRemoteServiceImpl implements MyRemoteInterface {public Long myBusyMethod(Long param1, String param2) {for (long i = 0; i < Long.MAX_VALUE; i++) {iterations.incrementAndGet();if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted! " + i);return;}}}}RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 注冊(cè)遠(yuǎn)程服務(wù)的服務(wù)端的同時(shí),通過(guò)單獨(dú)指定的ExecutorService來(lái)配置執(zhí)行線程池
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);// 異步調(diào)用方法
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// 取消異步調(diào)用
future.cancel(true);

二、分布式實(shí)時(shí)對(duì)象(Live Object)服務(wù)

1. 介紹

一個(gè) 分布式實(shí)時(shí)對(duì)象(Live Object) 可以被理解為一個(gè)功能強(qiáng)化后的Java對(duì)象。該對(duì)象不僅可以被一個(gè)JVM里的各個(gè)線程相引用,還可以被多個(gè)位于不同JVM里的線程同時(shí)引用。Wikipedia對(duì)這種特殊對(duì)象的概述是:

Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior.

Redisson分布式實(shí)時(shí)對(duì)象(Redisson Live Object,簡(jiǎn)稱RLO)運(yùn)用即時(shí)生成的代理類(Proxy),將一個(gè)指定的普通Java類里的所有字段,以及針對(duì)這些字段的操作全部映射到一個(gè)Redis Hash的數(shù)據(jù)結(jié)構(gòu),實(shí)現(xiàn)這種理念。每個(gè)字段的get和set方法最終被轉(zhuǎn)譯為針對(duì)同一個(gè)Redis Hash的hget和hset命令,從而使所有連接到同一個(gè)Redis節(jié)點(diǎn)的所有可以客戶端同時(shí)對(duì)一個(gè)指定的對(duì)象進(jìn)行操作。眾所周知,一個(gè)對(duì)象的狀態(tài)是由其內(nèi)部的字段所賦的值來(lái)體現(xiàn)的,通過(guò)將這些值保存在一個(gè)像Redis這樣的遠(yuǎn)程共享的空間的過(guò)程,把這個(gè)對(duì)象強(qiáng)化成了一個(gè)分布式對(duì)象。這個(gè)分布式對(duì)象就叫做Redisson分布式實(shí)時(shí)對(duì)象(Redisson Live Object,簡(jiǎn)稱RLO)。

通過(guò)使用RLO,運(yùn)行在不同服務(wù)器里的多個(gè)程序之間,共享一個(gè)對(duì)象實(shí)例變得和在單機(jī)程序里共享一個(gè)對(duì)象實(shí)例一樣了。同時(shí)還避免了針對(duì)任何一個(gè)字段操作都需要將整個(gè)對(duì)象序列化和反序列化的繁瑣,進(jìn)而降低了程序開(kāi)發(fā)的復(fù)雜性和其數(shù)據(jù)模型的復(fù)雜性:從任何一個(gè)客戶端修改一個(gè)字段的值,處在其他服務(wù)器上的客戶端(幾乎^)即刻便能查看到。而且實(shí)現(xiàn)代碼與單機(jī)程序代碼無(wú)異。(^連接到從節(jié)點(diǎn)的客戶端仍然受Redis的最終一致性的特性限制)

鑒于Redis是一個(gè)單線程的程序,針對(duì)實(shí)時(shí)對(duì)象的所有的字段操作可以理解為全部是原子性操作,也就是說(shuō)在讀取一個(gè)字段的過(guò)程不會(huì)擔(dān)心被其他線程所修改。

通過(guò)使用RLO,可以把Redis當(dāng)作一個(gè)允許被多個(gè)JVM同時(shí)操作且不受GC影響的共享堆(Heap Space)。

2. 使用方法

Redisson為分布式實(shí)時(shí)對(duì)象提供了一系列不同功能的注解,其中@REntity和@RId兩個(gè)注解是分布式實(shí)時(shí)對(duì)象的必要條件。

@REntity
public class MyObject {@RIdprivate String id;@RIndexprivate String value;private MyObject parent;public MyObject(String id) {this.id = id;}public MyObject() {}// getters and setters}

在開(kāi)始使用分布式實(shí)時(shí)對(duì)象以前,需要先通過(guò)Redisson服務(wù)將指定的對(duì)象連接(attach),合并(merge)或持久化(persist)到Redis里。

RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 將myObject對(duì)象當(dāng)前的狀態(tài)持久化到Redis里并與之保持同步。
myObject = service.persist(myObject);MyLiveObject myObject = new MyLiveObject("1");
// 拋棄myObject對(duì)象當(dāng)前的狀態(tài),并與Redis里的數(shù)據(jù)建立連接并保持同步。
myObject = service.attach(myObject);MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 將myObject對(duì)象當(dāng)前的狀態(tài)與Redis里的數(shù)據(jù)合并之后與之保持同步。
myObject = service.merge(myObject);
myObject.setValue("somevalue");// 通過(guò)ID獲取分布式實(shí)時(shí)對(duì)象
MyLiveObject myObject = service.get(MyLiveObject.class, "1");// 通過(guò)索引查找分布式實(shí)時(shí)對(duì)象
Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.in("value", "somevalue", "somevalue2"));Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.and(Conditions.in("value", "somevalue", "somevalue2"), Conditions.eq("secondfield", "test")));

“parent”字段中包含了指向到另一個(gè)分布式實(shí)時(shí)對(duì)象的引用,它可以與包含類是同一類型也可以不同。Redisson內(nèi)部采用了與Java的引用類似的方式保存這個(gè)關(guān)系,而非將全部對(duì)象序列化,可視為與普通的引用同等效果。

//RLO對(duì)象:
MyObject myObject = service.get(MyObject.class, "1");
MyObject myParentObject = service.get(MyObject.class, "2");
myObject.setValue(myParentObject);

RLO的字段類型基本上無(wú)限制,可以是任何類型。比如Java util包里的集合類,Map類等,也可以是自定義的對(duì)象。只要指定的編碼解碼器能夠?qū)ζ溥M(jìn)行編碼和解碼操作便可。關(guān)于編碼解碼器的詳細(xì)信息請(qǐng)查閱高級(jí)使用方法章節(jié)。

盡管RLO的字段類型基本上無(wú)限制,個(gè)別類型還是受限。注解了RId的字段類型不能是數(shù)組類(Array),比如int[],long[],double[],byte[]等等。更多關(guān)于限制有關(guān)的介紹和原理解釋請(qǐng)查閱使用限制 章節(jié)。

為了保證RLO的用法和普通Java對(duì)象的用法盡可能一直,Redisson分布式實(shí)時(shí)對(duì)象服務(wù)自動(dòng)將以下普通Java對(duì)象轉(zhuǎn)換成與之匹配的Redisson分布式對(duì)象RObject。

普通Java類

轉(zhuǎn)換后的Redisson類

SortedSet.class

RedissonSortedSet.class

Set.class

RedissonSet.class

ConcurrentMap.class

RedissonMap.class

Map.class

RedissonMap.class

BlockingDeque.class

RedissonBlockingDeque.class

Deque.class

RedissonDeque.class

BlockingQueue.class

RedissonBlockingQueue.class

Queue.class

RedissonQueue.class

List.class

RedissonList.class

類型轉(zhuǎn)換將按照從上至下的順序匹配類型,例如LinkedList類同時(shí)實(shí)現(xiàn)了Deque,List和Queue,由于Deque排在靠上的位置,因此它將會(huì)被轉(zhuǎn)換成一個(gè)RedissonDeque類型。

Redisson的分布式對(duì)象也采用類似的方式,將自身的狀態(tài)儲(chǔ)存于Redis當(dāng)中,(幾乎^)所有的狀態(tài)改變都直接映射到Redis里,不在本地JVM中保留任何賦值。(^本地緩存對(duì)象除外,比如RLocalCachedMap)

3. 高級(jí)使用方法

正如上述介紹,RLO類其實(shí)都是按需實(shí)時(shí)生成的代理(Proxy)類。生成的代理類和原類都一同緩存Redisson實(shí)例里。這個(gè)過(guò)程會(huì)消耗一些時(shí)間,在對(duì)耗時(shí)比較敏感的情況下,建議通過(guò)RedissonLiveObjectService提前注冊(cè)所有的RLO類。這個(gè)服務(wù)也可以用來(lái)注銷不再需要的RLO類,也可以用來(lái)查詢一個(gè)類是否已經(jīng)注冊(cè)了。

RLiveObjectService service = redisson.getLiveObjectService();
service.registerClass(MyClass.class);
service.unregisterClass(MyClass.class);
Boolean registered = service.isClassRegistered(MyClass.class);

4. 注解(Annotation)使用方法

@REntity

僅適用于類。通過(guò)指定@REntity的各個(gè)參數(shù),可以詳細(xì)的對(duì)每個(gè)RLO類實(shí)現(xiàn)特殊定制,以達(dá)到改變RLO對(duì)象的行為。

  • namingScheme - 命名方案。命名方案規(guī)定了每個(gè)實(shí)例在Redis中對(duì)應(yīng)key的名稱。它不僅被用來(lái)與已存在的RLO建立關(guān)聯(lián),還被用來(lái)儲(chǔ)存新建的RLO實(shí)例。默認(rèn)采用Redisson自帶的DefaultNamingScheme對(duì)象。
  • codec - 編碼解碼器。在運(yùn)行當(dāng)中,Redisson用編碼解碼器來(lái)對(duì)RLO中的每個(gè)字段進(jìn)行編碼解碼。Redisson內(nèi)部采用了實(shí)例池管理不同類型的編碼解碼器實(shí)例。Redisson提供了多種不同的編碼解碼器,默認(rèn)使用JsonJacksonCodec。
  • fieldTransformation - 字段轉(zhuǎn)換模式。如上所述,為了盡可能的保證RLO的用法和普通Java對(duì)象一致,Redisson會(huì)自動(dòng)將常用的普通Java對(duì)象轉(zhuǎn)換成與其匹配的Redisson分布式對(duì)象。這是由于字段轉(zhuǎn)換模式的默認(rèn)值是ANNOTATION_BASED,修改為IMPLEMENTATION_BASED就可以不轉(zhuǎn)換。

@RId

僅適用于字段。@RId注解只能用在具備區(qū)分實(shí)例的字段上,這類字段可以理解為一個(gè)類的id字段或主鍵字段。這個(gè)字段的值將被命名方案namingScheme用來(lái)與事先存在的RLO建立引用。加了該注解的字段是唯一在本地JVM里同時(shí)保存賦值的字段。一個(gè)類只能有一個(gè)字段包含@RId注解。

可以通過(guò)指定一個(gè)生成器generator策略來(lái)實(shí)現(xiàn)自動(dòng)生成這個(gè)字段的值。默認(rèn)不提供生成器。

@RIndex

僅適用于字段。用來(lái)指定可用于搜索的字段??梢酝ㄟ^(guò)RLiveObjectService.find方法來(lái)根據(jù)條件精細(xì)查找分布式實(shí)時(shí)對(duì)象。查詢條件可以是含(IN),或(OR),和(AND)或相等(EQ)以及它們的任意組合。

使用范例如下:

public class MyObject {@RIndexString field1;@RIndexString field2;@RIndexString field3;
}Collection<MyObject> objects = RLiveObjectService.find(MyObject.class, Conditions.or(Conditions.and(Conditions.eq("field1", "value"), Conditions.eq("field2", "value")), Conditions.in("field3", "value1", "value2"));

@RObjectField

僅適用于字段。允許通過(guò)該注解中的namingScheme或codec來(lái)改變?cè)撟侄蔚拿蚓幋a方式,用來(lái)區(qū)別于@REntity中指定的預(yù)設(shè)方式。

@RCascade

僅適用于字段。用來(lái)指定包含于分布式實(shí)時(shí)對(duì)象字段內(nèi)其它對(duì)象的級(jí)聯(lián)操作方式。

可選的級(jí)聯(lián)操作方式為如下:

RCascadeType.ALL - 執(zhí)行所有級(jí)聯(lián)操作
RCascadeType.PERSIST - 僅在執(zhí)行RLiveObjectService.persist()方法時(shí)進(jìn)行級(jí)聯(lián)操作 RCascadeType.DETACH - 僅在執(zhí)行RLiveObjectService.detach()方法時(shí)進(jìn)行級(jí)聯(lián)操作 RCascadeType.MERGE - 僅在執(zhí)行RLiveObjectService.merge()方法時(shí)進(jìn)行級(jí)聯(lián)操作 RCascadeType.DELETE - 僅在執(zhí)行RLiveObjectService.delete()方法時(shí)進(jìn)行級(jí)聯(lián)操作

5. 使用限制

如上所述,帶有RId注解字段的類型不能使數(shù)組類,這是因?yàn)槟壳澳J(rèn)的命名方案類DefaultNamingScheme還不能正確地將數(shù)組類序列化和反序列化。在改善了DefaultNamingScheme類的不足以后會(huì)考慮取消這個(gè)限制。另外由于帶有RId注解的字段是用來(lái)指定Redis中映射的key的名稱,因此組建一個(gè)只含有唯一一個(gè)字段的RLO類是毫無(wú)意義的。選用RBucket會(huì)更適合這樣的場(chǎng)景。

三、分布式執(zhí)行服務(wù)(Executor Service)

1. 分布式執(zhí)行服務(wù)概述

Redisson的分布式執(zhí)行服務(wù)實(shí)現(xiàn)了java.util.concurrent.ExecutorService接口,支持在不同的獨(dú)立節(jié)點(diǎn)里執(zhí)行基于java.util.concurrent.Callable接口或java.lang.Runnable接口或Lambda的任務(wù)。這樣的任務(wù)也可以通過(guò)使用Redisson實(shí)例,實(shí)現(xiàn)對(duì)儲(chǔ)存在Redis里的數(shù)據(jù)進(jìn)行操作。Redisson分布式執(zhí)行服務(wù)是最快速和有效執(zhí)行分布式運(yùn)算的方法。

2. 任務(wù)

Redisson獨(dú)立節(jié)點(diǎn)不要求任務(wù)的類在類路徑里。他們會(huì)自動(dòng)被Redisson獨(dú)立節(jié)點(diǎn)的ClassLoader加載。因此每次執(zhí)行一個(gè)新任務(wù)時(shí),不需要重啟Redisson獨(dú)立節(jié)點(diǎn)。

采用Callable任務(wù)的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

采用Runnable任務(wù)的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param = param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}

在創(chuàng)建ExecutorService時(shí)可以配置以下參數(shù):

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新嘗試執(zhí)行任務(wù)的時(shí)間間隔。
// ExecutorService的工作節(jié)點(diǎn)將等待10分鐘后重新嘗試執(zhí)行任務(wù)
//
// 設(shè)定為0則不進(jìn)行重試
//
// 默認(rèn)值為5分鐘
options.taskRetryInterval(10, TimeUnit.MINUTES);
RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
executorService.submit(new RunnableTask(123));RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit(new CallableTask());
Long result = future.get();

使用Lambda任務(wù)的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit((Callable & Serializable)() -> {System.out.println("task has been executed!");
});
Long result = future.get();

可以通過(guò)@RInject注解來(lái)為任務(wù)實(shí)時(shí)注入Redisson實(shí)例依賴。

3. 取消任務(wù)

通過(guò)Future.cancel()方法可以很方便的取消所有已提交的任務(wù)。通過(guò)對(duì)Thread.currentThread().isInterrupted()方法的調(diào)用可以在已經(jīng)處于運(yùn)行狀態(tài)的任務(wù)里實(shí)現(xiàn)任務(wù)中斷:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了許多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任務(wù)被取消了return null;}result += value;}return result;}}RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask());
// 或
RFuture<Long> future = executorService.submitAsync(new CallableTask());
// ...
future.cancel(true);

四、 分布式調(diào)度任務(wù)服務(wù)(Scheduler Service)

1. 分布式調(diào)度任務(wù)服務(wù)概述

Redisson的分布式調(diào)度任務(wù)服務(wù)實(shí)現(xiàn)了java.util.concurrent.ScheduledExecutorService接口,支持在不同的獨(dú)立節(jié)點(diǎn)里執(zhí)行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任務(wù)。Redisson獨(dú)立節(jié)點(diǎn)按順序運(yùn)行Redis列隊(duì)里的任務(wù)。調(diào)度任務(wù)是一種需要在未來(lái)某個(gè)指定時(shí)間運(yùn)行一次或多次的特殊任務(wù)。

2. 設(shè)定任務(wù)計(jì)劃

Redisson獨(dú)立節(jié)點(diǎn)不要求任務(wù)的類在類路徑里。他們會(huì)自動(dòng)被Redisson獨(dú)立節(jié)點(diǎn)的ClassLoader加載。因此每次執(zhí)行一個(gè)新任務(wù)時(shí),不需要重啟Redisson獨(dú)立節(jié)點(diǎn)。

采用Callable任務(wù)的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

在創(chuàng)建ExecutorService時(shí)可以配置以下參數(shù):

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新嘗試執(zhí)行任務(wù)的時(shí)間間隔。
// ExecutorService的工作節(jié)點(diǎn)將等待10分鐘后重新嘗試執(zhí)行任務(wù)
//
// 設(shè)定為0則不進(jìn)行重試
//
// 默認(rèn)值為5分鐘
options.taskRetryInterval(10, TimeUnit.MINUTES);
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
Long result = future.get();

使用Lambda任務(wù)的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
ScheduledFuture<Long> future = executorService.schedule((Callable & Serializable)() -> {System.out.println("task has been executed!");
}, 10, TimeUnit.MINUTES);
Long result = future.get();

采用Runnable任務(wù)的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param= param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS);

3. 通過(guò)CRON表達(dá)式設(shè)定任務(wù)計(jì)劃

在分布式調(diào)度任務(wù)中,可以通過(guò)CRON表達(dá)式來(lái)為任務(wù)設(shè)定一個(gè)更復(fù)雜的計(jì)劃。表達(dá)式與Quartz的CRON格式完全兼容。

例如:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

4. 取消計(jì)劃任務(wù)

分布式調(diào)度任務(wù)服務(wù)提供了兩張取消任務(wù)的方式:通過(guò)調(diào)用ScheduledFuture.cancel()方法或調(diào)用RScheduledExecutorService.cancelScheduledTask方法。通過(guò)對(duì)Thread.currentThread().isInterrupted()方法的調(diào)用可以在已經(jīng)處于運(yùn)行狀態(tài)的任務(wù)里實(shí)現(xiàn)任務(wù)中斷:

public class RunnableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了許多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任務(wù)被取消了return null;}result += value;}return result;}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
future.cancel(true);
// 或
String taskId = future.getTaskId();
// ...
executorService.cancelScheduledTask(taskId);

五、分布式映射歸納服務(wù)(MapReduce)

1.?介紹

Redisson提供了通過(guò)映射歸納(MapReduce)編程模式來(lái)處理儲(chǔ)存在Redis環(huán)境里的大量數(shù)據(jù)的服務(wù)。這個(gè)想法來(lái)至于其他的類似實(shí)現(xiàn)方式和谷歌發(fā)表的研究。所有 映射(Map)歸納(Reduce) 階段中的任務(wù)都是被分配到各個(gè)獨(dú)立節(jié)點(diǎn)(Redisson Node)里并行執(zhí)行的。以下所有接口均支持映射歸納(MapReduce)功能: RMap、 RMapCache、 RLocalCachedMap、 RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue 和 RPriorityDeque

映射歸納(MapReduce)的功能是通過(guò)RMapper、 RCollectionMapper、 RReducer 和 RCollator 這幾個(gè)接口實(shí)現(xiàn)的。

1.RMapper映射器接口適用于映射(Map)類,它用來(lái)把映射(Map)中的每個(gè)元素轉(zhuǎn)換為另一個(gè)作為歸納(Reduce)處理用的鍵值對(duì)。

public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {void map(KIn key, VIn value, RCollector<KOut, VOut> collector);}

2.RCollectionMapper 映射器接口僅適用于集合(Collection)類型的對(duì)象,它用來(lái)把集合(Collection)中的元素轉(zhuǎn)換成一組作為歸納(Reduce)處理用的鍵值對(duì)。

public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable {void map(VIn value, RCollector<KOut, VOut> collector);}

3.RReducer歸納器接口用來(lái)將上面這些,由映射器生成的鍵值對(duì)列表進(jìn)行歸納整理。

public interface RReducer<K, V> extends Serializable {V reduce(K reducedKey, Iterator<V> values);}

4.RCollator收集器接口用來(lái)把歸納整理以后的結(jié)果化簡(jiǎn)為單一一個(gè)對(duì)象。

public interface RCollator<K, V, R> extends Serializable {R collate(Map<K, V> resultMap);}

以上每個(gè)階段的任務(wù)都可以用@RInject注解的方式來(lái)獲取RedissonClient實(shí)例:

    public class WordMapper implements RMapper<String, String, String, Integer> {@RInjectprivate RedissonClient redissonClient;@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {// ...redissonClient.getAtomicLong("mapInvocations").incrementAndGet();}}

2. 映射(Map)類型的使用范例

Redisson提供的RMap、 RMapCache和RLocalCachedMap這三種映射(Map)類型的對(duì)象均可以使用這種分布式映射歸納(MapReduce)服務(wù)。

以下是在映射(Map)類型的基礎(chǔ)上采用映射歸納(MapReduce)來(lái)實(shí)現(xiàn)字?jǐn)?shù)統(tǒng)計(jì)的范例:

  1. Mapper對(duì)象將每個(gè)映射的值用空格且分開(kāi)。
    public class WordMapper implements RMapper<String, String, String, Integer> {@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
  1. Reducer對(duì)象計(jì)算統(tǒng)計(jì)所有單詞的使用情況。
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
  1. Collator對(duì)象統(tǒng)計(jì)所有單詞的使用情況。
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
  1. 把上面的各個(gè)對(duì)象串起來(lái)使用:
    RMap<String, String> map = redisson.getMap("wordsMap");map.put("line1", "Alice was beginning to get very tired");map.put("line2", "of sitting by her sister on the bank and");map.put("line3", "of having nothing to do once or twice she");map.put("line4", "had peeped into the book her sister was reading");map.put("line5", "but it had no pictures or conversations in it");map.put("line6", "and what is the use of a book");map.put("line7", "thought Alice without pictures or conversation");RMapReduce<String, String, String, Integer> mapReduce= map.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 統(tǒng)計(jì)詞頻Map<String, Integer> mapToNumber = mapReduce.execute();// 統(tǒng)計(jì)字?jǐn)?shù)Integer totalWordsAmount = mapReduce.execute(new WordCollator());

3. 集合(Collection)類型的使用范例

Redisson提供的RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue和RPriorityDeque這幾種集合(Collection)類型的對(duì)象均可以使用這種分布式映射歸納(MapReduce)服務(wù)。

以下是在集合(Collection)類型的基礎(chǔ)上采用映射歸納(MapReduce)來(lái)實(shí)現(xiàn)字?jǐn)?shù)統(tǒng)計(jì)的范例:

    public class WordMapper implements RCollectionMapper<String, String, Integer> {@Overridepublic void map(String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
    RList<String> list = redisson.getList("myList");list.add("Alice was beginning to get very tired");list.add("of sitting by her sister on the bank and");list.add("of having nothing to do once or twice she");list.add("had peeped into the book her sister was reading");list.add("but it had no pictures or conversations in it");list.add("and what is the use of a book");list.add("thought Alice without pictures or conversation");RCollectionMapReduce<String, String, Integer> mapReduce= list.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 統(tǒng)計(jì)詞頻Map<String, Integer> mapToNumber = mapReduce.execute();// 統(tǒng)計(jì)字?jǐn)?shù)Integer totalWordsAmount = mapReduce.execute(new WordCollator());

http://www.risenshineclean.com/news/929.html

相關(guān)文章:

  • 陜西省建設(shè)監(jiān)理協(xié)會(huì)證書查詢網(wǎng)站寧波網(wǎng)站推廣怎么做
  • 管理咨詢公司稅收優(yōu)惠青島seo優(yōu)化
  • 鎮(zhèn)江網(wǎng)站推廣南寧seo外包要求
  • 網(wǎng)站有了訂單郵箱提醒代碼營(yíng)銷策略模板
  • 這么做國(guó)外網(wǎng)站的國(guó)內(nèi)鏡像站甘肅搜索引擎網(wǎng)絡(luò)優(yōu)化
  • 西安網(wǎng)絡(luò)公司網(wǎng)站建設(shè)小紅書推廣平臺(tái)
  • 網(wǎng)站開(kāi)發(fā)基于什么平臺(tái)自己代理一款手游需要多少錢
  • go語(yǔ)言怎么搭建網(wǎng)頁(yè)東莞網(wǎng)站優(yōu)化關(guān)鍵詞排名
  • 用jsp做網(wǎng)站的難點(diǎn)云優(yōu)化
  • index.html網(wǎng)站怎么做重慶seo優(yōu)化公司
  • 免費(fèi)下載建設(shè)銀行官方網(wǎng)站濟(jì)南優(yōu)化網(wǎng)站的哪家好
  • 岳陽(yáng)網(wǎng)站開(kāi)發(fā)商城網(wǎng)絡(luò)推廣項(xiàng)目計(jì)劃書
  • 校園網(wǎng)站建設(shè)模板上海網(wǎng)站排名seo公司哪家好
  • 做網(wǎng)站賭博代理違法嗎品牌推廣是做什么的
  • 做營(yíng)銷網(wǎng)站制作seo綜合查詢是什么意思
  • 企業(yè)網(wǎng)站怎么做畢業(yè)設(shè)計(jì)網(wǎng)站怎么營(yíng)銷推廣
  • b2b平臺(tái)有哪些類別網(wǎng)絡(luò)營(yíng)銷優(yōu)化
  • 做免費(fèi)網(wǎng)站教程國(guó)vs百度seo排名優(yōu)化軟件化
  • 萊蕪二手房網(wǎng)湖南seo優(yōu)化報(bào)價(jià)
  • wordpress顯示輪播圖深圳市seo上詞多少錢
  • 網(wǎng)絡(luò)投注網(wǎng)站是怎么建設(shè)簡(jiǎn)述網(wǎng)絡(luò)營(yíng)銷的概念
  • 可以免費(fèi)開(kāi)店的平臺(tái)windows11優(yōu)化大師
  • 網(wǎng)站動(dòng)態(tài)小圖標(biāo)青島網(wǎng)絡(luò)seo公司
  • 日本中古手表網(wǎng)站申請(qǐng)網(wǎng)站怎么申請(qǐng)
  • 襄陽(yáng)做網(wǎng)站哪家好b2b平臺(tái)有哪些
  • 電子商務(wù)網(wǎng)站開(kāi)發(fā)的基本流程軟文營(yíng)銷的特點(diǎn)有哪些
  • 深圳 建設(shè)銀行國(guó)際互聯(lián)網(wǎng)站網(wǎng)絡(luò)推廣公司排名
  • 這幾年做網(wǎng)站怎么樣百度搜索風(fēng)云榜排行榜
  • 東莞網(wǎng)站設(shè)計(jì)報(bào)價(jià)天津百度網(wǎng)站排名優(yōu)化
  • 有哪些網(wǎng)站可以做淘寶客搜索seo怎么優(yōu)化