五八同城找工作紹興百度seo
生產(chǎn)經(jīng)驗 面試重點
Broker面試重點
代碼,開發(fā)重點
67 章了解
如何記錄行為數(shù)據(jù)
1. Kafka概述
1.產(chǎn)生原因
前端 傳到日志 日志傳到Flume 傳到HADOOP
但是如果數(shù)據(jù)特比大,HADOOP就承受不住了
2.Kafka解決問題
控流消峰
Flume傳給Kafka
存到Kafka
Hadoop 從Kafka取數(shù)據(jù) ,而不是Kafka強行發(fā)
類似 菜鳥驛站, 先存取來,我們主動去取,或者指定他去送
存到HDFS的,一定不是實時數(shù)據(jù),因為HDFS太慢了
3.應用場景
1. 緩沖/消峰
消息隊列存儲數(shù)據(jù),而不是直接發(fā)給處理系統(tǒng),處理完一部分,再取,再處理
2.解耦
通過中間件接口,適配不同數(shù)據(jù)源和目的地
3.異步通信
允許用戶將消息放入隊列,但不立即處理,然后再需要的時候處理。
為什么異步處理快: 同步需要等待
點餐:
同步:服務員過來給我點餐 ,這里需要服務員過來
異步:掃桌子碼自己點餐
4.消息隊列模式
Kafka使用發(fā)布訂閱模式
數(shù)據(jù)會保存一段時間
5.基礎架構
生產(chǎn)者 - Broker - Group
TopicA是什么?
這里的分區(qū)是什么?
分區(qū): 物理分割
為什么要分割:結合集群分散存儲
1. 分區(qū)操作
topic 是一整個數(shù)據(jù) ,分區(qū)是為了將數(shù)據(jù)分在不同的Broker上。類似于HDFS,
Broker是物理存儲
Partition 類似于DN
2.消費者組的概念
類似權限管理把,組內并行消費,便于管理
- Producer生產(chǎn)者: 向Kafka broker發(fā)消息的客戶端(自主)
- Consumer消費者: 從Kafka取消息的客戶端(自主)
- Group 組: 消費者組。消費者組內每個消費者負責消費不同分區(qū)的數(shù)據(jù),一個分區(qū)只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
這個說明,分區(qū)不支持并行讀取,一個分區(qū)只能同時一個組內消費者消費。
組內消費者對不同分區(qū)進行讀取,是為了優(yōu)化讀取速率. - Broker 一臺Kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
- Topic:可以理解為一個隊列,生產(chǎn)者和消費者面向的都是一個topic。
- Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。
- Leader:每個分區(qū)多個副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對象都是Leader。
- Follower:每個分區(qū)多個副本中的“從”,實時從Leader中同步數(shù)據(jù),保持和Leader數(shù)據(jù)的同步。Leader發(fā)生故障時,某個Follower會成為新的Leader。
- Replica:副本。一個topic的每個分區(qū)都有若干個副本,一個Leader和若干個Follower。
3.副本
備份功能,備份partition 多個副本, 類似HDFS,類似DN(存入一下子3個)把
多副本時,一個副本叫l(wèi)eader 另一個副本叫follower,
也是選出來的角色 交互時只和leader交互
follower平時只有備份作用,但是當leader倒下時,他直接成為leader
這里是存儲數(shù)據(jù)的目錄,而不是存Kafka自己日志的目錄
高可用, 配置多個
replicas 是存儲副本的位置
lsr 是目前存活的副本
分區(qū)數(shù)只能改大,不能改小
副本數(shù)修改,
通過JSON手動修改
消費者按最新的offect進行消費
5.配置
1.解壓
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.3.1.tgz -C /opt/module/
[atguigu@hadoop102 module]$ mv kafka_2.12-3.3.1/ kafka
2.配置文件
配置文件目前只需要修改三個
broker編號 不同機器只需要編號不同即可
log.dir 數(shù)據(jù)存放位置
zookeeper.connect 連接集群的地址
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
# 修改1 broker的全局唯一編號,不能重復,只能是數(shù)字。
broker.id=0#處理網(wǎng)絡請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#修改2 這里其實是是存放到Kafka的數(shù)據(jù)的地方 kafka運行日志(數(shù)據(jù))存放的路徑,路徑不需要提前創(chuàng)建,kafka自動幫你創(chuàng)建,可以配置多個磁盤路徑,路徑與路徑之間可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在當前broker上的分區(qū)個數(shù)
num.partitions=1
#用來恢復和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
# 每個topic創(chuàng)建時的副本數(shù),默認時1個副本
offsets.topic.replication.factor=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#每個segment文件的大小,默認最大1G
log.segment.bytes=1073741824
# 檢查過期數(shù)據(jù)的時間,默認5分鐘檢查一次是否數(shù)據(jù)過期
log.retention.check.interval.ms=300000
#修改3 連接集群的位置 配置連接Zookeeper集群地址(在zk根目錄下創(chuàng)建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
3. 環(huán)境變量
sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
這個是為了啟動的時候不需要輸入一長串地址
比如:bin/kafka-server-start.sh -daemon config/server.properties
這里的config是kafka的路徑 啟動需要輸入全路徑
bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
修改后
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
4.集群腳本
#! /bin/bash
if [ $# -lt 1 ]
then echo "參數(shù)錯誤,請輸入start或者stop"exit
fi
case $1 in
"start"){for i in hadoop102 hadoop103 hadoop104 do echo "---------------啟動 $i Kafka ----------------------"ssh $i "$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104do echo "---------------停止 $i Kafka ---------------------"ssh $i "$KAFKA_HOME/bin/kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties"done
};;
esac
2.命令
1.主題命令
1. --bootstrap-server <String: server toconnect to>
連接Broker 操作Kafka必須有這個命令
既可以輸入一個,也可以輸入多個
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092
2.主題的創(chuàng)建和刪除
– create +空格 ±-topic+空格+主題名
– delete +空格 ±-topic+空格+主題名
主題 主題名 一般放最后
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
–topic 定義topic名
–replication-factor 定義副本數(shù)
–partitions 定義分區(qū)數(shù)
3.查看所有主題
–list
4.查看主題詳細描述
可以看單個主題,可以看全部主題
不加后綴默認查看全部
查看單個需要+空格 ±-topic+空格+主題名
5.修改–alter
設置分區(qū)數(shù)
–partitions <Integer: # of partitions>
分區(qū)只能調大,不能調小
設置分區(qū)副本
–replication-factor<Integer: replication factor>
// 手動調整kafka topic分區(qū)的副本數(shù) {// 1. 版本號 這個是自定義的版本號"version":1,// 2. 分區(qū)是重點,因為副本改變分區(qū)也要改變。// 其實就是將分區(qū)的副本重新進行布局"partitions":[{"topic":"first","partition":0,"replicas":[1,2,0]},{"topic":"first","partition":1,"replicas":[2,0,1]},{"topic":"first","partition":2,"replicas":[2,0,1]}]
}
// 運行命令
//kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file ./rep.json --execute
更新系統(tǒng)默認的配置。
–config <String: name=value>
臨時調配參數(shù)
2.生產(chǎn)者命令
1.操作
--topic <String: topic>
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
3.消費者
kafka-console-consumer.sh --bootstrap-server +集群+ 主題
- –bootstrap-server <String: server toconnect to> 連接的Kafka Broker主機名稱和端口號。
- –topic <String: topic> 操作的topic名稱。
- –from-beginning 從頭開始消費。
- –group <String: consumer group id> 指定消費者組名稱。
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
為什么消費者未開啟時,生產(chǎn)者發(fā)送的消息,等消費者起來了收不到?
沒有指定消費者組時,每次開啟,消費者屬于的消費者組就是隨機的,那么就無法進行斷點續(xù)傳
當主動指定組后,再次登錄,在指定組后,會自動開啟斷點續(xù)傳功能
想要提前的順序,就需要,–from-beginning
但是不能和用戶組一起跑