網(wǎng)站開發(fā)軟件系統(tǒng)教程seo推廣排名網(wǎng)站
媒資管理模塊 - 視頻處理
文章目錄
- 媒資管理模塊 - 視頻處理
- 一、視頻轉(zhuǎn)碼
- 1.1 視頻轉(zhuǎn)碼介紹
- 1.2 FFmpeg 基本使用
- 1.2.1 下載安裝配置
- 1.2.2 轉(zhuǎn)碼測(cè)試
- 1.3 工具類
- 1.3.1 VideoUtil
- 1.3.2 Mp4VideoUtil
- 1.3.3 測(cè)試工具類
- 二、分布式任務(wù)處理
- 2.1 分布式任務(wù)調(diào)度
- 2.2 XXL-JOB 配置執(zhí)行器 中間件
- 2.3 搭建XXL-JOB
- 2.3.1 調(diào)度中心
- 2.3.2 執(zhí)行器
- 2.3.3 執(zhí)行任務(wù)
- 2.4 XXL-JOB 高級(jí)配置參數(shù)
- 2.5 分片廣播
- 2.5.1 分片廣播事例
- 三、視頻處理
- 3.1 技術(shù)方案
- 3.1.1 作業(yè)分片方案
- 3.1.2 保證任務(wù)不重復(fù)執(zhí)行
- 3.1.3 視頻處理方案
一、視頻轉(zhuǎn)碼
1.1 視頻轉(zhuǎn)碼介紹
視頻轉(zhuǎn)碼是指的對(duì)視頻文件的編碼格式進(jìn)行轉(zhuǎn)換
視頻上傳成功需要對(duì)視頻的格式進(jìn)行轉(zhuǎn)碼處理,比如:avi轉(zhuǎn)成mp4
一般做文件存儲(chǔ)的服務(wù)都需要對(duì)文件進(jìn)行處理,例如對(duì)視頻進(jìn)行轉(zhuǎn)碼處理,可能由于文件量較大需要使用多線程等技術(shù)進(jìn)行高效處理
文件格式:是指.mp4、.avi、.rmvb等 這些不同擴(kuò)展名的視頻文件的文件格式
視頻文件的內(nèi)容主要包括視頻和音頻,其文件格式是按照一 定的編碼格式去編碼,并且按照該文件所規(guī)定的封裝格式將視頻、音頻、字幕等信息封裝在一起,播放器會(huì)根據(jù)它們的封裝格式去提取出編碼,然后由播放器解碼,最終播放音視頻
音視頻編碼格式:通過(guò)音視頻的壓縮技術(shù),將視頻格式轉(zhuǎn)換成另一種視頻格式,通過(guò)視頻編碼實(shí)現(xiàn)流媒體的傳輸
目前最常用的編碼標(biāo)準(zhǔn)是視頻H.264,音頻AAC
比如:
一個(gè).avi的視頻文件原來(lái)的編碼是a,通過(guò)編碼后編碼格式變?yōu)閎,
音頻原來(lái)為c,通過(guò)編碼后變?yōu)閐
1.2 FFmpeg 基本使用
1.2.1 下載安裝配置
我們Java程序員只需要調(diào)用流媒體程序員寫的工具類即可完成對(duì)視頻的操作,這個(gè)工具可能是c或c++寫的
流媒體程序員:專門做視頻處理類的東西
FFmpeg開源工具被許多開源項(xiàng)目采用,QQ影音、暴風(fēng)影音、VLC等
下載鏈接:https://www.ffmpeg.org/download.html#build-windows
最終下載之后三個(gè)exe文件
查看是否安裝成功
ffmpeg -v
也可以把ffmpeg.exe文件配置在path環(huán)境變量中
現(xiàn)在我們就可以在任意一個(gè)位置執(zhí)行命令了
1.2.2 轉(zhuǎn)碼測(cè)試
將avi文件轉(zhuǎn)換成mp4文件
ffmpeg.exe -i avi測(cè)試視頻.avi 1.mp4
轉(zhuǎn)成mp3
ffmpeg -i xxx.avi xxx1.mp3
轉(zhuǎn)成gif
ffmpeg -i xxx.avi xxx1.gif
1.3 工具類
在xuecheng-plus-base工程添加此工具類
這份工具類其實(shí)就是流媒體程序員進(jìn)行提供的
其實(shí)我們需要的是怎么調(diào)用ffmpeg.exe文件
1.3.1 VideoUtil
/*** 此文件作為視頻文件處理父類,提供:* 1、查看視頻時(shí)長(zhǎng)* 2、校驗(yàn)兩個(gè)視頻的時(shí)長(zhǎng)是否相等**/
public class VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置public VideoUtil(String ffmpeg_path){this.ffmpeg_path = ffmpeg_path;}//檢查視頻時(shí)間是否一致public Boolean check_video_time(String source,String target) {String source_time = get_video_time(source);//取出時(shí)分秒source_time = source_time.substring(0,source_time.lastIndexOf("."));String target_time = get_video_time(target);//取出時(shí)分秒target_time = target_time.substring(0,target_time.lastIndexOf("."));if(source_time == null || target_time == null){return false;}if(source_time.equals(target_time)){return true;}return false;}//獲取視頻時(shí)間(時(shí):分:秒:毫秒)public String get_video_time(String video_path) {/*ffmpeg -i lucene.mp4*/List<String> commend = new ArrayList<String>();commend.add(ffmpeg_path);commend.add("-i");commend.add(video_path);try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();String outstring = waitFor(p);System.out.println(outstring);int start = outstring.trim().indexOf("Duration: ");if(start>=0){int end = outstring.trim().indexOf(", start:");if(end>=0){String time = outstring.substring(start+10,end);if(time!=null && !time.equals("")){return time.trim();}}}} catch (Exception ex) {ex.printStackTrace();}return null;}public String waitFor(Process p) {InputStream in = null;InputStream error = null;String result = "error";int exitValue = -1;StringBuffer outputString = new StringBuffer();try {in = p.getInputStream();error = p.getErrorStream();boolean finished = false;int maxRetry = 600;//每次休眠1秒,最長(zhǎng)執(zhí)行時(shí)間10分種int retry = 0;while (!finished) {if (retry > maxRetry) {return "error";}try {while (in.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}while (error.available() > 0) {Character c = new Character((char) in.read());outputString.append(c);System.out.print(c);}//進(jìn)程未結(jié)束時(shí)調(diào)用exitValue將拋出異常exitValue = p.exitValue();finished = true;} catch (IllegalThreadStateException e) {Thread.currentThread().sleep(1000);//休眠1秒retry++;}}} catch (Exception e) {e.printStackTrace();} finally {if (in != null) {try {in.close();} catch (IOException e) {System.out.println(e.getMessage());}}}return outputString.toString();}public static void main(String[] args) throws IOException {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置VideoUtil videoUtil = new VideoUtil(ffmpeg_path);String video_time = videoUtil.get_video_time("E:\\ffmpeg_test\\1.avi");System.out.println(video_time);}
}
1.3.2 Mp4VideoUtil
public class Mp4VideoUtil extends VideoUtil {String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安裝位置String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";String mp4_name = "test1.mp4";String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){super(ffmpeg_path);this.ffmpeg_path = ffmpeg_path;this.video_path = video_path;this.mp4_name = mp4_name;this.mp4folder_path = mp4folder_path;}//清除已生成的mp4private void clear_mp4(String mp4_path){//刪除原來(lái)已經(jīng)生成的m3u8及ts文件File mp4File = new File(mp4_path);if(mp4File.exists() && mp4File.isFile()){mp4File.delete();}}/*** 視頻編碼,生成mp4文件* @return 成功返回success,失敗返回控制臺(tái)日志*/public String generateMp4(){//清除已生成的mp4
// clear_mp4(mp4folder_path+mp4_name);clear_mp4(mp4folder_path);/*ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4*/List<String> commend = new ArrayList<String>();//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");commend.add(ffmpeg_path);commend.add("-i");
// commend.add("D:\\BaiduNetdiskDownload\\test1.avi");commend.add(video_path);commend.add("-c:v");commend.add("libx264");commend.add("-y");//覆蓋輸出文件commend.add("-s");commend.add("1280x720");commend.add("-pix_fmt");commend.add("yuv420p");commend.add("-b:a");commend.add("63k");commend.add("-b:v");commend.add("753k");commend.add("-r");commend.add("18");
// commend.add(mp4folder_path + mp4_name );commend.add(mp4folder_path );String outstring = null;try {ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();outstring = waitFor(p);} catch (Exception ex) {ex.printStackTrace();}
// Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);if(!check_video_time){return outstring;}else{return "success";}}}
上面的代碼中大多數(shù)是參數(shù)封裝,真正調(diào)用FFmpeg的是下面幾行
ProcessBuilder builder = new ProcessBuilder();builder.command(commend);//將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息builder.redirectErrorStream(true);
Process p = builder.start();
1.3.3 測(cè)試工具類
我們可以測(cè)試一下,比如打開一下“咪咕視頻”
public static void main(String[] args) throws IOException {ProcessBuilder builder = new ProcessBuilder();//啟動(dòng)一下我本地的咪咕視頻(路徑中盡量不要含有中文)builder.command("D:\\soft\\MiguVideo\\MiGuApp.exe");//將標(biāo)準(zhǔn)輸入流和錯(cuò)誤輸入流合并,通過(guò)標(biāo)準(zhǔn)輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();}
可以在此類中執(zhí)行main函數(shù)調(diào)用一下此工具類是否能完成視頻轉(zhuǎn)碼
public static void main(String[] args) throws IOException {//ffmpeg的路徑String ffmpeg_path = "";//ffmpeg的安裝位置//源avi視頻的路徑String video_path = "";//轉(zhuǎn)換后mp4文件的名稱String mp4_name = "";//轉(zhuǎn)換后mp4文件的路徑String mp4_path = "";//創(chuàng)建工具類對(duì)象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);//開始視頻轉(zhuǎn)換,成功將返回successString s = videoUtil.generateMp4();System.out.println(s);}
二、分布式任務(wù)處理
2.1 分布式任務(wù)調(diào)度
什么是任務(wù)調(diào)度?
下面的場(chǎng)景就是一個(gè)調(diào)度方案
-
每隔24小時(shí)執(zhí)行數(shù)據(jù)備份任務(wù)。
-
12306網(wǎng)站會(huì)根據(jù)車次不同,設(shè)置幾個(gè)時(shí)間點(diǎn)分批次放票。
-
某財(cái)務(wù)系統(tǒng)需要在每天上午10點(diǎn)前結(jié)算前一天的賬單數(shù)據(jù),統(tǒng)計(jì)匯總。
-
商品成功發(fā)貨后,需要向客戶發(fā)送短信提醒。
任務(wù)調(diào)度:對(duì)任務(wù)的調(diào)度,它是指系統(tǒng)為了完成特定業(yè)務(wù),基于給定時(shí)間點(diǎn),給定時(shí)間間隔或者給定執(zhí)行次數(shù)自動(dòng)執(zhí)行任務(wù)
我們可以將一個(gè)視頻的轉(zhuǎn)碼理解為一個(gè)任務(wù)的執(zhí)行,如果視頻的數(shù)量比較多,如何去高效處理一批任務(wù)呢?
- 多線程
多線程是充分利用單機(jī)的資源。
- 分布式加多線程
充分利用多臺(tái)計(jì)算機(jī),每臺(tái)計(jì)算機(jī)使用多線程處理。每臺(tái)計(jì)算機(jī)都在同時(shí)運(yùn)行指定任務(wù)處理
方案2可擴(kuò)展性更強(qiáng),并且是一種分布式任務(wù)調(diào)度的處理方案。
什么是分布式任務(wù)調(diào)度?
通常任務(wù)調(diào)度的程序是集成在應(yīng)用中的,
比如:優(yōu)惠卷服務(wù)中包括了定時(shí)發(fā)放優(yōu)惠卷的的調(diào)度程序,
結(jié)算服務(wù)中包括了定期生成報(bào)表的任務(wù)調(diào)度程序
由于采用分布式架構(gòu),一個(gè)服務(wù)往往會(huì)部署多個(gè)冗余實(shí)例來(lái)運(yùn)行我們的業(yè)務(wù),在這種分布式系統(tǒng)環(huán)境下運(yùn)行任務(wù)調(diào)度,我們稱之為分布式任務(wù)調(diào)度,如下圖:
分布式調(diào)度要實(shí)現(xiàn)的目標(biāo):
? 不管是任務(wù)調(diào)度程序集成在應(yīng)用程序中,還是單獨(dú)構(gòu)建的任務(wù)調(diào)度系統(tǒng),如果采用分布式調(diào)度任務(wù)的方式就相當(dāng)于將任務(wù)調(diào)度程序分布式構(gòu)建,這樣就可以具有分布式系統(tǒng)的特點(diǎn),并且提高任務(wù)的調(diào)度處理能力:
1、并行任務(wù)調(diào)度
? 并行任務(wù)調(diào)度實(shí)現(xiàn)靠多線程,如果有大量任務(wù)需要調(diào)度,此時(shí)光靠多線程就會(huì)有瓶頸了,因?yàn)橐慌_(tái)計(jì)算機(jī)CPU的處理能力是有限的。
? 如果將任務(wù)調(diào)度程序分布式部署,每個(gè)結(jié)點(diǎn)還可以部署為集群,這樣就可以讓多臺(tái)計(jì)算機(jī)共同去完成任務(wù)調(diào)度,我們可以將任務(wù)分割為若干個(gè)分片,由不同的實(shí)例并行執(zhí)行,來(lái)提高任務(wù)調(diào)度的處理效率。
2、高可用
? 若某一個(gè)實(shí)例宕機(jī),不影響其他實(shí)例來(lái)執(zhí)行任務(wù)。
3、彈性擴(kuò)容
? 當(dāng)集群中增加實(shí)例就可以提高并執(zhí)行任務(wù)的處理效率。
4、任務(wù)管理與監(jiān)測(cè)
? 對(duì)系統(tǒng)中存在的所有定時(shí)任務(wù)進(jìn)行統(tǒng)一的管理及監(jiān)測(cè)。讓開發(fā)人員及運(yùn)維人員能夠時(shí)刻了解任務(wù)執(zhí)行情況,從而做出快速的應(yīng)急處理響應(yīng)。
5、避免任務(wù)重復(fù)執(zhí)行
? 當(dāng)任務(wù)調(diào)度以集群方式部署,同一個(gè)任務(wù)調(diào)度可能會(huì)執(zhí)行多次
比如在上面提到的電商系統(tǒng)中到點(diǎn)發(fā)優(yōu)惠券的例子,就會(huì)發(fā)放多次優(yōu)惠券,對(duì)公司造成很多損失,所以我們需要控制相同的任務(wù)在多個(gè)運(yùn)行實(shí)例上只執(zhí)行一次
2.2 XXL-JOB 配置執(zhí)行器 中間件
我們只需要編寫任務(wù)的執(zhí)行邏輯即可,其他的部分都在中間件中
XXL-JOB是一個(gè)輕量級(jí)分布式任務(wù)調(diào)度平臺(tái)
其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展。
現(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。
官網(wǎng):https://www.xuxueli.com/xxl-job/
文檔:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
XXL-JOB主要有調(diào)度中心、執(zhí)行器、任務(wù):
調(diào)度中心:
? 負(fù)責(zé)管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請(qǐng)求,自身不承擔(dān)業(yè)務(wù)代碼;
? 主要職責(zé)為執(zhí)行器管理、任務(wù)管理、監(jiān)控運(yùn)維、日志管理等
調(diào)度中心其實(shí)就是一個(gè)管理者
任務(wù)執(zhí)行器:
? 負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行任務(wù)邏輯;
? 只要職責(zé)是注冊(cè)服務(wù)、任務(wù)執(zhí)行服務(wù)(接收到任務(wù)后會(huì)放入線程池中的任務(wù)隊(duì)列)、執(zhí)行結(jié)果上報(bào)、日志服務(wù)等
任務(wù)執(zhí)行器相當(dāng)于分布式部署,兩個(gè)執(zhí)行器相當(dāng)于兩個(gè)人執(zhí)行
**任務(wù):**負(fù)責(zé)執(zhí)行具體的業(yè)務(wù)處理。
執(zhí)行流程:
- 任務(wù)執(zhí)行器根據(jù)配置的調(diào)度中心的地址,自動(dòng)注冊(cè)到調(diào)度中心
調(diào)度中心要知道自己下面有多少個(gè)任務(wù)執(zhí)行器
- 達(dá)到任務(wù)觸發(fā)條件,調(diào)度中心下發(fā)任務(wù)
調(diào)度中心會(huì)根據(jù)任務(wù)的調(diào)度策略來(lái)下發(fā)任務(wù)
- 執(zhí)行器基于線程池執(zhí)行任務(wù),并把執(zhí)行結(jié)果放入內(nèi)存隊(duì)列中、把執(zhí)行日志寫入日志文件中
任務(wù)執(zhí)行器可能會(huì)執(zhí)行多個(gè)任務(wù),所以要先將任務(wù)放入線程池中
- 執(zhí)行器消費(fèi)內(nèi)存隊(duì)列中的執(zhí)行結(jié)果,主動(dòng)上報(bào)給調(diào)度中心
任務(wù)執(zhí)行器將執(zhí)行結(jié)果異步上報(bào)給調(diào)度中心
也就是能夠在調(diào)度中心里面必須能夠拿到幾點(diǎn)幾分幾秒,哪個(gè)執(zhí)行器執(zhí)行任務(wù)是成功還是失敗的
- 當(dāng)用戶在調(diào)度中心查看任務(wù)日志,調(diào)度中心請(qǐng)求任務(wù)執(zhí)行器,任務(wù)執(zhí)行器讀取任務(wù)日志文件并返回日志詳情
其實(shí)就是調(diào)度中心主動(dòng)查詢?nèi)蝿?wù)執(zhí)行器執(zhí)行的任務(wù)是成功還是失敗
2.3 搭建XXL-JOB
調(diào)度中心負(fù)責(zé)給執(zhí)行器下發(fā)任務(wù),執(zhí)行器負(fù)責(zé)執(zhí)行任務(wù)
2.3.1 調(diào)度中心
首先下載XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job
碼云:https://gitee.com/xuxueli0323/xxl-job
項(xiàng)目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
如果想本地運(yùn)行的話,我們需要修改一些參數(shù)才可以運(yùn)行
包結(jié)構(gòu)
xxl-job-admin:調(diào)度中心
xxl-job-core:公共依賴
xxl-job-executor-samples:執(zhí)行器Sample示例(選擇合適的版本執(zhí)行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通過(guò)Springboot管理執(zhí)行器,推薦這種方式;
:xxl-job-executor-sample-frameless:無(wú)框架版本;
進(jìn)入xxl-job
http://192.168.101.65:8088/xxl-job-admin/toLogin
賬號(hào):admin
密碼:123456
2.3.2 執(zhí)行器
在調(diào)度中心創(chuàng)建一個(gè)執(zhí)行器
配置執(zhí)行器,執(zhí)行器負(fù)責(zé)與調(diào)度中心通信接收調(diào)度中心發(fā)起的任務(wù)調(diào)度請(qǐng)求
創(chuàng)建“執(zhí)行器管理”,如下圖所示
此時(shí)沒有一個(gè)java程序在執(zhí)行任務(wù),知識(shí)創(chuàng)建了一個(gè)執(zhí)行器而已
因?yàn)槲覀円趍edia工程的media-service工程中使用xxl-job,所以在media-service的pom文件中增加下面這個(gè)坐標(biāo)
我們的執(zhí)行器就是在media-service工程中編寫
我們現(xiàn)在的目的是讓執(zhí)行器注冊(cè)到調(diào)度中心,我們添加之后就注冊(cè)到調(diào)度中心了
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId>
</dependency>
在nacos下的media-service-dev.yaml下配置xxl-job
我們配置上坐標(biāo)后還不能說(shuō)是完整的注冊(cè)到調(diào)度中心了,我們還需要告訴它調(diào)度中心在哪里,所以就需要下面調(diào)度中心的配置了
注意配置中的appname這是執(zhí)行器的應(yīng)用名
調(diào)度中心要給執(zhí)行器下發(fā)任務(wù),那執(zhí)行器肯定得啟動(dòng)一個(gè)服務(wù)
port是執(zhí)行器啟動(dòng)的端口,如果本地啟動(dòng)多個(gè)執(zhí)行器注意端口不能重復(fù)。執(zhí)行器啟動(dòng)起來(lái)后,調(diào)度中心會(huì)調(diào)用它
xxl:job:admin: addresses: http://192.168.101.65:8088/xxl-job-adminexecutor:appname: testHandleraddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token
將下面的配置復(fù)制到media-service工程
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 針對(duì)多網(wǎng)卡、容器內(nèi)部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊(cè)IP;** 1、引入依賴:* <dependency>* <groupId>org.springframework.cloud</groupId>* <artifactId>spring-cloud-commons</artifactId>* <version>${version}</version>* </dependency>** 2、配置文件,或者容器啟動(dòng)變量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、獲取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
最終結(jié)果如下圖所示
2.3.3 執(zhí)行任務(wù)
為什么要配置執(zhí)行器呢?
我們要讓執(zhí)行器執(zhí)行任務(wù)
那我們?cè)趺锤嬖V執(zhí)行器來(lái)執(zhí)行什么樣的任務(wù)呢?
如下圖所示的地方有個(gè)事例,拷貝到我們自己的工程中
第一步:定義任務(wù)類
/*** XxlJob開發(fā)示例(Bean模式)** 開發(fā)步驟:* 1、任務(wù)開發(fā):在Spring Bean實(shí)例中,開發(fā)Job方法;* 2、注解配置:為Job方法添加注解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對(duì)應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值。* 3、執(zhí)行日志:需要通過(guò) "XxlJobHelper.log" 打印執(zhí)行日志;* 4、任務(wù)結(jié)果:默認(rèn)任務(wù)結(jié)果為 "成功" 狀態(tài),不需要主動(dòng)設(shè)置;如有訴求,比如設(shè)置任務(wù)結(jié)果為失敗,可以通過(guò) "XxlJobHelper.handleFail/handleSuccess" 自主設(shè)置任務(wù)結(jié)果;** @author xuxueli 2019-12-11 21:52:51*//*** 任務(wù)類*/
@Component
public class SampleXxlJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);/*** 執(zhí)行器拿到任務(wù)后就會(huì)執(zhí)行這個(gè)方法* 具體的任務(wù)方法*/@XxlJob("demoJobHandler") //任務(wù)名稱是demoJobHandlerpublic void demoJobHandler() throws Exception {System.out.println("處理視頻.....");//任務(wù)執(zhí)行邏輯...}/*** 執(zhí)行器拿到任務(wù)后就會(huì)執(zhí)行這個(gè)方法* 具體的任務(wù)方法*/@XxlJob("demoJobHandler2")public void demoJobHandler2() throws Exception {System.out.println("處理文檔.....");//任務(wù)執(zhí)行邏輯....}}
第二步:調(diào)度中心中注冊(cè)任務(wù)
調(diào)度類型:
- 固定速度:每隔多長(zhǎng)時(shí)間進(jìn)行調(diào)度
CRON:不僅可以配置每隔多長(zhǎng)時(shí)間,還可以配置年月日時(shí)分秒
cron = “0/30 * * * * ?”
- 從第0秒開始,每間隔30秒執(zhí)行1次
- 秒 分 時(shí) 日 月 周
- 以秒為例
- *:每秒都執(zhí)行
- 1-3:從第1秒開始執(zhí)行,到第3秒結(jié)束執(zhí)行
- 0/3:從第0秒開始,每隔3秒執(zhí)行1次
- 1,2,3:在指定的第1、2、3秒執(zhí)行
- ?:不指定
- 日和周不能同時(shí)制定,指定其中之一,則另一個(gè)設(shè)置為?
30 10 1 * * ? 每天1點(diǎn)10分30秒觸發(fā)
0/30 * * * * ? 每30秒觸發(fā)一次
* 0/10 * * * ? 每10分鐘觸發(fā)一次
第三步:啟動(dòng)任務(wù)
第四步:觀察控制臺(tái)
2.4 XXL-JOB 高級(jí)配置參數(shù)
XXL-JOB分布式調(diào)度平臺(tái)包括調(diào)度中心和執(zhí)行器,我們剛剛已經(jīng)在media-service工程中創(chuàng)建了一個(gè)執(zhí)行器,但是分布式任務(wù)調(diào)度要有多個(gè)執(zhí)行器來(lái)執(zhí)行任務(wù),所以我們需要把執(zhí)行器至少部署兩個(gè)節(jié)點(diǎn)
怎么部署至少兩個(gè)節(jié)點(diǎn)呢?
將media-api工程運(yùn)行兩個(gè)即可
怎么讓XXL-JOB調(diào)度多個(gè)集群(即多個(gè)執(zhí)行器)進(jìn)行執(zhí)行任務(wù)呢?
其實(shí)就是剛剛?cè)蝿?wù)管理中的這些配置
-
路由策略:
我們有一個(gè)調(diào)度中心,三個(gè)任務(wù)執(zhí)行器
當(dāng)我們規(guī)定的任務(wù)調(diào)度時(shí)間到了后調(diào)度中心就會(huì)下發(fā)任務(wù),但是現(xiàn)在面臨一個(gè)問題,這個(gè)任務(wù)分發(fā)給哪個(gè)任務(wù)執(zhí)行器?
這就需要我們配置路由策略了
第一個(gè):每次都會(huì)下發(fā)給第一個(gè)任務(wù)執(zhí)行器
最后一個(gè):每次都會(huì)下發(fā)給第最后一個(gè)任務(wù)執(zhí)行器
輪訓(xùn):每個(gè)人輪著來(lái)
一致性HASH:我們的任務(wù)有一個(gè)id,會(huì)求此id的hash值,并且此hash值一定會(huì)是執(zhí)行器中的其中一個(gè)
最不經(jīng)常使用:最不經(jīng)常執(zhí)行任務(wù)的執(zhí)行器
最近最久未使用:最近最不經(jīng)常執(zhí)行任務(wù)的執(zhí)行器
故障轉(zhuǎn)移:任務(wù)路由策略選擇"故障轉(zhuǎn)移"情況下,如果執(zhí)行器集群中某一臺(tái)機(jī)器故障,將會(huì)自動(dòng)Failover切換到一臺(tái)正常的執(zhí)行器發(fā)送調(diào)度請(qǐng)求。
忙碌轉(zhuǎn)移:某個(gè)執(zhí)行器任務(wù)挺多或者正在忙,就會(huì)發(fā)送給其他執(zhí)行器
分片廣播:執(zhí)行器集群部署時(shí),任務(wù)路由策略選擇"分片廣播"情況下,一次任務(wù)調(diào)度將會(huì)廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務(wù),可根據(jù)分片參數(shù)開發(fā)分片任務(wù);
除了分片廣播之外,都是一個(gè)任務(wù)由一個(gè)執(zhí)行器進(jìn)行執(zhí)行,不能將執(zhí)行能力發(fā)揮到最大
分片廣播可以實(shí)現(xiàn)將任務(wù)同時(shí)發(fā)送給多個(gè)任務(wù)執(zhí)行器
-
子任務(wù)ID
不經(jīng)常使用
當(dāng)執(zhí)行完一個(gè)任務(wù)又想執(zhí)行第二個(gè)任務(wù),此時(shí)第二個(gè)任務(wù)就是第一個(gè)任務(wù)的子任務(wù)
-
調(diào)度過(guò)期策略
到了改調(diào)度的時(shí)候不知道什么原因沒有調(diào)度
-
堵塞處理策略
當(dāng)前執(zhí)行器在執(zhí)行任務(wù),但是任務(wù)調(diào)度中心又讓此執(zhí)行器進(jìn)行執(zhí)行任務(wù)2,此時(shí)任務(wù)2就被堵塞了
單擊串行:隊(duì)列的形式,任務(wù)進(jìn)行排隊(duì),執(zhí)行器按次序執(zhí)行任務(wù)
丟棄后續(xù)調(diào)度:執(zhí)行器正在執(zhí)行任務(wù)但是新派了任務(wù),不會(huì)干新派的任務(wù)
覆蓋之前調(diào)度:執(zhí)行器正在執(zhí)行任務(wù)但是新派了任務(wù),會(huì)把當(dāng)前的活終止,去做新的活
-
任務(wù)超時(shí)時(shí)間
假如我們?nèi)蝿?wù)訂的10秒,但是15秒還沒有執(zhí)行完,那超時(shí)了就不執(zhí)行了
- 失敗重試次數(shù)
2.5 分片廣播
分片廣播:執(zhí)行器集群部署時(shí),任務(wù)路由策略選擇"分片廣播"情況下,一次任務(wù)調(diào)度將會(huì)廣播觸發(fā)集群中所有執(zhí)行器執(zhí)行一次任務(wù),可根據(jù)分片參數(shù)開發(fā)分片任務(wù)
那這樣處理同一批視頻的話,會(huì)不會(huì)重復(fù)處理同一個(gè)視頻?
不會(huì)
分片廣播在給執(zhí)行器分配任務(wù)的時(shí)候會(huì)給執(zhí)行器分發(fā)序號(hào)
調(diào)度中心去廣播的時(shí)候,會(huì)通知三個(gè)執(zhí)行器執(zhí)行任務(wù)
比如通知第0號(hào)執(zhí)行器:把第零部分執(zhí)行一下
比如通知第1號(hào)執(zhí)行器:把第1部分執(zhí)行一下
比如通知第2號(hào)執(zhí)行器:把第2部分執(zhí)行一下
…
此時(shí)各個(gè)執(zhí)行器就能執(zhí)行各個(gè)的任務(wù)
作業(yè)分片適用哪些場(chǎng)景呢?
? 分片任務(wù)場(chǎng)景:10個(gè)執(zhí)行器的集群來(lái)處理10w條數(shù)據(jù),每臺(tái)機(jī)器只需要處理1w條數(shù)據(jù),耗時(shí)降低10倍;
? 廣播任務(wù)場(chǎng)景:廣播執(zhí)行器同時(shí)運(yùn)行shell腳本、廣播集群節(jié)點(diǎn)進(jìn)行緩存更新等。
所以,廣播分片方式不僅可以充分發(fā)揮每個(gè)執(zhí)行器的能力,并且根據(jù)分片參數(shù)可以控制任務(wù)是否執(zhí)行,最終靈活控制了執(zhí)行器集群分布式處理任務(wù)。
2.5.1 分片廣播事例
如下所示的代碼有一個(gè)示例,我們可以看一下
可以把這個(gè)復(fù)制到我們的工程中
下面的代碼需要做的就是告訴執(zhí)行器編號(hào),給每個(gè)執(zhí)行器進(jìn)行編號(hào)
/*** 2、分片廣播任務(wù)*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片參數(shù)int shardIndex = XxlJobHelper.getShardIndex();//執(zhí)行器的序號(hào),從0開始int shardTotal = XxlJobHelper.getShardTotal();//執(zhí)行器總數(shù)//只要有了上面兩個(gè)參數(shù),我們就可以人為確定我們執(zhí)行器執(zhí)行哪一部分System.out.println("shardIndex:"+shardIndex);System.out.println("shardTotal:"+shardTotal);}
我們需要兩個(gè)執(zhí)行器,那只能啟動(dòng)兩個(gè)MediaApplication項(xiàng)目
其中對(duì)于media-service-dev工程一定要在nacos配置本地優(yōu)先策略
spring:cloud:config:override-none: true
-Dserver.port=63051 配置一下啟動(dòng)端口
-Dxxl.job.executor.port=9998 配置執(zhí)行器的端口
啟動(dòng)兩個(gè)執(zhí)行器,如下圖所示:
然后查看一下調(diào)度中心,是否有兩個(gè)執(zhí)行器
要想讓執(zhí)行器執(zhí)行任務(wù),還需要在調(diào)度中心的任務(wù)管理注冊(cè)任務(wù)
執(zhí)行任務(wù)
觀察控制臺(tái)情況
分別是0號(hào)執(zhí)行器和1號(hào)執(zhí)行器,總共兩個(gè)執(zhí)行器
三、視頻處理
3.1 技術(shù)方案
3.1.1 作業(yè)分片方案
此時(shí)如何保證多個(gè)執(zhí)行器不會(huì)查詢到重復(fù)的任務(wù)呢?
將待處理的文件進(jìn)行編號(hào)
兩個(gè)執(zhí)行器實(shí)例那么分片總數(shù)為2,序號(hào)為0、1
從任務(wù)1開始,如下:
1 % 2 = 1 執(zhí)行器2執(zhí)行
2 % 2 = 0 執(zhí)行器1執(zhí)行
3 % 2 = 1 執(zhí)行器2執(zhí)行
以此類推.
3.1.2 保證任務(wù)不重復(fù)執(zhí)行
多個(gè)執(zhí)行器在并行執(zhí)行,怎么保證任務(wù)不重復(fù)執(zhí)行呢?
比如說(shuō)0號(hào)執(zhí)行器有1號(hào)任務(wù)和3號(hào)任務(wù),但是此時(shí)調(diào)度中心又來(lái)進(jìn)行調(diào)度,那1號(hào)任務(wù)和3號(hào)任務(wù)可能又執(zhí)行了一遍
如果一個(gè)執(zhí)行器在處理一個(gè)視頻還沒有完成,此時(shí)調(diào)度中心又一次請(qǐng)求調(diào)度,為了不重復(fù)處理同一個(gè)視頻該怎么辦
- 配置調(diào)度過(guò)期策略
調(diào)度過(guò)期策略:調(diào)度中心錯(cuò)過(guò)調(diào)度時(shí)間的補(bǔ)償處理策略,包括:忽略、立即補(bǔ)償觸發(fā)一次等;
- 忽略:調(diào)度過(guò)期后,忽略過(guò)期的任務(wù),從當(dāng)前時(shí)間開始重新計(jì)算下次觸發(fā)時(shí)間;
- 立即執(zhí)行一次:調(diào)度過(guò)期后,立即執(zhí)行一次,并從當(dāng)前時(shí)間開始重新計(jì)算下次觸發(fā)時(shí)間;
這里我們選擇忽略,如果立即執(zhí)行一次就可能重復(fù)執(zhí)行相同的任務(wù)
-
配置阻塞處理策略
阻塞處理策略:調(diào)度過(guò)于密集執(zhí)行器來(lái)不及處理時(shí)的處理策略;
阻塞處理策略就是當(dāng)前執(zhí)行器正在執(zhí)行任務(wù)還沒有結(jié)束時(shí)調(diào)度中心進(jìn)行任務(wù)調(diào)度,此時(shí)該如何處理
單機(jī)串行(默認(rèn)):調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,調(diào)度請(qǐng)求進(jìn)入FIFO隊(duì)列并以串行方式運(yùn)行;
丟棄后續(xù)調(diào)度:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請(qǐng)求將會(huì)被丟棄并標(biāo)記為失敗;
覆蓋之前調(diào)度:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù);
這里如果選擇覆蓋之前調(diào)度則可能重復(fù)執(zhí)行任務(wù),這里選擇 丟棄后續(xù)調(diào)度來(lái)避免任務(wù)重復(fù)執(zhí)行
- 保證任務(wù)處理的冪等性
任務(wù)的冪等性是指:對(duì)于數(shù)據(jù)的操作不論多少次,操作的結(jié)果始終是一致的
某一個(gè)視頻不管是調(diào)度多少次,只會(huì)轉(zhuǎn)碼一次
某個(gè)視頻轉(zhuǎn)碼成功后再來(lái)調(diào)度就不會(huì)進(jìn)行了,因?yàn)橐呀?jīng)轉(zhuǎn)碼成功了
什么是冪等性?它描述了一次和多次請(qǐng)求某一個(gè)資源對(duì)于資源本身應(yīng)該具有同樣的結(jié)果
冪等性是為了解決重復(fù)提交問題,比如:惡意刷單,重復(fù)支付等
解決冪等性常用的方案
1)數(shù)據(jù)庫(kù)約束,比如:唯一索引,主鍵。
2)樂觀鎖,常用于數(shù)據(jù)庫(kù),更新數(shù)據(jù)時(shí)根據(jù)樂觀鎖狀態(tài)去更新。
3)唯一序列號(hào),操作傳遞一個(gè)唯一序列號(hào),操作時(shí)判斷與該序列號(hào)相等則執(zhí)行。
基于以上分析,在執(zhí)行器接收調(diào)度請(qǐng)求去執(zhí)行視頻處理任務(wù)時(shí)要實(shí)現(xiàn)視頻處理的冪等性,要有辦法去判斷該視頻是否處理完成,如果正在處理中或處理完則不再處理。這里我們?cè)跀?shù)據(jù)庫(kù)視頻處理表中添加處理狀態(tài)字段,視頻處理完成更新狀態(tài)為完成,執(zhí)行視頻處理前判斷狀態(tài)是否完成,如果完成則不再處理。
3.1.3 視頻處理方案
邊梳理整個(gè)視頻上傳及處理的業(yè)務(wù)流程
上傳視頻成功向視頻處理待處理表添加記錄
我們上傳視頻的代碼已經(jīng)做了,但是沒有做向任務(wù)表插入一條待處理視頻的任務(wù)
視頻處理的詳細(xì)流程如下:
1、任務(wù)調(diào)度中心廣播作業(yè)分片。
2、執(zhí)行器收到廣播作業(yè)分片,從數(shù)據(jù)庫(kù)讀取待處理任務(wù),讀取未處理及處理失敗的任務(wù)。
3、執(zhí)行器更新任務(wù)為處理中,根據(jù)任務(wù)內(nèi)容從MinIO下載要處理的文件。
4、執(zhí)行器啟動(dòng)多線程去處理任務(wù)。
5、任務(wù)處理完成,上傳處理后的視頻到MinIO。
6、將更新任務(wù)處理結(jié)果,如果視頻處理完成除了更新任務(wù)處理結(jié)果以外還要將文件的訪問地址更新至任務(wù)處理表及文件表中,最后將任務(wù)完成記錄寫入歷史表。
行器存在運(yùn)行的調(diào)度任務(wù),本次請(qǐng)求將會(huì)被丟棄并標(biāo)記為失敗;
覆蓋之前調(diào)度:調(diào)度請(qǐng)求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù);
這里如果選擇覆蓋之前調(diào)度則可能重復(fù)執(zhí)行任務(wù),這里選擇 丟棄后續(xù)調(diào)度來(lái)避免任務(wù)重復(fù)執(zhí)行
[外鏈圖片轉(zhuǎn)存中…(img-Bdh0beJE-1704806448805)]
- 保證任務(wù)處理的冪等性
任務(wù)的冪等性是指:對(duì)于數(shù)據(jù)的操作不論多少次,操作的結(jié)果始終是一致的
某一個(gè)視頻不管是調(diào)度多少次,只會(huì)轉(zhuǎn)碼一次
某個(gè)視頻轉(zhuǎn)碼成功后再來(lái)調(diào)度就不會(huì)進(jìn)行了,因?yàn)橐呀?jīng)轉(zhuǎn)碼成功了
什么是冪等性?它描述了一次和多次請(qǐng)求某一個(gè)資源對(duì)于資源本身應(yīng)該具有同樣的結(jié)果
冪等性是為了解決重復(fù)提交問題,比如:惡意刷單,重復(fù)支付等
解決冪等性常用的方案
1)數(shù)據(jù)庫(kù)約束,比如:唯一索引,主鍵。
2)樂觀鎖,常用于數(shù)據(jù)庫(kù),更新數(shù)據(jù)時(shí)根據(jù)樂觀鎖狀態(tài)去更新。
3)唯一序列號(hào),操作傳遞一個(gè)唯一序列號(hào),操作時(shí)判斷與該序列號(hào)相等則執(zhí)行。
基于以上分析,在執(zhí)行器接收調(diào)度請(qǐng)求去執(zhí)行視頻處理任務(wù)時(shí)要實(shí)現(xiàn)視頻處理的冪等性,要有辦法去判斷該視頻是否處理完成,如果正在處理中或處理完則不再處理。這里我們?cè)跀?shù)據(jù)庫(kù)視頻處理表中添加處理狀態(tài)字段,視頻處理完成更新狀態(tài)為完成,執(zhí)行視頻處理前判斷狀態(tài)是否完成,如果完成則不再處理。