中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站建設(shè)尾款營銷咨詢公司排名前十

網(wǎng)站建設(shè)尾款,營銷咨詢公司排名前十,微網(wǎng)站,專業(yè)北京翻譯公司概覽 技術(shù)方案: 日志采集服務(wù):通過利用Flume-ng對業(yè)務(wù)平臺中用戶對于電影的一次評分行為進行采集,實時發(fā)送到Kafka集群。消息緩沖服務(wù):項目采用Kafka作為流式數(shù)據(jù)的緩存組件,接受來自Flume的數(shù)據(jù)采集請求。并將數(shù)據(jù)推…

概覽

技術(shù)方案:

  • 日志采集服務(wù):通過利用Flume-ng對業(yè)務(wù)平臺中用戶對于電影的一次評分行為進行采集,實時發(fā)送到Kafka集群。
  • 消息緩沖服務(wù):項目采用Kafka作為流式數(shù)據(jù)的緩存組件,接受來自Flume的數(shù)據(jù)采集請求。并將數(shù)據(jù)推送到項目的實時推薦系統(tǒng)部分。
  • 實時推薦服務(wù):項目采用Spark Streaming作為實時推薦系統(tǒng),通過接收Kafka中緩存的數(shù)據(jù),通過設(shè)計的推薦算法實現(xiàn)對實時推薦的數(shù)據(jù)處理,并將結(jié)構(gòu)合并更新到MongoDB數(shù)據(jù)庫。

1. 實現(xiàn)思路

我們應(yīng)該如何實現(xiàn)?

  1. 首先應(yīng)該redis安裝,這里存儲用戶的第K次評分(用戶評分存入redis中)
  2. 安裝zookeeper,安裝kafka,都是standlone模式
  3. 測試Kafka與Spark Streaming 聯(lián)調(diào)。Kafka生產(chǎn)一條數(shù)據(jù),Spark Streaming 可以消費成功,并根據(jù)redis中的數(shù)據(jù)和MongoDB數(shù)據(jù)進行推薦,存入MongoDB中
  4. 在業(yè)務(wù)系統(tǒng)寫埋點信息,測試時寫入本地文件,之后再遠程測試寫入云服務(wù)器log文件中
  5. flume配置文件書寫,kafka創(chuàng)建兩個topic,對整個過程進行測試

2 環(huán)境準備

1.1 redis 安裝

  • redis安裝redis安裝
  • 密碼:123456
  • 存入redis一些數(shù)據(jù) lpush uid:1 mid:score
  • redis 教程:教程

1.2 zookeeper單機版安裝

  • zookeeper安裝:zookeeper安裝
  • 版本:3.7.1
  • 遇到的坑:8080端口連接占用,我們需要在zoo.cpg文件中加上
    admin.serverPort=8001重新啟動即可。

1.3 kafka單機安裝

  • kafka安裝:官網(wǎng)下載地址
  • 安裝使用的為:127.0.0.1
  • 啟動kafka:kafka教程
bin/kafka-server-start.sh config/server.properties
  • 創(chuàng)建一個topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender
  • 生產(chǎn)一個消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender
  • 消費一個消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning

3 測試kafka與spark streaming聯(lián)調(diào)

  • kafka版本:2.2.0
  • spark版本:2.3.0
  • 因此使用spark-streaming-kafka-0-10

image.png

  1. 啟動kafka,生產(chǎn)一條信息
  2. 書寫程序
// 定義kafka連接參數(shù)val kafkaParam = Map("bootstrap.servers" -> "服務(wù)器IP:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 通過kafka創(chuàng)建一個DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ))// 把原始數(shù)據(jù)UID|MID|SCORE|TIMESTAMP 轉(zhuǎn)換成評分流// 1|31|4.5|val ratingStream = kafkaStream.map{msg =>val attr = msg.value().split("\\|")( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}
  1. 若是kafka報錯,如果你同樣也是云服務(wù)器,請注意kafka的配置信息(很重要!)

(1)解決方法:修改kafka配置文件,設(shè)置為設(shè)置listeners為內(nèi)網(wǎng)ip,設(shè)置外網(wǎng)ip

  • 解決方案修改內(nèi)網(wǎng)ip

