icp備案網(wǎng)站更名整站優(yōu)化多少錢
一 前言
RocketMQ 作為一個功能強大的消息隊列系統(tǒng),不僅支持基本的消息發(fā)布與訂閱,還提供了順序消息、延時消息、事務(wù)消息等高級功能,適應(yīng)了復(fù)雜的分布式系統(tǒng)需求。其高可用性架構(gòu)、多副本機制、完善的運維管理工具,以及安全控制功能,使其成為企業(yè)級應(yīng)用的首選消息中間件。
在Android應(yīng)用中,你可以使用RocketMQ的客戶端庫來發(fā)送和接收消息.
二 接入流程
1 添加依賴
在Android項目的build.gradle文件中添加RocketMQ客戶端庫的依賴。
dependencies {implementation 'org.apache.rocketmq:rocketmq-client:5.3.1'
}
2 添加權(quán)限
<uses-permission android:name="android.permission.INTERNET" />
3 接收消息
ExecutorService executor = Executors.newFixedThreadPool(20); //根據(jù)項目需要設(shè)置常用線程個數(shù)
String TAG = "MainActivity";
String GROUP = "producer";
String ADDRESS = "192.168.1.84:9876";
String KEY = "key";executor.submit(() -> {try {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);consumer.setNamesrvAddr(ADDRESS);// 訂閱 topic 下的全部 tabconsumer.subscribe(TOPIC, "*");// BROADCASTING:廣播模式,把消息發(fā)給了所有訂閱了對應(yīng)主題的消費者,不管消費者是不是同一個消費者組, CLUSTERING:集群模式(默認值),每一條消息只會被同一個消費者組中的一個實例消費consumer.setMessageModel(MessageModel.CLUSTERING);// CONSUME_FROM_LAST_OFFSET:從最新的偏移值開始消費(默認值), CONSUME_FROM_FIRST_OFFSET:從隊列最開始的偏移值開始消費, CONSUME_FROM_TIMESTAMP:從指定的時間戳處開始消費consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// yyyyMMddHHmmss: 當(dāng)選擇從指定的時間戳處開始消費時, 需要指定該時間戳// consumer.setConsumeTimestamp("");// 使用并發(fā)方式從多個MessageQueue中取數(shù)據(jù)的方式監(jiān)聽consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.println();for (MessageExt msg : msgs) {Log.e(TAG,"收到消息:"+new String(msg.getBody()));}// 返回消費成功, 還可以是 RECONSUME_LATER:稍后重新消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();TimeUnit.DAYS.sleep(1);} catch (Throwable cause) {cause.printStackTrace();}});
4 發(fā)送消息
executor.submit(() -> {try {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();// 同步傳遞消息,消息會發(fā)給集群中的一個Broker節(jié)點。Message message = new Message(TOPIC, TAG, KEY, "android hello word ss".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult result = producer.send(message);Log.e(TAG,"發(fā)送消息結(jié)果:result:"+ JSON.toJSONString(result));producer.shutdown();} catch (Exception e) {Log.e(TAG,"發(fā)送失敗:"+e.getCause().toString());e.printStackTrace();}});
三 問題
啟動項目,點擊發(fā)送消息,項目報了異常信息,如下
java.lang.NoClassDefFoundError: Failed resolution of: Ljava/lang/management/ManagementFactory; 報錯
這是因為RocketMQ客戶端庫依賴于Java標(biāo)準(zhǔn)庫中的 java.lang.management.ManagementFactory 類,而Android并不完全支持Java標(biāo)準(zhǔn)庫,尤其是 java.lang.management 包。
RocketMQ官方?jīng)]有專門為Android提供適配版本,所以可以嘗試使用這些版本,或者自己修改RocketMQ源碼,移除對 ManagementFactory 的依賴。
四 修改源碼
在github中,把rocketmq-client源碼下載到本地
https://github.com/apache/rocketmq
導(dǎo)入到本地如下
然后找到前面ManagementFactory 報錯的地方,將它移除或者用其他方法代替,經(jīng)排查在
org.apache.rocketmq.common.UtilAll 有相關(guān)的引用
該方法則是為了通過獲取jvm的進程ID,這邊我們可以把它注釋掉,然后用個固定值代替試下
static {HEX_ARRAY = "0123456789ABCDEF".toCharArray();/* Supplier<Integer> supplier = () -> {// format: "pid@hostname"String currentJVM = ManagementFactory.getRuntimeMXBean().getName();try {return Integer.parseInt(currentJVM.substring(0, currentJVM.indexOf('@')));} catch (Exception e) {return -1;}};PID = supplier.get();*/PID = 888888;}
以及在org.apache.rocketmq.common.MixAll也有ManagementFactory相關(guān)引用,這個作用是獲取當(dāng)前java虛擬機(JVM)的進程ID,可以將其注釋,然后返回固定的結(jié)果
public static long getPID() {String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();if (StringUtils.isNotEmpty(processName)) {try {return Long.parseLong(processName.split("@")[0]);} catch (Exception e) {return 0;}}return 0;}
最后還有一個地方有涉及到,在包路徑org.apache.rocketmq.store.StoreUtil,其作用是為了獲取當(dāng)前機器的總物理內(nèi)存大小(以字節(jié)為單位)
public static long getTotalPhysicalMemorySize() {long physicalTotal = 1024 * 1024 * 1024 * 24L;OperatingSystemMXBean osmxb = ManagementFactory.getOperatingSystemMXBean();if (osmxb instanceof com.sun.management.OperatingSystemMXBean) {physicalTotal = ((com.sun.management.OperatingSystemMXBean) osmxb).getTotalPhysicalMemorySize();}return physicalTotal;}
將相關(guān)的包修改后,然后將其重新打包,在maven工具下,選擇rocketmq-common,選擇Plugins下的jar組件,選中下面的jar進行打包
打包完成后,在模塊的target目錄下生成jar包
android需要用到的包如下:
implementation files('libs\\rocketmq-remoting-5.3.1.jar')
implementation files('libs\\rocketmq-client-5.3.1.jar')
implementation files('libs\\rocketmq-common-5.3.1.jar')implementation 'io.github.aliyunmq:rocketmq-logback-classic:1.0.1'
implementation 'com.google.guava:guava:31.1-jre'
implementation 'commons-validator:commons-validator:1.7'
將模塊rocketmq-remoting,rocketmq-client,rocketmq-commo三個模塊重新打包后導(dǎo)入,然后再加上下面那三個相關(guān)聯(lián)的依賴包.重新用android應(yīng)用進行收發(fā)信息,測試如下:
2025-03-04 14:51:22.272 13676-13795/? E/MainActivity: 收到消息:android hello word ss
2025-03-04 14:51:22.279 13676-13785/? E/MainActivity: 發(fā)送消息結(jié)果:result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"TopicTestLss"},"msgId":"C10005FD90380CA347BF12A326F00000","offsetMsgId":"C2000B5400002A9F000000000007A575","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true,"transactionId":"C10005FD90380CA347BF12A326F00000"}