Redis的发布/订阅工作模式详解

Redis的SUBSCRIBEUNSUBSCRIBEPUBLISH命令实现了消息的发布/订阅功能。发送者(消息发布者)不需要编程,就能够向特定的接收者(消息订阅者)发送消息了。Redis会将已发布的消息放入指定的频道之中,消息发布者不需要知道具体有哪些消息订阅者。订阅者可能会订阅一个或多个频道,并且只能接收已订阅频道中的消息,它们也不需要知道具体有哪些消息发布者。这种方式能够充分解耦发布者和订阅者之间的关系,也使得网络拓扑具有更好的伸缩性和动态性。

例如,若客户端想要订阅频道foo和bar,它可以运行SUBSCRIBE命令,以频道名称作为参数,如下所示:

  1. SUBSCRIBE foo bar

如果其他客户端向这些频道发送消息,那么Redis就会将这些消息推送给所有的订阅客户端。

如果一个客户端订阅了一个或多个频道,那么除了订阅和退订命令之外,这个客户端就不能运行其他命令了。订阅和退订操作的应答信息是以消息的形式发送的,客户端只能读取与其相关的消息流,每个消息的第一个部分表示消息的类型。处于订阅状态的客户端只能运行SUBSCRIBEPSUBSCRIBEUNSUBSCRIBEPUNSUBSCRIBEPINGQUIT命令。

一、环境描述

  1. 主机配置
    CPU:单核
    内存:2 GB
    IP:192.168.1.109

  2. 操作系统
    CentOS 6.6 x86_64 Minimal

  3. Redis版本
    Redis server v=3.2.4 sha=00000000:0 malloc=jemalloc-4.0.3 bits=64 build=431d8e4a684c794b

  4. Redis安装方式
    按照《在CentOS上安装Redis缓存系统

二、发布/订阅的相关命令

1. SUBSCRIBE

使得客户端订阅指定的频道。一旦客户端进入订阅状态,它就不能运行除了SUBSCRIBEPSUBSCRIBEUNSUBSCRIBEPUNSUBSCRIBE命令之外的其他命令。

这个命令的运行格式如下所示:

  1. SUBSCRIBE channel [channel ...]

时间复杂度为O(N),N是客户端想要订阅的频道的数量。

2. UNSUBSCRIBE

使得客户端退订指定的频道,如果不指定任何频道,那么就退订所有频道。

这个命令的运行格式如下所示:

  1. UNSUBSCRIBE [channel [channel ...]]

时间复杂度为O(N),N是客户端想要退订的频道的数量。

3. PSUBSCRIBE

使得客户端订阅指定模式的频道,支持glob风格的模式,例如:

  • h?llo:可以订阅hello、hallo和hxllo频道(?表示单个任意字符)。
  • h*llo:可以订阅hllo和heeeello频道(*表示任意多个任意字符,包括空字符)。
  • h[ae]llo:可以订阅hello和hallo频道,但是不能订阅hillo频道(选择[]之间的任意一个字符)。

如果想要将上述通配符作为普通字符进行处理,则需要使用\符号进行转义。

这个命令的运行格式如下所示:

  1. PSUBSCRIBE pattern [pattern ...]

时间复杂度为O(N),N是客户端已经订阅的模式的数量。

4. PUNSUBSCRIBE

使得客户端退订指定的模式所对应的频道,如果不指定任何模式,那么就退订所有的模式。

这个命令的运行格式如下所示:

  1. PUNSUBSCRIBE [pattern [pattern ...]]

时间复杂度为O(N+M),N是客户端已经订阅的模式的数量,M是系统中已经订阅的模式的总数量。。

5. PUBLISH

向指定的频道提交一条消息。返回值是一个整数,表示接收到这条消息的客户端的数量。

这个命令的运行格式如下所示:

  1. PUBLISH channel message

时间复杂度为O(N+M),N是订阅这个接收频道的客户端的数量,M是系统中已经订阅的模式的总数量。

6. PUBSUB

PUBSUB命令是一个自检命令,可用于检查发布/订阅子系统的状态。这个命令是由几个子命令组成的,下面会分别描述。这个命令的一般格式如下所示:

  1. PUBSUB <subcommand> ... args ...
6.1 PUBSUB CHANNELS [pattern]

