計算機網絡技術就業(yè)方向網站開發(fā)中國萬網域名注冊服務內容
同樣還是需要開通釘釘應用這里就不錯多說了
第一步:梳理邏輯流程
? ? ? ? 前提:打卡的機器是使用postgres數據庫,由于因為某些原因,釘釘userId 我已經提前獲取到了存放到數據庫里。
? ? ? ? 1.用戶打卡成功后,我們應該監(jiān)聽數據庫進行查詢,然后獲取到打卡的時間,在通過釘釘的工作消息接口,發(fā)送消息給當前考勤打卡的用戶,這樣用戶就可以知道我上班的打卡時間!
現在我們看看應該怎么去實現
先定義釘釘接口先Token:
appkey = ""
appsecret = ""
ding_url = "https://oapi.dingtalk.com/"async def dingTalkToken():async with httpx.AsyncClient() as client:response = await client.get(ding_url + 'gettoken', params={"appkey": appkey, "appsecret": appsecret})# 解析響應JSONresult = response.json()# 提取Access Tokenaccess_token = result.get("access_token")return access_token
釘釘工作消息接口
async def dingTalkTokenAsyncsend_v2(access_token, kqTime, userid):params = {"agent_id": ,"msg": {"msgtype": "text","text": {"content": "打卡成功:" + kqTime}},"userid_list": userid}async with httpx.AsyncClient() as client:response = await client.post(ding_url + 'topapi/message/corpconversation/asyncsend_v2?access_token=' + access_token, params=params)# 解析響應JSONresult = response.json()# 提取Access Tokenerrcode = result.get("errcode")return errcode
以上向釘釘工作發(fā)送消息的接口已經完成了
接下來就是核心代碼:
async def check_notifications():print('執(zhí)行')try:# 連接到數據庫with psycopg2.connect(dbname=posql.dbname, user=posql.user, password=posql.password, host=posql.host,port=posql.port) as connection:# 創(chuàng)建一個游標對象,用于執(zhí)行 SQL 語句with connection.cursor() as cursor:# 執(zhí)行查詢cursor.execute("LISTEN punch_event_channel")while True:connection.commit() # 提交事務await asyncio.sleep(1)# 檢查是否有通知connection.poll() # 從服務器獲取通知if connection.notifies:notify = connection.notifies[0]# 執(zhí)行查詢cursor.execute("這填寫打卡系統(tǒng)的數據庫使用ID去查詢最新
SELECT * FROM table WHERE id = %s" % int(notify.payload))# 獲取查詢結果result = cursor.fetchone()columns = [desc[0] for desc in cursor.description]result_dict = dict(zip(columns, result))# 處理 datetime 對象的序列化result_dict['timestamp'] = result_dict.get('timestamp', None)if result_dict['timestamp']:result_dict['timestamp'] = datetime.fromisoformat(result_dict['timestamp'])kqTime = result_dict['att_date'] + ' ' + result_dict['att_time']# 獲取用戶IDuser_name = result_dict['person_name']sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_nameuser_id = query_user_info(sql_str)if user_id != 'null':# 發(fā)送HTTP POST請求獲取Access Tokenaccess_token = await dingTalkToken()codes = await dingTalkTokenAsyncsend_v2(access_token, kqTime, user_id)print(f"考勤時間: " + result_dict['att_date'] + ' ' + result_dict['att_time'])breakelse:break# 去執(zhí)行 釘釘推送模塊# messages = '發(fā)送成功'else:print({"notify_payload": '沒有消息'})await asyncio.sleep(1)except Exception as e:print({"error": str(e)})
解釋一下代碼:
# 執(zhí)行查詢
cursor.execute("LISTEN punch_event_channel")
這里我是在考勤機器的數據庫里做了一個punch_event_channel 的頻道,而這個頻道是我創(chuàng)建了一個觸發(fā)函數用來觸發(fā)最新數據庫里的數據
接下來是創(chuàng)建觸發(fā)函數
-- 創(chuàng)建觸發(fā)器函數
CREATE OR REPLACE FUNCTION notify_punch_event()
RETURNS TRIGGER AS
$$
BEGINPERFORM pg_notify('punch_event_channel', 'new_punch_event');RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 創(chuàng)建觸發(fā)器,關聯(lián)到表的 AFTER INSERT 事件上CREATE TRIGGER 觸發(fā)器名稱XXXXX
AFTER INSERT ON 表名
FOR EACH ROW
EXECUTE PROCEDURE notify_punch_event();
在sql 工具執(zhí)行這兩句就可以了,替換成你自己的數據庫
另外這里是我內部拿取釘釘Userid的數據庫,我就不放代碼了:
# 獲取用戶ID
user_name = result_dict['person_name']
sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_name
user_id = query_user_info(sql_str)
你們可以根據自己方式,來獲取
最后,就是使用定時任務來
async def periodic_task():while True:await check_notifications()await asyncio.sleep(1) # 1秒鐘檢查一次,可以根據需要調整間隔if __name__ == '__main__':# 啟動定時任務app.add_task(periodic_task())# 啟動 Sanic 應用app.run(host='0.0.0.0', port=8089, workers=8)
最后記得導入包
import psycopg2
import httpx
import pymssql # 這是sql server 數據庫連接
如果寫的好動動你們發(fā)財的小手點贊,對你有幫助也可以打賞請我喝杯咖啡,提提神,感謝各位兄弟了