網(wǎng)站建設(shè)尾款營銷咨詢公司排名前十
概覽
技術(shù)方案:
- 日志采集服務(wù):通過利用Flume-ng對業(yè)務(wù)平臺中用戶對于電影的一次評分行為進行采集,實時發(fā)送到Kafka集群。
- 消息緩沖服務(wù):項目采用Kafka作為流式數(shù)據(jù)的緩存組件,接受來自Flume的數(shù)據(jù)采集請求。并將數(shù)據(jù)推送到項目的實時推薦系統(tǒng)部分。
- 實時推薦服務(wù):項目采用Spark Streaming作為實時推薦系統(tǒng),通過接收Kafka中緩存的數(shù)據(jù),通過設(shè)計的推薦算法實現(xiàn)對實時推薦的數(shù)據(jù)處理,并將結(jié)構(gòu)合并更新到MongoDB數(shù)據(jù)庫。
1. 實現(xiàn)思路
我們應(yīng)該如何實現(xiàn)?
- 首先應(yīng)該redis安裝,這里存儲用戶的第K次評分(用戶評分存入redis中)
- 安裝zookeeper,安裝kafka,都是standlone模式
- 測試Kafka與Spark Streaming 聯(lián)調(diào)。Kafka生產(chǎn)一條數(shù)據(jù),Spark Streaming 可以消費成功,并根據(jù)redis中的數(shù)據(jù)和MongoDB數(shù)據(jù)進行推薦,存入MongoDB中
- 在業(yè)務(wù)系統(tǒng)寫埋點信息,測試時寫入本地文件,之后再遠程測試寫入云服務(wù)器log文件中
- flume配置文件書寫,kafka創(chuàng)建兩個topic,對整個過程進行測試
2 環(huán)境準備
1.1 redis 安裝
- redis安裝redis安裝
- 密碼:123456
- 存入redis一些數(shù)據(jù) lpush uid:1 mid:score
- redis 教程:教程
1.2 zookeeper單機版安裝
- zookeeper安裝:zookeeper安裝
- 版本:3.7.1
- 遇到的坑:8080端口連接占用,我們需要在zoo.cpg文件中加上
admin.serverPort=8001
重新啟動即可。
1.3 kafka單機安裝
- kafka安裝:官網(wǎng)下載地址
- 安裝使用的為:127.0.0.1
- 啟動kafka:kafka教程
bin/kafka-server-start.sh config/server.properties
- 創(chuàng)建一個topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender
- 生產(chǎn)一個消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender
- 消費一個消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning
3 測試kafka與spark streaming聯(lián)調(diào)
- kafka版本:2.2.0
- spark版本:2.3.0
- 因此使用
spark-streaming-kafka-0-10
- 啟動kafka,生產(chǎn)一條信息
- 書寫程序
// 定義kafka連接參數(shù)val kafkaParam = Map("bootstrap.servers" -> "服務(wù)器IP:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 通過kafka創(chuàng)建一個DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ))// 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流// 1|31|4.5|val ratingStream = kafkaStream.map{msg =>val attr = msg.value().split("\\|")( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}
- 若是kafka報錯,如果你同樣也是云服務(wù)器,請注意kafka的配置信息(很重要!)
(1)解決方法:修改kafka配置文件,設(shè)置為設(shè)置listeners為內(nèi)網(wǎng)ip,設(shè)置外網(wǎng)ip
- 解決方案修改內(nèi)網(wǎng)ip
(2)重新啟動,成功
- 內(nèi)網(wǎng)外網(wǎng)分流:內(nèi)網(wǎng)外網(wǎng)分流
- kafka入門教程:入門教程
- redis報錯:開啟保護模式了,需要修改conf文件
效果
在kafka生產(chǎn)一個數(shù)據(jù),可以在MongoDB中得到推薦的電影結(jié)果
4 后端埋點
前端進行評分后,觸發(fā)click事件,后端進行測試埋點,利用log4j寫入本地文件中。
4.1 本地測試
- log4j配置文件
log4j.rootLogger=INFO, file, stdout# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.Threshold=INFO
log4j.appender.file.File=F:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
- 埋點實現(xiàn)
//埋點日志
import org.apache.log4j.Logger;// 關(guān)鍵代碼
Logger log = Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX + ":" + uid +"|"+ mid +"|"+ score +"|"+ System.currentTimeMillis()/1000)
4.2 寫入遠程測試
- Linux安裝syslog服務(wù),進行測試
- 主機log4j配置文件設(shè)置服務(wù)器ip
- log4j配置:寫入遠程服務(wù)器
log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost= 服務(wù)器IP
log4j.appender.syslog.Threshold=INFO
log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%20t] %-130c:(line:%4L) : %m%n
5 flume配置
- flume對接kafka:flume對接文件
- flume設(shè)置source和sink,source為文件地址,sink為kafka的log
# log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
agent.sources.exectail.type = exec
agent.sources.exectail.command = tail -f /project/logs/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+ agent.sources.exectail.channels = memoryChannelagent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = 服務(wù)器地址:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 agent.sinks.kafkasink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
6 實時推薦
ratingStream.foreachRDD{rdds => rdds.foreach{case (uid, mid, score, timestamp) => {println("rating data coming! >>>>>>>>>>>>>>>>")println(uid+",mid:"+mid)// 1. 從redis里獲取當(dāng)前用戶最近的K次評分,保存成Array[(mid, score)]val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )println("用戶最近的K次評分:"+userRecentlyRatings)// 2. 從相似度矩陣中取出當(dāng)前電影最相似的N個電影,作為備選列表,Array[mid]val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )println("電影最相似的N個電影:"+candidateMovies)// 3. 對每個備選電影,計算推薦優(yōu)先級,得到當(dāng)前用戶的實時推薦列表,Array[(mid, score)]val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )println("當(dāng)前用戶的實時推薦列表:"+streamRecs)// 4. 把推薦數(shù)據(jù)保存到mongodbsaveDataToMongoDB( uid, streamRecs )}}
}
def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={// 定義一個ArrayBuffer,用于保存每一個備選電影的基礎(chǔ)得分val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定義一個HashMap,保存每一個備選電影的增強減弱因子val increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){// 拿到備選電影和最近評分電影的相似度val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )if(simScore > 0.7){// 計算備選電影的基礎(chǔ)推薦得分scores += ( (candidateMovie, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 > 3 ){increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1} else{decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1}}}// 根據(jù)備選電影的mid做groupby,根據(jù)公式去求最后的推薦評分scores.groupBy(_._1).map{// groupBy之后得到的數(shù)據(jù) Map( mid -> ArrayBuffer[(mid, score)] )case (mid, scoreList) =>( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )}.toArray.sortWith(_._2>_._2)
}
7 啟動順序
- 啟動hadoop、spark的容器
cd /docker
docker-compose up -d
docker-compose ps
- 啟動mongodb和redis服務(wù)
netstat?-lanp?|?grep?"27017"
bin/redis-server etc/redis.conf
- 啟動zookeeper、kafka服務(wù)
./zkServer.sh start
bin/kafka-server-start.sh config/server.properties
- 啟動flume服務(wù)
bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent
實現(xiàn)效果
前端評分成功后寫入日志文件,flume對接log日志文件無問題,kafka對接flume無問題,spark streaming處理收到的一條數(shù)據(jù),進行推薦,存入MongoDB中。
總結(jié)
由于時間匆忙,寫的有些匆忙,如果有需要前端設(shè)計代碼和后端的代碼可以評論我,我整理整理發(fā)到github上。
前端設(shè)計部分沒有時間去詳細做,后續(xù)再對前端頁面進行美化。本科當(dāng)時整合了一個管理系統(tǒng),現(xiàn)在也沒有時間做,總之,一周多時間把當(dāng)時的系統(tǒng)快速復(fù)現(xiàn)了下,算是一個復(fù)習(xí)。
在進行開發(fā)時,遇到許多問題,版本問題、服務(wù)器內(nèi)網(wǎng)外網(wǎng)問題、docker容器相關(guān)問題、協(xié)同過濾算法設(shè)計問題,但幫著自己復(fù)習(xí)了下Vue和SpringBoot。
遇到問題時
- 遇到問題不應(yīng)該盲目解決,應(yīng)該靜下心看看報錯原因,想想為何報錯
- 版本尤其重要,因此最好在一個project的pom設(shè)定版本
- 使用服務(wù)器搭建docker-compose,利用該方法來搭建集群,快速簡單,但涉及的端口轉(zhuǎn)發(fā)等一些網(wǎng)絡(luò)知識需要耐下心來看
- Vue-Cli+Element-ui搭配起來開發(fā)簡單
- 寫程序時,我們應(yīng)該提前約定好接口,否則后續(xù)會很混亂…
后續(xù)
- 后續(xù)將優(yōu)化下前端頁面,設(shè)計更多功能
- 改進推薦算法
- 增加冷啟動方案