專業(yè)的標(biāo)志設(shè)計公司龍巖seo
????????需要編寫自定義集成層來滿足數(shù)據(jù)管道中的特定要求?了解如何使用 Go 通過 Kafka 和 OpenSearch 實現(xiàn)此目的。
????????可擴展的數(shù)據(jù)攝取是OpenSearch等大規(guī)模分布式搜索和分析引擎的一個關(guān)鍵方面。構(gòu)建實時數(shù)據(jù)攝取管道的方法之一是使用Apache Kafka。它是一個開源事件流平臺,用于處理高數(shù)據(jù)量(和速度),并與包括關(guān)系數(shù)據(jù)庫和 NoSQL 數(shù)據(jù)庫在內(nèi)的各種來源集成。例如,規(guī)范用例之一是異構(gòu)系統(tǒng)(源組件)之間的數(shù)據(jù)實時同步,以確保 OpenSearch 索引是最新的,并且可以通過儀表板和可視化用于分析或使用下游應(yīng)用程序。
????????這篇博文將介紹如何創(chuàng)建數(shù)據(jù)管道,其中寫入 Apache Kafka 的數(shù)據(jù)被引入 OpenSearch。我們將使用Amazon OpenSearch Serverless和Amazon Managed Streaming for Apache Kafka (Amazon MSK) Serverless。Kafka Connect非常適合此類需求。它為 OpenSearch 和 ElasticSearch 提供接收器連接器(如果您選擇將 ElasticSearch OSS 引擎與 Amazon OpenSearch 結(jié)合使用,則可以使用該連接器)。但有時,有特定的要求或原因可能需要使用定制解決方案。
????????例如,您可能正在使用 Kafka Connect 不支持的數(shù)據(jù)源(很少見,但可能會發(fā)生),并且不想從頭開始編寫數(shù)據(jù)源?;蛘?#xff0c;這可能是一次性集成,您想知道是否值得花費精力來設(shè)置和配置 Kafka Connect。也許還有其他問題,例如許可等。
????????值得慶幸的是,Kafka 和 OpenSearch 提供了多種編程語言的客戶端庫,使您可以編寫自己的集成層。這正是本博客所涵蓋的內(nèi)容!我們將利用自定義Go應(yīng)用程序通過Kafka和OpenSearch的 Go 客戶端來攝取數(shù)據(jù)。
你將學(xué)習(xí):
- 概述如何設(shè)置所需的 AWS 服務(wù):OpenSearch Serverless、MSK Serverless、AWS Cloud9 以及 IAM 策略和安全配置
- 應(yīng)用程序的高級演練
- 啟動并運行數(shù)據(jù)攝取管道
- 如何在 OpenSearch 中查詢數(shù)據(jù)
在深入討論之前,我們先簡要概述一下 OpenSearch Serverless 和 Amazon MSK Serverless。
Amazon OpenSearch 無服務(wù)器和 Amazon MSK 無服務(wù)器簡介
OpenSearch 是一個開源搜索和分析引擎,用于日志分析、實時監(jiān)控和點擊流分析。Amazon OpenSearch Service 是一項托管服務(wù),可簡化 AWS 中 OpenSearch 集群的部署和擴展。
Amazon OpenSearch Service 支持 OpenSearch 和舊版 Elasticsearch OSS(直至 7.10,該軟件的最終開源版本)。創(chuàng)建集群時,您可以選擇使用哪個搜索引擎。
您可以創(chuàng)建一個 OpenSearch Service 域(與 OpenSearch 集群同義)來表示一個集群,其中每個 Amazon EC2 實例充當(dāng)一個節(jié)點。然而,OpenSearch Serverless 通過為 OpenSearch 服務(wù)提供按需無服務(wù)器配置來消除操作復(fù)雜性。它使用索引集合來支持特定的工作負(fù)載,與傳統(tǒng)集群不同,它分離了索引和搜索組件,并使用Amazon S3作為索引的主存儲。該架構(gòu)支持獨立擴展搜索和索引功能。
您可以參考比較 OpenSearch Service 和 OpenSearch Serverless中的詳細信息。
Amazon MSK(Apache Kafka 的托管流)是一項完全托管的服務(wù),用于使用 Apache Kafka 處理流數(shù)據(jù)。它處理集群管理操作,例如創(chuàng)建、更新和刪除。您可以使用標(biāo)準(zhǔn) Apache Kafka 數(shù)據(jù)操作來生成和使用數(shù)據(jù),而無需修改應(yīng)用程序。它支持開源 Kafka 版本,確保與現(xiàn)有工具、插件和應(yīng)用程序的兼容性。
MSK Serverless 是 Amazon MSK 中的一種集群類型,無需手動管理和擴展集群容量。它根據(jù)需求自動配置和擴展資源,并負(fù)責(zé)主題分區(qū)管理。采用即用即付定價模式,您只需為實際使用量付費。MSK Serverless 非常適合需要靈活、自動擴展流容量的應(yīng)用程序。
讓我們首先討論高級應(yīng)用程序架構(gòu),然后再討論架構(gòu)注意事項。
應(yīng)用概述和關(guān)鍵架構(gòu)注意事項
這是應(yīng)用程序架構(gòu)的簡化版本,概述了組件以及它們?nèi)绾蜗嗷ソ换ァ?/span>
該應(yīng)用程序由生產(chǎn)者和消費者組件組成,它們是部署到實例的 Go 應(yīng)用程序EC2
:
- 顧名思義,生產(chǎn)者將數(shù)據(jù)發(fā)送到 MSK Serverless 集群。
- 消費者應(yīng)用程序
movie
從 MSK Serverless 主題接收數(shù)據(jù)(信息),并使用 OpenSearch Go 客戶端對movies
集合中的數(shù)據(jù)進行索引。
注重簡單性
值得注意的是,該博客文章已針對簡單性和易于理解進行了優(yōu)化,因此該解決方案并未針對運行生產(chǎn)工作負(fù)載進行調(diào)整。以下是一些已進行的簡化:
- 生產(chǎn)者和消費者應(yīng)用程序在同一計算平臺(EC2 實例)上運行。
- 有一個消費者應(yīng)用程序?qū)嵗幚韥碜?MSK 主題的數(shù)據(jù)。但是,您可以嘗試運行使用者應(yīng)用程序的多個實例,并查看數(shù)據(jù)如何在實例之間分布。
- 不是使用 Kafka CLI 來生成數(shù)據(jù),而是用 Go 編寫自定義生成器應(yīng)用程序以及 REST 端點來發(fā)送數(shù)據(jù)。這演示了如何用 Go 編寫 Kafka 生產(chǎn)者應(yīng)用程序并模仿 Kafka CLI。
- 使用的數(shù)據(jù)量很小。
- OpenSearch Serverless 集合具有公共訪問類型。
對于生產(chǎn)工作負(fù)載,您應(yīng)該考慮以下一些事項:
根據(jù)數(shù)據(jù)量和可擴展性要求為您的消費者應(yīng)用程序選擇合適的計算平臺 - 下文將詳細介紹。
為您的 OpenSearch Serverless 集合選擇?VPC訪問類型。
考慮使用Amazon OpenSearch Ingestion創(chuàng)建數(shù)據(jù)管道。
如果您仍然需要部署自定義應(yīng)用程序來構(gòu)建從 MSK 到 OpenSearch 的數(shù)據(jù)管道,以下是您可以選擇的計算選項范圍:
容器:您可以將消費者應(yīng)用程序打包為 Docker 容器(Dockerfile可在 GitHub 存儲庫中獲取)并將其部署到Amazon EKS或Amazon ECS。
如果您將應(yīng)用程序部署到 Amazon EKS,您還可以考慮使用KEDA根據(jù) MSK 主題中的消息數(shù)量自動擴展您的消費者應(yīng)用程序。
無服務(wù)器:還可以使用MSK 作為 AWS Lambda 函數(shù)的事件源。您可以將消費者應(yīng)用程序編寫為 Lambda 函數(shù),并將其配置為由 MSK 事件觸發(fā),或者在AWS Fargate上運行。
由于生產(chǎn)者應(yīng)用程序是 REST API,因此您可以將其部署到 AWS App Runner。
最后,您可以利用Amazon EC2 Auto Scaling 組為您的消費者應(yīng)用程序自動擴展 EC2 隊列。
有足夠的材料討論如何使用基于 Java 的 Kafka 應(yīng)用程序通過IAM 與 MSK Serverless連接。
讓我們繞道了解一下 Go 的工作原理。
Go 客戶端應(yīng)用程序如何使用 IAM 通過 MSK Serverless 進行身份驗證?
MSK Serverless 需要 IAM 訪問控制來處理 MSK 集群的身份驗證和授權(quán)。這意味著您的 MSK 客戶端應(yīng)用程序(在本例中為生產(chǎn)者和消費者)必須使用 IAM 向 MSK 進行身份驗證,基于此它們將被允許或拒絕特定的 Apache Kafka 操作。
好處是franz-go
Kafka 客戶端庫支持 IAM 身份驗證。以下是消費者應(yīng)用程序的片段,展示了它在實踐中的工作原理:
func init() {
//......cfg, err = config.LoadDefaultConfig(context.Background(), config.WithRegion("us-east-1"), config.WithCredentialsProvider(ec2rolecreds.New()))creds, err = cfg.Credentials.Retrieve(context.Background())
//....func initializeKafkaClient() {opts := []kgo.Opt{kgo.SeedBrokers(strings.Split(mskBroker, ",")...),kgo.SASL(sasl_aws.ManagedStreamingIAM(func(ctx context.Context) (sasl_aws.Auth, error) {return sasl_aws.Auth{AccessKey: creds.AccessKeyID,SecretKey: creds.SecretAccessKey,SessionToken: creds.SessionToken,UserAgent: "msk-ec2-consumer-app",}, nil})),
//.....
基礎(chǔ)設(shè)施設(shè)置
本節(jié)將幫助您設(shè)置以下組件:
- 所需的 IAM 角色
- MSK 無服務(wù)器集群
- OpenSearch 無服務(wù)器集合
- 用于運行應(yīng)用程序的 AWS Cloud9 EC2 環(huán)境
MSK 無服務(wù)器集群
您可以按照本文檔使用 AWS 控制臺設(shè)置 MSK 無服務(wù)器集群。執(zhí)行此操作后,記下以下集群信息:VPC、子網(wǎng)、安全組(“屬性”選項卡)和集群端點(單擊“查看客戶端信息”)。
應(yīng)用程序 IAM 角色
本教程需要不同的 IAM 角色。
首先創(chuàng)建 IAM 角色來執(zhí)行后續(xù)步驟,并按照步驟 1:配置權(quán)限(在 Amazon OpenSearch 文檔中)的權(quán)限使用 OpenSearch Serverless。
為客戶端應(yīng)用程序創(chuàng)建另一個 IAM 角色,該角色將與 MSK Serverless 集群交互,并使用 OpenSearch Go 客戶端對 OpenSearch Serverless 集合中的數(shù)據(jù)建立索引。創(chuàng)建如下內(nèi)聯(lián) IAM 策略 - 確保替換所需的值。
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["kafka-cluster:*"],"Resource": ["<ARN of the MSK Serverless cluster>","arn:aws:kafka:us-east-1:<AWS_ACCOUNT_ID>:topic/<MSK_CLUSTER_NAME>/*","arn:aws:kafka:us-east-1:AWS_ACCOUNT_ID:group/<MSK_CLUSTER_NAME>/*"]},{"Effect": "Allow","Action": ["aoss:APIAccessAll"],"Resource": "*"}]
}
使用以下信任策略:
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": "ec2.amazonaws.com"},"Action": "sts:AssumeRole"}]
}
最后,您將向其附加 OpenSearch Serverless數(shù)據(jù)訪問策略的另一個 IAM 角色- 下一步將詳細介紹這一點。
OpenSearch 無服務(wù)器集合
使用文檔創(chuàng)建 OpenSearch Serverless 集合。在遵循步驟 2:創(chuàng)建集合中的第 8 點時,請確保配置兩個數(shù)據(jù)策略;即,在上一節(jié)的步驟 2 和 3 中創(chuàng)建的每個 IAM 角色。
注意:出于本教程的目的,我們選擇公共訪問類型。建議為生產(chǎn)工作負(fù)載選擇VPC。
AWS Cloud9 EC2 環(huán)境
使用此文檔創(chuàng)建 AWS Cloud9 EC2 開發(fā)環(huán)境。確保使用與 MSK Serverless 集群相同的VPC。
完成后,您需要執(zhí)行以下操作: 打開 Cloud9 環(huán)境。在EC2 實例下,單擊管理 EC2 實例。在 EC2 實例中,導(dǎo)航到安全性并記下附加的安全組。
打開與 MSK Serverless 集群關(guān)聯(lián)的安全組并添加入站規(guī)則以允許 Cloud9 EC2 實例連接到它。選擇Cloud9 EC2實例的安全組作為源,9098作為端口,選擇TCP協(xié)議。
您現(xiàn)在已準(zhǔn)備好運行該應(yīng)用程序!
選擇 Cloud9 環(huán)境并選擇Open in Cloud9以啟動 IDE。打開終端窗口,克隆 GitHub 存儲庫,然后將目錄更改為該文件夾。
git clone https://github.com/build-on-aws/opensearch-using-kafka-golangcd opensearch-using-kafka-golang
啟動生產(chǎn)者應(yīng)用程序:
cd msk-producerexport MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=moviesgo run main.go
您應(yīng)該在終端中看到以下日志:
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
starting producer app
http server ready
要將數(shù)據(jù)發(fā)送到 MSK 無服務(wù)器集群,請使用 bash 腳本,該腳本將調(diào)用HTTP
您剛剛啟動的應(yīng)用程序公開的端點,并使用movies.txt
以下格式提交電影數(shù)據(jù)(來自文件):JSON
curl
./send-data.sh
在生產(chǎn)者應(yīng)用程序終端日志中,您應(yīng)該看到類似于以下內(nèi)容的輸出:
producing data to topic
payload {"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
record produced successfully to offset 2 in partition 0 of topic moviesproducing data to topic
payload {"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
record produced successfully to offset 4 in partition 1 of topic movies.....
出于本教程的目的并使其簡單易懂,數(shù)據(jù)量已特意限制為 1500 條記錄,并且腳本在將每條記錄發(fā)送到生產(chǎn)者后有意休眠 1 秒。您應(yīng)該能夠輕松地跟隨。
當(dāng)生產(chǎn)者應(yīng)用程序忙于向movies
主題發(fā)送數(shù)據(jù)時,您可以啟動消費者應(yīng)用程序,開始處理來自 MSK Serverless 集群的數(shù)據(jù),并將其在 OpenSearch Serverless 集合中建立索引。
cd msk-consumerexport MSK_BROKER=<enter MSK Serverless cluster endpoint>
export MSK_TOPIC=movies
export OPENSEARCH_INDEX_NAME=movies-index
export OPENSEARCH_ENDPOINT_URL=<enter OpenSearch Serverless endpoint>go run main.go
您應(yīng)該在終端中看到以下輸出,這表明它確實已開始從 MSK Serverless 集群接收數(shù)據(jù)并在 OpenSearch Serverless 集合中對其建立索引。
using default value for AWS_REGION - us-east-1
MSK_BROKER <MSK Serverless cluster endpoint>
MSK_TOPIC movies
OPENSEARCH_INDEX_NAME movies-index
OPENSEARCH_ENDPOINT_URL <OpenSearch Serverless endpoint>
using credentials from: EC2RoleProvider
kafka consumer goroutine started. waiting for records
paritions ASSIGNED for topic movies [0 1 2]got record from partition 1 key= val={"directors": ["Joseph Gordon-Levitt"], "release_date": "2013-01-18T00:00:00Z", "rating": 7.4, "genres": ["Comedy", "Drama"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQxNTc3NDM2MF5BMl5BanBnXkFtZTcwNzQ5NTQ3OQ@@._V1_SX400_.jpg", "plot": "A New Jersey guy dedicated to his family, friends, and church, develops unrealistic expectations from watching porn and works to find happiness and intimacy with his potential true love.", "title": "Don Jon", "rank": 1, "running_time_secs": 5400, "actors": ["Joseph Gordon-Levitt", "Scarlett Johansson", "Julianne Moore"], "year": 2013}
movie data indexed
committing offsets
got record from partition 2 key= val={"directors": ["Ron Howard"], "release_date": "2013-09-02T00:00:00Z", "rating": 8.3, "genres": ["Action", "Biography", "Drama", "Sport"], "image_url": "http://ia.media-imdb.com/images/M/MV5BMTQyMDE0MTY0OV5BMl5BanBnXkFtZTcwMjI2OTI0OQ@@._V1_SX400_.jpg", "plot": "A re-creation of the merciless 1970s rivalry between Formula One rivals James Hunt and Niki Lauda.", "title": "Rush", "rank": 2, "running_time_secs": 7380, "actors": ["Daniel Br\u00c3\u00bchl", "Chris Hemsworth", "Olivia Wilde"], "year": 2013}
movie data indexed
committing offsets.....
該過程完成后,您應(yīng)該1500在 OpenSearch Serverless 集合中為電影建立索引。不過,您不必等待它完成。一旦有了數(shù)百條記錄,您就可以繼續(xù)導(dǎo)航到OpenSearch 儀表板中的開發(fā)工具來執(zhí)行以下查詢。
在 OpenSearch 中查詢電影數(shù)據(jù)
運行簡單查詢
讓我們從一個簡單的查詢開始,列出索引中的所有文檔(不帶任何參數(shù)或過濾器)。
GET movies-index/_search
僅獲取特定字段的數(shù)據(jù)
默認(rèn)情況下,搜索請求會檢索對文檔建立索引時提供的整個 JSON 對象。使用該_source
選項從選定字段檢索源。例如,要僅檢索title
、plot
和genres
字段,請運行以下查詢:
GET movies-index/_search
{"_source": {"includes": ["title","plot","genres"]}
}
獲取數(shù)據(jù)以與術(shù)語查詢的精確搜索術(shù)語匹配
您可以使用術(shù)語查詢來實現(xiàn)此目的。例如,要搜索字段christmas
中包含該術(shù)語的電影title
,請運行以下查詢:
GET movies-index/_search
{"query": {"term": { "title": {"value": "christmas"}}}
}
**將選擇性字段選擇與術(shù)語查詢相結(jié)合。
您可以使用此查詢僅檢索某些字段,但對特定術(shù)語感興趣:
GET movies-index/_search
{"_source": {"includes": ["title","actors"]},"query": {"query_string": {"default_field": "title","query": "harry"}}
}
聚合
使用聚合根據(jù)特定字段中的值分組來計算匯總值。例如,您可以匯總ratings
、 、genre
和 等字段year
,以根據(jù)這些字段的值搜索結(jié)果。通過聚合,我們可以回答這樣的問題:“每種類型有多少部電影?”
GET movies-index/_search
{"size":0,"aggs": {"genres": {"terms":{"field": "genres.keyword"}}}
}
結(jié)論
回顧一下,您部署了一個管道,使用 Kafka 將數(shù)據(jù)提取到 OpenSearch Serverless 中,然后以不同的方式對其進行查詢。在此過程中,您還了解了生產(chǎn)工作負(fù)載需要記住的架構(gòu)注意事項和計算選項,以及使用基于 Go 的 Kafka 應(yīng)用程序和 MSK IAM 身份驗證。我還建議閱讀文章在 Go 中為 Amazon OpenSearch 構(gòu)建 CRUD 應(yīng)用程序,特別是如果您正在尋找以通過 Go SDK 執(zhí)行 OpenSearch 操作為中心的教程。