列出当前有效的频道。有效频道是具有一个或多个订阅者(不包括订阅模式的客户端)的发布/订阅频道。

如果没有指定任何模式,那么就会列出所有的频道。如果指定了模式,那么就只会列出匹配这个模式的所有频道(此处使用glob风格的模式)。

这个命令的返回值是一个数组,它会列出所有的有效频道,包括匹配指定模式的有效频道。

时间复杂度为O(N),N是有效频道的数量,假设模式匹配的时间是恒定的(相对于较短的频道名称和模式而言)。

6.2 PUBSUB NUMSUB [channel-1 … channel-N]

返回指定频道的订阅者的数量(不会计算订阅模式的客户端的数量)。

这个命令的返回值是一个数组,它会列出参数指定的所有频道,以及每个频道的订阅者的数量。返回值的格式为频道、数量、频道、数量、...,因此,这个列表是扁平的。返回值列出的频道顺序和命令调用时指定的频道顺序是相同的。注意,调用这个命令时可以不指定频道,此时返回值是一个空列表。

时间复杂度为O(N),N是命令中指定的频道的数量。

6.3 PUBSUB NUMPAT

返回模式的订阅数量(也就是所有客户端运行PSUBSCRIBE命令的总次数)。注意,这个数量不是订阅模式的客户端的数量,而是所有客户端订阅的模式的总数量。

这个命令的返回值是一个整数,表示所有客户端订阅的模式的总数量。

时间复杂度为O(1)。

三、推送消息的格式

Redis的发布/订阅功能有两种工作模式:订阅频道(channel)和订阅模式(pattern)。这两种工作模式推送消息的格式是不同的,如下文所述。

1. 订阅频道

  • subscribe消息
    这种消息表示客户端已经成功地订阅了指定的频道,它由三部分组成:
    第一部分是subscribe字符串;第二部分表示想要订阅的频道名称;第三部分表示客户端当前已经订阅的频道数量。消息格式如下图所示:
    subscribe消息

  • unsubscribe消息
    这种消息表示客户端已经成功地退订了指定的频道,它由三部分组成:
    第一部分是unsubscribe字符串;第二部分表示想要退订的频道名称;第三部分表示客户端当前已经订阅的频道数量。若第三部分的值为零,则表示客户端没有订阅任何频道,此时客户端便可以运行任意种类的Redis命令了。消息格式如下图所示:
    unsubscribe消息

  • message消息
    这种消息表示订阅频道的客户端已经成功地收到了另一个客户端向这个频道发送的信息,它由三部分组成:
    第一部分是message字符串;第二部分表示推送消息的频道名称;第三部分表示另一个客户端向这个频道发送的消息内容。消息格式如下图所示:
    message消息

2. 订阅模式

  • psubscribe消息
    这种消息表示客户端已经成功地订阅了指定的模式,它由三部分组成:
    第一部分是psubscribe字符串;第二部分表示想要订阅的模式名称;第三部分表示客户端当前已经订阅的模式数量。消息格式如下图所示:
    psubscribe消息

  • punsubscribe消息
    这种消息表示客户端已经成功地退订了指定的模式,它由三部分组成:
    第一部分是punsubscribe字符串;第二部分表示想要退订的模式名称;第三部分表示客户端当前已经订阅的模式数量。若第三部分的值为零,则表示客户端没有订阅任何模式,此时客户端便可以运行任意种类的Redis命令了。消息格式如下图所示:
    punsubscribe消息

  • pmessage消息
    这种消息表示订阅模式的客户端已经成功地收到了另一个客户端向这个模式所对应的频道发送的信息,它由三部分组成:
    第一部分是pmessage字符串;第二部分表示推送消息的频道模式的名称;第三部分表示另一个客户端向这个频道模式发送的消息内容。消息格式如下图所示:
    pmessage消息

四、注意事项

1. 数据库和作用域

发布/订阅和键空间没有任何关系。它被设计为不会对键空间造成任何影响,包括数据库编号。这就意味着,在db 10上发布消息,仍然可以被在db 1上的订阅者监听到。

如果你需要某种类型的作用域,那么可以为频道名称添加环境前缀,例如:test、staging、production,等等。

2. 频道退订

