手機(jī)網(wǎng)站開發(fā)平臺(tái)互聯(lián)網(wǎng)營銷怎么賺錢
需求分析
- 關(guān)鍵詞
- 統(tǒng)計(jì)關(guān)鍵詞出現(xiàn)的頻率
IK分詞
進(jìn)行分詞需要引入IK分詞器,使用它時(shí)需要引入相關(guān)的依賴。它能夠?qū)⑺阉鞯年P(guān)鍵字按照日常的使用習(xí)慣進(jìn)行拆分。比如將蘋果iphone 手機(jī),拆分為蘋果,iphone, 手機(jī)。
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId>
</dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId>
</dependency>
測(cè)試代碼如下:
public class IkUtil {public static void main(String[] args) throws IOException {String s = "Apple 蘋果15 5G手機(jī)";StringReader stringReader = new StringReader(s);IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二個(gè)參數(shù)表示是否再對(duì)拆分后的單詞再進(jìn)行拆分,true時(shí)表示不在繼續(xù)拆分Lexeme next = ikSegmenter.next();while (next!= null) {System.out.println(next.getLexemeText());next = ikSegmenter.next();}}
}
整體流程
- 創(chuàng)建自定義分詞工具類IKUtil,IK是一個(gè)分詞工具依賴
- 創(chuàng)建自定義函數(shù)類
- 注冊(cè)函數(shù)
- 消費(fèi)kafka DWD頁面主題數(shù)據(jù)并設(shè)置水位線
- 從主流中過濾搜索行為
- page[‘item’] is not null
- item_type : “keyword”
- last_page_id: “search”
- 使用分詞函數(shù)對(duì)keyword進(jìn)行拆分
- 對(duì)keyword進(jìn)行分組開窗聚合
- 寫出到doris
- 創(chuàng)建doris sink
- flink需要打開檢查點(diǎn)才能將數(shù)據(jù)寫出到doris
具體實(shí)現(xiàn)
import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** @Author 浪拍岸* @Create 28/12/2023 上午11:06* @Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 讀取主流dwd頁面主題數(shù)據(jù)tableEnv.executeSql("create table page_info(\n" +" `common` map<string,string>,\n" +" `page` map<string,string>,\n" +" `ts` bigint,\n" +" `row_time` as to_timestamp_ltz(ts,3),\n" +" WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//測(cè)試是否獲取到數(shù)據(jù)//tableEnv.executeSql("select * from page_info").print();//2. 篩選出關(guān)鍵字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n" +" page['item'] keywords,\n" +" `row_time`,\n" +" ts\n" +" from page_info\n" +" where page['last_page_id'] = 'search'\n" +" and page['item_type'] = 'keyword'\n" +" and page['item'] is not null");tableEnv.createTemporaryView("keywords_table", keywrodTable);// 測(cè)試是否獲取到數(shù)據(jù)//tableEnv.executeSql("select * from keywords_table").print();//3. 自定義分詞函數(shù)并注冊(cè)tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );//4. 調(diào)用分詞函數(shù)對(duì)keywords進(jìn)行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +" from keywords_table" +" left join lateral Table(kwSplit(keywords)) on true");tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 對(duì)keyword進(jìn)行分組開窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n" +" keyword,\n" +" cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +" cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +" cast(current_date as string) cur_date,\n" +" count(*) keyword_count\n" +"from split_kw_table\n" +"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 寫出到doristableEnv.executeSql("create table doris_sink\n" +"(\n" +" keyword STRING,\n" +" wStart STRING,\n" +" wEnd STRING,\n" +" cur_date STRING,\n" +" keyword_count BIGINT\n" +")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto("doris_sink").execute();}
}