icp備案網(wǎng)站建設(shè)方案書(shū)網(wǎng)站收錄一般多久
Google Guava EventBus(事件總線)的使用和源碼的簡(jiǎn)單解析
什么是EventBus?
事件總線(EventBus)是一種廣泛用于軟件架構(gòu)中的設(shè)計(jì)模式,用于實(shí)現(xiàn)解耦和松散耦合的通信機(jī)制。它可以幫助組織和管理應(yīng)用程序中不同組件之間的通信,以提高應(yīng)用程序的可維護(hù)性、可擴(kuò)展性和靈活性。
在事件總線模式中,不同的組件通過(guò)訂閱和發(fā)布事件來(lái)進(jìn)行通信。發(fā)布者發(fā)布一個(gè)事件,訂閱者可以訂閱該事件并在事件發(fā)生時(shí)執(zhí)行相關(guān)的操作。事件總線充當(dāng)了中介者的角色,它負(fù)責(zé)管理所有的事件和訂閱者,并確保事件正確地傳遞到所有訂閱者。
事件總線模式可以在很多場(chǎng)景中使用,例如 Android 應(yīng)用程序中的通信、分布式系統(tǒng)中的消息傳遞等。常見(jiàn)的事件總線實(shí)現(xiàn)包括 Google Guava 的 EventBus 和 Square 的 Otto。
總的來(lái)說(shuō),EventBus就是應(yīng)用了發(fā)布者/訂閱者模式,用來(lái)幫助我們進(jìn)行各組件間通信的工具。我們這里解析的是Google Guava 的 EventBus,它的官方文檔在這里
EventBus三要素
github上的圖:
EventBus里有三個(gè)比較重要的概念:
- Event 消息事件
- Publisher 消息發(fā)布者
- Subscriber 消息訂閱者
乍一看這訂閱者/發(fā)布者模式還和觀察者模式有幾分相似之處,不同之處就在于EventBus的存在,它作為一個(gè)中間件充當(dāng)了消息傳遞的助手,使得訂閱者不必直接訂閱發(fā)布者,訂閱事件總線即可。
EventBus的五種線程模型
EventBus的線程模型指的是根據(jù)事件的發(fā)布和訂閱所在的線程,決定應(yīng)該在哪個(gè)線程中處理事件。EventBus中有以下四種線程模型:
-
- POSTING(默認(rèn)):在發(fā)布事件的線程中執(zhí)行,即同步執(zhí)行,速度快。
-
- MAIN:在主線程(UI線程)中執(zhí)行,如果當(dāng)前線程是主線程,直接執(zhí)行訂閱方法;否則,通過(guò)主線程的Poster來(lái)執(zhí)行。
-
- BACKGROUND:在后臺(tái)線程中執(zhí)行,如果當(dāng)前線程是主線程,通過(guò)后臺(tái)的Poster來(lái)執(zhí)行;否則,直接執(zhí)行訂閱方法。
-
- ASYNC:在新的子線程中執(zhí)行,每個(gè)事件都會(huì)創(chuàng)建一個(gè)新的子線程,即異步執(zhí)行,速度較慢。
除此之外,還有一種模型:
-
- MAIN_ORDERED :MAIN_ORDERED 模式也是運(yùn)行在主線程上的模式,不同于 MAIN 模式的是,它保證了事件的順序性。也就是說(shuō),當(dāng)一個(gè)事件在主線程中被發(fā)布時(shí),它會(huì)先進(jìn)入一個(gè)隊(duì)列中,之后再一個(gè)個(gè)的被處理。這樣可以保證相同事件類型的事件按照發(fā)送的順序依次被執(zhí)行。如果當(dāng)前線程不是主線程,那么它就直接被執(zhí)行,這也是為了避免在子線程中的事件被阻塞。
我們會(huì)在消息處理方法中利用@Subscribe注解指定線程模型。
EventBus的簡(jiǎn)單使用
使用EventBus三步走
這里我們根據(jù)github上的三步分為四步走:
- 定義Event事件類:
public class MessageEvent {private String message;public MessageEvent(String message){this.message = message;}public String getMessage(){return message;}public void setMessage(String message){this.message = message;}
}
這里定義了一個(gè)MessageEvent用于傳遞事件。
- 聲明訂閱者,注冊(cè)訂閱方法并且指定線程模型
@Subscribe(threadMode = ThreadMode.MAIN)//選擇線程模型,說(shuō)明事件將在主線程中處理public void onMoonEvent(MessageEvent messageEvent){tv_message.setText(messageEvent.getMessage());}
這里在MainActivity里注冊(cè)了一個(gè)訂閱方法(消息處理方法),指定了線程模型為MAIN,說(shuō)明訂閱方法在主線程里執(zhí)行。完整Demo在后面放出。
3.注冊(cè)訂閱者與訂閱默認(rèn)的事件總線
bt_subscription.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {//MainActivity注冊(cè)了這條事件總線if(!EventBus.getDefault().isRegistered(MainActivity.this)){EventBus.getDefault().register(MainActivity.this);}}});
這里先判斷MainActivity是否已經(jīng)與默認(rèn)的事件總線訂閱,如果沒(méi)有訂閱就進(jìn)行訂閱。
4.發(fā)送消息事件–觸發(fā)訂閱方法
bt_message.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {EventBus.getDefault().post(new MessageEvent("歡迎來(lái)到,德萊聯(lián)盟"));finish();}});
這里我在第二個(gè)Activity里發(fā)送了一個(gè)消息事件到默認(rèn)的事件總線中,這樣MainActivity中的訂閱方法就會(huì)被觸發(fā)。
2.小Demo
這里我貼出我的小Demo:
- MainActivity:
public class MainActivity extends AppCompatActivity {private TextView tv_message;private Button bt_message;private Button bt_subscription;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);tv_message = findViewById(R.id.tv_message);bt_message = findViewById(R.id.bt_message);bt_subscription = findViewById(R.id.bt_subscription);bt_message.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {startActivity(new Intent(MainActivity.this,SecondActivity.class));}});bt_subscription.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {//MainActivity注冊(cè)了這條事件總線if(!EventBus.getDefault().isRegistered(MainActivity.this)){EventBus.getDefault().register(MainActivity.this);}}});}@Overrideprotected void onDestroy() {super.onDestroy();EventBus.getDefault().unregister(this);}//通過(guò)參數(shù)類型來(lái)區(qū)分應(yīng)該執(zhí)行哪個(gè)方法@Subscribe(threadMode = ThreadMode.MAIN)//選擇線程模型,說(shuō)明事件將在主線程中處理public void onMoonEvent(MessageEvent messageEvent){tv_message.setText(messageEvent.getMessage());}@Subscribe(threadMode = ThreadMode.MAIN,sticky = true)public void ononStickyEvent(MessageEvent messageEvent){tv_message.setText(messageEvent.getMessage());}@Subscribe(threadMode = ThreadMode.MAIN)public void secondBack(String mes){Toast.makeText(this, mes, Toast.LENGTH_SHORT).show();}
}
- SecondActivity:
public class SecondActivity extends AppCompatActivity {TextView tv_message;Button bt_message;Button bt_sticky;Button mes;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_second);tv_message = findViewById(R.id.tv_message1);bt_message = findViewById(R.id.bt_message1);bt_sticky = findViewById(R.id.bt_sticky);mes = findViewById(R.id.mes_event);bt_message.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {EventBus.getDefault().post(new MessageEvent("歡迎來(lái)到,德萊聯(lián)盟"));finish();}});bt_sticky.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {EventBus.getDefault().postSticky(new MessageEvent("粘性事件"));finish();}});mes.setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {EventBus.getDefault().post(new String("另一個(gè)事件"));finish();}});}
}
- MessageEvent:
//自定義的Event事件
public class MessageEvent {private String message;public MessageEvent(String message){this.message = message;}public String getMessage(){return message;}public void setMessage(String message){this.message = message;}
}
https://github.com/MonsterTTL/Android-Impove/tree/master/EventBusDemo
具體的實(shí)例代碼放在我的github上👆了,里面還涉及到了粘性事件以及多個(gè)訂閱事件。
源碼解析
getDefault()方法
先來(lái)看我們最常用的獲取EventBus的方法
static volatile EventBus defaultInstance;public static EventBus getDefault() {EventBus instance = defaultInstance;if (instance == null) {synchronized (EventBus.class) {instance = EventBus.defaultInstance;if (instance == null) {instance = EventBus.defaultInstance = new EventBus();}}}return instance;}
很顯然這里用到了DCL單例模式,確保缺省狀態(tài)下EventBus的實(shí)例只有一個(gè)。我們順著這個(gè)方法往下捋,看new EventBus()方法。
new EventBus()方法
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();public EventBus() {this(DEFAULT_BUILDER);}EventBus(EventBusBuilder builder) {logger = builder.getLogger();subscriptionsByEventType = new HashMap<>();typesBySubscriber = new HashMap<>();stickyEvents = new ConcurrentHashMap<>();mainThreadSupport = builder.getMainThreadSupport();mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;backgroundPoster = new BackgroundPoster(this);asyncPoster = new AsyncPoster(this);indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,builder.strictMethodVerification, builder.ignoreGeneratedIndex);logSubscriberExceptions = builder.logSubscriberExceptions;logNoSubscriberMessages = builder.logNoSubscriberMessages;sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;sendNoSubscriberEvent = builder.sendNoSubscriberEvent;throwSubscriberException = builder.throwSubscriberException;eventInheritance = builder.eventInheritance;executorService = builder.executorService;}
很顯然,獲取default EventBus的調(diào)用流程是這樣的:getDefault() -> EventBus()->EventBus(DEFAULT_BUILDER)。說(shuō)道Builder,那么應(yīng)該是采取建造者模式創(chuàng)建的,我們接下來(lái)分析EventBusBuilder。
EventBusBuilder
builder里延伸下去的內(nèi)容有點(diǎn)多,我們直接先看EventBusBuilder的build方法:
public EventBus build() {return new EventBus(this);}
很顯然,該類的作用是構(gòu)建一個(gè)EventBus實(shí)例,并提供一些配置選項(xiàng)。該類具有多個(gè)屬性,如是否記錄訂閱者異常、是否發(fā)送沒(méi)有訂閱者事件等,以及設(shè)置自定義線程池和日志處理程序等。類中的各種方法可用于配置這些屬性,例如logSubscriberExceptions()方法可用于設(shè)置是否記錄訂閱者異常。 這里我們就不關(guān)注打印日志,記錄異常等配置了,我們關(guān)注具體的創(chuàng)建過(guò)程。
我們先來(lái)看看以下幾個(gè)可選的方法:
/*** 設(shè)置 EventBus 是否開(kāi)啟事件類繼承的支持。默認(rèn)情況下,EventBus會(huì)考慮事件類繼承關(guān)系,即父類有訂閱者時(shí),子類的訂閱者也會(huì)被
通知。但是,關(guān)閉這個(gè)特性可以提高事件發(fā)布的速度。如果事件類的繼承層級(jí)比較復(fù)雜,關(guān)閉繼承支持的性能提升應(yīng)該會(huì)大于20%。但需要注意
的是,事件發(fā)布通常只占應(yīng)用程序內(nèi)部CPU時(shí)間的一小部分,除非高速發(fā)布事件,例如每秒鐘發(fā)布數(shù)百/數(shù)千個(gè)事件。 ***/public EventBusBuilder eventInheritance(boolean eventInheritance) {this.eventInheritance = eventInheritance;return this;}/*** 這是用來(lái)指定線程池類型的,除了使用Builder內(nèi)部自帶的線程池,我們也可以使用自己傳入的線程池*/public EventBusBuilder executorService(ExecutorService executorService) {this.executorService = executorService;return this;}/*** 這是用來(lái)跳過(guò)指定訂閱者的方法驗(yàn)證的,當(dāng)你注冊(cè)一個(gè)類作為事件訂閱者時(shí),EventBus會(huì)自動(dòng)檢查該類是否具有合法的事件訂閱方法。* 這些方法必須是公共的,沒(méi)有返回值,只有一個(gè)參數(shù),并且在方法上需要使用@Subscribe注解。然而,有些情況下,你可能希望讓某* 個(gè)類作為訂閱者,但是它并不符合這些要求。例如,它可能有一些不是用@Subscribe注解的方法,但是你仍然希望這些方法能夠被* EventBus識(shí)別并調(diào)用。在這種情況下,你可以使用skipMethodVerificationFor()方法來(lái)跳過(guò)驗(yàn)證,允許這些方法被注冊(cè)為事件訂閱 * 方法。*/public EventBusBuilder skipMethodVerificationFor(Class<?> clazz) {if (skipMethodVerificationForClasses == null) {skipMethodVerificationForClasses = new ArrayList<>();}skipMethodVerificationForClasses.add(clazz);return this;}/** 允許在生成了索引的情況下也強(qiáng)制使用反射進(jìn)行訂閱者的查找。默認(rèn)情況下,如果有生成的索引,則會(huì)使用它來(lái)查找訂閱者,而不是使用反射。通過(guò)調(diào)用這個(gè)方法并將其參數(shù)設(shè)置為 true,可以強(qiáng)制使用反射進(jìn)行查找。 */public EventBusBuilder ignoreGeneratedIndex(boolean ignoreGeneratedIndex) {this.ignoreGeneratedIndex = ignoreGeneratedIndex;return this;}/** 開(kāi)啟嚴(yán)格方法驗(yàn)證 . 在 EventBus 中,嚴(yán)格檢查是指在注冊(cè)和訂閱事件時(shí)檢查相關(guān)方法的參數(shù)和返回類型是否正確匹配,以確保事件可以正確地被分發(fā)。如果開(kāi)啟了嚴(yán)格檢查,那么在注冊(cè)或訂閱事件時(shí),如果發(fā)現(xiàn)有方法的參數(shù)或返回類型與事件類型不匹配,EventBus 會(huì)拋出一個(gè)異常,防止程序出現(xiàn)意料之外的錯(cuò)誤。*/public EventBusBuilder strictMethodVerification(boolean strictMethodVerification) {this.strictMethodVerification = strictMethodVerification;return this;}/** 用于添加 SubscriberInfoIndex 對(duì)象到 EventBusBuilder 中。SubscriberInfoIndex 接口用于提供訂閱者類和訂閱方法信息的索引,以便 EventBus 能夠快速找到訂閱者及其訂閱方法。在 EventBus 中, */public EventBusBuilder addIndex(SubscriberInfoIndex index) {if (subscriberInfoIndexes == null) {subscriberInfoIndexes = new ArrayList<>();}subscriberInfoIndexes.add(index);return this;}
接下來(lái)看下比較重要的幾個(gè)方法:
public EventBus installDefaultEventBus() {synchronized (EventBus.class) {if (EventBus.defaultInstance != null) {throw new EventBusException("Default instance already exists." +" It may be only set once before it's used the first time to ensure consistent behavior.");}EventBus.defaultInstance = build();return EventBus.defaultInstance;}}
這個(gè)方法是用來(lái)安裝默認(rèn)EventBus里的defaultInstance的:
class EventBus{
...static volatile EventBus defaultInstance;//默認(rèn)的實(shí)例private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();//默認(rèn)的Builder
...
}
這段代碼是通過(guò)構(gòu)建者模式構(gòu)建一個(gè)默認(rèn)的EventBus實(shí)例,并安裝在默認(rèn)的EventBus上,只能在第一次使用默認(rèn)的EventBus之前調(diào)用,否則會(huì)拋出EventBusException異常。在synchronized塊內(nèi),首先判斷是否已經(jīng)存在默認(rèn)的EventBus實(shí)例,如果存在則拋出異常,否則構(gòu)建EventBus實(shí)例并將其設(shè)置為默認(rèn)實(shí)例,最后返回默認(rèn)實(shí)例。
接下來(lái)看另一個(gè)方法:
MainThreadSupport getMainThreadSupport() {if (mainThreadSupport != null) {return mainThreadSupport;} else if (AndroidComponents.areAvailable()) {return AndroidComponents.get().defaultMainThreadSupport;} else {return null;}}
這個(gè)方法是用來(lái)獲取主線程支持的,那什么是主線程支持呢,我們點(diǎn)進(jìn)去看看:
public interface MainThreadSupport {boolean isMainThread();Poster createPoster(EventBus eventBus);
}
這里我們可以看到,MainThreadSupport是一個(gè)接口,定義了isMainThread和createPoster方法,根據(jù)這兩個(gè)方法名,我們可以推測(cè)這個(gè)接口的作用就是用來(lái)支持消息在主線程里傳遞的,isMainThread用于判斷當(dāng)前線程是否是主線程,createPoster用于創(chuàng)建一個(gè)poster以在主線程里傳遞消息。
當(dāng)然這只是一個(gè)接口,我們要看具體的實(shí)現(xiàn)還是得折回去看getMainThreadSupport方法,我們看這一段:
else if (AndroidComponents.areAvailable()) {return AndroidComponents.get().defaultMainThreadSupport;}
這段代碼的作用就是判斷當(dāng)前平臺(tái)是不是Android平臺(tái),如果是Android平臺(tái),就調(diào)用Android.get().defaultMainThreadSupport獲取默認(rèn)的主線程支持類。那這里又得涉及到AndroidComponents類了,我們開(kāi)一個(gè)子標(biāo)題討論這個(gè)類。
AndroidComponents類
AndroidComponents類是一個(gè)抽象類,內(nèi)容比較短,這里先貼出它的全部源碼:
public abstract class AndroidComponents {private static final AndroidComponents implementation;static {implementation = AndroidDependenciesDetector.isAndroidSDKAvailable()? AndroidDependenciesDetector.instantiateAndroidComponents(): null;}public static boolean areAvailable() {return implementation != null;}public static AndroidComponents get() {return implementation;}public final Logger logger;public final MainThreadSupport defaultMainThreadSupport;public AndroidComponents(Logger logger, MainThreadSupport defaultMainThreadSupport) {this.logger = logger;this.defaultMainThreadSupport = defaultMainThreadSupport;}
}
這里將其聲明為抽象的意義應(yīng)該就是防止其實(shí)例化,因?yàn)锳ndroidComponents的具體實(shí)現(xiàn)是由AndroidDependenciesDetector來(lái)決定的,這個(gè)類會(huì)檢測(cè)當(dāng)前是否在Android環(huán)境下,如果是,則返回實(shí)際的實(shí)現(xiàn),否則返回null。而AndroidDependenciesDetector這個(gè)類是不允許被外部直接實(shí)例化的,因此AndroidComponents也應(yīng)該遵循同樣的規(guī)則。
真正重要的代碼部分是在靜態(tài)代碼塊內(nèi),代碼塊在初始化時(shí)會(huì)會(huì)檢測(cè)當(dāng)前是否在Android環(huán)境下,如果是,則返回實(shí)際的實(shí)現(xiàn),否則返回null。
接下來(lái)我們?cè)偕钊胂蛳掳?#xff0c;找到MainThreadSupport的具體實(shí)現(xiàn)類,再往下看我們可以發(fā)現(xiàn)這個(gè)類是由反射機(jī)制實(shí)現(xiàn)的,不過(guò)我們先不關(guān)心這個(gè),直接找到具體實(shí)現(xiàn)類DefaultAndroidMainThreadSupport,直接看它的源碼:
public class DefaultAndroidMainThreadSupport implements MainThreadSupport {@Overridepublic boolean isMainThread() {return Looper.getMainLooper() == Looper.myLooper();}@Overridepublic Poster createPoster(EventBus eventBus) {return new HandlerPoster(eventBus, Looper.getMainLooper(), 10);}
}
可以發(fā)現(xiàn),這個(gè)類實(shí)際上十分簡(jiǎn)單,通過(guò)Looper判斷是否是主線程,接下來(lái)看他的Poster類。
具體的Poster類:
public class HandlerPoster extends Handler implements Poster {private final PendingPostQueue queue;private final int maxMillisInsideHandleMessage;private final EventBus eventBus;private boolean handlerActive;public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {super(looper);this.eventBus = eventBus;this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;queue = new PendingPostQueue();}public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);synchronized (this) {queue.enqueue(pendingPost);if (!handlerActive) {handlerActive = true;if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");}}}}
...
我們先來(lái)看這前半部分,在構(gòu)造方法中新出現(xiàn)了一個(gè)queue,是一個(gè)post隊(duì)列,是用于存儲(chǔ)post請(qǐng)求的隊(duì)列,具體的PendingPostQueue類我們就不在具體看了,只要知道其內(nèi)部使用鏈表的結(jié)構(gòu)連接的,而PendingPostQueue類里記錄了這個(gè)post隊(duì)列的頭部和尾部。
enqueue方法中將發(fā)送的Post請(qǐng)求進(jìn)行入隊(duì),如果handler不處于活躍狀態(tài),就將其置位活躍狀態(tài),然后獲取一個(gè)Message對(duì)象并將其發(fā)送給MessageQueue中,接下來(lái)我們看它是如何處理post請(qǐng)求的:
public void handleMessage(Message msg) {boolean rescheduled = false;try {long started = SystemClock.uptimeMillis();while (true) {PendingPost pendingPost = queue.poll();if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll();if (pendingPost == null) {handlerActive = false;return;}}}eventBus.invokeSubscriber(pendingPost);long timeInMethod = SystemClock.uptimeMillis() - started;if (timeInMethod >= maxMillisInsideHandleMessage) {if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");}rescheduled = true;return;}}} finally {handlerActive = rescheduled;}}
由于HandlerPoster本身就是一個(gè)Handler,所以上一步發(fā)送的Message實(shí)際上就是由它自己處理的,就是在這個(gè)方法中處理,Post請(qǐng)求具體是在這里執(zhí)行的:
eventBus.invokeSubscriber(pendingPost);
這個(gè)方法會(huì)將Event事件推送到具體的訂閱者類中去執(zhí)行,接下來(lái)我們重新回到EventBus類中看看這個(gè)過(guò)程是怎樣執(zhí)行的。
Subscription類
在正式介紹EventBus是怎樣執(zhí)行推送之前,我們需要先了解兩個(gè)前置類的信息,首先我們先來(lái)看Subscriptiont類:
final class Subscription {final Object subscriber;final SubscriberMethod subscriberMethod;/*** Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery* {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.*/volatile boolean active;Subscription(Object subscriber, SubscriberMethod subscriberMethod) {this.subscriber = subscriber;this.subscriberMethod = subscriberMethod;active = true;}@Overridepublic boolean equals(Object other) {if (other instanceof Subscription) {Subscription otherSubscription = (Subscription) other;return subscriber == otherSubscription.subscriber&& subscriberMethod.equals(otherSubscription.subscriberMethod);} else {return false;}}@Overridepublic int hashCode() {return subscriber.hashCode() + subscriberMethod.methodString.hashCode();}
}
這個(gè)類里面的具體的SubscriberMethod和Object subscriber就不詳細(xì)介紹了,根據(jù)名字我們也可以知道,final Object subscriber 存儲(chǔ)的是訂閱事件的對(duì)象,而 final SubscriberMethod subscriberMethod 則包含了訂閱事件的方法的信息,例如該方法的名稱、參數(shù)類型等等。通過(guò)這兩個(gè)屬性,EventBus 可以將事件正確地分發(fā)給訂閱者的對(duì)應(yīng)方法。
PendingPost類
然后我們?cè)賮?lái)看PendingPost類
final class PendingPost {private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();Object event;Subscription subscription;PendingPost next;...}
我們看到這個(gè)類里有一個(gè)靜態(tài)的變量:
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
就是說(shuō),所有的PendingPost類的實(shí)例都會(huì)共享這一個(gè)池子,這個(gè)池子主要是用來(lái)優(yōu)化GC性能的,用于重用已創(chuàng)建的對(duì)象,避免了大量的對(duì)象創(chuàng)建和銷(xiāo)毀,減小了內(nèi)存開(kāi)銷(xiāo)。
1.構(gòu)造方法
private PendingPost(Object event, Subscription subscription) {this.event = event;this.subscription = subscription;}
其創(chuàng)建方法主要是傳入發(fā)送的事件和訂閱信息相關(guān)的Subscription,然后將其包裝成一個(gè)PendingPost對(duì)象。
static PendingPost obtainPendingPost(Subscription subscription, Object event) {synchronized (pendingPostPool) {int size = pendingPostPool.size();if (size > 0) {PendingPost pendingPost = pendingPostPool.remove(size - 1);pendingPost.event = event;pendingPost.subscription = subscription;pendingPost.next = null;return pendingPost;}}return new PendingPost(event, subscription);}
obtainPendingPost方法是根據(jù)pendingPostPool這個(gè)共享池來(lái)創(chuàng)建PendingPost對(duì)象,如果共享池里還有可用對(duì)象,就復(fù)用這個(gè)對(duì)象來(lái)創(chuàng)建新的PendingPost對(duì)象,否則就直接創(chuàng)建一個(gè)新的對(duì)象。
static void releasePendingPost(PendingPost pendingPost) {pendingPost.event = null;pendingPost.subscription = null;pendingPost.next = null;synchronized (pendingPostPool) {// Don't let the pool grow indefinitelyif (pendingPostPool.size() < 10000) {pendingPostPool.add(pendingPost);}}}
最后是release方法,這個(gè)方法是用來(lái)將PendingPost對(duì)象釋放會(huì)共享池中的,不過(guò)這個(gè)共享池的大小也是有限制的,最大不能超過(guò)10000。
EventBus類中如何推送事件
上面說(shuō)到,具體是通過(guò)invokeSubscriber這個(gè)方法來(lái)推送事件的,事不宜遲,我們馬上來(lái)看看這個(gè)方法的內(nèi)容:
void invokeSubscriber(PendingPost pendingPost) {Object event = pendingPost.event;Subscription subscription = pendingPost.subscription;PendingPost.releasePendingPost(pendingPost);if (subscription.active) {invokeSubscriber(subscription, event);}}
可以發(fā)現(xiàn),具體又調(diào)用了invokeSubscriber(subscription, event);方法,我們緊接著看invokeSubscriber(subscription, event)方法:
void invokeSubscriber(Subscription subscription, Object event) {try {subscription.subscriberMethod.method.invoke(subscription.subscriber, event);} catch (InvocationTargetException e) {handleSubscriberException(subscription, event, e.getCause());} catch (IllegalAccessException e) {throw new IllegalStateException("Unexpected exception", e);}}
很顯然,這里是通過(guò)了反射機(jī)制,subscription.subscriberMethod.method.invoke(subscription.subscriber, event)表示將event事件發(fā)送到subscriber中并觸發(fā)subscriber的處理方法。method是SubscriberMethod中的一個(gè)字段,表示訂閱方法本身,通過(guò)invoke方法來(lái)調(diào)用這個(gè)方法,并將subscriber作為方法的調(diào)用者,event作為參數(shù)傳遞進(jìn)去。到這里,我們就看完Event事件是如何傳遞給訂閱者的了。
Register(訂閱事件總線)
接下來(lái)我們看第二個(gè)內(nèi)容,訂閱事件總線時(shí)做了什么:
public void register(Object subscriber) {...Class<?> subscriberClass = subscriber.getClass();List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);synchronized (this) {for (SubscriberMethod subscriberMethod : subscriberMethods) {subscribe(subscriber, subscriberMethod);}}}
核心方法在于subscribe方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {Class<?> eventType = subscriberMethod.eventType;Subscription newSubscription = new Subscription(subscriber, subscriberMethod);CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);//獲取同一個(gè)類型的消息事件下的所有Subscrption對(duì)象if (subscriptions == null) {subscriptions = new CopyOnWriteArrayList<>();subscriptionsByEventType.put(eventType, subscriptions);//如果原來(lái)還沒(méi)有注冊(cè)這一類型的消息事件,則新創(chuàng)建一個(gè)List并加入} else {if (subscriptions.contains(newSubscription)) {//如果當(dāng)前注冊(cè)的對(duì)象已經(jīng)注冊(cè)過(guò)了,就拋出異常throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "+ eventType);}}int size = subscriptions.size();for (int i = 0; i <= size; i++) {if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {subscriptions.add(i, newSubscription);break;}}//這一段是用來(lái)將新的subscription對(duì)象插入到同一類型的消息事件隊(duì)列的合適位置中--將訂閱者與注冊(cè)方法關(guān)聯(lián)起來(lái)List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);if (subscribedEvents == null) {subscribedEvents = new ArrayList<>();typesBySubscriber.put(subscriber, subscribedEvents);}subscribedEvents.add(eventType);//這一段是用來(lái)將訂閱者與消息事件類型關(guān)聯(lián)起來(lái)if (subscriberMethod.sticky) {//粘性事件的處理方法if (eventInheritance) {// Existing sticky events of all subclasses of eventType have to be considered.// Note: Iterating over all events may be inefficient with lots of sticky events,// thus data structure should be changed to allow a more efficient lookup// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();for (Map.Entry<Class<?>, Object> entry : entries) {Class<?> candidateEventType = entry.getKey();if (eventType.isAssignableFrom(candidateEventType)) {Object stickyEvent = entry.getValue();checkPostStickyEventToSubscription(newSubscription, stickyEvent);}}} else {Object stickyEvent = stickyEvents.get(eventType);checkPostStickyEventToSubscription(newSubscription, stickyEvent);}}}
具體來(lái)說(shuō),subscribe方法做了三件事情:
1. 維護(hù)了當(dāng)前EventBus中的subscriptionsByEventType表;
2. 維護(hù)了當(dāng)前EventBus中的 typesBySubscriber表;
3. 處理了粘性事件;
詳細(xì)的說(shuō)明我放在上面的注釋里面了,我們先說(shuō)維護(hù)的兩個(gè)表,第一個(gè)表是代表消息事件類型和Subscription關(guān)聯(lián)的一個(gè)表,
第二個(gè)表是表示訂閱者Subscriber和消息事件類型關(guān)聯(lián)的兩個(gè)表,這兩個(gè)表可以在發(fā)布事件時(shí)可以快速地找到對(duì)應(yīng)的訂閱者并調(diào)用其處理方法。
然后是粘性事件的處理,這里有一個(gè)標(biāo)志位eventInheritance,這個(gè)標(biāo)志位我們?cè)谥疤岬竭^(guò),我們可以回到EventBuilder的內(nèi)容查看,主要是涉及到繼承問(wèn)題的,除去繼承的問(wèn)題,粘性事件主要就是調(diào)用:
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
直接點(diǎn)開(kāi)這個(gè)方法會(huì)發(fā)現(xiàn)其調(diào)用了:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {if (stickyEvent != null) {// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)// --> Strange corner case, which we don't take care of here.postToSubscription(newSubscription, stickyEvent, isMainThread());}}private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {switch (subscription.subscriberMethod.threadMode) {case POSTING:invokeSubscriber(subscription, event);break;case MAIN:if (isMainThread) {invokeSubscriber(subscription, event);} else {mainThreadPoster.enqueue(subscription, event);}break;case MAIN_ORDERED:if (mainThreadPoster != null) {mainThreadPoster.enqueue(subscription, event);} else {// temporary: technically not correct as poster not decoupled from subscriberinvokeSubscriber(subscription, event);}break;case BACKGROUND:if (isMainThread) {backgroundPoster.enqueue(subscription, event);} else {invokeSubscriber(subscription, event);}break;case ASYNC:asyncPoster.enqueue(subscription, event);break;default:throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);}}
實(shí)際上是調(diào)用了postToSubscription方法,該方法根據(jù)回調(diào)方法指定的線程模型來(lái)傳遞粘性事件,至于上面提到的三種poster,我們之后再解析。
Post方法
訂閱者注冊(cè)完畢后,我們就要用Post方法進(jìn)行消息的發(fā)送了,Post方法又可以分為post和postSticky,接下來(lái)我們先看post方法:
public void post(Object event) {PostingThreadState postingState = currentPostingThreadState.get();List<Object> eventQueue = postingState.eventQueue;//獲取發(fā)送線程的eventQueueeventQueue.add(event);//將當(dāng)前要發(fā)送時(shí)間添加進(jìn)eventQueue中if (!postingState.isPosting) {//如果當(dāng)前線程處于空閑狀態(tài)postingState.isMainThread = isMainThread();postingState.isPosting = true;if (postingState.canceled) {throw new EventBusException("Internal error. Abort state was not reset");}try {while (!eventQueue.isEmpty()) {//eventQueue不為空postSingleEvent(eventQueue.remove(0), postingState);//通過(guò)postSingleEvent方法依次將eventQueue中的事件發(fā)送}} finally {postingState.isPosting = false;postingState.isMainThread = false;}}}
這里又冒出來(lái)了一個(gè)PostingThreadState,我們看它是從哪里來(lái)的:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {@Overrideprotected PostingThreadState initialValue() {return new PostingThreadState();}};
...final static class PostingThreadState {final List<Object> eventQueue = new ArrayList<>();boolean isPosting;boolean isMainThread;Subscription subscription;Object event;boolean canceled;}
這里涉及到一個(gè)ThreadLocal類,我們先來(lái)看看:
在Java中,ThreadLocal是一個(gè)非常有用的類,它允許您在單個(gè)線程中存儲(chǔ)和檢索數(shù)據(jù),而不會(huì)與其他線程共享。ThreadLocal通過(guò)創(chuàng)建一個(gè)副本來(lái)解決線程安全問(wèn)題,每個(gè)線程都有自己的副本,所以每個(gè)線程都可以獨(dú)立地改變自己的副本,而不會(huì)影響其他線程。
ThreadLocal提供了三個(gè)主要方法:
get() - 返回當(dāng)前線程的變量副本(如果有);如果沒(méi)有,則返回默認(rèn)值。
set(T value) - 設(shè)置當(dāng)前線程的變量副本。
remove() - 移除當(dāng)前線程的變量副本。
ThreadLocal通常用于在單個(gè)線程中存儲(chǔ)和檢索上下文信息,例如數(shù)據(jù)庫(kù)連接、事務(wù)對(duì)象等。使用ThreadLocal可以避免在多線程環(huán)境下對(duì)這些資源進(jìn)行同步操作,從而提高性能。
所以在這里的語(yǔ)境下,PostingThreadState就是每個(gè)發(fā)送post請(qǐng)求線程中的局部變量,且各個(gè)發(fā)送線程之間是不會(huì)共享這個(gè)PostingThreadState變量的,每個(gè)發(fā)送post請(qǐng)求的線程都會(huì)有自己私有的PostingThreadState變量。
post方法做的大致如下:
-
先獲取到當(dāng)前發(fā)送線程的PostingThreadState變量
-
接著獲取當(dāng)前線程要發(fā)送的事件隊(duì)列eventQueue,然后將當(dāng)前要發(fā)送的事件添加進(jìn)入eventQueue隊(duì)列中
-
如果當(dāng)前發(fā)送線程處于空閑狀態(tài),就將eventQueue中的事件通過(guò)postSingleEvent方法依次發(fā)送
既然說(shuō)調(diào)用了postSingleEvent方法,接下來(lái)我們理所當(dāng)然地查看postSingleEvent方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {Class<?> eventClass = event.getClass();boolean subscriptionFound = false;if (eventInheritance) {List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);int countTypes = eventTypes.size();for (int h = 0; h < countTypes; h++) {Class<?> clazz = eventTypes.get(h);subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);}} else {subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);}if (!subscriptionFound) {if (logNoSubscriberMessages) {logger.log(Level.FINE, "No subscribers registered for event " + eventClass);}if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&eventClass != SubscriberExceptionEvent.class) {post(new NoSubscriberEvent(this, event));}}}
這里又涉及到了eventInheritance標(biāo)志位,這里我們不管這個(gè)標(biāo)志位,查看這個(gè)方法的核心在于:
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
我們接著查看這個(gè)postSingleEventForEventType方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {CopyOnWriteArrayList<Subscription> subscriptions;synchronized (this) {subscriptions = subscriptionsByEventType.get(eventClass);}if (subscriptions != null && !subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) {postingState.event = event;postingState.subscription = subscription;boolean aborted;try {postToSubscription(subscription, event, postingState.isMainThread);aborted = postingState.canceled;} finally {postingState.event = null;postingState.subscription = null;postingState.canceled = false;}if (aborted) {break;}}return true;}return false;}
這個(gè)方法接收了發(fā)送的事件event,發(fā)送線程的局部變量postingState,還有事件類型eventClass。接下來(lái)它獲取了對(duì)應(yīng)事件類型下的subscriptions隊(duì)列,對(duì)其中的每個(gè)subscription對(duì)象調(diào)用了postToSubscription方法,如果一切成功,就返回true,否則返回false。至于postToSubscription這個(gè)方法我們?cè)谥耙蔡岬竭^(guò),就是:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread)
上次提到它是用來(lái)傳遞粘性事件,當(dāng)然它也可以用來(lái)傳遞普通事件,這個(gè)方法將根據(jù)指定的線程模型來(lái)選擇如何傳遞事件給訂閱者回調(diào)處理。
至于粘性事件的POST:
public void postSticky(Object event) {synchronized (stickyEvents) {stickyEvents.put(event.getClass(), event);}// Should be posted after it is putted, in case the subscriber wants to remove immediatelypost(event);}
不同點(diǎn)就在于在執(zhí)行post方法之前還將其加入到了粘性事件隊(duì)列stickyEvents,以便在其他訂閱者完成注冊(cè)時(shí)能接收到粘性事件。
三種Poster
之前提到過(guò)EventBus里有三種Poster,在MainThreadSuppot中我們已經(jīng)認(rèn)識(shí)了HandlerPoster,接下來(lái)我們看看其他兩種Poster,這里我們?yōu)榱朔奖?#xff0c;貼出postToSubscription的源碼方便理解:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {switch (subscription.subscriberMethod.threadMode) {case POSTING:invokeSubscriber(subscription, event);break;case MAIN:if (isMainThread) {invokeSubscriber(subscription, event);} else {mainThreadPoster.enqueue(subscription, event);}break;case MAIN_ORDERED:if (mainThreadPoster != null) {mainThreadPoster.enqueue(subscription, event);} else {// temporary: technically not correct as poster not decoupled from subscriberinvokeSubscriber(subscription, event);}break;case BACKGROUND:if (isMainThread) {backgroundPoster.enqueue(subscription, event);} else {invokeSubscriber(subscription, event);}break;case ASYNC:asyncPoster.enqueue(subscription, event);break;default:throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);}}
很顯然,根據(jù)不同的線程模型,該方法會(huì)采取的不同的策略和poster,mainThreadPoster就是我們一開(kāi)始解析過(guò)的HandlerPoster,接下來(lái)先看backgroundPoster:
private final PendingPostQueue queue;private final EventBus eventBus;private volatile boolean executorRunning;BackgroundPoster(EventBus eventBus) {this.eventBus = eventBus;queue = new PendingPostQueue();}
BackgroundPoster內(nèi)部會(huì)有自己的一條PendingPostQueue,接下來(lái)我們看入隊(duì)方法:
public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);synchronized (this) {queue.enqueue(pendingPost);if (!executorRunning) {executorRunning = true;eventBus.getExecutorService().execute(this);}}}
這里BackgroundPoster用了同步代碼塊,說(shuō)明雖然Background模型是在子線程中運(yùn)行的,但是其會(huì)遵循順序依次執(zhí)行,同一個(gè)發(fā)送隊(duì)列發(fā)送的事件觸發(fā)的多個(gè)Background線程模型的回調(diào)方法不能同時(shí)運(yùn)行。
至于任務(wù)該如何執(zhí)行,eventBus中會(huì)有一個(gè)線程池,這個(gè)線程池類型是根據(jù)builder決定的,默認(rèn)情況下,這個(gè)線程池的類型將會(huì)是CachedThreadPool。接下來(lái)再看它的run方法:
public void run() {try {try {while (true) {PendingPost pendingPost = queue.poll(1000);if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll();if (pendingPost == null) {executorRunning = false;return;}}}eventBus.invokeSubscriber(pendingPost);}} catch (InterruptedException e) {eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);}} finally {executorRunning = false;}}
run方法很簡(jiǎn)單,就是不斷將queue中的PendingPost取出放入線程池中執(zhí)行eventBus的invokeSubscriber方法:
接下來(lái)我們看AsncPoster:
AsyncPoster(EventBus eventBus) {this.eventBus = eventBus;queue = new PendingPostQueue();}public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);queue.enqueue(pendingPost);eventBus.getExecutorService().execute(this);}@Overridepublic void run() {PendingPost pendingPost = queue.poll();if(pendingPost == null) {throw new IllegalStateException("No pending post available");}eventBus.invokeSubscriber(pendingPost);}
AsyncPoster和BackgroundPoster其實(shí)有點(diǎn)像,不同之處就在于AsyncPoster的post請(qǐng)求不必順序執(zhí)行,多個(gè)回調(diào)可以同時(shí)進(jìn)行。
總結(jié)
到此為止,我們就已經(jīng)解析了EventBus源碼的核心基礎(chǔ)部分了,讓我們總結(jié)一下流程圖:
這里的流程圖是簡(jiǎn)化的版本,這里具體實(shí)現(xiàn)的細(xì)節(jié)被隱藏起來(lái)了,主要是幫助我們理解機(jī)制流程。