# 通过RabbitMQ实现rpc
# 实现逻辑
1.服务端启动接收消息,监听queue队列。
2.实列化客户端,调用call方法,将消息属性内包含: 1.回调函数随机队列,接收服务端返回结果,服务端会将结果发送到这个队列。2.客户但的随机uuid,标识唯一消息。然后将body消息发送给服务端。
3.客户端,发布完消息后,进入非阻塞状态,如果没有接收到服务端返回的结果,会一直等待,直到收到结果,然后返回结果。
4.服务端接收queue队列消息,调用函数将消息进行处理,获取裴波那契数列。
5.然后服务端进行发布,将消息发送到客户端的回调函数队列,客户端的uuid。
6.客户端监听接收队列消息,调用函数处理,判断唯一uuid,确认body,然后成功收到消息并返回。
import pika
# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq服务器ip地址,credentials指定用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列
channel = connection.channel()
channel.queue_declare(queue='rpc_queue') # 声明队列,如果队列不存在,会自动创建
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else: # 递归调用,计算斐波那契数列
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body): # ch为频道,method为方法,props为属性,body为消息体
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 将消息发送到客户端的回调函数
properties=pika.BasicProperties(correlation_id = \
props.correlation_id), # 将客户端的correlation_id传递给客户端
body=str(response))
# 发送ack消息,告诉rabbitmq,消息已经被处理
ch.basic_ack(delivery_tag=method.delivery_tag)
# 每次只接收一个消息
channel.basic_qos(prefetch_count=1)
# 消费消息
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) # queue为队列名,on_message_callback为回调函数,收到消息后,会调用回调函数
print(" [x] Awaiting RPC requests")
channel.start_consuming() # 开始接收消息,进入阻塞状态,等待消息,直到收到消息为止,收到消息后,会调用on_request函数
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
self.credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq服务器ip地址,credentials指定用户名和密码
self.connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=self.credentials))
self.channel = self.connection.channel()
# 声明一个随机队列,用来接收rpc_server返回的结果
result = self.channel.queue_declare(queue='', exclusive=True) # exclusive=True表示这个队列只能被当前连接使用,当连接关闭时,队列会被删除,exclusive=True是为了防止多个客户端同时使用一个队列
# 获取随机队列的名称
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response, # 消费消息
auto_ack=True # 自动发送ack消息,告诉rabbitmq,消息已经被处理
)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
# 生成一个随机的correlation_id, 用来标识消息, 客户端和服务端都会用这个id来标识消息,
# 客户端会将这个id传递给服务端, 服务端会将这个id传递给客户端, 客户端和服务端都会将这个id与自己的id进行比较, 如果不一致, 则丢弃这个消息
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue', # 将消息发送到rpc_queue队列
properties=pika.BasicProperties( # 消息属性, 用来标识消息
reply_to=self.callback_queue, # 将消息发送到客户端的回调函数, 用来接收服务端返回的结果, 服务端会将结果发送到这个队列
correlation_id=self.corr_id, # 将客户端的crrelation_id发送给服务端
),
body=str(n) # 将消息发送给服务端, 服务端会将这个消息作为参数传递给fib函数
)
while self.response is None: # 如果没有收到服务端返回的结果, 则一直等待, 直到收到结果, 然后返回结果
self.connection.process_data_events() # 非阻塞版的start_consuming(), 用来接收消息
return int(self.response)
fibonacci_rpc = FibonacciRpcClient() # 实例化客户端, 用来发送消息, 并接收服务端返回的结果, 并返回结果, 用来调用服务端的方法
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(10) # 调用call方法, 发送消息, 并接收服务端返回的结果, 然后打印结果
print(" [.] Got %r" % response)