如果客户端是在Redis命令行(redis-cli)中进入订阅监听状态的,那么它是不能直接运行UNSUBSCRIBEPUNSUBSCRIBE命令的,必须通过telnet之类的工具才能在订阅监听状态中进行退订操作。

3. 订阅计数

subscribeunsubscribepsubscribepunsubscribe消息类型中,消息的最后一部分是客户端仍然有效的订阅数量。这个数量实际上是客户端仍然在订阅的频道和模式的总数。只有当这个数量变为零时,客户端才会退出发布/订阅状态,这就意味着客户端已经退订了所有的频道和模式

4. 模式订阅和频道订阅

如果某个客户端订阅了多个模式(或者多个模式和频道),并且这些模式都能匹配到同一条消息,那么这个客户端就会多次收到这条相同的消息。例如,某个客户端同时订阅了一个频道和一个模式:

  1. SUBSCRIBE foo
  2. PSUBSCRIBE f*

在上面的例子中,如果向频道foo发送一条消息,那么这个客户端将会收到两条消息:一条是message类型的消息,另一条是pmessage类型的消息。

五、命令行示例

1. 订阅频道

Step-1 订阅频道

打开一个Shell终端(此处取名为终端-1),运行以下命令:

  1. telnet localhost 6379

然后,在telnet提示符中输入以下命令:

  1. subscribe mychannel

若上述命令的返回信息(也就是subscribe消息)如下图所示,则表示频道订阅成功:

订阅mychannel频道

在上图中,*3表示消息有三部分组成;$9表示下面有9字节长的字符串,也就是subscribe字符串,这是消息的第一部分;接下来,还有一个$9,表示下面有9字节长的字符串,也就是mychannel字符串,这是消息的第二部分;最后,:1表示这个客户端订阅的频道数量。在下面的示例中,返回消息的结构和含义大致相同,本文也就不再赘述。

Step-2 发布消息

打开另一个Shell终端(此处取名为终端-2),运行以下命令:

  1. redis-cli

进入Redis客户端的命令行之后,运行以下命令,发布一条消息:

  1. publish mychannel hello

若上述命令在终端-2中的返回信息如下图所示,则表示消息发送成功:

发布消息

在上图中,(integer) 1表示收到这条消息的客户端的数量。

此时,在终端-1中可以看到客户端收到的消息(也就是message消息),如下图所示:

接收消息

Step-3 退订频道

终端-1的telnet提示符中输入以下命令:

  1. unsubscribe mychannel

若上述命令的返回信息(也就是unsubscribe消息)如下图所示,则表示频道退订成功:

退订mychannel频道

2. 订阅模式

Step-1 订阅模式

打开一个Shell终端(此处取名为终端-1),运行以下命令:

  1. telnet localhost 6379

然后,在telnet提示符中输入以下命令:

  1. psubscribe mychannel.*

若上述命令的返回信息(也就是psubscribe消息)如下图所示,则表示模式订阅成功:

订阅mychannel.*模式

Step-2 发布消息

打开另一个Shell终端(此处取名为终端-2),运行以下命令:

  1. redis-cli

进入Redis客户端的命令行之后,运行以下命令,发布一条消息:

  1. publish mychannel.test hello123

若上述命令在终端-2中的返回信息如下图所示,则表示消息发送成功:

发布消息

在上图中,(integer) 1表示收到这条消息的客户端的数量。

此时,在终端-1中可以看到客户端收到的消息(也就是pmessage消息),如下图所示:

接收消息

Step-3 退订模式

终端-1的telnet提示符中输入以下命令:

  1. punsubscribe mychannel.*

若上述命令的返回信息(也就是punsubscribe消息)如下图所示,则表示模式退订成功:

退订mychannel.*模式

3. 同时订阅频道和模式

Step-1 订阅频道和模式

打开一个Shell终端(此处取名为终端-1),运行以下命令:

  1. telnet localhost 6379

然后,在telnet提示符中输入以下命令:

  1. subscribe foo
  2. psubscribe f*

若上述命令的返回信息(也就是psubscribe消息)如下图所示,则表示频道和模式订阅成功:

订阅频道和模式

Step-2 发布消息

打开另一个Shell终端(此处取名为终端-2),运行以下命令:

  1. redis-cli

进入Redis客户端的命令行之后,运行以下命令,发布一条消息:

  1. publish foo hello

