软件信息网 移动端开发 持久化(消息安全之durable持久化)

持久化(消息安全之durable持久化)

1.什么是rabbitmq持久化?
	数据支持持久化,运行过程中,rabbitmq宕机了,在重新启动起来,如果队列消费消息没被消费,那么就还是会存在。

2.配置队列持久化
	# 在创建队列的时候增加durable=True设置队列持久化,如果rabbitmq服务重启,队列不会丢失
	channel.queue_declare(queue='datalog',durable=True)

3.配置消息持久化
	# 在发布消息的时候增加properties设置消息持久化,如果rabbitmq服务停止,重启后,消息还在, 1:非持久化,2:持久化,默认为1
    properties=pika.BasicProperties(delivery_mode=2,)
    
# 注意:
	1.没加持久化配置之前的队列不会支持持久化,需要加持久化配置之后重新创建。
* 生产者
import pika

# 创建连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22'))  # host指定rabbitmq服务器ip地址

# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))  # host指定rabbitmq服务器ip地址,credentials指定用户名和密码


# 创建channel对象,用于发送消息,接收消息,声明队列
channel = connection.channel()  # connection.channel(): 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列

# 声明队列,如果队列不存在,则创建队列,如果队列存在,则不创建
channel.queue_declare(queue='datalog', durable=True)  # durable=True: 队列持久化,如果rabbitmq服务停止,重启后,队列还在

# 生产者向队列中放一条消息
channel.basic_publish(exchange='',  # 交换机,如果不指定,则使用默认的交换机, 默认的交换机
                      routing_key='datalog',  # 队列名称
                      body='zll nb!',  # 发送的消息
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 消息持久化,如果rabbitmq服务停止,重启后,消息还在, 1:非持久化,2:持久化
                      )
                      )

print("Sent 'Hello World!'")

# 关闭连接
connection.close()
  • 消费者
import pika, sys, os

def main():
    # 创建连接对象
    # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22'))  # host指定rabbitmq服务器ip地址

    # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
                                                                   credentials=credentials))  # host指定rabbitmq服务器ip地址,credentials指定用户名和密码

    # 创建channel对象,用于发送消息,接收消息,声明队列
    channel = connection.channel()  # connection.channel(): 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列

    """消费者也声明了队列,因为如果是消费者先启动,那么队列就不存在了,消费者就无法消费消息了,所以消费者也要声明队列"""
    channel.queue_declare(queue='datalog', durable=True)  # durable=True: 队列持久化,如果rabbitmq服务重启,队列不会丢失

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        # 真正的将消息消费完了,再发确认
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # 消费者从队列queue指定的消费队列hello中取消息,拿到数据了之前将hello队列的数据丢到callback里面,如果队列中没有消息,则会一直等待,直到有消息为止
    # auto_ack=True:自动确认消息,如果不设置为True,那么消息会一直处于未确认状态,即使消费者已经消费了消息,消息也不会从队列中删除,这样就会造成消息的重复消费,所以一般都会设置为True
    # auto_ack=true: 队列接收到既直接确认,就会删除队列中的消息,不会管后面数据会不会消费完。
    channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False)  # 默认为false,不自动确认消息,需要手动确认

    print(' [*] Waiting for messages. To exit press CTRL+C')

    channel.start_consuming()  # 开始消费消息,如果队列中没有消息,那么就会一直等待,直到有消息为止,如果队列中有消息,那么就会消费消息

if __name__ == '__main__':
    main()

image

作者: 软件定制开发

李铁牛,一直致力于企业客户软件定制开发,计算机专业毕业后,一直从事于互联网产品开发到现在。系统开发,系统源码:15889726201
上一篇
下一篇
联系我们

联系我们

15889726201

在线咨询: QQ交谈

邮箱: 187395037@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部