当前位置: 56net亚洲必嬴 > 编程 > 正文

Python开拓【十生机勃勃章】:RabbitMQ队列

时间:2019-11-01 11:06来源:编程
RabbitMQ队列 率先大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的事务是同样的,都以队列,用于传递新闻 在python的queue中有八个一个是线程queue,多少个是进度queue(multiproce

RabbitMQ队列

率先大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的事务是同样的,都以队列,用于传递新闻

在python的queue中有八个一个是线程queue,多少个是进度queue(multiprocessing中的queue)。线程queue无法跨进度,用于三个线程之间开展多少同步交互;进程queue只是用于父进程与子进程,或然同属于同意父进程下的三个子进程实行交互。也正是说假设是多少个完全部独用立的主次,即便是python程序,也依旧无法用那么些进程queue来通讯。那即使我们有三个单身的python程序,分属于四个进度,大概是python和别的语言

安装:windows下

首先需求设置 Erlang景况

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

然后安装RabbitMQ了 

先是下载RabbitMQ 的Windows版本

下载地址:

安装pika:

前边设置过了pip,直接展开cmd,运维pip install pika

安装完成之后,达成二个最轻便易行的行列通讯:

www.56.net 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先成立一个中坚的socket,然后创立一个管道,在管道中发新闻,然后声多美滋(Dumex)个queue,起个类别的名字,之后真正的发信息(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要风流洒脱运营就径直运维下去,他不停收一条,恒久在此卡住。

在上边不管是produce依旧consume,里面都宣称了二个queue,那几个是干什么吧?因为大家不亮堂是客商先发轫运营依然生产者先运营,那样生龙活虎旦未有注脚的话就能够报错。

下边大家来看一下风流罗曼蒂克对多,即三个劳动者对应多少个买主:

先是大家运维3个买主,然后不断的用produce去发送数据,大家可以看看顾客是经过黄金年代种轮询的办法实行不断的收受多少,各个客户开支四个。

那么只要大家客商收到了新闻,然后管理那么些音讯须求30分钟,在管理的进度中,花费者断电了宕机了,那开销者还不曾拍卖完,大家设那几个任务大家亟须管理完,那我们相应有贰个承认的音讯,说那些职务成功了依然是还没实现,所以自个儿的劳动者要肯定花费者是不是把这么些职务管理完了,花费者处理完现在要给那几个生产者服务器端发送一个明显音信,生产者才会把那些义务从音信队列中删除。若无拍卖完,花费者宕机了,未有给劳动者发送确认音信,这就表示并未有拍卖完,那我们看看rabbitMQ是怎么管理的

大家能够在客户的callback中增添一个time.sleep()举办模拟宕机。callback是一个回调函数,只要事件一触发就能够调用那些函数。函数试行完了就意味着音信管理完了,假使函数未有拍卖完,那就注解。。。。

大家得以看来在花费者代码中的basic_consume()中有二个参数叫no_ack=True,那么些意思是那条新闻是还是不是被拍卖完都不会发送确认新闻,平时大家不加这么些参数,rabbitMQ暗许就能给您设置成新闻管理完了就自动发送确认,大家后天把那么些参数去掉,况且在callback中增加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动管理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

www.56.net 2www.56.net 3www.56.net 4

运营的结果正是,笔者先运维一遍生产者,数据被消费者1选拔到了,不过作者把顾客1宕机,结束运营,那么开支者2就选择了音讯,即只要花费者绝非发送确认音信,生产者就不会把新闻删除。

RabbitMQ音信悠久化:

大家能够改动加许多的音讯队列,那大家怎么查看音信队列的景况呢:rabbitmqctl.bat list_queues

www.56.net 5

现行的状态是,音讯队列中还有音讯,但是服务器宕机了,这那么些新闻就丢了,那自个儿就须要这么些消息强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在每便证明队列的时候增加叁个durable参数(顾客端和劳动器端都要抬高这些参数),

www.56.net 6

在此个意况下,我们把rabbitMQ服务重视启,开掘只有队列名留下了,但是队列中的新闻未有了,那样我们还要求在劳动者basic_publish中加多八个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此就足以使得新闻持久化

现行反革命是两个劳动者对应五个买主,很公道的收发收发,可是实际上的状态是,大家机器的布局是不均等的,有的配置是单核的部分配置是多核的,恐怕i7管理器管理4条音信的时候和别的的微型Computer管理1条新闻的时刻基本上,那差的微处理器这里就能够聚成堆音讯,而好的管理器那里就能形成闲置,在实际中做运转的,大家会在负载均衡中装置权重,哪个人的布局高权重高,职务就多一点,可是在rabbitMQ中,我们只做了三个简易的管理就足以完结公道的音讯分发,你有多大的力量就管理多少音信

即:server端给客商端发送消息的时候,先反省今后还应该有多少消息,若是当前新闻并未有管理完结,就不会发送给那一个开支者音信。若是当前的花费者绝非新闻就发送

以此只供给在花费者端进行更改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在转移一个consume2,在callback中sleep20秒来模拟

www.56.net 7www.56.net 8www.56.net 9

本身先运转多个produce,被consume选拔,然后在开发银行叁个,就被consumer2接纳,可是因为consumer第22中学sleep20秒,管理慢,所以当时在运营produce,就又给了consume举办拍卖

 

RabbitMQ队列

前言:这一次整合治理写黄金年代篇有关rabbitMQ的博客,相比较上豆蔻梢头篇redis,认为rabbitMQ难度是进步不菲。那篇博客会插入一些英语疏解,但是简单明白的。rabbitMQ的下载与安装,请参见redis&rabbitMQ安装。

PublishSubscrible(音讯宣布订阅)

近期都以1对1的出殡和安葬选择数据,这作者想1对多,想广播相像,生产者发送三个信息,全数客商都接收新闻。那我们如何做呢?那时候大家将要用到exchange了

exchange在生龙活虎端收消息,在另风流罗曼蒂克端就把新闻放进queue,exchange必得正确的接头收到的信息要怎么,是还是不是应当发到二个一定的queue依然发给大多queue,也许说把她废弃,那么些都被exchange的品类所定义

exchange在概念的时候是有项指标,以调节到底是那么些queue切合条件,能够选用音信:

fanout:全数bind到此exchange的queue都得以担当音讯

direct:通过rounroutingKey和exchange决定的可怜唯意气风发的queue可以收到新闻

topic:全体符合routingKey的routingKey所bind的queue能够选取音讯

headers:通过headers来调节把新闻发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange从前是空的,现在赋值log;在此边也并未有声明queue,广播不必要写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在消费者这里大家有定义了八个queue,注意一下批注中的内容。不过大家在发送端未有申明queue,为何发送端不须要选取端要求吗?在consume里有一个channel.queue_bind()函数,里面绑定了exchange转变器上,当然里面还供给贰个queue_name

运作结果:

www.56.net 10www.56.net 11www.56.net 12www.56.net 13

就约等于收音机同样,实时播报,张开多个买主,生产者发送一条数据,然后3个顾客同一时候收到到

rabbitMQ是新闻队列;想想早前的我们学过队列queue:threading queue(线程queue,多少个线程之间张开数量交互)、进程queue(父进度与子进程展开交互或许同属于同意气风发父进度下的八个子进度张开互动);假设三个独立的顺序,那么之间是无法由此queue实行交互的,当时大家就供给叁个个中代理即rabbitMQ

rabbitMQ是音讯队列;想想在此之前的大家学过队列queue:threading queue(线程queue,八个线程之间开展多少交互)、进程Queue(父进度与子进程张开互动只怕同属于同豆蔻年华父进程下的四个子进度张开交互);假诺多少个独立的前后相继,那么之间是不可能经过queue举行互动的,那时大家就供给贰此中等代理即rabbitMQ.

有取舍的收受新闻(exchange_type = direct)

RabbitMQ还协理根据注重字发送,即:队列绑定关键字,发送者将数据依照重大字发送到消息exchange,exchange依照重大字决断应该将数据发送到钦命的类别

www.56.net 14

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更为细致的过滤(exchange_type=topic)

www.56.net 15

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

以上都以服务器端发新闻,客商端收音信,音信流是单向的,那倘使大家想要发一条命令给长途的顾客端去试行,然后想让客商端施行的结果回到,则这种情势叫做rpc

RabbitMQ RPC

www.56.net 16

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是跻身二个梗阻形式,未有音信就等候音信,有音讯就收过来

self.connection.process_data_events()是一个非阻塞版的start_consuming,就是说发了三个事物给客商端,每过一点日子去反省有没有消息,若无信息,能够去干别的事情

reply_to = self.callback_queue是用来抽取反应队列的名字

corr_id = str(uuid.uuid4()),correlation_id第风流罗曼蒂克在顾客端会通过uuid4生成,第二在劳务器端再次回到奉行结果的时候也会传过来一个,所以说假若服务器端发过来的correlation_id与协和的id相近,那么服务器端发出来的结果就一定会将是本人正好客商端发过去的下令的举办结果。现在就三个劳动器端四个客户端,不留意缺人不承认。今后顾客端是非阻塞版的,我们得以不让它打字与印刷未有信息,而是进行新的通令,那样就两条信息,不自然按梯次达成,那我们就供给去确认种种重回的结果是哪个命令的实行结果。

生机勃勃体化的情势是如此的:生产者发了贰个下令给买主,不领会顾客端曾几何时回来,依旧要去收结果的,然则它又不想步入阻塞情势,想每过生机勃勃段时间看这些音讯收回来未有,固然新闻收回来了,就代表收完了。 

运营结果:

www.56.net 17www.56.net 18

劳务器端开启,然后在开行客商端,顾客端先是等待新闻的出殡和安葬,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

消息队列:

 

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ...........

大器晚成、轻便的rabbitMQ队列通讯

www.56.net 19

由上海教室能够,数据是头阵给exchange沟通器,exchage再发放相应队列。pika模块是python对rabbitMQ的API接口。选择端有一个回调函数,后生可畏接受到数码就调用该函数。一条新闻被一个费用者收到后,该消息就从队列删除。OK,驾驭上边的学问后,先来探访贰个大概的rabbitMQ列队通讯。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运作结果:(下边包车型大巴代码对应自身写的注释相信是看得懂的~)

www.56.net 20www.56.net 21

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

透过浓重的测量试验,有以下五个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运维。开掘当receive运维时还可以接受数据。
  2. 运营七个(eg:3个)采取数据的客商端,再运营发送端,顾客端1接纳数量,再运维发送端,客商端2收到数量,再运转载送端,顾客端3收受数额。

RabbitMQ会暗中同意把p发的新闻依次分发给各种花费者(c),跟负载均衡大致。

 

原理:

二、全英文ack

在看上边的例证,你会发觉有一句代码no_ack=False(花费完成后向服务端发送二个承认,默感到False),以作者德文四级飘过的水准,看完上面关于ack的教学感到写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

自身把发送端和选用端分别比作生产者与买主。生产者发送职务A,花费者收到义务A并拍卖,管理完后生产者将新闻队列中的职分A删除。以后我们遭受了二个标题:假设开支者采用职分A,但在拍卖的经过中陡然宕机了。而此刻生产者将新闻队列中的义务A删除。实际上职责A并未有得逞拍卖完,也等于遗失了职分/新闻。为化解这一个主题素材,应使顾客收到职分并打响拍卖完后发送三个ack到生产者!生产者收到ack后就理解职分A已被成功拍卖,这个时候才从音讯队列大校义务A删除,若无吸收ack,就需求把职务A发送给下三个主顾,直到职务A被成功拍卖。

 

www.56.net 22

三、音信长久化

眼前早就知晓,生产者生产总的数量,花费者再开发银行是能够采纳数据的。

但是,生产者生产数据,然后重启rabbitMQ,开销者是束手无策采取数据。

eg:音讯在传输进程中rabbitMQ服务器宕机了,会发觉此前的新闻队列就空中楼阁了,那时候大家将在用到消息长久化,新闻持久化会让队列不趁早服务器宕机而消失,组织带头人久的保留下来。上面看下关于新闻长久化的保加比什凯克语解说:

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable  use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

      1 channel.queue_declare(queue='hello', durable=True)

Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error(会曝错) to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

      1 channel.queue_declare(queue='task_queue', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

      1 channel.basic_publish(exchange='',
      2                       routing_key="task_queue",
      3                       body=message,
      4                       properties=pika.BasicProperties(
      5                          delivery_mode = 2,      # make message persistent
      6                       ))

地点的德语对信息持久化讲得很好。音信持久化分为两步:

  • www.56.net,长久化队列。通过代码完结长久化hello队列:channel.queue_declare(queue='hello', durable=True)
  • 持久化队列中的音讯。通过代码达成:properties=pika.BasicProperties( delivery_mode = 2, )

此间有个点要留神下:

假使您在代码中已兑现长久化hello队列与队列中的音讯。那么你重启rabbitMQ后再行运维代码大概会爆错!

因为: RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error.

为了缓慢解决那个主题材料,能够声贝因美(Beingmate)个与重启rabbitMQ以前分歧的队列名(queue_name).

 

1、安装和着力选用

四、音信公平分发

即使Rabbit只管按顺序把音信发到各类花费者身上,不思考花费者负载的话,很恐怕现身,二个机器配置不高的主顾这里聚积了不知凡几新闻管理不完,同有时候安顿高的买主却直接超级轻易。为减轻此难题,能够在各样花费者端,配置perfetch=1,意思就是告诉RabbitMQ在自己这么些花费者当前新闻尚未管理完的时候就无须再给作者发新音信了。

www.56.net 23

 

带音信长久化+公平分发的风流浪漫体化代码

生产者端:

www.56.net 24www.56.net 25

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

买主端:

www.56.net 26www.56.net 27

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

自己在运营方面程序时对顾客端里回调函数的一句代码(ch.basic_ack(delivery_tag =method.delivery_tag))拾叁分嫌疑。那句代码去掉花费者端也能长期以来收到消息啊。那句代码有毛线用处??

劳动者端音讯持久后,需求在花费者端加上(ch.basic_ack(delivery_tag =method.delivery_tag)): 保证音信被花费后,开支端发送贰个ack,然后服务端从队列删除该新闻.

 

安装RabbitMQ服务  

五、消息透露与订阅

事先的例子都基本皆以1对1的新闻发送和采用,即音讯只好发送到钦命的queue里,但有个别时候你想让您的消息被有着的queue收到,相通广播的功能,此时就要用到exchange了。PS:风乐趣的摸底redis的发表与订阅,能够看看本人写的博客python之redis。

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded(丢弃). The rules for that are defined by the exchange type.

Exchange在概念的时候是有档期的顺序的,以调整到底是什么样Queue切合条件,能够选取新闻

 

fanout: 全部bind到此exchange的queue都能够吸收接纳音讯

direct: 通过routingKey和exchange决定的极其唯少年老成的queue能够收起音信

topic:全数切合routingKey(那个时候能够是二个表明式)的routingKey所bind的queue可以选取新闻

 

表明式符号表达: #表示一个或八个字符,*意味着任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
                *.a会匹配a.a,b.a,c.a等
            注:使用RoutingKey为#,Exchange Type为topic的时候一定于选拔fanout

 

上面笔者分别说下fanout,direct,topic:

1、fanout

fanout: 全数bind到此exchange的queue都足以接过消息

www.56.net 28

send端:

www.56.net 29www.56.net 30

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

www.56.net 31www.56.net 32

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有三个点要静心下:

  • fanout-广播,send端的routing_key='', #fanout的话为空(私下认可)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),功用:不钦点queue名字(为了收广播),rabbitMQ会随机分配一个queue名字,exclusive=True会在使用此queue的主顾断开后,自动将queue删除。

 

2、有选拔的吸收接纳消息(exchange type=direct)

RabbitMQ还援助依照重大字发送,即:队列绑定关键字,发送者将数据依赖着重字发送到新闻exchange,exchange依照 关键字 判别应该将数据发送至内定队列。

www.56.net 33

send端:

www.56.net 34www.56.net 35

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

www.56.net 36www.56.net 37

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

骨子里最起始自己看代码是一脸懵逼的~ 上边是自己在cmd举行测量检验的截图(同盟着截图看会轻便精晓些),二个send端,八个receive端(先起receive端,再起receive端):

send端:

www.56.net 38

receive端-1:

www.56.net 39

receive端-2:

www.56.net 40

 

3、更紧凑的新闻过滤topic(供参谋)

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

深感本人立陶宛共和国(Republic of Lithuania)语水准不高啊~,作者相比着垃圾有道翻译,加上本身的明白,大约知道地点在讲怎样。

举个例子来讲: 假若是系统的错误,就把新闻发送到A,假使是MySQL的失实,就把音信发送到B。不过对B来讲,想完毕接受MySQL的错误新闻,能够用有取舍的选择音讯(exchange type=direct),让机要字为error就落成了啊!今后B有个供给:不是装有的错误新闻都接到,只收到钦命的荒谬。在某种音信再张开过滤,那正是更紧凑的消息过滤topic。

 

send端:

www.56.net 41www.56.net 42

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

www.56.net 43www.56.net 44

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

RPC的定义可看作者百度的(其实就象是作者后面做的FTP,笔者从客商端发贰个指令,服务端再次来到相关新闻):

www.56.net 45www.56.net 46

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

上边爱慕讲下RPC通讯,小编刚初步学挺难的,学完事后感到RPC通讯的想想很有启示性,代码的事例写得也很牛!!

www.56.net 47

client端发的新闻被server端选择后,server端会调用callback函数,推行任务后,还须求把相应的音讯发送到client,然而server如何将音讯发还给client?假如有八个client连接server,server又怎么精晓是要发给哪个client??

RPC-server私下认可监听rpc_queue.肯定无法把要发给client端的音讯发到rpc_queue吧(rpc_queue是监听client端发到server端的数据)。

客观的方案是server端另起一个queue,通过queue将音信重临给对应client。但难题又来了,queue是server端起的,故client端断定不知情queue_name,连queue_name都不精通,client端选用毛线的数码??

寸草不留办法:

客户端在出殡和下葬指令的同时告诉服务端:任务试行完后,数据经过某队列重返结果。顾客端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= 
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

转折表明出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

2、达成最简单易行的体系通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会并发报错,必要提供客户名密码

3、RabbitMQ新闻分发轮询

先运转音信生产者,然后再分别运转3个买主,通过生产者多发送几条新闻,你会开掘,这几条音讯会被每种分配到各种花费者身上

www.56.net 48

 

在这里种格局下,RabbitMQ会默许把p发的新闻公平的相继分发给各类花费者(c),跟负载均衡大概

www.56.net 49www.56.net 50

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

www.56.net 51www.56.net 52

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

通过实行pubulish.py和consume.py能够兑现地点的消息公平分发,这如若c1收受音讯之后宕机了,会冒出什么样意况吧?rabbitMQ是什么样管理的?现在大家模拟一下

www.56.net 53www.56.net 54

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

www.56.net 55www.56.net 56

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里增添了time.sleep模拟函数管理,通过上边程序开展模拟开掘,c1接受到音讯后未有拍卖完乍然宕机,音讯就从队列上海消防失了,rabbitMQ把音讯删除掉了;假如程序供给新闻一定要拍卖完技能从队列里删除,那我们就须求对程序实行拍卖一下

www.56.net 57www.56.net 58

编辑:编程 本文来源:Python开拓【十生机勃勃章】:RabbitMQ队列

关键词: