最通俗易懂的Redis公布订阅及代码实战

公布订阅简介

除了使用List实现简朴的新闻行列功效以外,Redis还提供了公布订阅的新闻机制。在这种机制下,新闻公布者向指定频道(channel)公布新闻,新闻订阅者可以收到指定频道的新闻,同一个频道可以有多个新闻订阅者,如下图:

最通俗易懂的Redis公布订阅及代码实战

Redis也提供了一些下令支持这个机制,接下来我们详细先容一下这些下令。

迎接关注微信民众号:万猫学社,每周一分享Java手艺干货。

公布订阅相关下令

在Redis中,公布订阅相关下令有:

  1. 公布新闻
  2. 订阅频道
  3. 作废订阅
  4. 根据模式订阅
  5. 根据模式作废订阅
  6. 查询订阅信息

公布新闻

公布新闻的下令是publish,语法是:

publish 频道名称 新闻

好比,要向channel:one-more-study:demo频道公布一条新闻“I am One More Study.”,下令如下:

> publish channel:one-more-study:demo "I am One More Study."
(integer) 0

返回的效果是订阅者的个数,上例中没有订阅者,以是返回效果为0。

订阅新闻

订阅新闻的下令是subscribe,订阅者可以订阅一个或者多个频道,语法是:

subscribe 频道名称 [频道名称 ...]

好比,订阅一个channel:one-more-study:demo频道,下令如下:

> subscribe channel:one-more-study:demo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:one-more-study:demo"
3) (integer) 1

返回效果中有3条,划分示意:返回值的类型(订阅乐成)、订阅的频道名称、现在已订阅的频道数目。当订阅者接受到新闻时,就会显示:

1) "message"
2) "channel:one-more-study:demo"
3) "I am One More Study."

同样也是3条效果,划分示意:返回值的类型(信息)、新闻来源的频道名称、新闻内容。

新开启的订阅者,是无法收到该频道之前的历史新闻的,由于Redis没有对公布的新闻做持久化。

作废订阅

作废订阅的下令是unsubscribe,可以作废一个或者多个频道的订阅,语法是:

unsubscribe [频道名称 [频道名称 ...]]

好比,作废订阅channel:one-more-study:demo频道,下令如下:

> unsubscribe channel:one-more-study:demo
1) "unsubscribe"
2) "channel:one-more-study:demo"
3) (integer) 0

返回效果中有3条,划分示意:返回值的类型(作废订阅乐成)、作废订阅的频道名称、现在已订阅的频道数目。

迎接关注微信民众号:万猫学社,每周一分享Java手艺干货。

按模式订阅新闻

按模式订阅新闻的下令是psubscribe,订阅一个或多个相符给定模式的频道,语法是:

堆溢出—glibc malloc

psubscribe 模式 [模式 ...]

每个模式以 * 作为匹配符,好比 channel* 匹配所有以 channel 开头的频道,下令如下:

> psubscribe channel:*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "channel*"
3) (integer) 1

返回效果中有3条,划分示意:返回值的类型(按模式订阅乐成)、订阅的模式、现在已订阅的模式数目。当订阅者接受到新闻时,就会显示:

1) "pmessage"
2) "channel*"
3) "channel:one-more-study:demo"
4) "I am One More Study."

返回效果中有4条,划分示意:返回值的类型(信息)、新闻匹配的模式、新闻来源的频道名称、新闻内容。

按模式作废订阅

按模式作废订阅的下令是punsubscribe,可以作废一个或者多个模式的订阅,语法是:

punsubscribe [模式 [模式 ...]]

每个模式以 * 作为匹配符,好比 channel:* 匹配所有以 channel 开头的频道,下令如下:

1> punsubscribe channel:*
1) "punsubscribe"
2) "channel:*"
3) (integer) 0

返回效果中有3条,划分示意:返回值的类型(按模式作废订阅乐成)、作废订阅的模式、现在已订阅的模式数目。

迎接关注微信民众号:万猫学社,每周一分享Java手艺干货。

查询订阅信息

查看活跃频道

活跃频道指的是至少有一个订阅者的频道,语法是:

pubsub channels [模式]

好比:

> pubsub channels
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
3) "channel:demo"
> pubsub channels *demo
1) "channel:one-more-study:demo"
2) "channel:demo"
> pubsub channels *one-more-study*
1) "channel:one-more-study:test"
2) "channel:one-more-study:demo"
查看频道订阅数
pubsub numsub [频道名称 ...]

好比:

> pubsub numsub channel:one-more-study:demo
1) "channel:one-more-study:demo"
2) (integer) 1
查看模式订阅数
> pubsub numpat
(integer) 1

