环境配置

本文参考自Celery官方文档

# Python包
Django==3.1.7
celery==5.0.5
django-celery-results==2.0.1
# 消息代理
RabbitMQ==3.8.14

Step1 新建django项目

我的django项目结构如下

(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ tree -I "venv|migrations|__pycache__" -L 5
.
├── apps
│   ├── config
│   │   ├── admin.py
│   │   ├── apps.py
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── serializers.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   └── views.py
│   ├── __init__.py
│   ├── project
│   │   ├── admin.py
│   │   ├── apps.py
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── serializers.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   └── views.py
│   ├── testcase
│   │   ├── admin.py
│   │   ├── apps.py
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── serializers.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   └── views.py
│   ├── testsuite
│   │   ├── admin.py
│   │   ├── apps.py
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── serializers.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   └── views.py
│   └── user
│       ├── admin.py
│       ├── apps.py
│       ├── __init__.py
│       ├── models.py
│       ├── serializers.py
│       ├── tests.py
│       ├── urls.py
│       └── views.py
├── beer_server
│   ├── asgi.py
│   ├── celery.py
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── LICENSE
├── manage.py
├── README.md
├── requirements.txt

Step2 安装消息代理服务RabbitMQ

我这里为了演示使用docker启动一个RabbitMQ容器

docker run -d --name ramq -p 5672:5672 -p 15672:15672 rabbitmq && docker ps

进入容器开启RabbitMQ的Web管理界面服务(可选)

# 进入ramq容器
docker exec -it ramq /bin/bash
# 开启web管理服务
rabbitmq-plugins enable rabbitmq_management

此时访问http://127.0.0.1:15672/#/就可以看到RabbitMQ的登录页面了,账号和密码默认都是guest,登录后就可以看到如下页面了
image.png

Step3 pip3安装Celery包

pip3 install -i https://pypi.douban.com/simple celery

Step4 在项目settings.py同级目录下新建celery.py文件,文件内容如下

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'beer_server.settings')

app = Celery('beer_server')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Step5 在项目settings.py同级目录下的__init__.py文件内写入如下内容

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

Step6 新建一个测试任务模块:apps目录下的testcase目录下新建tasks.py文件(eg:apps/testcase/tasks.py),内容如下

from celery import shared_task
from testcase.models import TestCase


@shared_task
def add(x, y):
    return x + y


@shared_task
def count_testcases():
    return TestCase.objects.count()

Step7 安装django-celery-results包,使用Django ORM作为结果后端

安装django_celery_results

pip3 install -i https://pypi.douban.com/simple django-celery-results

在settings.py文件中的INSTALLED_APPS列表中添加django_celery_results

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'django_celery_results',
    ]

进行数据迁移

python3 manage.py migrate django_celery_results

然后在settings.py文件中添加如下内容

# Celery配置选项
# 配置时区,使用与django项目相同的时区设置
CELERY_TIMEZONE = TIME_ZONE
# 异步任务运行结果使用django自带的ORM来存储
CELERY_RESULT_BACKEND = 'django-db'

Step8 启动Celery任务队列服务

(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ celery -A beer_server worker -l INFO
 
 -------------- celery@w-Vulcan-Series v5.0.5 (singularity)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-60-generic-x86_64-with-glibc2.29 2021-04-26 23:21:07
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         beer_server:0x7f62b67dcfa0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . testcase.tasks.add
  . testcase.tasks.count_testcases

[2021-04-26 23:21:07,357: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-04-26 23:21:07,367: INFO/MainProcess] mingle: searching for neighbors
[2021-04-26 23:21:08,394: INFO/MainProcess] mingle: all alone
[2021-04-26 23:21:08,410: WARNING/MainProcess] /home/w/PycharmProjects/beer_server/venv/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
            leak, never use this setting in production environments!
  warnings.warn('''Using settings.DEBUG leads to a memory

[2021-04-26 23:21:08,411: INFO/MainProcess] celery@w-Vulcan-Series ready.

Step9 使用django shell对Celery服务进行测试

(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ python3 manage.py shell
Python 3.8.5 (default, Jan 27 2021, 15:41:15) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testcase.tasks import count_testcases, add
>>> print(count_testcases.delay())
57bdf093-a6ff-4846-acd9-cacc83a3f513
>>> print(add.delay(10, 25))
f8125941-2887-4590-9e63-02f61e353690

Celery服务运行的日志如下,可以看到已经正常接收到这两个异步任务,并执行完成

[2021-04-26 23:21:08,411: INFO/MainProcess] celery@w-Vulcan-Series ready.
[2021-04-26 23:23:44,528: INFO/MainProcess] Received task: testcase.tasks.count_testcases[57bdf093-a6ff-4846-acd9-cacc83a3f513]  
[2021-04-26 23:23:44,555: INFO/ForkPoolWorker-8] Task testcase.tasks.count_testcases[57bdf093-a6ff-4846-acd9-cacc83a3f513] succeeded in 0.025395925000339048s: 2
[2021-04-26 23:23:57,057: INFO/MainProcess] Received task: testcase.tasks.add[f8125941-2887-4590-9e63-02f61e353690]  
[2021-04-26 23:23:57,075: INFO/ForkPoolWorker-8] Task testcase.tasks.add[f8125941-2887-4590-9e63-02f61e353690] succeeded in 0.017471159000706393s: 35

查看django_celery_results_taskresult表,可以看到运行成功的异步任务的执行记录

image.png

end!

Q.E.D.


一只鹅啊