本文基于 ActiveMQ 5.8.0 以及 Python 3.9.13
ActiveMQ 官方网站 中显示,ActiveMQ 已经分成 ActiveMQ Classic 和 ActiveMQ Artemis 两个分支,其中 Classic 主要是 5.x 的版本,Artemis 是 6.x 的版本。
网上搜到的文章大多是用一段代码生产和消费数据,和实际应用有一些区别,因此专门整理了几段代码,更贴合生产实际的需要。如果需要 ActiveMQ 的本地环境,可以参考 构建ActiveMQ镜像并运行 快速搭建本地环境。
编写几个 Python 操作 ActiveMQ 的例子,演示 Python 使用 STOMP 协议连接 ActiveMQ 的场景。
- 向队列中发送消息
- 消费队列中的数据
- 向 Topic 中发送消息
- 消费 Topic 中的消息
向队列中发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import time import sys
import stomp
class MyListener(stomp.ConnectionListener): def on_error(self, frame): print('received an error "%s"' % frame.body)
def on_message(self, frame): print('received a message "%s"' % frame.body)
conn = stomp.Connection([('localhost',61613)]) conn.set_listener('logicServerQueue', MyListener())
conn.connect('admin', 'admin', wait=True)
while True: t=time.gmtime() msg=" hello " + time.strftime("%Y-%m-%d %H:%M:%S",t) conn.send(body=msg, destination='/queue/test', headers={'consumerId': 'qmsg_producer'}) print(" send : " + msg) time.sleep(10)
conn.disconnect()
|
消费队列中的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import time import sys
import stomp
class MyListener(stomp.ConnectionListener): def on_error(self, frame): print('received an error "%s"' % frame.body)
def on_message(self, frame): print('received a message "%s"' % frame.body)
conn = stomp.Connection() conn.set_listener('', MyListener()) conn.connect('admin', 'admin', wait=True) conn.subscribe(destination='/queue/test', id=1, ack='auto')
while True: time.sleep(5)
|
将消费者放入了循环中,否则消费一次后会退出整个应用。
向 Topic 中发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import time import sys
import stomp
class MyListener(stomp.ConnectionListener): def on_error(self, frame): print('received an error "%s"' % frame.body)
def on_message(self, frame): print('received a message "%s"' % frame.body)
conn = stomp.Connection([('localhost',61613)]) conn.set_listener('logicServerQueue', MyListener())
conn.connect('admin', 'admin', wait=True)
while True: t=time.gmtime() msg=" hello " + time.strftime("%Y-%m-%d %H:%M:%S",t) conn.send(body=msg, destination='/topic/test', headers={'consumerId': 'topic_producer'}) print(" send : " + msg) time.sleep(10)
conn.disconnect()
|
消费 Topic 中的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import time import sys
import stomp
class MyListener(stomp.ConnectionListener): def on_error(self, frame): print('received an error "%s"' % frame.body)
def on_message(self, frame): print('received a message "%s"' % frame.body)
conn = stomp.Connection() conn.set_listener('', MyListener()) conn.connect('admin', 'admin', wait=True) conn.subscribe(destination='/queue/test', id=1, ack='auto')
while True: time.sleep(5)
|
参考资料
- python stomp 收发指定的消息
- ActiveMQ 配置自动清除数据
- python接收ActiveMQ消息