代码实战

言而不行假把式,我们使用Java语言写一个简朴的公布订阅示例。

Jedis集群示例

Jedis是Redis官方推荐的Java毗邻开发工具,我们使用Jedis写一个简朴的集群示例。

package onemore.study;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;
import java.util.Set;

/**
 * Jedis集群
 *
 * @author 万猫学社
 */
public enum Cluster {
    INSTANCE;

    //为了简朴,把IP和端口直接写在这里,现实开发中写在配置文件会更好。
    private final String hostAndPorts = "192.168.0.60:6379;192.168.0.61:6379;192.168.0.62:6379";
    private JedisCluster jedisCluster;

    Cluster() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();

        //最大毗邻数
        poolConfig.setMaxTotal(20);
        //最大空闲数
        poolConfig.setMaxIdle(10);
        //最小空闲数
        poolConfig.setMinIdle(2);

        //从jedis毗邻池获取毗邻时,校验并返回可用的毗邻
        poolConfig.setTestOnBorrow(true);
        //把毗邻放回jedis毗邻池时,校验并返回可用的毗邻
        poolConfig.setTestOnReturn(true);

        Set<HostAndPort> nodes = new HashSet<>();
        String[] hosts = hostAndPorts.split(";");
        for (String hostport : hosts) {
            String[] ipport = hostport.split(":");
            String ip = ipport[0];
            int port = Integer.parseInt(ipport[1]);
            nodes.add(new HostAndPort(ip, port));
        }
        jedisCluster = new JedisCluster(nodes, 1000, poolConfig);
    }

    public JedisCluster getJedisCluster() {
        return jedisCluster;
    }
}

迎接关注微信民众号:万猫学社,每周一分享Java手艺干货。

公布者示例

package onemore.study;

import redis.clients.jedis.JedisCluster;

/**
 * 公布者
 *
 * @author 万猫学社
 */
public class Publisher implements Runnable {
    private final String CHANNEL_NAME = "channel:one-more-study:demo";
    private final String QUIT_COMMAND = "quit";

    @Override
    public void run() {
        JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
        for (int i = 1; i <= 3; i++) {
            String message = "第" + i + "新闻";
            System.out.println(Thread.currentThread().getName() + " 公布:" + message);
            jedisCluster.publish(CHANNEL_NAME, message);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("------------------");
        }
        jedisCluster.publish(CHANNEL_NAME, QUIT_COMMAND);
    }
}

订阅者示例

package onemore.study;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPubSub;

/**
 * 订阅者
 *
 * @author 万猫学社
 */
public class Subscriber implements Runnable {
    private final String CHANNEL_NAME = "channel:one-more-study:demo";
    private final String QUIT_COMMAND = "quit";

    private final JedisPubSub jedisPubSub = new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            System.out.println(Thread.currentThread().getName() + " 吸收:" + message);
            if (QUIT_COMMAND.equals(message)) {
                unsubscribe(CHANNEL_NAME);
            }
        }
    };

    @Override
    public void run() {
        JedisCluster jedisCluster = Cluster.INSTANCE.getJedisCluster();
        jedisCluster.subscribe(jedisPubSub, CHANNEL_NAME);
    }
}

迎接关注微信民众号:万猫学社,每周一分享Java手艺干货。

综合示例

package onemore.study;

public class App {
    public static void main(String[] args) throws InterruptedException {
        //建立3个订阅者
        new Thread(new Subscriber()).start();
        new Thread(new Subscriber()).start();
        new Thread(new Subscriber()).start();
        Thread.sleep(1000);

        //建立公布者
        new Thread(new Publisher()).start();
    }
}

运行效果如下:

Thread-6 公布:第1新闻
Thread-0 吸收:第1新闻
Thread-1 吸收:第1新闻
Thread-2 吸收:第1新闻
------------------
Thread-6 公布:第2新闻
Thread-0 吸收:第2新闻
Thread-1 吸收:第2新闻
Thread-2 吸收:第2新闻
------------------
Thread-6 公布:第3新闻
Thread-0 吸收:第3新闻
Thread-2 吸收:第3新闻
Thread-1 吸收:第3新闻
------------------
Thread-0 吸收:quit
Thread-1 吸收:quit
Thread-2 吸收:quit

微信民众号:万猫学社

微信扫描二维码

获得更多Java手艺干货

最通俗易懂的Redis公布订阅及代码实战

原创文章,作者:admin,如若转载,请注明出处:https://www.2lxm.com/archives/6408.html