修改公司網(wǎng)站東莞推廣公司
文章目錄
- 1、Kafka簡介
- 1.1、消息隊(duì)列
- 1.1.1、為什么要有消息隊(duì)列?
- 1.1.2、消息隊(duì)列
- 1.1.3、消息隊(duì)列的分類
- 1.1.4、p2p 和 發(fā)布訂閱MQ的比較
- 1.1.5、消息系統(tǒng)的使用場景
- 1.1.6、常見的消息系統(tǒng)
- 1.2、Kafka簡介
- 1.2.1、簡介
- 1.2.2、設(shè)計(jì)目標(biāo)
- 1.2.3、kafka核心的概念
- 2、Kafka的分布式安裝
- 2.1 jdk & zookeeper安裝
- 1、jdk 安裝配置
- 2、zookeeper安裝
- 2.2、Kafka安裝步驟
- 修改Kafka配置
- 3、Kafka集群
- 3.1、克隆機(jī)配置修改
- broker.id
- listeners
- zookeeper.connect
- 3.2、kafka集群啟動(dòng)
- 1、zookeeper啟動(dòng)
- 2、kafka啟動(dòng)
- 3.3、kafka操作命令
- 1、查看主題
- 2、創(chuàng)建主題:
- 2.1、創(chuàng)建一個(gè)名為“test”的topic,它有一個(gè)分區(qū)和一個(gè)副本
- 2.2、測試再次創(chuàng)建一個(gè)主題,設(shè)置分區(qū)為3,(最好跟主機(jī)數(shù)量一致):
- 2.3、創(chuàng)建主題cities,復(fù)制因子為2,分區(qū)為3
- 3、刪除主題
- 4、啟動(dòng)生產(chǎn)者端/消費(fèi)者端
- 4、zokeeper查看kafka日志
- 段segment
- 查看segment
1、Kafka簡介
1.1、消息隊(duì)列
1.1.1、為什么要有消息隊(duì)列?
1.1.2、消息隊(duì)列
- 消息 Message
網(wǎng)絡(luò)中的兩臺(tái)計(jì)算機(jī)或者兩個(gè)通訊設(shè)備之間傳遞的數(shù)據(jù)。例如說:文本、音樂、視頻等內(nèi)容。 - 隊(duì)列 Queue
一種特殊的線性表(數(shù)據(jù)元素首尾相接),特殊之處在于只允許在首部刪除元素和在尾部追加元素(FIFO)。
入隊(duì)、出隊(duì)。 - 消息隊(duì)列 MQ
消息+隊(duì)列,保存消息的隊(duì)列。消息的傳輸過程中的容器;主要提供生產(chǎn)、消費(fèi)接口供外部調(diào)用做數(shù)據(jù)的存
儲(chǔ)和獲取。
1.1.3、消息隊(duì)列的分類
MQ主要分為兩類:點(diǎn)對(duì)點(diǎn)p2p、發(fā)布訂閱(Pub / Sub)
-
Peer-to-Peer 一般基于Pull或者Polling接收數(shù)據(jù) 發(fā)送到隊(duì)列中的消息被一個(gè)而且僅僅一個(gè)接收者所接受,即
使有多個(gè)接收者在同一個(gè)隊(duì)列中偵聽同一消息 即支持異步“即發(fā)即收”的消息傳遞方式,也支持同步請求/應(yīng)答
傳送方式
-
發(fā)布訂閱 發(fā)布到同一個(gè)主題的消息,可被多個(gè)訂閱者所接收 發(fā)布/訂閱即可基于Push消費(fèi)數(shù)據(jù),也可基于Pull
或者Polling消費(fèi)數(shù)據(jù) 解耦能力比P2P模型更強(qiáng)
1.1.4、p2p 和 發(fā)布訂閱MQ的比較
- 共同點(diǎn)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消費(fèi)者從queue中讀取并且消費(fèi)消息。 - 不同點(diǎn)
p2p模型包括:消息隊(duì)列(Queue)、發(fā)送者(Sender)、接收者(Receiver)
一個(gè)生產(chǎn)者生產(chǎn)的消息只有一個(gè)消費(fèi)者(Consumer)(即一旦被消費(fèi),消息就不在消息隊(duì)列中)。比如說打電
話。
pub/Sub包含:消息隊(duì)列(Queue)、主題(Topic)、發(fā)布者(Publisher)、訂閱者(Subscriber)
每個(gè)消息可以有多個(gè)消費(fèi)者,彼此互不影響。比如我發(fā)布一個(gè)微博:關(guān)注我的人都能夠看到。
1.1.5、消息系統(tǒng)的使用場景
- 解耦 各系統(tǒng)之間通過消息系統(tǒng)這個(gè)統(tǒng)一的接口交換數(shù)據(jù),無須了解彼此的存在
- 冗余 部分消息系統(tǒng)具有消息持久化能力,可規(guī)避消息處理前丟失的風(fēng)險(xiǎn)
- 擴(kuò)展 消息系統(tǒng)是統(tǒng)一的數(shù)據(jù)接口,各系統(tǒng)可獨(dú)立擴(kuò)展
- 峰值處理能力 消息系統(tǒng)可頂住峰值流量,業(yè)務(wù)系統(tǒng)可根據(jù)處理能力從消息系統(tǒng)中獲取并處理對(duì)應(yīng)量的請求
- 可恢復(fù)性 系統(tǒng)中部分鍵失效并不會(huì)影響整個(gè)系統(tǒng),它恢復(fù)會(huì)仍然可從消息系統(tǒng)中獲取并處理數(shù)據(jù)
- 異步通信 在不需要立即處理請求的場景下,可以將請求放入消息系統(tǒng),合適的時(shí)候再處理
1.1.6、常見的消息系統(tǒng)
- RabbitMQ Erlang編寫,支持多協(xié)議AMQP,XMPP,SMTP,STOMP。支持負(fù)載均衡、數(shù)據(jù)持久化。同時(shí)支
持Peer-to-Peer和發(fā)布/訂閱模式。 - Redis 基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫,同時(shí)支持MQ功能,可做輕量級(jí)隊(duì)列服務(wù)使用。就入隊(duì)操作而言,
Redis對(duì)短消息(小于10kb)的性能比RabbitMQ好,長消息性能比RabbitMQ差。 - ZeroMQ 輕量級(jí),不需要單獨(dú)的消息服務(wù)器或中間件,應(yīng)用程序本身扮演該角色,Peer-to-Peer。它實(shí)質(zhì)上是
一個(gè)庫,需要開發(fā)人員自己組合多種技術(shù),使用復(fù)雜度高。 - ActiveMQ JMS實(shí)現(xiàn),Peer-to-Peer,支持持久化、XA(分布式)事務(wù)
- Kafka/Jafka 高性能跨語言的分布式發(fā)布/訂閱消息系統(tǒng),數(shù)據(jù)持久化,全分布式,同時(shí)支持在線和離線處理
- MetaQ/RocketMQ 純Java實(shí)現(xiàn),發(fā)布/訂閱消息系統(tǒng),支持本地事務(wù)和XA分布式事務(wù)
1.2、Kafka簡介
1.2.1、簡介
Kafka是分布式的發(fā)布—訂閱消息系統(tǒng)。它最初由LinkedIn(領(lǐng)英)公司發(fā)布,使用Scala語言編寫,與2010年12月份
開源,成為Apache的頂級(jí)項(xiàng)目。Kafka是一個(gè)高吞吐量的、持久性的、分布式發(fā)布訂閱消息系統(tǒng)。它主要用于處理
活躍live的數(shù)據(jù)(登錄、瀏覽、點(diǎn)擊、分享、喜歡等用戶行為產(chǎn)生的數(shù)據(jù))。
Kafka三大特點(diǎn):
- 高吞吐量
可以滿足每秒百萬級(jí)別消息的生產(chǎn)和消費(fèi)——生產(chǎn)消費(fèi)。 - 持久性
有一套完善的消息存儲(chǔ)機(jī)制,確保數(shù)據(jù)的高效安全的持久化——中間存儲(chǔ)。 - 分布式
基于分布式的擴(kuò)展和容錯(cuò)機(jī)制;Kafka的數(shù)據(jù)都會(huì)復(fù)制到幾臺(tái)服務(wù)器上。當(dāng)某一臺(tái)故障失效時(shí),生產(chǎn)者和消
費(fèi)者轉(zhuǎn)而使用其它的機(jī)器——整體
健壯性。
1.2.2、設(shè)計(jì)目標(biāo)
- 高吞吐率 在廉價(jià)的商用機(jī)器上單機(jī)可支持每秒100萬條消息的讀寫
- 消息持久化 所有消息均被持久化到磁盤,無消息丟失,支持消息重放
- 完全分布式 Producer,Broker,Consumer均支持水平擴(kuò)展
- 同時(shí)適應(yīng)在線流處理和離線批處理
1.2.3、kafka核心的概念
一個(gè)MQ需要哪些部分?生產(chǎn)、消費(fèi)、消息類別、存儲(chǔ)等等。 對(duì)于kafka而言,kafka服務(wù)就像是一個(gè)大的水池。不
斷的生產(chǎn)、存儲(chǔ)、消費(fèi)著各種類別的消息。那么kafka由何組成呢?
Kafka服務(wù):
- Topic:主題,Kafka處理的消息的不同分類。
- Broker:消息服務(wù)器代理,Kafka集群中的一個(gè)kafka服務(wù)節(jié)點(diǎn)稱為一個(gè)broker,主要存儲(chǔ)消息數(shù)據(jù)。存在硬
盤中。每個(gè)topic都是有分區(qū)的。- Partition:Topic物理上的分組,一個(gè)topic在broker中被分為1個(gè)或者多個(gè)partition,分區(qū)在創(chuàng)建topic的時(shí)候
指定。- Message:消息,是通信的基本單位,每個(gè)消息都屬于一個(gè)partition
Kafka服務(wù)相關(guān)
- Producer:消息和數(shù)據(jù)的生產(chǎn)者,向Kafka的一個(gè)topic發(fā)布消息。
- Consumer:消息和數(shù)據(jù)的消費(fèi)者,定于topic并處理其發(fā)布的消息。
- Zookeeper:協(xié)調(diào)kafka的正常運(yùn)行。
2、Kafka的分布式安裝
下載地址:https://kafka.apache.org/downloads
中文下載官網(wǎng):https://kafka.apachecn.org/downloads.html
安裝包鏈接:https://pan.baidu.com/s/1G9F8TEfI88wPi_j2-hkK1A?pwd=e9tu
源碼包鏈接:https://pan.baidu.com/s/1LR7X3Is-JRsOOu3DdAp2aw?pwd=7249
2.1 jdk & zookeeper安裝
我們知道Kafka是由Zookeeper管理的,那么在安裝Kafka之前,先來安裝一下Zookeeper吧~
1、jdk 安裝配置
首先CentOS7中會(huì)默認(rèn)自帶jdk的,我的虛擬機(jī)里centos7默認(rèn)自帶的是open jdk 1.8.0_262_b10。
如果想要安裝指定版本的jdk,則先下載jdk安裝包。
Linux安裝jdk的詳細(xì)步驟
2、zookeeper安裝
我的kafka安裝包是3.4.0版本的,對(duì)應(yīng)的zookeeper版本是3.6.3,那么去官網(wǎng)下載好壓縮包(注意是 bin.tar.gz壓縮包):
官網(wǎng):http://archive.apache.org/dist/zookeeper/
首先,將安裝包放到Linux目錄下執(zhí)行以下命令:
$ mkdir zk
# 創(chuàng)建Zookeeper數(shù)據(jù)存儲(chǔ)路徑
$ mkdir zk/data
# 創(chuàng)建Zookeeper日志存放路徑
$ mkdir zk/logs
# 解壓安裝包
$ tar -zxvf apache-zookeeper-3.8.1-bin.tar.gz
# 配置環(huán)境變量,添加下述內(nèi)容
$ vi /etc/profile
export ZK_HOME=/home/install_package/apache-zookeeper-3.8.1-bin/bin
export PATH=$ZK_HOME/bin:$PATH
$ source /etc/profile
# 生成Zookeeper配置文件
$ cd apache-zookeeper-3.8.1-bin/conf
$ cp zoo_sample.cfg zoo.cfg # 因?yàn)閦ookeeper默認(rèn)加載的配置文件名是zoo.cfg
然后修改一下配置(數(shù)據(jù)目錄和日志目錄):
vim zoo.cfg
# 心跳間隔時(shí)間,時(shí)間單位為毫秒值
tickTime=2000
# leader與客戶端連接超時(shí)時(shí)間,設(shè)為5個(gè)心跳間隔
initLimit=10
# Leader與Follower之間的超時(shí)時(shí)間,設(shè)為2個(gè)心跳間隔
syncLimit=5
# 數(shù)據(jù)存放目錄
dataDir=/home/admin/Study/zk/data
# 日志存放目錄
dataLogDir=/home/admin/Study/zk/logs
# 客戶端通信端口
clientPort=2181
# 清理間隔,單位是小時(shí),默認(rèn)是0,表示不開啟
#autopurge.purgeInterval=1
# 這個(gè)參數(shù)和上面的參數(shù)搭配使用,這個(gè)參數(shù)指定了需要保留的文件數(shù)目,默認(rèn)是保留3個(gè)
#autopurge.snapRetainCount=5
# 單機(jī)版不配下述配置
# server.NUM=IP:port1:port2 NUM表示本機(jī)為第幾號(hào)服務(wù)器;IP為本機(jī)ip地址;
# port1為leader與follower通信端口;port2為參與競選leader的通信端口
# 多個(gè)實(shí)例的端口配置不能重復(fù),如下:
#server.0=192.168.101.136:12888:13888
#server.1=192.168.101.146:12888:13888
1、啟動(dòng)zookeeper后臺(tái)服務(wù):
zkServer.sh start
2、關(guān)閉zookeeper后臺(tái)服務(wù):
zkServer.sh stop
3、查看zookeeper后臺(tái)服務(wù)運(yùn)行狀態(tài):
zkServer.sh status
2.2、Kafka安裝步驟
1、首先,在Linux下kafka壓縮包所在的目錄下,解壓:
$ mkdir kafka
# 創(chuàng)建kafka日志存放路徑
$ mkdir kafka/logs
# 解壓安裝包
$ tar -zxvf kafka_2.12-3.4.0.tgz
# 移動(dòng)到kafka目錄下
mv kafka_2.12-3.4.0 kafka
# 配置環(huán)境變量,添加下述內(nèi)容
$ vi /etc/profile
export KAFKA_HOME=/home/admin/Study/kafka/kafka_2.12-3.4.0
export PATH=$KAFKA_HOME/bin:$PATH
$ source /etc/profile
# 修改kafka配置
$ cd kafka_2.12-3.4.0/config
$ vi server.properties
修改Kafka配置
# broker.id每個(gè)實(shí)例的值不能重復(fù)
broker.id=0
# 配置主機(jī)的ip和端口
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.57.30:9092
#advertised.listeners=PLAINTEXT://10.11.0.203:9092
# 配置日志存儲(chǔ)路徑
log.dirs=/home/admin/Study/kafka/logs
# 配置zookeeper集群
zookeeper.connect=localhost:2181
啟動(dòng)kafka,zokeeper啟動(dòng)的前提下:
bin/kafka-server-start.sh -daemon config/server.properties
判斷kafka啟動(dòng)成功:
kafka關(guān)閉:
bin/kafka-server-stop.sh -daemon config/server.properties
3、Kafka集群
準(zhǔn)備三臺(tái)虛擬機(jī)。這里將上面安裝好的虛擬機(jī)直接克隆兩份。
克隆過程很簡單,這里不再贅述。接下來看一下克隆好后需要哪些配置的修改。
3.1、克隆機(jī)配置修改
① 修改主機(jī)名
關(guān)閉全部虛擬機(jī),打開克隆好的第一臺(tái),修改主機(jī)名為kafka02:
vim /etc/hostname
② 修改網(wǎng)絡(luò)地址
vim /etc//sysconfig/network-scripts/ifcfg-ens33
③ 重啟:
reboot
另一臺(tái)同樣方法修改:
主機(jī)名:kafka03
ip地址:192.168.255.214
(題外話:修改命令行背景色和字體色)
打開命令行——> Edit ——> Preferences ——> Colors :取消勾選Use colors from system theme
④ 進(jìn)入到kafka安裝目錄下修改kafka server.properties配置文件:
vim config/server.properties
broker.id
該屬性?于唯?標(biāo)記?個(gè)Kafka的Broker,它的值是?個(gè)任意integer值。
當(dāng)Kafka以分布式集群運(yùn)?的時(shí)候,尤為重要。
最好該值跟該Broker所在的物理主機(jī)有關(guān)的,如主機(jī)名為host1.lagou.com,則broker.id=1,如果主機(jī)名為192.168.100.101,則broker.id=101等等。
listeners
?于指定當(dāng)前Broker向外發(fā)布服務(wù)的地址和端?。
與advertised.listeners配合,?于做內(nèi)外?隔離。
內(nèi)外?隔離配置:
- listener.security.protocol.map
監(jiān)聽器名稱和安全協(xié)議的映射配置。
?如,可以將內(nèi)外?隔離,即使它們都使?SSL。
- listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL
每個(gè)監(jiān)聽器的名稱只能在map中出現(xiàn)?次。
- inter.broker.listener.name
?于配置broker之間通信使?的監(jiān)聽器名稱,該名稱必須在advertised.listeners列表中。
inter.broker.listener.name=EXTERNAL
listeners
?于配置broker監(jiān)聽的URI以及監(jiān)聽器名稱列表,使?逗號(hào)隔開多個(gè)URI及監(jiān)聽器名稱。
如果監(jiān)聽器名稱代表的不是安全協(xié)議,必須配置 listener.security.protocol.map。
每個(gè)監(jiān)聽器必須使?不同的?絡(luò)端?。
- advertised.listeners
需要將該地址發(fā)布到zookeeper供客戶端使?,如果客戶端使?的地址與listeners配置不同。
可以在zookeeper的get /myKafka/brokers/ids/<broker.id>中找到。
在IaaS環(huán)境,該條?的?絡(luò)接?得與broker綁定的?絡(luò)接?不同。
如果不設(shè)置此條?,就使?listeners的配置。跟listeners不同,該條?不能使?0.0.0.0?絡(luò)端?。
advertised.listeners的地址必須是listeners中配置的或配置的?部分。
zookeeper.connect
該參數(shù)?于配置Kafka要連接的Zookeeper/集群的地址。
它的值是?個(gè)字符串,使?逗號(hào)分隔Zookeeper的多個(gè)地址。Zookeeper的單個(gè)地址是host:port形式的,可以在最后添加Kafka在Zookeeper中的根節(jié)點(diǎn)路徑。
3.2、kafka集群啟動(dòng)
1、zookeeper啟動(dòng)
zkServer.sh start
2、kafka啟動(dòng)
命令:
bin/kafka-server-start.sh -daemon config/server.properties
以下報(bào)錯(cuò)信息:
是因?yàn)閗afka目錄下logs文件夾meta.properties文件中的broker.id和server.properties中的不一致了,修改一下即可。
啟動(dòng)成功:
3.3、kafka操作命令
參考官網(wǎng)快速開始:Kafka中文文檔
1、查看主題
(新安裝kafka是沒有主題的)
bin/kafka-topics.sh --list --bootstrap-server 192.168.255.212:9092
2、創(chuàng)建主題:
2.1、創(chuàng)建一個(gè)名為“test”的topic,它有一個(gè)分區(qū)和一個(gè)副本
bin/kafka-topics.sh --create --bootstrap-server 192.168.255.212:9092 --replication-factor 1 --partitions 1 --topic test
此時(shí)查看主題:
在另外兩臺(tái)kafka主機(jī)上同樣可以看到:
此時(shí),我們查看3臺(tái)kafka的logs目錄下:
可以看到創(chuàng)建的test主題。
2.2、測試再次創(chuàng)建一個(gè)主題,設(shè)置分區(qū)為3,(最好跟主機(jī)數(shù)量一致):
bin/kafka-topics.sh --create --bootstrap-server 192.168.255.212:9092 --replication-factor 1 --partitions 3 --topic city
可以看到3臺(tái)機(jī)器logs目錄下分別都有一個(gè)主題分區(qū)。
2.3、創(chuàng)建主題cities,復(fù)制因子為2,分區(qū)為3
命令:
bin/kafka-topics.sh --create --bootstrap-server 192.168.255.212:9092 --replication-factor 2 --partitions 3 --topic cities
查看日志目錄下可以看到3個(gè)分區(qū)每個(gè)都有兩份:
3、刪除主題
bin/kafka-topics.sh --delete --bootstrap-server 192.168.255.212:9092 --topic 主題名
4、啟動(dòng)生產(chǎn)者端/消費(fèi)者端
以下命令:創(chuàng)建一個(gè)生產(chǎn)者客戶端,產(chǎn)生消息,主題為test。注意,生產(chǎn)者客戶端可以在任意主機(jī)上只要包含kafka,命令存在即可執(zhí)行,當(dāng)前在213這臺(tái)kafka上,既作為server又充當(dāng)客戶端。
bin/kafka-console-producer.sh --broker-list 192.168.255.213:9092 --topic test
回車,輸入消息:
>Beijing
>Shanghai
現(xiàn)在啟動(dòng)一個(gè)消費(fèi)者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
后面加–from-beginning表示接受所有包括之前的消息;不加表示只接受最新的消息,過往的(消費(fèi)者啟動(dòng)前)不接受。
此時(shí),生產(chǎn)一個(gè)Hello消息:
兩個(gè)消費(fèi)者的接收:
注意:消費(fèi)者不管消費(fèi)哪個(gè)ip,都可以收到消息~
4、zokeeper查看kafka日志
上述操作命令我們都依次執(zhí)行并在kafka logs目錄下查看到產(chǎn)生的主題。那么實(shí)際上我們也可以在zokeeper中查看。
命令:
進(jìn)入到zookeeper bin目錄下
zkCli.sh
ls /
ls /brokers
ls /brokers/ids
ls /brokers/topics
打開 brokerid = 0,查看數(shù)據(jù)內(nèi)容:
ls /brokers/ids/0
get /brokers/ids/0
可以查看到當(dāng)前主機(jī)的信息,以json格式存儲(chǔ)。
接下來查看主題內(nèi)容:
ls /brokers/topics/cities
ls /brokers/topics/cities/partitions
ls /brokers/topics/cities/partitions/0
ls /brokers/topics/cities/partitions/0/state
get /brokers/topics/cities/partitions/0/state
get /brokers/topics/cities
0號(hào)主機(jī)上有cities-1、cities-2分區(qū)。
同理,其他部分也可查看:
段segment
segment 是一個(gè)邏輯概念,其由兩類物理文件組成,分別為“.index”文件和“.log”文
件。“.log”文件中存放的是消息,而“.index”文件中存放的是“.log”文件中消息的索引。
進(jìn)入test主題所在主機(jī)的logs下:
表示前面有0條消息。
00000000000000001456.log 表示前面有1456條消息。
查看segment
想要查看segment中的log文件,需要通過kafka自帶的一個(gè)工具查看。
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/home/admin/Study/kafka/logs/test-0/00000000000000000000.log --print-data-log
一個(gè)用戶的一個(gè)主題會(huì)被提交到一個(gè)__consumer_offsets 分區(qū)中。使用主題字符串的
hash 值與 50 取模,結(jié)果即為分區(qū)索引。一般默認(rèn)為50個(gè)分區(qū)(0 ~ 49)。