企業(yè)注冊信息查詢單怎么打印seo內(nèi)容優(yōu)化
一、概述
1、celery框架和flask框架在運行時,是在不同的進程中,資源是獨占的。
2、celery異步任務如果想使用flask中的功能,如orm,是需要在flask應用上下文管理器中執(zhí)行orm操作的
3、使用celery是需要使用到中間件的,簡單點就使用redis做中間件
注意:
在flask工廠模式集成celery異步框架,在celery的異步任務中能夠獲取到flask的應用上下文管理器,也就是說在celery異步任務中你可以去調(diào)用flask項目中功能,如orm操作等。
使用本文配置,可以無需修改flask創(chuàng)建app應用的程序,直接將celery相關包創(chuàng)建,運行就可以使用,且能夠在異步任務使用flask的功能。
二、項目結構
依賴環(huán)境:
celery==4.4.7
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0
Werkzeug==2.0.2
目錄結構:
flask-project
????????|--apps
??????????????? |-- user
??????????????????????? |-- models
??????????????????????? |--views.py
??????????????????????? |--urls.py
??????? ????????|--__init__.py
??????? |--ext
??????????????? |--__init__.py
??????????????? |--config.py
??????? |--celery_task
??????????????? |--__init__.py
??????????????? |--async_task.py
??????????????? |--celery.py
??????????????? |--celeryconfig.py
??????????????? |--check_task.py
??????????????? |--scheduler_task.py
??????? app.py
三、flask工廠模式下各模塊功能
1、apps/user/models.py : 寫了一個user表
2、apps/user/views.py:寫了測試調(diào)用celery異步任務的接口
3、apps/user/urls.py: 注冊路由的
4、ext/__init__.py:cache、db、cors的拓展
5、ext/config.py : cache和cors使用到的配置
6、apps/__init__.py: 一個函數(shù)create_app,生成flask應用對象
7、app.py: 啟動flask應用對象的模塊
本文重點不在flask工廠模式,默認看官都懂如何創(chuàng)建flaks工廠模式的項目了。
在視圖中在執(zhí)行異步任務,并獲取異步任務的id:
from celery_task.async_task import send_email_task,cache_user_task
#用戶資源:get\put\delete, 對單個進行操作
class UserOneResource(ResourceBase):def put(self,id):#測試異步發(fā)郵件email = request.args.get('email')code = request.args.get('code')res = send_email_task.delay(email,code)print(res.id)return NewResponse(msg='put',data={'task_id':res.id})def patch(self,id):#測試異步操作flask的orm和cachep = request.args.get('p')if p=='set':res = cache_user_task.delay()print(res,type(res))return NewResponse(msg='patch',data={'task_id':res.id})else:from ext import cachedata = cache.get('all-user-data')return NewResponse(msg='patch',data=data)
res = 異步函數(shù).delay(函數(shù)需要的參數(shù))
task_id = res.id
注意:task_id 可以知道對應的任務的完成情況,獲取任務的返回值等。
四、celery項目的配置
1、celery的配置
將celery的配置都放到一個py文件中,方便后期的維護和使用
celeryconfig.py
from celery.schedules import crontab
from datetime import timedelta
'''
參數(shù)解析:
accept_content:允許的內(nèi)容類型/序列化程序的白名單,如果收到不在此列表中的消息,則該消息將被丟棄并出現(xiàn)錯誤,默認只為json;
task_serializer:標識要使用的默認序列化方法的字符串,默認值為json;
result_serializer:結果序列化格式,默認值為json;
timezone:配置Celery以使用自定義時區(qū);
enable_utc:啟用消息中的日期和時間,將轉換為使用 UTC 時區(qū),與timezone連用,當設置為 false 時,將使用系統(tǒng)本地時區(qū)。
result_expires: 異步任務結果存活時長
beat_schedule:設置定時任務
'''
#手動注冊celery的異步任務:將所有celery異步任務所在的模塊找到,寫成字符串
task_module = ['celery_task.async_task', # 寫任務模塊導入路徑,該模塊主要寫異步任務的方法'celery_task.scheduler_task', # 寫任務模塊導入路徑,該模塊主要寫定時任務的方法
]#celery的配置
config = {"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密碼時,123456是密碼"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定時任務配置# 名字隨意命名'add-func-30-seconds': {# 執(zhí)行add_task下的addy函數(shù)'task': 'celery_task.scheduler_task.add_func', # 任務函數(shù)的導入路徑,from celery_task.scheduler_task import add_func# 每10秒執(zhí)行一次'schedule': timedelta(seconds=30),# add函數(shù)傳遞的參數(shù)'args': (10, 21)},# 名字隨意起'add-func-5-minutes': {'task': 'celery_task.scheduler_task.add_func', # 任務函數(shù)的導入路徑,from celery_task.scheduler_task import add_func# crontab不傳的參數(shù)默認就是每的意思,比如這里是每年每月每日每天每小時的5分執(zhí)行該任務'schedule': crontab(minute='5'), # 之前時間點執(zhí)行,每小時的第5分鐘執(zhí)行任務, 改成小時,分鐘,秒 就是每天的哪個小時哪分鐘哪秒鐘執(zhí)行'args': (19, 22) # 定時任務需要的參數(shù)},# 緩存用戶數(shù)據(jù)到cache中'cache-user-func': {'task': 'celery_task.scheduler_task.cache_user_func',# 導入任務函數(shù):from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1), # 每1分鐘執(zhí)行一次,將用戶消息緩存到cache中}}
}
2、創(chuàng)建celery對象
celery.py
from celery import Celery,Task
from .celeryconfig import config,task_module
import sys
import os
'1、把flask項目路徑添加到系統(tǒng)環(huán)境變量中'
project_path = os.path.dirname(os.path.dirname(__file__))
sys.path.append(project_path)'''
2、創(chuàng)建celery應用對象'task'可以任務是該celery對象名字,用于區(qū)分celery對象broker是指定消息中間件backend是指定任務結果存儲位置include是手動指定異步任務所在的模塊的位置
'''
#創(chuàng)建celery異步對象
celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)
#導入一些基本配置
celery.conf.update(**config)'3、給celery所有任務添加flask的應用上下文,在celery異步任務中就可以調(diào)用flask中的對象了'
class ContextTask(celery.Task):def __call__(self, *args, **kwargs):from apps import create_appapp = create_app()with app.app_context():return self.run(*args, **kwargs)
celery.Task = ContextTask
注意:
1、第一步很關鍵,設置到python項目運行時,加載環(huán)境變量的問題。這一步是將flask項目的根目錄加載環(huán)境變量中,這樣第3步才能從apps中導入create_app函數(shù)。
2、第二步是創(chuàng)建celery通用的方法了,沒什么好說的。
3、第三步很關鍵,涉及到celery異步任務能否在flask應用上下文管理器運行,從而可以調(diào)用flask中的功能,例如orm操作,cache操作.。(在執(zhí)行任務時,先套上flask的應用上下文管理器)
3、異步任務模塊
將所有異步任務相關的函數(shù)都集中到一個模塊中,方便維護和使用。
async_task.py
# 導入celery對象app
from celery_task.celery import celery
from ext import cache
import time'''
1、沒有返回值的,@app.task(ignore_result=True)
2、有返回值的任務,@app.task 默認就是(ignore_result=False)
'''# 沒有返回值,禁用掉結果后端
@celery.task
def send_email_task(receiver_email,code): # 此時可以直接傳郵箱,還能減少一次數(shù)據(jù)庫的IO操作''':param email: 接收消息的郵箱,用戶的郵箱:return:'''# 模擬郵件發(fā)送驗證碼time.sleep(5)return {'result':'郵件已經(jīng)發(fā)送',receiver_email:'2356'}@celery.task
def cache_user_task():#orm查詢數(shù)據(jù),放到cache中from apps.user.models import UserModeluser = UserModel.query.all()lis = []for u in user:id = u.idname = u.namedic = {'id':id,'name':name}lis.append(dic)print(dic)cache.set('all-user-data',lis)return {'code':200,'msg':'查詢數(shù)據(jù)成功'}
4、定時任務模塊
將所有定時任務相關的函數(shù)都集中到一個模塊中,方便維護和使用。
schedulser_task.py
from celery_task.celery import celery
import time# 有返回值,返回值可以從結果后端中獲取
@celery.task
def add_func(a, b):print('執(zhí)行了加法函數(shù)',a+b)return a + b# 不需要返回值,禁用掉結果后端
@celery.task(ignore_result=True)
def cache_user_func():print('all')
5、檢測任務id獲取任務狀態(tài)和返回值
check_task.py:
from celery.result import AsyncResult
from celery_task.celery import celery'''驗證任務的執(zhí)行狀態(tài)的'''def check_task_status(task_id):'''任務的執(zhí)行狀態(tài):PENDING :等待執(zhí)行STARTED :開始執(zhí)行RETRY :重新嘗試執(zhí)行SUCCESS :執(zhí)行成功FAILURE :執(zhí)行失敗:param task_id::return:'''result = AsyncResult(id=task_id, app=celery)dic = {'type': result.status,'msg': '','data': None,'code': 400}if result.status == 'PENDING':dic['msg'] = '任務等待中'elif result.status == 'STARTED':dic['msg'] = '任務開始執(zhí)行'elif result.status == 'RETRY':dic['msg'] = '任務重新嘗試執(zhí)行'elif result.status == 'FAILURE':dic['msg'] = '任務執(zhí)行失敗了'elif result.status == 'SUCCESS':result = result.get()dic['msg'] = '任務執(zhí)行成功'dic['data'] = resultdic['code'] = 200# result.forget() # 將結果刪除# async.revoke(terminate=True) # 無論現(xiàn)在是什么時候,都要終止# async.revoke(terminate=False) # 如果任務還沒有開始執(zhí)行呢,那么就可以終止。return dic
在視圖函數(shù)中調(diào)用該方法,通過task_id ,返回任務的運行結果。
五、測試
1、運行項目
flask項目(在項目根目錄下執(zhí)行):
????????flask run --host 0.0.0.0 --port 5000
celery項目(在項目根目錄下執(zhí)行):
啟動celery進程:
windows系統(tǒng):
????????celery -A celery_task.celery worker -l info? -P? eventlet
linux系統(tǒng):
????????celery -A celery_task.celery worker -l info?
啟動定時任務(先啟動celery進程在啟動定時任務):
celery -A celery_task.celery beat -l info
2、運行結果
1、執(zhí)行異步任務中,將orm數(shù)據(jù)存到cache中
2、執(zhí)行定時任務了
六、注意事項
1、在系統(tǒng)中要先安裝好redis和mysql,并都啟動了
2、在測試異步操作orm時,會使用到flask的cache存數(shù)據(jù),注意flask的cache不能配置內(nèi)存模式,不然celery進程存到cache中的數(shù)據(jù),flask進程中取不到的。
3、當前的配置下,celery的目錄必須是在flask根目錄下
七、拓展-改變celery_task的位置
如果你想將celery_task包移動到apps包下,此時你需要修改什么?
1、apps/celery_task/celery.py:將flask項目根目錄加載到系統(tǒng)環(huán)境變量中的路徑有變
'1、把flask項目路徑添加到系統(tǒng)環(huán)境變量中'
project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
2、apps/celery_task/celeryconfig.py: 注冊異步任務的模塊,定時任務的模塊的位置變化
'1、加上apps.'
task_module = ['apps.celery_task.async_task', # 寫任務模塊導入路徑,該模塊主要寫異步任務的方法'apps.celery_task.scheduler_task', # 寫任務模塊導入路徑,該模塊主要寫定時任務的方法
]'2、task參數(shù)對應的字符串,加上apps.'
config = {"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密碼時,123456是密碼"result_backend" : 'redis://127.0.0.1:6379/1',"task_serializer" : 'json',"result_serializer" : 'json',"accept_content" : ['json'],"timezone" : 'Asia/Shanghai',"enable_utc" : False,"result_expires" : 1*60*60,"beat_schedule" : { #定時任務配置# 名字隨意命名'add-func-30-seconds': {# 執(zhí)行add_task下的addy函數(shù)'task': 'apps.celery_task.scheduler_task.add_func', # 任務函數(shù)的導入路徑,from celery_task.scheduler_task import add_func# 每10秒執(zhí)行一次'schedule': timedelta(seconds=30),# add函數(shù)傳遞的參數(shù)'args': (10, 21)},# 名字隨意起'add-func-5-minutes': {'task': 'apps.celery_task.scheduler_task.add_func', # 任務函數(shù)的導入路徑,from celery_task.scheduler_task import add_func# crontab不傳的參數(shù)默認就是每的意思,比如這里是每年每月每日每天每小時的5分執(zhí)行該任務'schedule': crontab(minute='5'), # 之前時間點執(zhí)行,每小時的第5分鐘執(zhí)行任務, 改成小時,分鐘,秒 就是每天的哪個小時哪分鐘哪秒鐘執(zhí)行'args': (19, 22) # 定時任務需要的參數(shù)},# 緩存用戶數(shù)據(jù)到cache中'cache-user-func': {'task': 'apps.celery_task.scheduler_task.cache_user_func',# 導入任務函數(shù):from celery_task.scheduler_task import cache_user_func'schedule': timedelta(minutes=1), # 每1分鐘執(zhí)行一次,將用戶消息緩存到cache中}}
}
3、在視圖函數(shù)導入異步任務的路徑也變了
#異步任務
from apps.celery_task.async_task import send_email_task,cache_user_task
4、啟動celery和定時任務的命令變量【在項目根目錄下執(zhí)行命令】
啟動celery:
windows啟動命令: celery? -A? apps.celery_task.celery worker -l info? -P? eventlet
linux啟動命令: celery? -A? apps.celery_task.celery worker -l info?
啟動定時任務:
celery -A apps.celery_task beat -l info