微網站技術重慶seo排名優(yōu)化
什么是ELK?
ELK是Elasticsearch
、Logstash
、Kibana
三大開源框架首字母大寫簡稱(后來出現的filebeat屬于beats家族中的一員,可以用來替代logstash的數據收集功能,比較輕量級),也被稱為Elastic Stack
。
Filebeat?
Filebeat是用于轉發(fā)和集中日志數據的輕量級傳送工具。Filebeat監(jiān)視您指定的日志文件或位置,收集日志事件,并將它們轉發(fā)到Elasticsearch或 Logstash進行索引。Filebeat的工作方式如下:啟動Filebeat時,它將啟動一個或多個輸入,這些輸入將在為日志數據指定的位置中查找。對于Filebeat所找到的每個日志,Filebeat都會啟動收集器。每個收集器都讀取單個日志以獲取新內容,并將新日志數據發(fā)送到libbeat,libbeat將聚集事件,并將聚集的數據發(fā)送到為Filebeat配置的輸出。
Logstash
Logstash是免費且開放的服務器端數據處理管道,能夠從多個來源采集數據,轉換數據,然后將數據發(fā)送到您最喜歡的“存儲庫”中。Logstash能夠動態(tài)地采集、轉換和傳輸數據,不受格式或復雜度的影響。利用Grok從非結構化數據中派生出結構,從IP地址解碼出地理坐標,匿名化或排除敏感字段,并簡化整體處理過程。
ElasticSearch
Elasticsearch是Elastic Stack核心的分布式搜索和分析引擎,是一個基于Lucene、分布式、通過Restful方式進行交互的近實時搜索平臺框架。Elasticsearch為所有類型的數據提供近乎實時的搜索和分析。無論您是結構化文本還是非結構化文本,數字數據或地理空間數據,Elasticsearch都能以支持快速搜索的方式有效地對其進行存儲和索引。
Kibana
Kibana是一個針對Elasticsearch的開源分析及可視化平臺,用來搜索、查看交互存儲在Elasticsearch索引中的數據。使用Kibana,可以通過各種圖表進行高級數據分析及展示。并且可以為 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以匯總、分析和搜索重要數據日志。還可以讓海量數據更容易理解。它操作簡單,基于瀏覽器的用戶界面可以快速創(chuàng)建儀表板(dashboard)實時顯示Elasticsearch查詢動態(tài)。
Logstash入門
使用Docker-Compose啟動Logstash服務,其中docker-compose.yml文件如下:
version:?"3.1"
#?服務配置
services:
??logstash:
????container_name:?logstash-7.17.0
????image:?docker.elastic.co/logstash/logstash:7.17.0
????volumes:
??????-?./logstash/data:/usr/share/logstash/data
??????-?./logstash/pipeline:/usr/share/logstash/pipeline
????networks:
??????-?elk_net
#?網絡配置
networks:
??elk_net:
????driver:?bridge
示例配置1(標準輸入、輸出)?
每隔10秒輸出字符串:Hello from Logstash!
?input?{
??heartbeat?{
????interval?=>?10
????message??=>?'Hello?from?Logstash!'
??}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
}
示例配置2(讀取文件)
讀取文件內容(文件的最后一行將不會讀取),示例文件為test.log
hello?world!
From?Shanghai?To?Beijing
this?is?a?log?for?test?in?logstash!
配置文件如下:
input?{
??file?{
????path?=>?"/usr/share/logstash/data/test.log"
????start_position?=>?"beginning"
??}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
}?
讀取結果如下(順序已打亂)
?
示例配置3(Grok插件)?
Grok為正則表達式,Logstash(v4.4.3)已內置120種表達式,參考網址為:https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns.
讀取Nginx日志文件,并使用Grok進行過濾。Nginx文件示例如下:
112.195.209.90?-?-?[20/Feb/2018:12:12:14?+0800]?"GET?/?HTTP/1.1"?200?190?"-"?"Mozilla/5.0?(Linux;?Android?6.0;?Nexus?5?Build/MRA58N)?AppleWebKit/537.36?(KHTML,?like?Gecko)?Chrome/63.0.3239.132?Mobile?Safari/537.36"?"-"?
配置文件如下:
input?{
??file?{
????path?=>?"/usr/share/logstash/data/test.log"
????start_position?=>?"beginning"
??}
}
filter?{
????grok?{
????????match?=>?{
????????????"message"?=>?"%{COMBINEDAPACHELOG}"
????????}
????}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
}?
解析結果為:
?
Kibana在Dev Tools中內置了Grok Debugger(調試器):
?
示例解析配置4(多行讀取插件 multiline)?
對 multiline 插件來說,有三個設置比較重要:negate、pattern 和 what。
-
??pattern: 類型是string,要匹配的正則表達式
-
??negate: 類型是boolean,默認false,否定正則表達式
-
??what: 必須設置,可以為 previous 或 next, 如果正則表達式匹配了,那么該事件是屬于下一個或是前一個事件
multiline插件可以多行讀取。示例文件內容如下(注意最后一行為空行):
[Aug/08/08?14:54:03]?hello?world
[Aug/08/09?14:54:04]?hello?logstash
????hello?best?practice
????hello?raochenlin
[Aug/08/10?14:54:05]?the?end
配置文件:
input?{
??file?{
????path?=>?"/usr/share/logstash/data/test.log"
????start_position?=>?"beginning"
????codec?=>?multiline?{
????????pattern?=>?"^\["
????????negate?=>?true
????????what?=>?"previous"
????}
??}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
}?
解析結果如下:
{
???????"message"?=>?"[Aug/08/08?14:54:03]?hello?world",
????"@timestamp"?=>?2023-12-23T10:41:20.884Z,
??????????"path"?=>?"/usr/share/logstash/data/test.log",
??????????"host"?=>?"b62820accf76",
??????"@version"?=>?"1"
}
{
???????"message"?=>?"[Aug/08/09?14:54:04]?hello?logstash\n????hello?best?practice\n????hello?raochenlin",
??????????"path"?=>?"/usr/share/logstash/data/test.log",
??????????"tags"?=>?[
????????[0]?"multiline"
????],
????"@timestamp"?=>?2023-12-23T10:44:24.846Z,
??????????"host"?=>?"b62820accf76",
??????"@version"?=>?"1"
}?
由于參數what設置為previous,因此只解析出兩條數據。當what設置為next時,可解析出三條數據,但解析結果有變化,如下:
{
???????"message"?=>?"[Aug/08/08?14:54:03]?hello?world",
??????????"path"?=>?"/usr/share/logstash/data/test.log",
????"@timestamp"?=>?2023-12-23T10:49:23.395Z,
??????????"host"?=>?"492dfb254e78",
??????"@version"?=>?"1"
}
{
???????"message"?=>?"????hello?best?practice\n????hello?raochenlin\n[Aug/08/10?14:54:05]?the?end",
????"@timestamp"?=>?2023-12-23T10:49:23.415Z,
??????????"path"?=>?"/usr/share/logstash/data/test.log",
??????????"host"?=>?"492dfb254e78",
??????"@version"?=>?"1",
??????????"tags"?=>?[
????????[0]?"multiline"
????]
}
{
???????"message"?=>?"[Aug/08/09?14:54:04]?hello?logstash",
??????????"path"?=>?"/usr/share/logstash/data/test.log",
????"@timestamp"?=>?2023-12-23T10:49:23.414Z,
??????????"host"?=>?"492dfb254e78",
??????"@version"?=>?"1"
}?
ELK搭建簡單示例?
結合Logstash, ElasticSearch與Kibana,將data文件夾中的以log結尾的文件,逐行導入至ElasticSearch中。
logstash.conf配置如下:
input?{
??file?{
????path?=>?"/usr/share/logstash/data/*.log"
????start_position?=>?"beginning"
??}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
????elasticsearch?{
????????hosts?=>?["http://elasticsearch:9200"]
????????index?=>?"test_log"
????????action?=>?"index"
????}
}
docker-compose.yml文件如下:
version:?"3.1"
#?服務配置
services:
??logstash:
????container_name:?logstash-7.17.0
????image:?docker.elastic.co/logstash/logstash:7.17.0
????volumes:
??????-?./logstash/config/logstash.yml:/usr/share/logstash/logstash.yml
??????-?./logstash/data:/usr/share/logstash/data
??????-?./logstash/pipeline:/usr/share/logstash/pipeline
????networks:
??????-?elk_net
????depends_on:
??????-?elasticsearch
??elasticsearch:
????container_name:?elasticsearch-7.17.0
????image:?elasticsearch:7.17.0
????environment:
??????-?"ES_JAVA_OPTS=-Xms1024m?-Xmx1024m"
??????-?"http.host=0.0.0.0"
??????-?"node.name=elastic01"
??????-?"cluster.name=cluster_elasticsearch"
??????-?"discovery.type=single-node"
????ports:
??????-?"9200:9200"
??????-?"9300:9300"
????volumes:
??????-?./es/plugins:/usr/share/elasticsearch/plugins
??????-?./es/data:/usr/share/elasticsearch/data
????networks:
??????-?elk_net
??kibana:
????container_name:?kibana-7.17.0
????image:?kibana:7.17.0
????ports:
??????-?"5601:5601"
????networks:
??????-?elk_net
????depends_on:
??????-?elasticsearch
#?網絡配置
networks:
??elk_net:
????driver:?bridge
?ELK日志系統實戰(zhàn)
我們來設計這樣一個日志系統,其中Logstash可將Flask運行過程中的日志進行收集,并導入至ElasticSearch中,再使用Kibana進行數據分析。
Flask服務
使用Flask構建簡單的web服務,代碼如下:
#?-*-?coding:?utf-8?-*-
#?@file:?server.py
#?@time:?2023/12/23?19:17
import?time
import?random
from?flask?import?Flask,?Response
import?logginglogging.basicConfig(filename='../logstash/data/flask.log',level=logging.DEBUG,format='%(asctime)s-%(filename)s-%(funcName)s-%(levelname)s-%(message)s')
logger?=?logging.getLogger()app?=?Flask("elk_test")@app.route('/')
def?index():t1?=?time.time()logger.info(f"api_endpoint:?/,?status:?200,?cost_time:?{(time.time()?-?t1)?*?1000}")return?"Hello?index",?200@app.route("/io_task")
def?io_task():t1?=?time.time()time.sleep(2)logger.info(f"api_endpoint:?/io_task,?status:?200,?cost_time:?{(time.time()?-?t1)?*?1000}")return?"IO?bound?task?finish!",?200@app.route("/cpu_task")
def?cpu_task():t1?=?time.time()for?i?in?range(10000):n?=?i*i*ilogger.info(f"api_endpoint:?/cpu_task,?status:?200,?cost_time:?{(time.time()?-?t1)?*?1000}")return?"CPU?bound?task?finish!",?200@app.route("/random_sleep")
def?random_sleep():t1?=?time.time()time.sleep(random.randint(0,?5))logger.info(f"api_endpoint:?/random_sleep,?status:?200,?cost_time:?{(time.time()?-?t1)?*?1000}")return?"random?sleep",?200@app.route("/random_status")
def?random_status():t1?=?time.time()status_code?=?random.choice([200]?*?6?+?[300,?400,?400,?500])logger.info(f"api_endpoint:?/random_status,?status:?{status_code},?cost_time:?{(time.time()?-?t1)?*?1000}")return?Response("random?status",?status=status_code)if?__name__?==?'__main__':app.run(host="0.0.0.0",?port=5000,?debug=True)
使用下面的shell腳本進行HTTP請求模擬:
TIMES=5
for?i?in?$(eval?echo?"{1..$TIMES}")
do
????siege?-c?1?-r?10?http://localhost:5000/
????siege?-c?1?-r?5?http://localhost:5000/io_task
????siege?-c?1?-r?5?http://localhost:5000/cpu_task
????siege?-c?1?-r?3?http://localhost:5000/random_sleep
????siege?-c?1?-r?10?http://localhost:5000/random_status
????sleep?5
done?
日志記錄在flask.log文件中。
ELK搭建
對上述日志,搭建ELK,docker-compose.yml同上述ELK搭建簡單示例
,logstash.conf改動如下:
input?{
??file?{
????path?=>?"/usr/share/logstash/data/flask.log"
????start_position?=>?"beginning"
??}
}
filter?{
????#?只對cost_time所在列進行解析
????if?"cost_time"?in?[message]?{
????????grok?{
????????????match?=>?{
????????????????"message"?=>?"%{TIMESTAMP_ISO8601:request_finish_time}-%{WORD:script}.py-%{WORD:module}-%{LOGLEVEL:loglevel}-api_endpoint:?%{DATA:api_endpoint},?status:?%{NUMBER:status:int},?cost_time:?%{NUMBER:cost_time:float}"
????????????}
????????}
????????#?使用mutate過濾器替換字符
????????mutate?{
????????????#?替換空格為T
????????????gsub?=>?[?"request_finish_time",?"?",?"T"?]
????????????#?替換逗號為點
????????????gsub?=>?[?"request_finish_time",?",",?"."?]
????????}
????????#?使用date過濾器解析和格式化日期
????????date?{
????????????match?=>?[?"request_finish_time",?"ISO8601"?]
????????}
????}
????else?{
????????drop?{?}
????}
}
output?{
????stdout?{
????????codec?=>?rubydebug
????}
????elasticsearch?{
????????hosts?=>?["http://elasticsearch:9200"]
????????index?=>?"flask_log"
????????action?=>?"index"
????}
}
只對有cost_time所在的行進行解析,其它行丟棄,導入至ElasticSearch中的flask_log這個索引中。
數據分析?
對上述的五個API Endpoint進行請求占比分析,餅圖如下:
同時,對cost_time進行數據分析,其平均值,90, 95, 99分位數如下表:
?
上述的日志記錄方式還有待改進,比如記錄程序報錯信息,使用json字段解析而不是Grok表達式會更容易些。?