Celery as a microservice message queue

django,, python | Comments

引子

随着业务变得复杂,代码库变得越来越大,扇贝最近也开始了应用微服务化拆分我们的项目。 随着微服务化引入的一个问题就是,不同的服务之间需要能够实现互相通信。我们需要一个可以在不同的服务之间传递消息的方案。 在Python开发中,谈到任务队列、消息传递,首先想到的就是Celery,Celery是一个分布式的任务队列,借助一个broker,可以实现任务的分发和执行。

一个celery的task大概会是这样:

1
2
3
4
5
6
7
8
a.py
@task
def add(x, y)
    return x+y


b.py
add.delay(3, 4)

看起来使用celery的时候会有一个限制,就是调用task的地方必须能够import这个task,这就要求他们的代码运行同一个项目中。不过delay调用其实就是往celery的broker发送一条消息,如果我们知道消息的格式,是不是可以不通过delay,而是由自己往broker发送一条消息的方式来触发我们想要调用的task?如果我们把函数名想象成消息,也就相当于实现了服务间的消息传递。

Celery的内部实现

celery中一个task消息的例子:

1
2
3
4
{'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}}

只要我们把这种格式的消息发到对应的queue里面,celery的worker就会收到并自动执行对应的task。要想发送这条消息,我们最少需要知道两个参数:

  1. task的名字,用来构造消息
  2. celery使用的队列名,消息要发送到的地方。

Celery使用Kombu来和broker之间进行通信,我们也借助Kombu来发送这种消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from kombu.pools import producers
from kombu.utils import uuid

from django.conf import settings

def publish_message(key, body):
    # Publish a custom message to RabbitMQ
    print('Publish Message (routing_key:%s, body:%s)' % (key, body))
    body = {
        'id': uuid(),
        'task':'buses.%s' % key,
        'kwargs':body,
    }
    with producers[settings.MQ_CONNECTION].acquire(block=True) as producer:
        producer.publish(body,
            exchange=settings.SOME_EXCHANGE,
            routing_key=key,
            declare=[settings.SOME_EXCHANGE],
            serializer='json',
            compression='zlib')

publish_message('coins.charge', {'user_id':100, 'value': '1000'})

作为发送消息的Producer,我们可能并不知道对应的task被定义在什么地方,我们需要在Producer和Consumer之间统一名字来实现调用,我们这里用了buses.xxx的形式,如果没有指定,Celery会自动根据module和函数名组合成对应的task名,但是我们可以在定义task的时候指定好我们想要的名字。

1
2
3
4
coins/buses.py:
@task(name='buses.coins.charge')
def coins_charge(**kwargs):
    pass

如果使用的是Django作为开发框架,可以把这种task单独定义在一个模块中,然后通过app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name='buses')来导入。 上面的publish_message函数中只有一个意义不明确的参数exchange,并没有我们之前说到的queue的名字,那么消息是怎么传递到对应的queue里的呢?想要弄清楚这点,我们需要了解一点AMQP的知识。

AMQP(Advanced Message Queuing Protocol )

和HTTP、SMTP类似,AMQP定义了消息在不同的系统之间相互操作的方式,在AMQP中消息首先发送到exchange,再由exchange发送到对应的queue里。不同的类型的exchange使用不同的方式来分发消息到queue中。 AMQP中定义了4种类型的exchange。

  1. direct
  2. topic
  3. fanout
  4. header

创建queue的时候,需要指定一个routing key来和一个exchange绑定,这个key有时候也被称作binding key, 消息发布到exchange的时候也需要一个routing key,不同的exchange通过比较routing key和binding key来决定是否发送消息到对应的queue。 可以有多个queue绑定到同一个exchange,一个queue也可以用不同的binding key来绑定到同一个exchange。

当exchange为direct类型时,只有routing key和binding key完全一样时,消息才会发送到对应的queue中。

topic类型的exchange支持包含通配符的binding key,如果一个queue使用coins.#作为binding_key, 那么消息的routing key为coins.charge 或者coins.purchase 都会发到这个queue中。

fanout类型的exhange会忽略binding key,把接收到的消息发给所有绑定到它的queue。

header类型会根据message中header中的一些字段而不是完全靠routing key来发送消息。

更改Celery的Exchange

默认配置下Celery的worker会创建一个名为celery的exchange,一个名为celery的queue,并且使用的binding key也是celery。当调用task.delay时,实际上就是往celery这个exchange发送一条消息,routing key为celery,这样worker就会接收到然后调用对应的task。

在Celery worker启动时可以使用-Q参数来让它监听不同的queue

1
celery worker -A celery:app -Q celery,coins_tasks

如果按照Celery的默认配置,使用-Q指定的queue都会创建一个一样名字的exchange,并且使用一样的名字来binding,如果我们想把coins.charge,coins.purchase这类消息交由一个队列来处理,就需要使用topic类型的exchange修改Celery的默认配置。

1
2
3
4
5
from kombu import Exchange
EXAMPLE_EXCHANGE = Exchange('exchange_name', 'topic', durable=True)
CELERY_QUEUES = (
    Queue('coins_tasks',  EXAMPLE_EXCHANGE, routing_key='coins.#'),
)

当celery worker启动时,在配置文件中找到对应的queue的声明,就会按照声明区创建queue,这样其他服务里发送的消息就可以让Celery去监听执行了。

总结

Celery是一个借助AMQP实现的任务队列,不仅可以用它来完成异步任务,也可以借助它来作为服务之间的消息队列。虽然上面这些功能使用Redis也能完成,不过更推荐使用RabbitMQ来做Celery的broker,RabbitMQ是AMQP协议的一个实现,原生支持Exchange,Queue这些概念,从使用过程来看,可靠性和扩展性也比Redis要好。虽然现在Celery还只是个任务队列,不过也许以后会成为微服务化的标准配置也不一定。

Comments