菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
14
0

Celery

原创
05/13 14:22
阅读数 37345

 Celery

http://docs.celeryproject.org/en/latest/index.html

Celery - Distributed Task Queue

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.

Celery is Open Source and licensed under the BSD License.

 

DEMO安装运行:

https://github.com/fanqingsong/celery_running

#install dependency
pipenv install

#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet

# run producer
pipenv run python taskscaller.py

 

任务客户端:

 

 任务发送端:、

 

 

报错处理

https://blog.csdn.net/qq_30242609/article/details/79047660

运行tasks有报错 “Celery ValueError: not enough values to unpack (expected 3, got 0)”

 

启动worker的时候加一个参数,如下:

celery -A <mymodule> worker -l info -P eventlet

 

任务处理状态

官网解释:

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-get-the-result-of-a-task-if-i-have-the-id-that-points-there

How do I get the result of a task if I have the ID that points there?

Answer: Use task.AsyncResult:

>>> result = my_task.AsyncResult(task_id)
>>> result.get()

This will give you a AsyncResult instance using the tasks current result backend.

If you need to specify a custom result backend, or you want to use the current application’s default backend you can use app.AsyncResult:

>>> result = app.AsyncResult(task_id)
>>> result.get()

 

StackOverflow

https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery

 

Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:

x = method.delay(1,2)
print x.task_id

When asking, get a new AsyncResult using this task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

 

API

http://docs.celeryproject.org/en/latest/reference/celery.result.html

通过collect接口获取, 底层机制是python coroutine

from celery import group
from proj.celery import app

@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()

@app.task(trail=True)
def B(i):
    return pow2.delay(i)

@app.task(trail=True)
def pow2(i):
    return i ** 2
>>> from celery.result import ResultBase
>>> from proj.tasks import A

>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

定时任务

https://realpython.com/asynchronous-tasks-with-django-and-celery/

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger

from photos.utils import save_latest_flickr_image

logger = get_task_logger(__name__)


@periodic_task(
    run_every=(crontab(minute='*/15')),
    name="task_save_latest_flickr_image",
    ignore_result=True
)
def task_save_latest_flickr_image():
    """
    Saves latest image from Flickr
    """
    save_latest_flickr_image()
    logger.info("Saved image from Flickr")

Here, we run the save_latest_flickr_image() function every fifteen minutes by wrapping the function call in a task. The @periodic_task decorator abstracts out the code to run the Celery task, leaving the tasks.py file clean and easy to read!

 

 参考:

https://www.liaoxuefeng.com/article/00137760323922531a8582c08814fb09e9930cede45e3cc000

 

发表评论

0/200
14 点赞
0 评论
收藏