若上述命令在终端-2中的返回信息如下图所示,则表示消息发送成功:

发布消息

在上图中,(integer) 2表示收到这条消息的客户端的数量。

此时,在终端-1中可以看到客户端收到两条消息(也就是message消息和pmessage消息),如下图所示:

接收消息

六、Java示例

Jedis是一种用Java语言开发的Redis客户端,它具有轻量级和功能完备的特点。Jedis完全兼容于Redis 2.8.x和3.0.x版本。

以下代码简单示范了如何通过Jedis操作Redis缓存。

  • App.java
  1. package org.xninja.ghoulich.JedisTest;
  2. import redis.clients.jedis.Jedis;
  3. public class App {
  4. @SuppressWarnings("resource")
  5. public static void main(String[] args) {
  6. final Jedis jedis = new Jedis("192.168.1.109", 6379);
  7. final Jedis pjedis = new Jedis("192.168.1.109", 6379);
  8. final MyListener listener = new MyListener();
  9. final MyListener plistener = new MyListener();
  10. Thread thread = new Thread(new Runnable() {
  11. public void run() {
  12. jedis.subscribe(listener, "mychannel");
  13. }
  14. });
  15. Thread pthread = new Thread(new Runnable() {
  16. public void run() {
  17. pjedis.psubscribe(plistener, "mychannel.*");
  18. }
  19. });
  20. thread.start();
  21. pthread.start();
  22. }
  23. }

该程序建立了两个Jedis客户端,然后又建立了两个发布/订阅监听器,最后启动了两个线程,分别用于监听一个频道和一个模式。

  • MyListener.java
  1. package org.xninja.ghoulich.JedisTest;
  2. import redis.clients.jedis.JedisPubSub;
  3. public class MyListener extends JedisPubSub {
  4. // 取得订阅的消息后的处理
  5. public void onMessage(String channel, String message) {
  6. System.out.println("onMessage: " + channel + "=" + message);
  7. if (message.equals("quit"))
  8. this.unsubscribe(channel);
  9. }
  10. // 初始化订阅时候的处理
  11. public void onSubscribe(String channel, int subscribedChannels) {
  12. System.out.println("onSubscribe: " + channel + "=" + subscribedChannels);
  13. }
  14. // 取消订阅时候的处理
  15. public void onUnsubscribe(String channel, int subscribedChannels) {
  16. System.out.println("onUnsubscribe: " + channel + "=" + subscribedChannels);
  17. }
  18. // 初始化按模式的方式订阅时候的处理
  19. public void onPSubscribe(String pattern, int subscribedChannels) {
  20. System.out.println("onPSubscribe: " + pattern + "=" + subscribedChannels);
  21. }
  22. // 取消按模式的方式订阅时候的处理
  23. public void onPUnsubscribe(String pattern, int subscribedChannels) {
  24. System.out.println("onPUnsubscribe: " + pattern + "=" + subscribedChannels);
  25. }
  26. // 取得按模式的方式订阅的消息后的处理
  27. public void onPMessage(String pattern, String channel, String message) {
  28. System.out.println("onPMessage: " + pattern + "=" + channel + "=" + message);
  29. if (message.equals("quit"))
  30. this.punsubscribe(pattern);
  31. }
  32. }

这个监听器会对频道和模式的订阅、接收消息和退订等事件进行监听,然后进行相应的处理。

在Eclipse中运行App.java的main函数,此时会触发频道和模式的订阅事件,控制台中的输出如下图所示:

监听订阅事件

此时,示例程序已经订阅了mychannel频道和mychannel.*模式。然后,在redis-cli命令行中输入以下命令,向mychannel频道和mychannel.*模式各发送一条消息:

  1. publish mychannel "hello message for channel"
  2. publish mychannel.test "hello message for pattern"

此时,会触发示例程序的接收消息事件,Eclipse的控制台输出如下图所示:

监听接收消息事件

最后,在redis-cli命令行中输入以下命令,向mychannel频道和mychannel.*模式各发送一条用于退订的消息:

  1. publish mychannel quit
  2. publish mychannel.test quit

由示例程序的源码可知,当监听器收到内容为“quit”的消息时,便会退订mychannel频道和mychannel.*模式,然后终止执行。Eclipse的控制台输出如下图所示:

监听退订事件