qq電腦版官方網(wǎng)站策劃方案
1. Maxwell框架
開發(fā)公司為Zendesk公司開源,用java編寫的MySQL變更數(shù)據(jù)抓取軟件。內(nèi)部是通過監(jiān)控MySQL的Binlog日志,并將變更數(shù)據(jù)以JSON格式發(fā)送到Kafka等流處理平臺(tái)。
1.1 MySQL主從復(fù)制
主機(jī)每次變更數(shù)據(jù)都會(huì)生成對應(yīng)的Binlog日志,從機(jī)可以通過IO流的方式將Binlog日志下載到本地,可以通過它創(chuàng)造和主機(jī)一樣的環(huán)境或者作為熱備。
1.2 安裝Maxwell
- 解壓改名
- 啟動(dòng)MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
- binlog_format 日志類型的三種類型:
- 基于語句:主機(jī)執(zhí)行了什么語句,在從機(jī)里同樣執(zhí)行一遍。如果使用了random語句,會(huì)導(dǎo)致主從不一致。但是量級比較低
- 基于行:主機(jī)被改動(dòng)后,從機(jī)同步一份。不會(huì)有主從不一致的問題,但是量價(jià)比較大,需要將每行修改的數(shù)據(jù)都拿一份。
- 混合模式:一般基于語句,但是如果基于語句會(huì)導(dǎo)致前后結(jié)果產(chǎn)生差異,自動(dòng)轉(zhuǎn)成基于行。
- binlog_format 日志類型的三種類型:
#數(shù)據(jù)庫id
server-id = 1
#啟動(dòng)binlog,該參數(shù)的值會(huì)作為binlog的文件名
log-bin=mysql-bin
#binlog類型,maxwell要求為row類型
binlog_format=row
#啟用binlog的數(shù)據(jù)庫,需根據(jù)實(shí)際情況作出修改
binlog-do-db=gmall
- 重啟MySQL服務(wù)
- 創(chuàng)建Maxwell所需所需的數(shù)據(jù)庫和用戶,用來存儲(chǔ)斷點(diǎn)續(xù)傳所需的數(shù)據(jù)。
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell庫的所有權(quán)限給maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他庫的查詢、復(fù)制權(quán)限給maxwell
- 修改maxwell配置文件
cp 配置文件,將會(huì)復(fù)制某個(gè)文件并且可以改名。
producer=kafka
# 目標(biāo)Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目標(biāo)Kafka topic,可靜態(tài)配置,例如:maxwell,也可動(dòng)態(tài)配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相關(guān)配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true# 過濾gmall中的z_log表數(shù)據(jù),該表是日志數(shù)據(jù)的備份,無須采集
filter=exclude:gmall.z_log
# 指定數(shù)據(jù)按照主鍵分組進(jìn)入Kafka不同分區(qū),避免數(shù)據(jù)傾斜
producer_partition_by=primary_key
1.3 Maxwell的使用
- 啟動(dòng)zookeeper,kafka
- 啟動(dòng)maxwell,
bin/maxwell --config config.properties --daemon
- 啟動(dòng)kafka消費(fèi)者進(jìn)程,用于消費(fèi)maxwell添加到kafka的變更數(shù)據(jù)
- 啟動(dòng)數(shù)據(jù)生成jar包,查看消費(fèi)者進(jìn)程是否有新數(shù)據(jù)。
- 編寫Maxwell啟停腳本
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "啟動(dòng)Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在運(yùn)行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在運(yùn)行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac
1.4 Bootstrap全量同步
Maxwell獲取的數(shù)據(jù)都是后期變更的數(shù)據(jù),但沒有獲取到數(shù)據(jù)庫在開啟Binlog日志之前的原始數(shù)據(jù)。
全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
2. 數(shù)倉數(shù)據(jù)同步策略
2.1 用戶行為數(shù)據(jù)
數(shù)據(jù)源:Kafka
目的地:HDFS
傳輸方式采用Flume, 其中source為Kafka source, channel為Memmory channel, sink為HDFS sink。
根據(jù)官網(wǎng)查找相應(yīng)參數(shù):
- Kafka Source
- type = Kafka Source全類名
- kafka.bootstrap.servers 連接地址
- kafka.topics = topic_log
- batchSize: 批次大小
- batchDurationMillis: 批次間隔2s
- File Channel
- type: file
- dataDirs: 存儲(chǔ)路徑
- checkpointDir: 偏移量存儲(chǔ)地址
- keep-alive: 管道滿了后,生產(chǎn)者間隔多少秒再放數(shù)據(jù)
- HDFS Sink
- hdfs.rollInterval : 文件滾動(dòng),解決小文件問題,每隔多久滾動(dòng)一次
- rollSize: 文件大小
- hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路徑
- hdfs.round = false, 不采用系統(tǒng)本地時(shí)間
#定義組件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false # 是否獲取本地時(shí)間a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制輸出文件類型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#組裝
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2 零點(diǎn)漂移問題
在HDFS系統(tǒng)存放文件時(shí)是按照時(shí)間進(jìn)行分區(qū)存放的,存放時(shí)查看的是header中的timestamp,但是由于數(shù)據(jù)傳輸過程中也需要一段時(shí)間,header中的時(shí)間并不是數(shù)據(jù)的實(shí)際產(chǎn)生時(shí)間,這個(gè)就是零點(diǎn)漂移問題。
解決辦法:借助攔截器,修改header中的timestamp的值。編寫攔截器代碼,需要在IDEA中創(chuàng)建對應(yīng)的項(xiàng)目并打包。
- 導(dǎo)入依賴,flume-ng-core和JSON解析依賴fastjson (1.2.62)
- 創(chuàng)建包gmall.interceptor
- 創(chuàng)建類TimeStampInterceptor, 繼承Interceptor接口
- 實(shí)現(xiàn)intercept(Event event)和intercept(Event events)
- 使用fastjson來解析json文件,得到j(luò)sonObject對象,用來獲取時(shí)間戳ts。將獲取到的時(shí)間戳覆蓋header中的timestamp, 如果數(shù)據(jù)格式錯(cuò)誤會(huì)拋異常,使用try-catch來捕獲它,并過濾掉該條數(shù)據(jù)。注意此處不能使用for循環(huán)來一邊遍歷,一邊刪除集合數(shù)據(jù)。
@Overridepublic Event intercept(Event event) {//1、獲取header和body的數(shù)據(jù)Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);try {//2、將body的數(shù)據(jù)類型轉(zhuǎn)成jsonObject類型(方便獲取數(shù)據(jù))JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp時(shí)間字段替換成日志生成的時(shí)間戳(解決數(shù)據(jù)漂移問題)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;} catch (Exception e) {e.printStackTrace();return null;}
@Override
public List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();//必須使用迭代器刪除}}return list;
}
-
打包時(shí)注意要帶上fastjson依賴,需要在maven中添加配置打包插件。依賴中有flume和fastjson,但在虛擬機(jī)上有flume,沒有fastjson,所以需要排除flume??梢允褂胮rovided標(biāo)簽來排除讓打包時(shí)排除依賴。
- compile:在單元測試、編譯、運(yùn)行三種方式都會(huì)使用compile表明的依賴;
- test:在單元測試才會(huì)使用test表明的依賴;
- provided:在編譯才會(huì)使用test表明的依賴;
-
Flume配置文件中添加攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全類名建議在IDEA中復(fù)制,Builder也需要根據(jù)自己的代碼函數(shù)名修改
- 重新生成數(shù)據(jù),查看是否根據(jù)數(shù)據(jù)本身的時(shí)間戳存放到對應(yīng)的HDFS分區(qū)文件中。
3. 業(yè)務(wù)數(shù)據(jù)同步
3.1 同步策略
- 全量同步:每天將所有數(shù)據(jù)同步一份,業(yè)務(wù)數(shù)據(jù)量小,優(yōu)先考慮全量同步。
- 增量同步:每天只將新增和變化進(jìn)行同步,業(yè)務(wù)數(shù)據(jù)量大,優(yōu)先考慮增量同步。
3.2 數(shù)據(jù)同步工具
全量:DataX
、Sqoop
增量:Maxwell
、Canal
3.3 DataX
是一個(gè)數(shù)據(jù)同步工具,致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫HDFS、Hive、ODPS、HBase、MySQL等等數(shù)據(jù)源之間的互傳。
- 架構(gòu)= reader + framework + writer
- 運(yùn)行流程
- job: 單個(gè)數(shù)據(jù)同步的作業(yè),會(huì)啟動(dòng)一個(gè)進(jìn)程。
- Task: 根據(jù)不同數(shù)據(jù)源的切分策略,一個(gè)Job會(huì)切分為多個(gè)Task,Task是DataX作業(yè)的最小單元,每個(gè)Task負(fù)責(zé)一部分,由一個(gè)線程執(zhí)行。
- 調(diào)度策略:會(huì)根據(jù)系統(tǒng)資源設(shè)置并發(fā)度,并發(fā)度為線程同時(shí)執(zhí)行的個(gè)數(shù),任務(wù)會(huì)按照并發(fā)度一組一組執(zhí)行。
3.4 DataX安裝
- 下載解壓DataX安裝包
bin/datax.py job/job.json
測試安裝包是否完整- MySQL Reader配置文件的書寫
- HDFS Writer配置文件的書寫
- 執(zhí)行datax命令
python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
- 執(zhí)行完后可以使用hadoop fs cat 路徑名 | zcat,來查看壓縮文件是否正確