软件信息网 移动端开发 通过rabbitmq实现rpc(基于RabbitMQ封装RPC)

通过rabbitmq实现rpc(基于RabbitMQ封装RPC)

# 通过RabbitMQ实现rpc

# 实现逻辑
	1.服务端启动接收消息,监听queue队列。
    2.实列化客户端,调用call方法,将消息属性内包含: 1.回调函数随机队列,接收服务端返回结果,服务端会将结果发送到这个队列。2.客户但的随机uuid,标识唯一消息。然后将body消息发送给服务端。
    3.客户端,发布完消息后,进入非阻塞状态,如果没有接收到服务端返回的结果,会一直等待,直到收到结果,然后返回结果。
    4.服务端接收queue队列消息,调用函数将消息进行处理,获取裴波那契数列。
    5.然后服务端进行发布,将消息发送到客户端的回调函数队列,客户端的uuid。
    6.客户端监听接收队列消息,调用函数处理,判断唯一uuid,确认body,然后成功收到消息并返回。
  • 服务端
import pika

# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq服务器ip地址,credentials指定用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
                                                               credentials=credentials))
# 在连接上创建一个频道,这个频道就是我们要通过它来发送,接收消息的对象,类似于TCP中的socket对象,我们通过它来收发消息,声明队列
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')  # 声明队列,如果队列不存在,会自动创建

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:  # 递归调用,计算斐波那契数列
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):  # ch为频道,method为方法,props为属性,body为消息体
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,  # 将消息发送到客户端的回调函数
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),  # 将客户端的correlation_id传递给客户端
                     body=str(response))

    # 发送ack消息,告诉rabbitmq,消息已经被处理
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 每次只接收一个消息
channel.basic_qos(prefetch_count=1)
# 消费消息
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)  # queue为队列名,on_message_callback为回调函数,收到消息后,会调用回调函数

print(" [x] Awaiting RPC requests")

channel.start_consuming()  # 开始接收消息,进入阻塞状态,等待消息,直到收到消息为止,收到消息后,会调用on_request函数
  • 客户端
import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定
        self.credentials = pika.PlainCredentials("admin", "admin")
        # host指定rabbitmq服务器ip地址,credentials指定用户名和密码
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=self.credentials))
        self.channel = self.connection.channel()

        # 声明一个随机队列,用来接收rpc_server返回的结果
        result = self.channel.queue_declare(queue='', exclusive=True)  # exclusive=True表示这个队列只能被当前连接使用,当连接关闭时,队列会被删除,exclusive=True是为了防止多个客户端同时使用一个队列
        # 获取随机队列的名称
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,  # 消费消息
            auto_ack=True  # 自动发送ack消息,告诉rabbitmq,消息已经被处理
        )


    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body


    def call(self, n):
        self.response = None
        # 生成一个随机的correlation_id, 用来标识消息, 客户端和服务端都会用这个id来标识消息,
        # 客户端会将这个id传递给服务端, 服务端会将这个id传递给客户端, 客户端和服务端都会将这个id与自己的id进行比较, 如果不一致, 则丢弃这个消息
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',  # 将消息发送到rpc_queue队列
            properties=pika.BasicProperties(  # 消息属性, 用来标识消息
                reply_to=self.callback_queue,  # 将消息发送到客户端的回调函数, 用来接收服务端返回的结果, 服务端会将结果发送到这个队列
                correlation_id=self.corr_id,  # 将客户端的crrelation_id发送给服务端
            ),
            body=str(n)  # 将消息发送给服务端, 服务端会将这个消息作为参数传递给fib函数
        )

        while self.response is None:  # 如果没有收到服务端返回的结果, 则一直等待, 直到收到结果, 然后返回结果
            self.connection.process_data_events()  # 非阻塞版的start_consuming(), 用来接收消息
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()  # 实例化客户端, 用来发送消息, 并接收服务端返回的结果, 并返回结果, 用来调用服务端的方法

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(10)  # 调用call方法, 发送消息, 并接收服务端返回的结果, 然后打印结果
print(" [.] Got %r" % response)

image

作者: 软件定制开发

李铁牛,一直致力于企业客户软件定制开发,计算机专业毕业后,一直从事于互联网产品开发到现在。系统开发,系统源码:15889726201
上一篇
下一篇
联系我们

联系我们

15889726201

在线咨询: QQ交谈

邮箱: 187395037@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部