搭建一個(gè)20人的辦公網(wǎng)絡(luò)優(yōu)化是什么梗
kafka入門
一種分布式的、基于發(fā)布/訂閱的消息系統(tǒng),scala編寫,具備快速、可擴(kuò)展、可持久化的特點(diǎn)。
基本概念
topic
主題
partition
分區(qū),一個(gè)topic下可以有多個(gè)partition,消息是分散到多個(gè)partition里存儲的,partition支持水平擴(kuò)展。
一個(gè)partition內(nèi)的消息是有序的,partition間的消息則是無序的。
每個(gè)partition會有若干副本。
broker
一個(gè)kafka節(jié)點(diǎn)
consumer
消費(fèi)者,從topic里取得消息。
每個(gè)consumer維護(hù)自己的offset。
consumer數(shù)量小于分區(qū)數(shù),會有一個(gè)消費(fèi)者處理多個(gè)分區(qū);反之,會有空閑的消費(fèi)者,造成浪費(fèi)。
producer
生產(chǎn)者,負(fù)責(zé)將消息寫入topic
特點(diǎn)
基于硬盤的消息保存,避免在producer上累積消息或者消息丟失。
同一個(gè)消息可以由多個(gè)consumer消費(fèi)。
可擴(kuò)展性:隨著數(shù)據(jù)的增加,可擴(kuò)展為數(shù)十臺,上百臺規(guī)模的大集群。擴(kuò)展可以在集群正常運(yùn)行的時(shí)候進(jìn)行,對于整個(gè)系統(tǒng)的運(yùn)作沒有影響;這也就意味著,對于很多臺broker 的集群,如果一臺broker 有故障,不影響為client 提供服務(wù).集群如果要同時(shí)容忍更多的故障的話, 可以配置更高的replication
factors。
高性能:上面的這些特性使得Apache Kafka 成為一個(gè)能夠在高負(fù)載的情況下表現(xiàn)出優(yōu)越性能的發(fā)布-訂閱消息系統(tǒng)。Producer, consumer 和broker 都能在大數(shù)據(jù)流的情況下輕松的擴(kuò)展.
kafka的版本
比如kafka_2.10-0.10.2.0
這里,2.10指的是編譯kafka的scala版本,真正的kafka版本號是后面的:0.10.2.0
配置kafka
config/server.properties文件里的配置項(xiàng)說明:
broker.id
每個(gè)kafka 的broker 都需要有一個(gè)整型的唯一標(biāo)識,這個(gè)標(biāo)識通過broker.id 來設(shè)置。默認(rèn)的情況下,這個(gè)數(shù)字是0,但是它可以設(shè)置成任何值。需要注意的
是,需要保證集群中這個(gè)id 是唯一的。這個(gè)值是可以任意填寫的,并且可以在必要的時(shí)候從broker 集群中刪除。比較好的做法是使用主機(jī)名相關(guān)的標(biāo)識來做
為id,比如,你的主機(jī)名當(dāng)中有數(shù)字相關(guān)的信息,如hosts1.example.com,host2.example.com,那么這個(gè)數(shù)字就可以用來作為broker.id 的值。
port
默認(rèn)啟動kafka 時(shí),監(jiān)聽的是TCP 的9092 端口,端口號可以被任意修改。如果端口號設(shè)置為小于1024,那么kafka 需要以root 身份啟動。但是并不推薦以root 身份啟動。
zookeeper.connect
這個(gè)參數(shù)指定了Zookeeper 所在的地址,它存儲了broker 的元信息。默認(rèn)是運(yùn)行在本機(jī)的2181 端口上,因此這個(gè)值被設(shè)置成
localhost:2181。這個(gè)值可以通過分號設(shè)置多個(gè)值,每個(gè)值的格式都是hostname:port/path
log.dirs
這個(gè)參數(shù)用于配置Kafka 保存數(shù)據(jù)的位置,Kafka 中所有的消息都會存在這個(gè)目錄下??梢酝ㄟ^逗號來指定多個(gè)目錄,kafka 會根據(jù)最少被使用的原則選擇目錄分配新的partition。注意kafka 在分配partition 的時(shí)候選擇的規(guī)則不是按照磁盤的空間大小來定的,而是分配的parition 的個(gè)數(shù)大小。
num.recovery.thread.per.data.dir
kafka 可以配置一個(gè)線程池,線程池的使用場景如下:
- 當(dāng)正常啟動的時(shí)候,開啟每個(gè)parition 的文檔塊
- 當(dāng)失敗后重啟時(shí),檢查parition 的文檔塊
- 當(dāng)關(guān)閉kafka 的時(shí)候,清除關(guān)閉文檔塊
默認(rèn),每個(gè)目錄只有一個(gè)線程。最好是設(shè)置多個(gè)線程數(shù),這樣在服務(wù)器啟動或者關(guān)閉的時(shí)候,都可以并行的進(jìn)行操作。尤其是當(dāng)非正常停機(jī)后,重啟時(shí),如果有大量的分區(qū)數(shù),那么啟動broker 將會花費(fèi)大量的時(shí)間。
【注意】這個(gè)參數(shù)是針對每個(gè)目錄的。比如,num.recovery.threads.per.data.dir 設(shè)置為8,如果有3個(gè)log.dirs 路徑,那么一共會有24 個(gè)線程。
num.partitions
這個(gè)參數(shù)用于配置新創(chuàng)建的topic 有多少個(gè)分區(qū),默認(rèn)是1 個(gè)。注意partition 的個(gè)數(shù)只可以被增加,不能被減少。這就意味著如果想要減少主題的分區(qū)數(shù),那
么就需要重新創(chuàng)建topic。
在第一章中介紹過,kafka 通過分區(qū)來對topic 進(jìn)行擴(kuò)展,因此需要使用分區(qū)的個(gè)數(shù)來做負(fù)載均衡,如果新增了broker,那么就會引發(fā)重新負(fù)載分配。這并不意味著所有的主題的分區(qū)數(shù)都需要大于broker 的數(shù)量,因?yàn)閗afka 是支持多個(gè)主題的,其他的主題會使用其余的broker。需要注意的是,如果消息的吞吐量很高,那么可以通過設(shè)置一個(gè)比較大的分區(qū)數(shù),來分?jǐn)倝毫Α?/p>
log.retention.ms
這個(gè)參數(shù)用于配置kafka 中消息保存的時(shí)間,也可以使用log.retention.hours,默認(rèn)這個(gè)參數(shù)是168 個(gè)小時(shí),即一周。另外,還支持log.retention.minutes 和log.retention.ms。這三個(gè)參數(shù)都會控制刪除過期數(shù)據(jù)的時(shí)間,推薦還是使用log.retention.ms。如果多個(gè)同時(shí)設(shè)置,那么會選擇最小的那個(gè)。
log.retention.bytes
這個(gè)參數(shù)也是用來配置消息過期的,它會應(yīng)用到每個(gè)分區(qū),比如,你有一個(gè)主題,有8 個(gè)分區(qū),并且設(shè)置了log.retention.bytes 為1G,那么這個(gè)主題總共可以保留8G 的數(shù)據(jù)。注意,所有的過期配置都會應(yīng)用到patition 粒度,而不是主題粒度。這也意味著,如果增加了主題的分區(qū)數(shù),那么主題所能保留的數(shù)據(jù)也就隨之增加了。
如果設(shè)置了log.retention.bytes 和log.retention.ms(或者其他過期時(shí)間的配置),只要滿足其中一個(gè)條件,消息就會被刪除。
log.segment.bytes
這個(gè)參數(shù)用來控制log 段文件的大小,而不是消息的大小。在kafka 中,所有的消息都會進(jìn)入broker,然后以追加的方式追加到分區(qū)當(dāng)前最新的segment 段文件中。一旦這個(gè)段文件到達(dá)log.segment.bytes 設(shè)置的大小,比如默認(rèn)的1G,這個(gè)段文件就會被關(guān)閉,然后創(chuàng)建一個(gè)新的。一旦這個(gè)文件被關(guān)閉,就可以理解成這個(gè)文件已經(jīng)過期了。這個(gè)參數(shù)設(shè)置的越小,那么關(guān)閉文件創(chuàng)建文件的操作就會越頻繁,這樣也會造成大量的磁盤讀寫的開銷。
通過生產(chǎn)者發(fā)送過來的消息的情況可以判斷這個(gè)值的大小。比如,主題每天接收100M 的消息,并且log.segment.bytes 為默認(rèn)設(shè)置,那么10 天后,這個(gè)段文件才會被填滿。由于段文件在沒有關(guān)閉的時(shí)候,是不能刪除的,log.retention.ms 又是默認(rèn)的設(shè)置,那么這個(gè)消息將會在17 天后,才過期刪除。因?yàn)?0 天后,段文件才關(guān)閉。再過7 天,這個(gè)文件才算真正過期,才能被清除。
message.max.bytes
這個(gè)參數(shù)用于限制生產(chǎn)者消息的大小,默認(rèn)是1000000,也就是1M。生產(chǎn)者在發(fā)送消息給broker 的時(shí)候,如果出錯(cuò),會嘗試重發(fā);但是如果是因?yàn)榇笮〉脑?#xff0c;那生產(chǎn)者是不會重發(fā)的。另外,broker上的消息可以進(jìn)行壓縮,這個(gè)參數(shù)是指壓縮后的大小,這樣能多存儲很多消息。
需要注意的是,允許發(fā)送更大的消息會對性能有很大影響。更大的消息,就意味著broker 在處理網(wǎng)絡(luò)連接的時(shí)候需要更長的時(shí)間,它也會增加磁盤的寫操作壓力,影響IO 吞吐量。
啟動kafka
進(jìn)入kafka/bin目錄。
啟動zk
nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &
測試zk是否啟動
telnet localhost 2181
輸入srvr,應(yīng)該會返回:
[2017-12-01 15:59:18,829] INFO Processing srvr command from /0:0:0:0:0:0:0:1:40194 (org.apache.zookeeper.server.NIOServerCnxn)
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0xdf
Mode: standalone
Node count: 127
[2017-12-01 15:59:18,833] INFO Closed socket connection for client /0:0:0:0:0:0:0:1:40194 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
Connection closed by foreign host.
不建議zookeeper運(yùn)行在多于7個(gè)的節(jié)點(diǎn)上,因?yàn)榧盒阅軙蛞恢滦缘奶卣鞫档汀?/p>
啟動kafka
nohup ./kafka-server-start.sh -daemon ../config/server.properties
以守護(hù)進(jìn)程方式執(zhí)行。
topic管理命令
創(chuàng)建具有指定數(shù)量分區(qū)(或復(fù)制因子)的topic
./kafka-topics.sh --create --zookeeper xx.xx.xx.xx:2181 --topic test1 --replication-factor 1 --partitions 6
查看topic的元信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
查看所有topic
./kafka-topics.sh --zookeeper localhost:2181 --list
修改已有topic的分區(qū)
./kafka-topics.sh --alter --zookeeper localhost:2181 --topic test1 --partitions 4./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.ms=5000./kafka-topics.sh --zookeeper xx.xx.xx.xx:2181 --alter --topic test1 --config flush.messages=20000
刪除topic
./kafka-topics.sh --delete --zookeeper xx.xx.xx.xx:2181 --topic test1
但刪除topic并不是真的刪除,只是打個(gè)標(biāo)記,要真正刪除,需要同時(shí)修改server.properties:
delete.topic.enable=true
并重啟kafka才能生效。
向topic發(fā)布消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1
輸入:
msg1
msg2
按^D停止發(fā)送。
若kafka的server.properties配置了host.name,則localhost必須改成host.name的值,例如:
./kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic test1
查看topic里的消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
輸出:
msg1
msg2
按^C停止接收。