(2)重新啟動,成功

  • 內(nèi)網(wǎng)外網(wǎng)分流:內(nèi)網(wǎng)外網(wǎng)分流
  • kafka入門教程:入門教程
  1. redis報錯:開啟保護模式了,需要修改conf文件

效果

在kafka生產(chǎn)一個數(shù)據(jù),可以在MongoDB中得到推薦的電影結(jié)果

4 后端埋點

前端進行評分后,觸發(fā)click事件,后端進行測試埋點,利用log4j寫入本地文件中。

4.1 本地測試

  • log4j配置文件
log4j.rootLogger=INFO, file, stdout# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.Threshold=INFO
log4j.appender.file.File=F:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
  • 埋點實現(xiàn)
//埋點日志
import org.apache.log4j.Logger;// 關(guān)鍵代碼
Logger log = Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX + ":" + uid +"|"+ mid +"|"+ score +"|"+ System.currentTimeMillis()/1000)

4.2 寫入遠程測試

  1. Linux安裝syslog服務(wù),進行測試
  2. 主機log4j配置文件設(shè)置服務(wù)器ip
  • log4j配置:寫入遠程服務(wù)器
log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost= 服務(wù)器IP
log4j.appender.syslog.Threshold=INFO
log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%20t]  %-130c:(line:%4L)  :   %m%n

5 flume配置

  1. flume對接kafka:flume對接文件
  2. flume設(shè)置source和sink,source為文件地址,sink為kafka的log
# log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel 
agent.sinks = kafkasink 
agent.sources.exectail.type = exec 
agent.sources.exectail.command = tail -f /project/logs/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+ agent.sources.exectail.channels = memoryChannelagent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = 服務(wù)器地址:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 agent.sinks.kafkasink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000

6 實時推薦

