可口可樂公司建設網(wǎng)站的目的是什么seo推廣優(yōu)化工具
在 Kafka 3.3.1 中,可以使用 ACL(Access Control List)控制用戶對 topic 的訪問權限。以下是一些基本示例:
- 創(chuàng)建一個名為 my-topic 的 topic
在命令行中執(zhí)行以下命令創(chuàng)建一個名為 my-topic
的 topic:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
- 設置 ACL
a. 在文件system-acls.properties或配置文件KafkaServer或者其他支持的外部ACL模塊中設置ACLs規(guī)則,例如,我們可以通過修改/config/server.properties 配置文件來添加全局的訪問控制規(guī)則:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true #定義默認ACL規(guī)則,允許所有人都可以對topic進行讀寫操作,這很不安全,請不要在生產(chǎn)環(huán)境中使用
super.users=User:admin
其中 SimpleAclAuthorizerr
是一種內置的 ACL 校驗器,User:admin
是一組超級管理員。如果沒有在配置文件中指定 super.users,則只有在 Zookeeper 上配置了訪問控制時才會應用 ACL 規(guī)則。
b. 為特定用戶或組添加 ACL 規(guī)則
假設我們要給組開發(fā)者(developers)授予對 my-topic 的讀寫權限,我們可以通過以下命令添加 ACL 規(guī)則:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
現(xiàn)在,開發(fā)人員組可以對該 topic 進行讀寫操作。如果你需要撤銷授權(刪除 ACL),則可以使用以下命令:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove \
--allow-principal User:developers --operation Read --operation Write --topic my-topic
- 驗證 ACL 規(guī)則
驗證ACL規(guī)則的最簡單方法是嘗試讀取或寫入某個受保護的 topic。例如,在 java 中可以使用 KafkaProducer
和 KafkaConsumer
API 來測試:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 創(chuàng)建 Kafka 生產(chǎn)者對象并發(fā)送消息
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {Future<RecordMetadata> result = producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));System.out.println(result.get().toString());
}
producer.close();// 創(chuàng)建 Kafka 消費者對象并訂閱 Topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(Arrays.asList(tp));
consumer.seekToBeginning(Collections.singletonList(tp));// 讀取消息
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}
參考資料
Kafka系列(五)、開啟SASL安全認證以及配置ACL權限控制_sasl_mechanism_王義凱_Rick的博客-CSDN博客
kafka權限控制 - 簡書
Kafka SASL/SCRAM+ACL實現(xiàn)動態(tài)創(chuàng)建用戶及權限控制
Kafka 權限管理實戰(zhàn)(最全整理)
Kafka基于Kraft下的權限控制_kafka kraft sasl_songjxin的博客-CSDN博客
【kafka】SSL和ACL的配置 - 知乎
深入淺出 SSL/CA 證書及其相關證書文件(pem、crt、cer、key、csr)
fabric kafka配置SSL+ACL - 簡書