Celery
http://docs.celeryproject.org/en/latest/index.html
Celery - Distributed Task Queue
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.
Celery is Open Source and licensed under the BSD License.
DEMO安装运行:
https://github.com/fanqingsong/celery_running
#install dependency
pipenv install
#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet
# run producer
pipenv run python taskscaller.py
任务客户端:
任务发送端:、
报错处理
https://blog.csdn.net/qq_30242609/article/details/79047660
运行tasks有报错 “Celery ValueError: not enough values to unpack (expected 3, got 0)”
启动worker的时候加一个参数,如下:
celery -A <mymodule> worker -l info -P eventlet
任务处理状态
官网解释:
http://docs.celeryproject.org/en/latest/faq.html#how-do-i-get-the-result-of-a-task-if-i-have-the-id-that-points-there
Answer: Use task.AsyncResult:
>>> result = my_task.AsyncResult(task_id)
>>> result.get()
This will give you a AsyncResult
instance using the tasks current result backend.
If you need to specify a custom result backend, or you want to use the current application’s default backend you can use app.AsyncResult
:
>>> result = app.AsyncResult(task_id)
>>> result.get()
StackOverflow
https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery
Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:
x = method.delay(1,2)
print x.task_id
When asking, get a new AsyncResult using this task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
API
http://docs.celeryproject.org/en/latest/reference/celery.result.html
通过collect接口获取, 底层机制是python coroutine
from celery import group
from proj.celery import app
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
>>> from celery.result import ResultBase
>>> from proj.tasks import A
>>> result = A.delay(10)
>>> [v for v in result.collect()
... if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
定时任务
https://realpython.com/asynchronous-tasks-with-django-and-celery/
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger
from photos.utils import save_latest_flickr_image
logger = get_task_logger(__name__)
@periodic_task(
run_every=(crontab(minute='*/15')),
name="task_save_latest_flickr_image",
ignore_result=True
)
def task_save_latest_flickr_image():
"""
Saves latest image from Flickr
"""
save_latest_flickr_image()
logger.info("Saved image from Flickr")
Here, we run the save_latest_flickr_image()
function every fifteen minutes by wrapping the function call in a task
. The @periodic_task
decorator abstracts out the code to run the Celery task, leaving the tasks.py file clean and easy to read!
参考:
https://www.liaoxuefeng.com/article/00137760323922531a8582c08814fb09e9930cede45e3cc000