正規(guī)的培訓(xùn)行業(yè)網(wǎng)站開發(fā)seo關(guān)鍵詞如何布局
本人是C#出身的程序員,c#很簡單就能實(shí)現(xiàn),有需要的可以加我私聊。但是就目前流行的開發(fā)語言,尤其是面向web方向應(yīng)用的,我感覺就是Nodejs最簡單了。下面介紹:
本文將會(huì)介紹在windows環(huán)境下啟動(dòng)Kafka,并通過nodejs作為客戶端,生產(chǎn)并消費(fèi)消息。
步驟一,Kafka需要java運(yùn)行時(shí),先安裝配置java環(huán)境。通過在命令行中輸入java -version確認(rèn)java是否成功安裝(可能需要查看windows的環(huán)境變量中是否有java)。
步驟二,Kafka官網(wǎng)下載最新版本的壓縮包(.tgz格式),并解壓。分別在兩個(gè)命令行里面啟動(dòng)zookeeper、kafka(解壓縮路徑下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
說明一下zookeeper和kafka的關(guān)系:zookeeper是集群的調(diào)度者,kafka才是消息隊(duì)列。 zookeeper的默認(rèn)端口:2181,kafka的默認(rèn)端口:9092
相關(guān)配置可以在config文件下的server.properties和zookeeper.properties中找到
用記事本打開就可以編輯
建立data,logs,kafka-logs目錄,用于日志,備用。
消費(fèi)者客戶端需要的group.id可以在config->consumer.properties中找到。
步驟三,使用DOS的CMD管理員命令行方式測(cè)試生產(chǎn)者生產(chǎn)、消費(fèi)者消費(fèi)。
//創(chuàng)建一個(gè)topic:test
bin\windows\kafka-topics.bat --create --zookeeper?localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
//創(chuàng)建生產(chǎn)者主題mytest
bin\windows\kafka-console-producer.bat --broker-list?localhost:9092 --topic test-nodetopic
//創(chuàng)建消費(fèi)者消費(fèi)mytest
bin\windows\kafka-console-consumer.bat?--bootstrap-server?localhost:9092 --topic test-nodetopic --from-beginning
步驟四,生產(chǎn)者發(fā)送消息
在生產(chǎn)者窗口,隨意輸入一條消息,可以在消費(fèi)者窗口看到該消息。
最后,使用nodejs訪問kafka? 首先安裝kafkajs
初始化項(xiàng)目
npm init -y
沒有安裝kafkajs的,在這里可以安裝,互聯(lián)網(wǎng)在線安裝。
npm install kafkajs
新建demo2023.js,輸入以下代碼
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
??clientId: 'my-app',
??brokers: ['localhost:9092']
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-consumer-group' })
const run = async () => {
??// Producing
??await producer.connect()
??await producer.send({
????topic: 'test-nodetopic',
????messages: [
??????{ value: ' Hello KafkaJS user,I?am producer ! ' },
????],
??})
??// Consuming
??await consumer.connect()
??await consumer.subscribe({ topic: 'test-nodetopic', fromBeginning: true })
??await consumer.run({
????eachMessage: async ({ topic, partition, message }) => {
??????console.log({
????????partition,
????????offset: message.offset,
????????value: message.value.toString(),
??????})
????},
??})
}
run().catch(console.error)
最后執(zhí)行命令
node demo2023.js