如何建立新聞網(wǎng)站搜索引擎bing
官方文檔:http://kafka.apache.org/documentation.html
(雖然kafka中集成了zookeeper,但還是建議使用獨(dú)立的zk集群)
Kafka3臺(tái)集群搭建環(huán)境:
操作系統(tǒng): centos7
防火墻:全關(guān)
3臺(tái)zookeeper集群內(nèi)的機(jī)器,1臺(tái)logstash
軟件版本: zookeeper-3.4.12.tar.gz
軟件版本kafka_2.12-2.1.0.tgz
安裝軟件
(3臺(tái)zookeeper集群的機(jī)器)
# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka
創(chuàng)建數(shù)據(jù)目錄(3臺(tái))
# mkdir /data/kafka-logs
修改第一臺(tái)配置文件
(注意不同顏色標(biāo)記的部分)
# egrep -v "^$|^#" /usr/local/kafka/config/server.properties
broker.id=1 #當(dāng)前機(jī)器在集群中的唯一標(biāo)識(shí),和zookeeper的myid性質(zhì)一樣
listeners=PLAINTEXT://192.168.148.141:9092 #監(jiān)聽(tīng)套接字
num.network.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)
num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù)
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲(chǔ)到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤(pán)
socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請(qǐng)求消息或者向kafka發(fā)送消息的請(qǐng)請(qǐng)求的最大數(shù),這個(gè)值不能超過(guò)java的堆棧大小
log.dirs=/data/kafka-logs #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號(hào)分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)
#如果配置多個(gè)目錄,新創(chuàng)建的topic把消息持久化在分區(qū)數(shù)最少那一個(gè)目錄中
num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù)
num.recovery.threads.per.data.dir=1 #在啟動(dòng)時(shí)恢復(fù)日志和關(guān)閉時(shí)刷新日志時(shí)每個(gè)數(shù)據(jù)目錄的線程的數(shù)量,默認(rèn)1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數(shù)
replica.fetch.max.bytes=5242880 #取消息的最大字節(jié)數(shù)
log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過(guò)這個(gè)值的時(shí)候,kafka會(huì)新起一個(gè)文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間,到目錄查看是否有過(guò)期的消息如果有,刪除
zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
修改另外兩臺(tái)配置文件
#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/
broker.id=2
listeners=PLAINTEXT://192.168.148.142:9092
# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/
broker.id=3
listeners=PLAINTEXT://192.168.148.143:9092
啟動(dòng)kafka(3臺(tái))
[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
查看啟動(dòng)情況(3臺(tái))
[root@host1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps
創(chuàng)建topic來(lái)驗(yàn)證
[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien
出現(xiàn)Created topic "cien"驗(yàn)證成功運(yùn)行
在一臺(tái)服務(wù)器上創(chuàng)建一個(gè)發(fā)布者
[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien
> hello kafka
> ni hao ya
>
在另一臺(tái)服務(wù)器上創(chuàng)建一個(gè)訂閱者
[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning
...
hello kafka
ni hao ya
如果都能接收到,說(shuō)明kafka部署成功!
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的詳細(xì)信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #刪除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置elfk集群訂閱和zookeeper和kafka
配置第一臺(tái)logstash生產(chǎn)消息輸出到kafka
yum -y install wget
wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm
yum localinstall jdk-8u341-linux-x64.rpm -y
java -version
1.安裝logstash
tar xf logstash-6.4.1.tar.gz -C /usr/local
ln -s /usr/local/logstash-6.4.1 /usr/local/logstash
2.修改配置文件
cd /usr/local/logstash/config/
vim logstash.yml
http.host: "0.0.0.0"
3.編寫(xiě)配置文件
不要過(guò)濾, logstash會(huì)將message內(nèi)容寫(xiě)入到隊(duì)列中
# cd /usr/local/logstash/config/
# vim logstash-kafka.conf
input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning} } output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" #輸出到kafka集群topic_id => "sys-log-messages" #主題名稱compression_type => "snappy" #壓縮類型codec => "json"} }
啟動(dòng)logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf
在kafka上查看主題,發(fā)現(xiàn)已經(jīng)有了sys-log-messages,說(shuō)明寫(xiě)入成功了
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
配置第二臺(tái)logstash,訂閱kafka日志,輸出到es集群
# cat kafka-es.conf
input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages" #kafka主題名稱codec => "json"auto_offset_reset => "earliest"} }output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"} }