Python操作RabbitMQ服务器实现消息队列的路由功能
admin
2023-08-02 15:39:09
0

Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server),这里我们来看一下MQ相关的路由功能。

路由键的实现

比如有一个需要给所有接收端发送消息的场景,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了。

路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键。发送端通过交换机发送信息时,可以指明路由键 ,交换机会根据路由键把消息发送到相应的消息队列,这样接收端就能接收到消息了。

这边继上一篇,还是用send.py和receive.py来模拟实现路由键的功能。send.py表示发送端,receive.py表示接收端。实例的功能就是将info、warning、error三种级别的信息发送到不同的接收端。

send.py代码分析

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        \'localhost\'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange=\'messages\', type=\'direct\')
 
#定义三个路由键
routings = [\'info\', \'warning\', \'error\']
 
#将消息依次发送到交换机,并设置路由键
for routing in routings:
  message = \'%s message.\' % routing
  channel.basic_publish(exchange=\'messages\',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

receive.py代码分析

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        \'localhost\'))
channel = connection.channel()
 
#定义交换机,设置类型为direct
channel.exchange_declare(exchange=\'messages\', type=\'direct\')
 
#从命令行获取路由键参数,如果没有,则设置为info
routings = sys.argv[1:]
if not routings:
  routings = [\'info\']
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange=\'messages\',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print \" [x] Received %r\" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print \' [*] Waiting for messages. To exit press CTRL+C\'
channel.start_consuming()

打开两个终端,一个运行代码python receive.py info warning,表示只接收info和warning的消息。另外一个终端运行send.py,可以观察到接收终端只接收到了info和warning的消息。如果打开多个终端运行receive.py,并传入不同的路由键参数,可以看到更明显的效果。

当接收端正在运行时,可以使用rabbitmqctl list_bindings来查看绑定情况。

路由键模糊匹配
路由键模糊匹配,就是可以使用正则表达式,和常用的正则表示式不同,这里的话“#”表示所有、全部的意思;“*”只匹配到一个词。看完示例就能明白了。

这边继上面的例子,还是用send.py和receive.py来实现路由键模糊匹配的功能。send.py表示发送端,receive.py表示接收端。实例的功能大概是这样:比如你有个知心好朋友,不管开心、伤心、工作上的还是生活上的事情都可以和她说;还有一些朋友可以分享开心的事情;还有一些朋友,你可以把不开心的事情和她说。

send.py代码分析

因为要进行路由键模糊匹配,所以交换机的类型要设置为topic,设置为topic,就可以使用#,*的匹配符号了。

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        \'localhost\'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange=\'messages\', type=\'topic\')
 
#定义路由键
routings = [\'happy.work\', \'happy.life\', \'sad.work\', \'sad.life\']
 
#将消息依次发送到交换机,并设定路由键
for routing in routings:
  message = \'%s message.\' % routing
  channel.basic_publish(exchange=\'messages\',
             routing_key=routing,
             body=message)
  print message
 
connection.close()

上例中定义了四种类型的消息,容易理解,就不解释了,然后依次发送出去。

receive.py代码分析

同样,交换机的类型要设定为topic就可以了。从命令行接收参数的功能稍微调整了一下,就是没有参数时报错退出。

#!/usr/bin/env python
#coding=utf8
import pika, sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        \'localhost\'))
channel = connection.channel()
 
#定义交换机,设置类型为topic
channel.exchange_declare(exchange=\'messages\', type=\'topic\')
 
#从命令行获取路由参数,如果没有,则报错退出
routings = sys.argv[1:]
if not routings:
  print >> sys.stderr, \"Usage: %s [routing_key]...\" % (sys.argv[0],)
  exit()
 
#生成临时队列,并绑定到交换机上,设置路由键
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
for routing in routings:
  channel.queue_bind(exchange=\'messages\',
            queue=queue_name,
            routing_key=routing)
 
def callback(ch, method, properties, body):
  print \" [x] Received %r\" % (body,)
 
channel.basic_consume(callback, queue=queue_name, no_ack=True)
 
print \' [*] Waiting for messages. To exit press CTRL+C\'
channel.start_consuming()

打开四个终端,一个运行如下,表示任何事情都可以和她说:

python receive.py \"#\"

另外一个终端 运行如下,表示可以和她分享开心的事:

python receive.py \"happy.*\"

第三个运行如下,表示工作上的事情可以和她分享:

python receive.py \"*.work\"

最后一个运行python send.py。结果不难想象出来,就不贴出来了。

相关内容

热门资讯

500 行 Python 代码... 语法分析器描述了一个句子的语法结构,用来帮助其他的应用进行推理。自然语言引入了很多意外的歧义,以我们...
定时清理删除C:\Progra... C:\Program Files (x86)下面很多scoped_dir开头的文件夹 写个批处理 定...
65536是2的几次方 计算2... 65536是2的16次方:65536=2⁶ 65536是256的2次方:65536=256 6553...
Mobi、epub格式电子书如... 在wps里全局设置里有一个文件关联,打开,勾选电子书文件选项就可以了。
scoped_dir32_70... 一台虚拟机C盘总是莫名奇妙的空间用完,导致很多软件没法再运行。经过仔细检查发现是C:\Program...
小程序支付时提示:appid和... [Q]小程序支付时提示:appid和mch_id不匹配 [A]小程序和微信支付没有进行关联,访问“小...
pycparser 是一个用... `pycparser` 是一个用 Python 编写的 C 语言解析器。它可以用来解析 C 代码并构...
微信小程序使用slider实现... 众所周知哈,微信小程序里面的音频播放是没有进度条的,但最近有个项目呢,客户要求音频要有进度条控制,所...
python查找阿姆斯特朗数 题目解释 如果一个n位正整数等于其各位数字的n次方之和,则称该数为阿姆斯特朗数。 例如1^3 + 5...
Apache Doris 2.... 亲爱的社区小伙伴们,我们很高兴地向大家宣布,Apache Doris 2.0.0 版本已于...