源碼怎樣做網(wǎng)站深圳推廣公司哪家正規(guī)
說明
先檢查一下昨天啟動的worker是否正常工作,然后做一些簡單的清洗,存入clickhouse。
內(nèi)容
1 檢查數(shù)據(jù)
from Basefuncs import *
# 將一般字符串轉(zhuǎn)為UCS 名稱
def dt_str2ucs_blockname(some_dt_str):some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 測試隊(duì)列聲明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804
獲取數(shù)據(jù)(使用單worker,模式比較簡單且性能足夠)
data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前無關(guān)的數(shù)據(jù)刪掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']
向clickhouse發(fā)起query,請求每個etf的最大時間,之后要使得新增的數(shù)據(jù)大于這個時間,另外目標(biāo)表的字段形如
這是之前做的設(shè)計,因?yàn)楦舻臅r間有點(diǎn)久都有點(diǎn)忘了。不過這個設(shè)計是合理的,后面會看到。
要做的轉(zhuǎn)換也很簡單:
- 1 將時間字符轉(zhuǎn)為時間戳
- 2 從日期中分解出shard、part、block和brick
轉(zhuǎn)換段
import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]
今天就到這里吧,明晚接著寫。
Go on …
昨天疏忽了,數(shù)據(jù)不應(yīng)該直接存庫,而是應(yīng)該整理好之后送到隊(duì)列。然后由默認(rèn)的worker將數(shù)據(jù)搬到clickhouse.
2 存數(shù)規(guī)則
第二步的輸入隊(duì)列BUFF.xxxstream_in
,輸出隊(duì)列BUFF.xxx.stream_out
。
第一次需要確保對應(yīng)數(shù)據(jù)表的存在。clickhouse對數(shù)值的要求比較嚴(yán)格,為了避免麻煩,統(tǒng)一設(shè)置成Float32。(這樣可以用統(tǒng)一的同步worker)。另外clickhouse不支持刪除數(shù)據(jù),這點(diǎn)倒是比較特別。
但可以支持全部刪除數(shù)據(jù)(保留數(shù)據(jù)結(jié)構(gòu)) TRUNCATE table market_data_v2
create_table_sql = '''
CREATE TABLE market_data_v2
(data_dt String,open Float32,close Float32,high Float32,low Float32,vol Float32,amt Float32,brick String,block String,part String,shard String,code String,ts Float32,pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]
etl_worker.py
# 0 記錄日志
import logging
from logging.handlers import RotatingFileHandlerlogger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)# ---------------------------------------- 設(shè)置日志from Basefuncs import *
def tuple_list2dict(tuple_list):"""將包含三個元素的tuple列表轉(zhuǎn)換為字典。參數(shù):tuple_list (List[Tuple[K, V1, V2]]): 包含鍵和兩個值的tuple的列表。返回:Dict[K, Tuple[V1, V2]]: 轉(zhuǎn)換后的字典,其中值是包含兩個元素的tuple。"""return {key:value1 for key, value1 in tuple_list}# 將一般字符串轉(zhuǎn)為UCS 名稱
def dt_str2ucs_blockname(some_dt_str):some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函數(shù)# 測試隊(duì)列聲明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len = qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:is_source_recs = True
else:is_source_recs = Falselogger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 獲取數(shù)據(jù)(使用單worker,模式比較簡單且性能足夠)# ---------------------------------------- 隊(duì)列取數(shù),有數(shù)據(jù)才執(zhí)行下面
if is_source_recs:# ---------------------------------------- 取數(shù),取出消息列表和需要的列# worker 30 秒啟動一次data = qm.xrange(source_stream_name)['data']data_df = pd.DataFrame(data)msg_id_list = list(data_df['_msg_id'])keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前無關(guān)的數(shù)據(jù)刪掉# data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1]# ------------------------------------- 獲取當(dāng)前數(shù)據(jù)庫已有的數(shù)據(jù)# 獲取各code最大值click_para = {'database': 'xx','host': '192.168.0.4','name': 'xx','password': 'xx','port': xxx,'user': 'xx'}chc = CHClient(**click_para)'''這個 SQL 語句的作用是按照 `code` 分組,并為每個 `code` 找到對應(yīng)的最新日期(`data_dt`),這個最新日期是基于 `ts` 字段的最大值來確定的。`argMax` 函數(shù)在這里用于找到每個分組中 `ts` 值最大時對應(yīng)的 `data_dt` 值。具體來說,`argMax(data_dt, ts)` 會返回每個 `code` 分組中使得 `ts` 達(dá)到最大值的 `data_dt` 值。這意味著對于每個 `code`,查詢會找到 `ts` 字段的最大值,并返回對應(yīng)的 `data_dt` 值,即每個 `code` 的最新數(shù)據(jù)日期。最終,這個查詢會返回一個結(jié)果集,其中包含每個 `code` 以及對應(yīng)的最新數(shù)據(jù)日期(`last_data_dt`)。這對于分析每個代碼的最新市場數(shù)據(jù)非常有用。'''latest_sql = '''SELECTcode,argMax(data_dt, ts) AS last_data_dtFROMmarket_data_v2GROUP BYcode'''# 更新時latest_date_tuple_list = chc._exe_sql(latest_sql)latest_date_dict = tuple_list2dict(latest_date_tuple_list)# ------------------------------------- 使用時間進(jìn)行過濾# 篩選新數(shù)據(jù)data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')output_sel = data_df2['data_dt'] > data_df2['existed_dt']output_df = data_df2[output_sel][keep_cols1]output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)del_msg = qm.xdel(source_stream_name, msg_id_list)logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))
將該腳本發(fā)布為任務(wù),30秒執(zhí)行一次同步。
exe_qtv200_etl_worker.sh
#!/bin/bash# 記錄
# sh /home/test_exe.sh com_info_change_pattern running# 有些情況需要把source替換為 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 環(huán)境(或你創(chuàng)建的特定環(huán)境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd /home/workers && python3 etl_worker.py
存數(shù)成功,后續(xù)就自動運(yùn)行了。