Python RabbitMQ 使用Fair dispatch公平分发确保每个消费者当前消息平均

  • A+
所属分类:python语言

最近在写一个爬虫,一个程序去采集需要爬的页面URL然后放置到rabbitMQ队列中,然后启用多个消费者去爬取内容,由于使用代理去爬取,每个消费者处理的时间都会是不一样的。

生产URL的程序非常快的就会生产大量的URL,为了控制rabbitMQ内存使用,我控制了生产者的速度。当消息队列中有操作50条未处理时候,生产者就会暂停一段时间。

 
 

rabbitmq默认使用的是平均分发队列,来了消息就平均分配给消费者。

由于每个消费者处理的时间可能会不一样,就会出现,很多消息在个别的消费者上,其他消费者会空闲,

 
 

解决办法我们就要使用Fair dispatch公平分发策略,始终保持消费一定数量的消息

 
 

credentials = pika.PlainCredentials('admin', 'xxxxxxx')

# 连接到rabbitmq服务器

connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', credentials))

channel = connection.channel

channel.queue_declare(queue='Kb_number')

channel.basic_qos(prefetch_count=5)

channel.basic_consume(self.RabbitMQ, queue='Kb_number', )

channel.start_consuming()

 
 

这样设置后,我们每个消费者都会最多有5个消息

030319 0744 PythonRabbi1 - Python RabbitMQ 使用Fair dispatch公平分发确保每个消费者当前消息平均

未分配的消息就会在reday状态中

030319 0744 PythonRabbi2 - Python RabbitMQ 使用Fair dispatch公平分发确保每个消费者当前消息平均

注: 在设置时候遇到的坑

申明basic_qos时一定要在注册consume(basic_consume)之前设置,在之后设置不生效,虽然从控制台上可以看到prefetch有设定的值

 
 

 
 

 
 

头像

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: