電子商務(wù)網(wǎng)站建設(shè)及管理谷歌廣告推廣網(wǎng)站
kafka官方文檔
參考Kafka三款監(jiān)控工具比較
1 查看kafka的版本
進入kafka所在目錄,通過查看libs目錄下的jar包。
2.11是scala的版本,2.0.0是kafka的版本。
測試環(huán)境
#systemctl start zookeeper
#systemctl start kafkka
2 kafka的常用配置
Kafka使用屬性文件格式的鍵值對進行配置。這些值可以從文件或以編程方式提供。
2.1 Broker配置
*************************
*****Importance: high
advertised.listeners null
auto.create.topics.enable true
auto.leader.rebalance.enable true
background.threads 10
broker.id -1
compression.type producer
control.plane.listener.name null
controller.listener.names null
controller.quorum.election.backoff.max.ms 1000
controller.quorum.election.timeout.ms 1000
controller.quorum.fetch.timeout.ms 2000
controller.quorum.voters ""
delete.topic.enable true
early.start.listeners null
leader.imbalance.check.interval.seconds 300
leader.imbalance.per.broker.percentage 10
listeners PLAINTEXT://:9092
log.dir /tmp/kafka-logs
log.dirs null
log.flush.interval.messages 9223372036854775807
log.flush.interval.ms null
log.flush.offset.checkpoint.interval.ms 60000
log.flush.scheduler.interval.ms 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms 60000
log.retention.bytes -1
log.retention.hours 168
log.retention.minutes null
log.retention.ms null
log.roll.hours 168
log.roll.jitter.hours 0
log.roll.jitter.ms null
log.roll.ms null
log.segment.bytes 1073741824 (1 gibibyte)
log.segment.delete.delay.ms 60000 (1 minute)
message.max.bytes 1048588
metadata.log.dir null
metadata.log.max.record.bytes.between.snapshots 20971520
metadata.log.segment.bytes 1073741824 (1 gibibyte)
metadata.log.segment.ms 604800000 (7 days)
metadata.max.retention.bytes -1
metadata.max.retention.ms 604800000 (7 days)
min.insync.replicas 1
node.id -1
num.io.threads 8
num.network.threads 3
num.recovery.threads.per.data.dir 1
num.replica.alter.log.dirs.threads null
num.replica.fetchers 1
offset.metadata.max.bytes 4096 (4 kibibytes)
offsets.commit.required.acks -1
offsets.commit.timeout.ms 5000 (5 seconds)
offsets.load.buffer.size 5242880
offsets.retention.check.interval.ms 600000 (10 minutes)
offsets.retention.minutes 10080
offsets.topic.compression.codec 0
offsets.topic.num.partitions 50
offsets.topic.replication.factor 3
offsets.topic.segment.bytes 104857600 (100 mebibytes)
process.roles ""
queued.max.requests 500
replica.fetch.min.bytes 1
replica.fetch.wait.max.ms 500
replica.high.watermark.checkpoint.interval.ms 5000 (5 seconds)
replica.lag.time.max.ms 30000 (30 seconds)
replica.socket.receive.buffer.bytes 65536 (64 kibibytes)
replica.socket.timeout.ms 30000 (30 seconds)
request.timeout.ms 30000 (30 seconds)
sasl.mechanism.controller.protocol GSSAPI
socket.receive.buffer.bytes 102400 (100 kibibytes)
socket.request.max.bytes 104857600 (100 mebibytes)
socket.send.buffer.bytes 102400 (100 kibibytes)
transaction.max.timeout.ms 900000 (15 minutes)
transaction.state.log.load.buffer.size 5242880
transaction.state.log.min.isr 2
transaction.state.log.num.partitions 50
transaction.state.log.replication.factor 3
transaction.state.log.segment.bytes 104857600 (100 mebibytes)
transactional.id.expiration.ms 604800000 (7 days)
unclean.leader.election.enable false
zookeeper.connect null
zookeeper.connection.timeout.ms null
zookeeper.max.in.flight.requests 10
zookeeper.session.timeout.ms 18000 (18 seconds)
zookeeper.set.acl false*************************
*****Importance: medium
broker.heartbeat.interval.ms 2000 (2 seconds)
broker.id.generation.enable true
broker.rack null
broker.session.timeout.ms 9000 (9 seconds)
connections.max.idle.ms 600000 (10 minutes)
connections.max.reauth.ms 0
controlled.shutdown.enable true
controlled.shutdown.max.retries 3
controlled.shutdown.retry.backoff.ms 5000 (5 seconds)
controller.quorum.append.linger.ms 25
controller.quorum.request.timeout.ms 2000 (2 seconds)
controller.socket.timeout.ms 30000 (30 seconds)
default.replication.factor 1
delegation.token.expiry.time.ms 86400000 (1 day)
delegation.token.master.key null
delegation.token.max.lifetime.ms 604800000 (7 days)
delegation.token.secret.key null
delete.records.purgatory.purge.interval.requests 1
fetch.max.bytes 57671680 (55 mebibytes)
fetch.purgatory.purge.interval.requests 1000
group.initial.rebalance.delay.ms 3000 (3 seconds)
group.max.session.timeout.ms 1800000 (30 minutes)
group.max.size 2147483647
group.min.session.timeout.ms 6000 (6 seconds)
initial.broker.registration.timeout.ms 60000 (1 minute)
inter.broker.listener.name null
inter.broker.protocol.version
log.cleaner.backoff.ms 15000 (15 seconds)
log.cleaner.dedupe.buffer.size 134217728
log.cleaner.delete.retention.ms 86400000 (1 day)
log.cleaner.enable true
log.cleaner.io.buffer.load.factor 0.9
log.cleaner.io.buffer.size 524288
log.cleaner.io.max.bytes.per.second 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms 9223372036854775807
log.cleaner.min.cleanable.ratio 0.5
log.cleaner.min.compaction.lag.ms 0
log.cleaner.threads 1
log.cleanup.policy delete
log.index.interval.bytes 4096 (4 kibibytes)
log.index.size.max.bytes 10485760 (10 mebibytes)
log.message.format.version
log.message.timestamp.difference.max.ms 9223372036854775807
log.message.timestamp.type CreateTime
log.preallocate false
log.retention.check.interval.ms 300000 (5 minutes)
max.connection.creation.rate 2147483647
max.connections 2147483647
max.connections.per.ip 2147483647
max.connections.per.ip.overrides ""
max.incremental.fetch.session.cache.slots 1000
num.partitions 1
password.encoder.old.secret null
password.encoder.secret null
principal.builder.class
producer.purgatory.purge.interval.requests 1000
queued.max.request.bytes -1
replica.fetch.backoff.ms 1000 (1 second)
replica.fetch.max.bytes 1048576 (1 mebibyte)
replica.fetch.response.max.bytes 10485760 (10 mebibytes)
replica.selector.class null
reserved.broker.max.id 1000
sasl.client.callback.handler.class null
sasl.enabled.mechanisms GSSAPI
sasl.jaas.config null
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.principal.to.local.rules DEFAULT
sasl.kerberos.service.name null
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.callback.handler.class null
sasl.login.class null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.mechanism.inter.broker.protocol GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
sasl.server.callback.handler.class null
sasl.server.max.receive.size 524288
security.inter.broker.protocol PLAINTEXT
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
socket.listen.backlog.size 50
ssl.cipher.suites ""
ssl.client.auth none
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.key.password null
ssl.keymanager.algorithm SunX509
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.trustmanager.algorithm PKIX
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null
ssl.truststore.type JKS
zookeeper.clientCnxnSocket null
zookeeper.ssl.client.enable false
zookeeper.ssl.keystore.location null
zookeeper.ssl.keystore.password null
zookeeper.ssl.keystore.type null
zookeeper.ssl.truststore.location null
zookeeper.ssl.truststore.password null
zookeeper.ssl.truststore.type null*************************
*****Importance: low
alter.config.policy.class.name null
alter.log.dirs.replication.quota.window.num 11
alter.log.dirs.replication.quota.window.size.seconds 1
authorizer.class.name ""
client.quota.callback.class null
connection.failed.authentication.delay.ms 100
controller.quorum.retry.backoff.ms 20
controller.quota.window.num 11
controller.quota.window.size.seconds 1
create.topic.policy.class.name null
delegation.token.expiry.check.interval.ms 3600000 (1 hour)
kafka.metrics.polling.interval.secs 10
kafka.metrics.reporters ""
listener.security.protocol.map PLAINTEXT:PLAINTEXT
log.message.downconversion.enable true
metadata.max.idle.interval.ms 500
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
password.encoder.cipher.algorithm AES/CBC/PKCS5Padding
password.encoder.iterations 4096
password.encoder.key.length 128
password.encoder.keyfactory.algorithm null
quota.window.num 11
quota.window.size.seconds 1
replication.quota.window.num 11
replication.quota.window.size.seconds 1
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.principal.mapping.rules DEFAULT
ssl.secure.random.implementation null
transaction.abort.timed.out.transaction.cleanup.interval.ms 10000 (10 seconds)
transaction.remove.expired.transaction.cleanup.interval.ms 3600000 (1 hour)
zookeeper.ssl.cipher.suites null
zookeeper.ssl.crl.enable false
zookeeper.ssl.enabled.protocols null
zookeeper.ssl.endpoint.identification.algorithm HTTPS
zookeeper.ssl.ocsp.enable false
zookeeper.ssl.protocol TLSv1.2
2.2 Topic配置
*****Importance: medium
cleanup.policy delete
compression.type producer
delete.retention.ms 86400000 (1 day)
file.delete.delay.ms 60000 (1 minute)
flush.messages 9223372036854775807
flush.ms 9223372036854775807
follower.replication.throttled.replicas ""
index.interval.bytes 4096 (4 kibibytes)
leader.replication.throttled.replicas ""
max.compaction.lag.ms 9223372036854775807
max.message.bytes 1048588
message.format.version 3.0-IV1
message.timestamp.difference.max.ms 9223372036854775807
message.timestamp.type CreateTime
min.cleanable.dirty.ratio 0.5
min.compaction.lag.ms 0
min.insync.replicas 1
preallocate false
retention.bytes -1
retention.ms 604800000 (7 days)
segment.bytes 1073741824 (1 gibibyte)
segment.index.bytes 10485760 (10 mebibytes)
segment.jitter.ms 0
segment.ms 604800000 (7 days)
unclean.leader.election.enable false*****Importance: low
message.downconversion.enable true
2.3 Producer配置
*************************
*****Importance: high
key.serializer
value.serializer
bootstrap.servers ""
buffer.memory 33554432
compression.type none
retries 2147483647
ssl.key.password null
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null*************************
*****Importance: medium
batch.size 16384
client.dns.lookup use_all_dns_ips
client.id ""
connections.max.idle.ms 540000 (9 minutes)
delivery.timeout.ms 120000 (2 minutes)
linger.ms 0
max.block.ms 60000 (1 minute)
max.request.size 1048576
partitioner.class null
partitioner.ignore.keys false
receive.buffer.bytes 32768 (32 kibibytes)
request.timeout.ms 30000 (30 seconds)
sasl.client.callback.handler.class null
sasl.jaas.config null
sasl.kerberos.service.name null
sasl.login.callback.handler.class null
sasl.login.class null
sasl.mechanism GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
security.protocol PLAINTEXT
send.buffer.bytes 131072 (128 kibibytes)
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.truststore.type JKS*************************
*****Importance: low
acks all
enable.idempotence true
interceptor.classes ""
max.in.flight.requests.per.connection 5
metadata.max.age.ms 300000 (5 minutes)
metadata.max.idle.ms 300000 (5 minutes)
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
partitioner.adaptive.partitioning.enable true
partitioner.availability.timeout.ms 0
reconnect.backoff.max.ms 1000 (1 second)
reconnect.backoff.ms 50
retry.backoff.ms 100
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.cipher.suites null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.keymanager.algorithm SunX509
ssl.secure.random.implementation null
ssl.trustmanager.algorithm PKIX
transaction.timeout.ms 60000 (1 minute)
transactional.id null
2.4 Consumer配置
*************************
*****Importance: high
key.deserializer
value.deserializer
bootstrap.servers ""
fetch.min.bytes 1
group.id null
heartbeat.interval.ms 3000 (3 seconds)
max.partition.fetch.bytes 1048576 (1 mebibyte)
session.timeout.ms 45000 (45 seconds)
ssl.key.password null
ssl.keystore.certificate.chain null
ssl.keystore.key null
ssl.keystore.location null
ssl.keystore.password null
ssl.truststore.certificates null
ssl.truststore.location null
ssl.truststore.password null*************************
*****Importance: medium
allow.auto.create.topics true
auto.offset.reset latest
client.dns.lookup use_all_dns_ips
connections.max.idle.ms 60000 (1 minute)
enable.auto.commit true
exclude.internal.topics true
fetch.max.bytes 52428800 (50 mebibytes)
group.instance.id null
isolation.level read_uncommitted
max.poll.interval.ms 300000 (5 minutes)
max.poll.records 500
partition.assignment.strategy
receive.buffer.bytes 65536 (64 kibibytes)
request.timeout.ms 30000 (30 seconds)
sasl.client.callback.handler.class null
sasl.jaas.config null
sasl.kerberos.service.name null
sasl.login.callback.handler.class null
sasl.login.class null
sasl.mechanism GSSAPI
sasl.oauthbearer.jwks.endpoint.url null
sasl.oauthbearer.token.endpoint.url null
security.protocol PLAINTEXT
send.buffer.bytes 131072 (128 kibibytes)
socket.connection.setup.timeout.max.ms 30000 (30 seconds)
socket.connection.setup.timeout.ms 10000 (10 seconds)
ssl.enabled.protocols TLSv1.2,TLSv1.3
ssl.keystore.type JKS
ssl.protocol TLSv1.3
ssl.provider null
ssl.truststore.type JKS*************************
*****Importance: low
auto.commit.interval.ms 5000 (5 seconds)
check.crcs true
client.id ""
client.rack ""
fetch.max.wait.ms 500
interceptor.classes ""
metadata.max.age.ms 300000 (5 minutes)
metric.reporters ""
metrics.num.samples 2
metrics.recording.level INFO
metrics.sample.window.ms 30000 (30 seconds)
reconnect.backoff.max.ms 1000 (1 second)
reconnect.backoff.ms 50
retry.backoff.ms 100
sasl.kerberos.kinit.cmd /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 60000
sasl.kerberos.ticket.renew.jitter 0.05
sasl.kerberos.ticket.renew.window.factor 0.8
sasl.login.connect.timeout.ms null
sasl.login.read.timeout.ms null
sasl.login.refresh.buffer.seconds 300
sasl.login.refresh.min.period.seconds 60
sasl.login.refresh.window.factor 0.8
sasl.login.refresh.window.jitter 0.05
sasl.login.retry.backoff.max.ms 10000 (10 seconds)
sasl.login.retry.backoff.ms 100
sasl.oauthbearer.clock.skew.seconds 30
sasl.oauthbearer.expected.audience null
sasl.oauthbearer.expected.issuer null
sasl.oauthbearer.jwks.endpoint.refresh.ms 3600000 (1 hour)
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms 10000 (10 seconds)
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms 100
sasl.oauthbearer.scope.claim.name scope
sasl.oauthbearer.sub.claim.name sub
security.providers null
ssl.cipher.suites null
ssl.endpoint.identification.algorithm https
ssl.engine.factory.class null
ssl.keymanager.algorithm SunX509
ssl.secure.random.implementation null
ssl.trustmanager.algorithm PKIX
2.5 Kafka Connect配置
2.6 Kafka Streams配置
2.7 AdminClient配置
3 KafkaOffsetMonitor
KafkaOffsetMonitor是Kafka的一款客戶端消費監(jiān)控工具,用來實時監(jiān)控Kafka服務(wù)的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,并且每個Topic的所有Partition的消費情況都可以一目了然。
KafkaOffsetMonitor,它是由Kafka開源社區(qū)提供的一款Web管理界面,是Kafka的一款客戶端消費監(jiān)控工具,用來實時監(jiān)控Kafka服務(wù)的Consumer以及它們所在的Partition中的Offset,通過KafkaOffsetMonitor,我們可以很直觀的知道,每個Partition的Message是否消費掉,有沒有阻塞等。
通過web界面,可以方便的得知以下信息:
(1)對Consumer的消費監(jiān)控,并列出每個Consumer的Offset數(shù)據(jù)。
(2)保護消費者組列表信息。
(3)每個Topic的所有Partition列表包含:Topic、Pid、Offset、LogSize、Lag以及Owner等。
(4)瀏覽查閱Topic的歷史消費信息。
3.1 下載
KafkaOffsetMonitor托管在Github上,可以通過Github下載。
下載地址:https://github.com/gunsid/KafkaOffsetMonitor.git。
3.2 啟動
將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用于存放KafkaOffsetMonitor-assembly-0.2.1.jar進入到KafkaMonitor目錄下,通過java編譯命令來運行這個jar包:
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb
--zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181
--port 8088
--refresh 5.seconds
--retain 1.days
如果沒有指定端口,則默認會開啟一個隨機端口。
zk :zookeeper主機地址,如果有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數(shù)據(jù)庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數(shù)據(jù)庫文件名,默認為offsetapp
編輯啟動腳本
#vi kafka-monitor-start.sh
#chmod a+x kafka-monitor-start.sh
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--port 8089 \
--zk 10.23.241.202:2181 \
--refresh 5.minutes \
--retain 1.day >/dev/null 2>&1;
#nohup /root/mm/kafka-monitor-start.sh & 啟動
#netstat -lnp | grep 8089 查看端口占用情況
3.3 web UI
在游覽器中輸入:http://ip:port即可以查看KafkaOffsetMonitor Web UI。
http://10.23.241.202:8089/。