# auto_ack: 自动确认消息(队列接收到就会确认消费,会丢失数据的可能性) 默认为false
auto_ack=true: 队列接收到既直接确认,就会删除队列中的消息,不会管后面数据会不会消费完。
auto_ack=false: 设置为false的情况,那么消息会一直处于未确认状态,即使消费者已经消费了消息,消息也不会从队列中删除,这样就会造成消息的重复消费
# ch.basic_ack: 消费完后,自动确认消费(可靠性,保证数据都完整的消费): 常用推荐
ch.basic_ack(delivery_tag=method.delivery_tag): 真正的将消息消费完了后,再发确认,就会删除掉队列中的消息。
* 生产者
import pika
# 创建连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq服务器ip地址
# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq服务器ip地址,credentials指定用户名和密码
# 创建channel对象,用于发送消息,接收消息,声明队列
channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列
# 声明队列,如果队列不存在,则创建队列,如果队列存在,则不创建
channel.queue_declare(queue='datalog')
# 生产者向队列中放一条消息
channel.basic_publish(exchange='', # 交换机,如果不指定,则使用默认的交换机, 默认的交换机
routing_key='datalog', # 队列名称
body='zll nb!' # 发送的消息
)
print("Sent 'Hello World!'")
# 关闭连接
connection.close()
-
消费者
copy
import pika, sys, os
def main():
# 创建连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq服务器ip地址
# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq服务器ip地址,credentials指定用户名和密码
# 创建channel对象,用于发送消息,接收消息,声明队列
channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列
"""消费者也声明了队列,因为如果是消费者先启动,那么队列就不存在了,消费者就无法消费消息了,所以消费者也要声明队列"""
channel.queue_declare(queue='datalog')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 真正的将消息消费完了,再发确认
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消费者从队列queue指定的消费队列hello中取消息,拿到数据了之前将hello队列的数据丢到callback里面,如果队列中没有消息,则会一直等待,直到有消息为止
# auto_ack=True:自动确认消息,如果不设置为True,那么消息会一直处于未确认状态,即使消费者已经消费了消息,消息也不会从队列中删除,这样就会造成消息的重复消费,所以一般都会设置为True
# auto_ack=true: 队列接收到既直接确认,就会删除队列中的消息,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 默认为false,不自动确认消息,需要手动确认
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 开始消费消息,如果队列中没有消息,那么就会一直等待,直到有消息为止,如果队列中有消息,那么就会消费消息
if __name__ == '__main__':
main()