admin管理员组

文章数量:1033333

Python 与 ActiveMQ 交互发送消息示例

网上搜到的文章大多是用一段代码生产和消费数据,和实际应用有一些区别,因此专门整理了几段代码,编写几个 Python 操作 ActiveMQ 的例子,演示 Python 使用 STOMP 协议连接 ActiveMQ 的场景,更贴合生产实际的需要。

  • 向队列中发送消息
  • 消费队列中的数据
  • 向 Topic 中发送消息
  • 消费 Topic 中的消息

本文基于 ActiveMQ 5.8.0 以及 Python 3.9.13 ActiveMQ 官方网站 中显示,ActiveMQ 已经分成 ActiveMQ Classic 和 ActiveMQ Artemis 两个分支,其中 Classic 主要是 5.x 的版本,Artemis 是 6.x 的版本。

向队列中发送消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection([('localhost',61613)])
conn.set_listener('logicServerQueue', MyListener())
# conn.start()
conn.connect('admin', 'admin', wait=True)
# conn.subscribe(destination='/queue/test_queue', id=1, ack='auto')

while True:
    t=time.gmtime()
    msg=" hello  " + time.strftime("%Y-%m-%d %H:%M:%S",t)
    conn.send(body=msg, destination='/queue/test', headers={'consumerId': 'qmsg_producer'})
    print(" send : " + msg)
    time.sleep(10)

conn.disconnect()

消费队列中的数据

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='auto')

while True:
    time.sleep(5)

将消费者放入了循环中,否则消费一次后会退出整个应用。

向 Topic 中发送消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection([('localhost',61613)])
conn.set_listener('logicServerQueue', MyListener())
# conn.start()
conn.connect('admin', 'admin', wait=True)
# conn.subscribe(destination='/queue/test_queue', id=1, ack='auto')

while True:
    t=time.gmtime()
    msg=" hello  " + time.strftime("%Y-%m-%d %H:%M:%S",t)
    conn.send(body=msg, destination='/topic/test', headers={'consumerId': 'topic_producer'})
    print(" send : " + msg)
    time.sleep(10)

conn.disconnect()

消费 Topic 中的消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='auto')

while True:
    time.sleep(5)

Python 与 ActiveMQ 交互发送消息示例

网上搜到的文章大多是用一段代码生产和消费数据,和实际应用有一些区别,因此专门整理了几段代码,编写几个 Python 操作 ActiveMQ 的例子,演示 Python 使用 STOMP 协议连接 ActiveMQ 的场景,更贴合生产实际的需要。

  • 向队列中发送消息
  • 消费队列中的数据
  • 向 Topic 中发送消息
  • 消费 Topic 中的消息

本文基于 ActiveMQ 5.8.0 以及 Python 3.9.13 ActiveMQ 官方网站 中显示,ActiveMQ 已经分成 ActiveMQ Classic 和 ActiveMQ Artemis 两个分支,其中 Classic 主要是 5.x 的版本,Artemis 是 6.x 的版本。

向队列中发送消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection([('localhost',61613)])
conn.set_listener('logicServerQueue', MyListener())
# conn.start()
conn.connect('admin', 'admin', wait=True)
# conn.subscribe(destination='/queue/test_queue', id=1, ack='auto')

while True:
    t=time.gmtime()
    msg=" hello  " + time.strftime("%Y-%m-%d %H:%M:%S",t)
    conn.send(body=msg, destination='/queue/test', headers={'consumerId': 'qmsg_producer'})
    print(" send : " + msg)
    time.sleep(10)

conn.disconnect()

消费队列中的数据

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='auto')

while True:
    time.sleep(5)

将消费者放入了循环中,否则消费一次后会退出整个应用。

向 Topic 中发送消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection([('localhost',61613)])
conn.set_listener('logicServerQueue', MyListener())
# conn.start()
conn.connect('admin', 'admin', wait=True)
# conn.subscribe(destination='/queue/test_queue', id=1, ack='auto')

while True:
    t=time.gmtime()
    msg=" hello  " + time.strftime("%Y-%m-%d %H:%M:%S",t)
    conn.send(body=msg, destination='/topic/test', headers={'consumerId': 'topic_producer'})
    print(" send : " + msg)
    time.sleep(10)

conn.disconnect()

消费 Topic 中的消息

代码语言:javascript代码运行次数:0运行复制
import time
import sys

import stomp

class MyListener(stomp.ConnectionListener):
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)

    def on_message(self, frame):
        print('received a message "%s"' % frame.body)

conn = stomp.Connection()
conn.set_listener('', MyListener())
conn.connect('admin', 'admin', wait=True)
conn.subscribe(destination='/queue/test', id=1, ack='auto')

while True:
    time.sleep(5)

本文标签: Python 与 ActiveMQ 交互发送消息示例