大昌建設(shè)集團(tuán)有限公司網(wǎng)站sem網(wǎng)絡(luò)推廣是什么
??熱點(diǎn)文章-實(shí)時(shí)計(jì)算
1 今日內(nèi)容
1.1 定時(shí)計(jì)算與實(shí)時(shí)計(jì)算
1.2 今日內(nèi)容
kafkaStream
- 什么是流式計(jì)算
- kafkaStream概述
- kafkaStream入門案例
- Springboot集成kafkaStream
實(shí)時(shí)計(jì)算
- 用戶行為發(fā)送消息
- kafkaStream聚合處理消息
- 更新文章行為數(shù)量
- 替換熱點(diǎn)文章數(shù)據(jù)
2 實(shí)時(shí)流式計(jì)算
2.1 概念
一般流式計(jì)算會(huì)與批量計(jì)算相比較。在流式計(jì)算模型中,輸入是持續(xù)的,可以認(rèn)為在時(shí)間上是無(wú)界的,也就意味著,永遠(yuǎn)拿不到全量數(shù)據(jù)去做計(jì)算。同時(shí),計(jì)算結(jié)果是持續(xù)輸出的,也即計(jì)算結(jié)果在時(shí)間上也是無(wú)界的。流式計(jì)算一般對(duì)實(shí)時(shí)性要求較高,同時(shí)一般是先定義目標(biāo)計(jì)算,然后數(shù)據(jù)到來(lái)之后將計(jì)算邏輯應(yīng)用于數(shù)據(jù)。同時(shí)為了提高計(jì)算效率,往往盡可能采用增量計(jì)算代替全量計(jì)算。
流式計(jì)算就相當(dāng)于上圖的右側(cè)扶梯,是可以源源不斷的產(chǎn)生數(shù)據(jù),源源不斷的接收數(shù)據(jù),沒(méi)有邊界。
2.2 應(yīng)用場(chǎng)景
- 日志分析
網(wǎng)站的用戶訪問(wèn)日志進(jìn)行實(shí)時(shí)的分析,計(jì)算訪問(wèn)量,用戶畫像,留存率等等,實(shí)時(shí)的進(jìn)行數(shù)據(jù)分析,幫助企業(yè)進(jìn)行決策
- 大屏看板統(tǒng)計(jì)
可以實(shí)時(shí)的查看網(wǎng)站注冊(cè)數(shù)量,訂單數(shù)量,購(gòu)買數(shù)量,金額等。
- 公交實(shí)時(shí)數(shù)據(jù)
可以隨時(shí)更新公交車方位,計(jì)算多久到達(dá)站牌等
- 實(shí)時(shí)文章分值計(jì)算
頭條類文章的分值計(jì)算,通過(guò)用戶的行為實(shí)時(shí)文章的分值,分值越高就越被推薦。
2.3 技術(shù)方案選型
- Hadoop
- Apche Storm
Storm 是一個(gè)分布式實(shí)時(shí)大數(shù)據(jù)處理系統(tǒng),可以幫助我們方便地處理海量數(shù)據(jù),具有高可靠、高容錯(cuò)、高擴(kuò)展的特點(diǎn)。是流式框架,有很高的數(shù)據(jù)吞吐能力。
- Kafka Stream
可以輕松地將其嵌入任何Java應(yīng)用程序中,并與用戶為其流應(yīng)用程序所擁有的任何現(xiàn)有打包,部署和操作工具集成。
3 Kafka Stream
3.1 概述
Kafka Stream是Apache Kafka從0.10版本引入的一個(gè)新Feature。它是提供了對(duì)存儲(chǔ)于Kafka內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的功能。
Kafka Stream的特點(diǎn)如下:
- Kafka Stream提供了一個(gè)非常簡(jiǎn)單而輕量的Library,它可以非常方便地嵌入任意Java應(yīng)用中,也可以任意方式打包和部署
- 除了Kafka外,無(wú)任何外部依賴
- 充分利用Kafka分區(qū)機(jī)制實(shí)現(xiàn)水平擴(kuò)展和順序性保證
- 通過(guò)可容錯(cuò)的state store實(shí)現(xiàn)高效的狀態(tài)操作(如windowed join和aggregation)
- 支持正好一次處理語(yǔ)義
- 提供記錄級(jí)的處理能力,從而實(shí)現(xiàn)毫秒級(jí)的低延遲
- 支持基于事件時(shí)間的窗口操作,并且可處理晚到的數(shù)據(jù)(late arrival of records)
- 同時(shí)提供底層的處理原語(yǔ)Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)
3.2 Kafka Streams的關(guān)鍵概念
- 源處理器(Source Processor):源處理器是一個(gè)沒(méi)有任何上游處理器的特殊類型的流處理器。它從一個(gè)或多個(gè)kafka主題生成輸入流。通過(guò)消費(fèi)這些主題的消息并將它們轉(zhuǎn)發(fā)到下游處理器。
- Sink處理器:sink處理器是一個(gè)沒(méi)有下游流處理器的特殊類型的流處理器。它接收上游流處理器的消息發(fā)送到一個(gè)指定的Kafka主題。
3.3 KStream
(1)數(shù)據(jù)結(jié)構(gòu)類似于map,如下圖,key-value鍵值對(duì)
(2)KStream
KStream數(shù)據(jù)流(data stream),即是一段順序的,可以無(wú)限長(zhǎng),不斷更新的數(shù)據(jù)集。
數(shù)據(jù)流中比較常記錄的是事件,這些事件可以是一次鼠標(biāo)點(diǎn)擊(click),一次交易,或是傳感器記錄的位置數(shù)據(jù)。
KStream負(fù)責(zé)抽象的,就是數(shù)據(jù)流。與Kafka自身topic中的數(shù)據(jù)一樣,類似日志,每一次操作都是向其中插入(insert)新數(shù)據(jù)。
為了說(shuō)明這一點(diǎn),讓我們想象一下以下兩個(gè)數(shù)據(jù)記錄正在發(fā)送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流處理應(yīng)用是要總結(jié)每個(gè)用戶的價(jià)值,它將返回4
了alice
。為什么?因?yàn)榈诙l數(shù)據(jù)記錄將不被視為先前記錄的更新。(insert)新數(shù)據(jù)
3.4 Kafka Stream入門案例編寫
(1)需求分析,求單詞個(gè)數(shù)(word count)
(2)引入依賴
在之前的kafka-demo工程的pom文件中引入
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
(3)創(chuàng)建原生的kafka staream入門案例
package com.heima.kafka.sample;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式處理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 構(gòu)建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式計(jì)算streamProcessor(streamsBuilder);//創(chuàng)建kafkaStream對(duì)象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//開啟流式計(jì)算kafkaStreams.start();}/*** 流式計(jì)算* 消息的內(nèi)容:hello kafka hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//創(chuàng)建kstream對(duì)象,同時(shí)指定從那個(gè)topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 處理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value進(jìn)行聚合處理.groupBy((key,value)->value)//時(shí)間窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//統(tǒng)計(jì)單詞的個(gè)數(shù).count()//轉(zhuǎn)換為kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//發(fā)送消息.to("itcast-topic-out");}
}
(4)測(cè)試準(zhǔn)備
- 使用生產(chǎn)者在topic為:itcast_topic_input中發(fā)送多條消息
- 使用消費(fèi)者接收topic為:itcast_topic_out
結(jié)果:
- 通過(guò)流式計(jì)算,會(huì)把生產(chǎn)者的多條消息匯總成一條發(fā)送到消費(fèi)者中輸出
3.5 SpringBoot集成Kafka Stream
(1)自定配置參數(shù)
package com.heima.kafka.config;
import java.util.HashMap;
import java.util.Map;/*** 通過(guò)重新注冊(cè)KafkaStreamsConfiguration對(duì)象,設(shè)置自定配置參數(shù)*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//連接信息props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//組props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//應(yīng)用名稱props.put(StreamsConfig.RETRIES_CONFIG, 10);//重試次數(shù)props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key序列化器props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}
修改application.yml文件,在最下方添加自定義配置
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}
(2)新增配置類,創(chuàng)建KStream對(duì)象,進(jìn)行聚合
package com.heima.kafka.stream;
import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//創(chuàng)建kstream對(duì)象,同時(shí)指定從那個(gè)topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根據(jù)value進(jìn)行聚合分組.groupBy((key,value)->value)//聚合計(jì)算時(shí)間間隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求單詞的個(gè)數(shù).count().toStream()//處理后的結(jié)果轉(zhuǎn)換為string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//發(fā)送消息.to("itcast-topic-out");return stream;}
}
測(cè)試:
啟動(dòng)微服務(wù),正常發(fā)送消息,可以正常接收到消息
3 app端熱點(diǎn)文章計(jì)算
3.1 思路說(shuō)明
3.2 功能實(shí)現(xiàn)
3.2.1 用戶行為(閱讀量,評(píng)論,點(diǎn)贊,收藏)發(fā)送消息,以閱讀和點(diǎn)贊為例
①在heima-leadnews-behavior微服務(wù)中集成kafka生產(chǎn)者配置
修改nacos,新增內(nèi)容
spring:application:name: leadnews-behaviorkafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
②修改ApLikesBehaviorServiceImpl新增發(fā)送消息
定義消息發(fā)送封裝類:UpdateArticleMess
package com.heima.model.mess;import lombok.Data;@Data
public class UpdateArticleMess {/*** 修改文章的字段類型*/private UpdateArticleType type;/*** 文章ID*/private Long articleId;/*** 修改數(shù)據(jù)的增量,可為正負(fù)*/private Integer add;public enum UpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}
}
topic常量類:
package com.heima.common.constants;
public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
}
完整代碼如下:
package com.heima.behavior.service.impl;import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult like(LikesBehaviorDto dto) {//1.檢查參數(shù)if (dto == null || dto.getArticleId() == null || checkParam(dto)) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登錄ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//3.點(diǎn)贊 保存數(shù)據(jù)if (dto.getOperation() == 0) {Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (obj != null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已點(diǎn)贊");}// 保存當(dāng)前keylog.info("保存當(dāng)前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));mess.setAdd(1);} else {// 刪除當(dāng)前keylog.info("刪除當(dāng)前key:{}, {}", dto.getArticleId(), user.getId());cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());mess.setAdd(-1);}//發(fā)送消息,數(shù)據(jù)聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}/*** 檢查參數(shù)** @return*/private boolean checkParam(LikesBehaviorDto dto) {if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {return true;}return false;}
}
③修改閱讀行為的類ApReadBehaviorServiceImpl發(fā)送消息
完整代碼:
package com.heima.behavior.service.impl;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult readBehavior(ReadBehaviorDto dto) {//1.檢查參數(shù)if (dto == null || dto.getArticleId() == null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登錄ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}//更新閱讀次數(shù)String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (StringUtils.isNotBlank(readBehaviorJson)) {ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));}// 保存當(dāng)前keylog.info("保存當(dāng)前key:{} {} {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));//發(fā)送消息,數(shù)據(jù)聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
}
3.2.2 使用kafkaStream實(shí)時(shí)接收消息,聚合內(nèi)容
①在leadnews-article微服務(wù)中集成kafkaStream (參考kafka-demo)
②定義實(shí)體類,用于聚合之后的分值封裝
package com.heima.model.article.mess;
import lombok.Data;@Data
public class ArticleVisitStreamMess {/*** 文章id*/private Long articleId;/*** 閱讀*/private int view;/*** 收藏*/private int collect;/*** 評(píng)論*/private int comment;/*** 點(diǎn)贊*/private int like;
}
修改常量類:增加常量
package com.heima.common.constans;
public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}
③ 定義stream,接收消息并聚合
package com.heima.article.stream;
import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式處理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434 和 value: likes:1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id進(jìn)行聚合.groupBy((key,value)->key)//時(shí)間窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的計(jì)算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 獲得初始值,也是時(shí)間窗口內(nèi)計(jì)算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作*/String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("當(dāng)前時(shí)間窗口內(nèi)的消息處理結(jié)果:"+formatStr);return formatStr;}}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//發(fā)送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value數(shù)據(jù)* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息處理之后的結(jié)果為:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}
3.2.3 重新計(jì)算文章的分值,更新到數(shù)據(jù)庫(kù)和緩存中
①在ApArticleService添加方法,用于更新數(shù)據(jù)庫(kù)中的文章分值
/*** 更新文章的分值 同時(shí)更新緩存中的熱點(diǎn)文章數(shù)據(jù)* @param mess*/
public void updateScore(ArticleVisitStreamMess mess);
實(shí)現(xiàn)類方法
/*** 更新文章的分值 同時(shí)更新緩存中的熱點(diǎn)文章數(shù)據(jù)* @param mess*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {//1.更新文章的閱讀、點(diǎn)贊、收藏、評(píng)論的數(shù)量ApArticle apArticle = updateArticle(mess);//2.計(jì)算文章的分值Integer score = computeScore(apArticle);score = score * 3;//3.替換當(dāng)前文章對(duì)應(yīng)頻道的熱點(diǎn)數(shù)據(jù)replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());//4.替換推薦對(duì)應(yīng)的熱點(diǎn)數(shù)據(jù)replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);
}/*** 替換數(shù)據(jù)并且存入到redis* @param apArticle* @param score* @param s*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {String articleListStr = cacheService.get(s);if (StringUtils.isNotBlank(articleListStr)) {List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);boolean flag = true;//如果緩存中存在該文章,只更新分值for (HotArticleVo hotArticleVo : hotArticleVoList) {if (hotArticleVo.getId().equals(apArticle.getId())) {hotArticleVo.setScore(score);flag = false;break;}}//如果緩存中不存在,查詢緩存中分值最小的一條數(shù)據(jù),進(jìn)行分值的比較,如果當(dāng)前文章的分值大于緩存中的數(shù)據(jù),就替換if (flag) {if (hotArticleVoList.size() >= 30) {hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if (lastHot.getScore() < score) {hotArticleVoList.remove(lastHot);HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}} else {HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}}//緩存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(s, JSON.toJSONString(hotArticleVoList));}
}/*** 更新文章行為數(shù)量* @param mess*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;
}/*** 計(jì)算文章的具體分值* @param apArticle* @return*/
private Integer computeScore(ApArticle apArticle) {Integer score = 0;if(apArticle.getLikes() != null){score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){score += apArticle.getViews();}if(apArticle.getComment() != null){score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;
}