ratingStream.foreachRDD{rdds => rdds.foreach{case (uid, mid, score, timestamp) => {println("rating data coming! >>>>>>>>>>>>>>>>")println(uid+",mid:"+mid)// 1. 從redis里獲取當(dāng)前用戶最近的K次評分,保存成Array[(mid, score)]val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )println("用戶最近的K次評分:"+userRecentlyRatings)// 2. 從相似度矩陣中取出當(dāng)前電影最相似的N個電影,作為備選列表,Array[mid]val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )println("電影最相似的N個電影:"+candidateMovies)// 3. 對每個備選電影,計算推薦優(yōu)先級,得到當(dāng)前用戶的實時推薦列表,Array[(mid, score)]val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )println("當(dāng)前用戶的實時推薦列表:"+streamRecs)// 4. 把推薦數(shù)據(jù)保存到mongodbsaveDataToMongoDB( uid, streamRecs )}}
}
def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={// 定義一個ArrayBuffer,用于保存每一個備選電影的基礎(chǔ)得分val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定義一個HashMap,保存每一個備選電影的增強減弱因子val increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){// 拿到備選電影和最近評分電影的相似度val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )if(simScore > 0.7){// 計算備選電影的基礎(chǔ)推薦得分scores += ( (candidateMovie, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 > 3 ){increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1} else{decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1}}}// 根據(jù)備選電影的mid做groupby,根據(jù)公式去求最后的推薦評分scores.groupBy(_._1).map{// groupBy之后得到的數(shù)據(jù) Map( mid -> ArrayBuffer[(mid, score)] )case (mid, scoreList) =>( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )}.toArray.sortWith(_._2>_._2)
}

7 啟動順序

  1. 啟動hadoop、spark的容器
  • cd /docker
  • docker-compose up -d
  • docker-compose ps
  1. 啟動mongodb和redis服務(wù)
  • netstat?-lanp?|?grep?"27017"
  • bin/redis-server etc/redis.conf
  1. 啟動zookeeper、kafka服務(wù)
  • ./zkServer.sh start
  • bin/kafka-server-start.sh config/server.properties
  1. 啟動flume服務(wù)
  • bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent

實現(xiàn)效果

前端評分成功后寫入日志文件,flume對接log日志文件無問題,kafka對接flume無問題,spark streaming處理收到的一條數(shù)據(jù),進行推薦,存入MongoDB中。

image.png

總結(jié)

由于時間匆忙,寫的有些匆忙,如果有需要前端設(shè)計代碼和后端的代碼可以評論我,我整理整理發(fā)到github上。

前端設(shè)計部分沒有時間去詳細做,后續(xù)再對前端頁面進行美化。本科當(dāng)時整合了一個管理系統(tǒng),現(xiàn)在也沒有時間做,總之,一周多時間把當(dāng)時的系統(tǒng)快速復(fù)現(xiàn)了下,算是一個復(fù)習(xí)。

在進行開發(fā)時,遇到許多問題,版本問題、服務(wù)器內(nèi)網(wǎng)外網(wǎng)問題、docker容器相關(guān)問題、協(xié)同過濾算法設(shè)計問題,但幫著自己復(fù)習(xí)了下Vue和SpringBoot。

遇到問題時

  • 遇到問題不應(yīng)該盲目解決,應(yīng)該靜下心看看報錯原因,想想為何報錯
  • 版本尤其重要,因此最好在一個project的pom設(shè)定版本
  • 使用服務(wù)器搭建docker-compose,利用該方法來搭建集群,快速簡單,但涉及的端口轉(zhuǎn)發(fā)等一些網(wǎng)絡(luò)知識需要耐下心來看
  • Vue-Cli+Element-ui搭配起來開發(fā)簡單
  • 寫程序時,我們應(yīng)該提前約定好接口,否則后續(xù)會很混亂…

后續(xù)

  • 后續(xù)將優(yōu)化下前端頁面,設(shè)計更多功能
  • 改進推薦算法
  • 增加冷啟動方案
http://www.risenshineclean.com/news/1417.html

相關(guān)文章:

  • 哪些網(wǎng)站是做食品nba交易最新消息
  • 山東鑫泰建設(shè)集團網(wǎng)站微信營銷推廣公司
  • 買了個域名怎么做網(wǎng)站網(wǎng)絡(luò)輿情分析師
  • 英文網(wǎng)站建設(shè)小程序開發(fā)
  • 360seo排名點擊軟件逆冬seo
  • 微信平臺公眾號開發(fā)廊坊網(wǎng)站seo
  • 威遠移動網(wǎng)站建設(shè)黃石seo診斷
  • 賀州網(wǎng)站制作吸引顧客的營銷策略
  • 一條龍網(wǎng)站建設(shè)哪家好游戲推廣員是做什么的
  • 六安建設(shè)廳網(wǎng)站青島網(wǎng)站seo診斷
  • 住建局證件查詢系統(tǒng)怎么做關(guān)鍵詞優(yōu)化排名
  • 網(wǎng)站用gbk還是utf8惡意點擊軟件哪個好
  • 在putty做網(wǎng)站要拷貝什么seo點擊優(yōu)化
  • photoshop做圖網(wǎng)站常德seo
  • 自己做網(wǎng)站收費么成都網(wǎng)站設(shè)計公司
  • 成都哪家公司做網(wǎng)站最好建立一個網(wǎng)站的費用
  • 建網(wǎng)站注冊培訓(xùn)心得體會2000字
  • 網(wǎng)站建設(shè)的經(jīng)濟可行性怎樣做網(wǎng)站賣自己的產(chǎn)品
  • 網(wǎng)站備案信息真實性核驗網(wǎng)頁版百度云
  • 黃渡網(wǎng)站建設(shè)百度網(wǎng)盤網(wǎng)頁版登錄
  • 電子商務(wù)網(wǎng)站建設(shè)選擇服務(wù)器要考慮的因素有 seo won
  • 鄒城手機網(wǎng)站建設(shè)網(wǎng)絡(luò)營銷軟件條件
  • dw做單頁網(wǎng)站教程seo排名點擊軟件運營
  • 蘇州做網(wǎng)站的公司哪家好衡陽百度推廣
  • 企業(yè)開展網(wǎng)站建設(shè)廣告聯(lián)盟有哪些
  • ps如何做ppt模板下載網(wǎng)站谷歌三件套
  • 這是我自己做的網(wǎng)站嗎現(xiàn)在推廣平臺哪家最好
  • 望城區(qū)政府門戶網(wǎng)站城市建設(shè)短視頻營銷的發(fā)展趨勢
  • 怎樣設(shè)計一個網(wǎng)頁頁面關(guān)鍵詞優(yōu)化搜索引擎
  • 外貿(mào)網(wǎng)站做的作用是什么sem工作內(nèi)容