醫(yī)院網(wǎng)站建設技術方案今日國內(nèi)最新新聞
1. 問題
如果是采用hdfs上傳加載的表、或者是flume直接寫hdfs的表空間通??磆ive的屬性是不準確的。
2. 思路
為了使結果更精確,我們直接使用linux下命令統(tǒng)計hive倉庫目錄下的每個表對應的文件夾目錄占用空間的大小。
3. 解決方法
這里建立三層表結構
ods: 原始數(shù)據(jù)采集
ods.ods_hive_tablelist
ods.ods_hive_tablespace
dw:清洗整合
dw.dw_hive_metadata
mdl: 統(tǒng)計
mdl.mdl_hive_metadata_stat
3.1 ODS層數(shù)據(jù)采集
在ods層建立文件路徑列表和每個路徑占用空間大小。
create table ods.ods_hive_tablelist(
path string comment '表路徑',
update_time string comment '更新時間'
) comment 'hive表更新時間'
partitioned by (pk_day string)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile;create table ods.ods_hive_tablespace(
path string comment '表路徑',
size string comment '表占用大小(byte)',
blocksize string comment '副本占用大小(byte)'
) comment 'hive表空間占用統(tǒng)計'
partitioned by (pk_day string)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile;
這里的數(shù)據(jù)采集使用shell命令格式,我是使用pySpark里面直接執(zhí)行的。
tableList = os.popen("""hdfs dfs -ls /user/hive/warehouse/*.db |awk '{print $8","$6" "$7}'""")
tablespaceList = os.popen("""hadoop fs -du /user/hive/warehouse/*.db|awk '{print $3","$1","$2}'""")new_tableList = []
for table in tableList:arr = table.replace('\n','').split(",")new_tableList.append((arr[0],arr[1]))new_tablespaceList = []
for tablespace in tablespaceList:arr = tablespace.replace('\n','').split(",")new_tablespaceList.append((arr[0],arr[1],arr[2]))#----ods----
current_dt = date.today().strftime("%Y-%m-%d")
print(current_dt)
spark.createDataFrame(new_tableList,['path','update_time']).registerTempTable('tablelist')
spark.createDataFrame(new_tablespaceList,['path','size','blocksize']).registerTempTable('tablespacelist')
tablelistdf = spark.sql('''(select path,update_time,current_date() as pk_day from tablelist where path != '') ''')
tablelistdf.show(10)tablelistdf.repartition(2).write.insertInto('ods.ods_hive_tablelist',True)tablespacelistdf = spark.sql('''(select path,size,blocksize,current_date() as pk_day from tablespacelist where path != '')''')
tablespacelistdf.show(10)
tablespacelistdf.repartition(2).write.insertInto('ods.ods_hive_tablespace',True)
經(jīng)過簡單的清洗后,落表。
ods.ods_hive_tablelist表的顯示如下:
在ods.ods_hive_tablespace中顯示的如下
3.2 清洗整合入倉
接下來在dw層進行整合,對應的表結構如下:
create table dw.dw_hive_metadata(
dbname string comment '數(shù)據(jù)庫名',
tblname string comment '表名',
path string comment '表路徑',
update_date string comment '更新日期',
update_time string comment '更新時間',
mb double comment '表占用大小(MB)',
gb double comment '表占用大小(GB)',
size double comment '表占用大小(byte)',
blocksize double comment '副本占用大小(byte)',
blocksize_gb double comment '副本占用大小(gb)'
) comment 'hive表元數(shù)據(jù)統(tǒng)計'
partitioned by (pk_day string)
stored as textfile;
這里整合ods層的兩張表關聯(lián),就可以拼接出每個表占用的空間大小:
#----dw----
dwdf = spark.sql('''(
selectsplit(a.path,'/')[4] as dbname,split(a.path,'/')[5] as tblname,a.path,substr(a.update_time,1,10) as update_date,a.update_time,nvl(round(b.size/1000/1000,2),0) as mb,nvl(round(b.size/1000/1000/1000,2),0) as gb,nvl(round(b.size,2),0) as size,nvl(round(b.blockSize,2),0) as blocksize,nvl(round(b.blockSize/1000/1000/1000,2),0) as blocksize_gb,a.pk_day
from(select * from ods.ods_hive_tablelist where pk_day = current_date()) aleft join(select * from ods.ods_hive_tablespace where pk_day = current_date()) b
on a.path = b.path and a.pk_day = b.pk_day
where a.path is not null
and a.path != ''
)''')
我們可以看到這個明細數(shù)據(jù)展示如下:
3.3 統(tǒng)計分析
這里可以根據(jù)需要自己增加統(tǒng)計邏輯,我這里按照db層級統(tǒng)計每天的增量大小。
統(tǒng)計層表結構如下:
create table mdl.mdl_hive_metadata_stat(
dbname string comment '數(shù)據(jù)庫名',
tblcount int comment '表個數(shù)',
dbspace double comment '數(shù)據(jù)庫空間(GB)',
dbspace_incr double comment '數(shù)據(jù)庫空間日增量(GB)',
blockspace_incr double comment '服務器空間日增量(GB)'
) comment 'hive元數(shù)據(jù)db統(tǒng)計'
partitioned by (pk_day string)
stored as textfile;
實現(xiàn)方式:
#----mdl----
spark.sql('''(select pk_day,dbname,count(tblname) as tblCount,round(sum(gb),2) as dbspace,round(sum(blocksize_gb),2) as blockSpacefrom dw.dw_hive_metadatawhere pk_day>= date_sub(current_date(),7)group by pk_day,dbname)''').createTempView('tmp_a')spark.sql('''(selectpk_day,dbname,tblCount,dbspace,blockSpace,lag(dbspace,1,0) over(partition by dbname order by pk_day) as lagSpace,lag(blockSpace,1,0) over(partition by dbname order by pk_day) as lagBlockSpacefrom tmp_a
)''').createTempView('tmp_b')mdldf = spark.sql('''(
select dbname,tblCount,dbspace,
round((dbspace-lagSpace),2) as dbspace_incr,
round((blockSpace-lagBlockSpace),2) as blockspace_incr,
pk_day
from tmp_b where pk_day = current_date()
)''')
mdldf.show(10)
mdldf.repartition(1).write.insertInto('mdl.mdl_hive_metadata_stat',True)
最后看看,統(tǒng)計層的內(nèi)容如下: