Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

49.12. Redis Pub/Sub

Redis Pub/Sub 基于以下核心角色:

		
发布者(Publisher):发送消息到指定频道(Channel)。
订阅者(Subscriber):订阅一个或多个频道,接收频道中的消息。
频道(Channel):消息的传输媒介,发布者向频道发送消息,订阅者从频道接收消息。		
		
		

49.12.1. MessageListener 方案

配置 Redis

			
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            RedisMessageListener redisMessageListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        
        // 订阅 "my-channel" 频道
        container.addMessageListener(redisMessageListener, new ChannelTopic("my-channel"));
        
        return container;
    }
}			
			
			

使用 PatternTopic("*") 订阅所有频道

			
        container.addMessageListener(redisMessageListener, new PatternTopic("*"));			
			
			
			
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class RedisMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 解析消息内容
        String channel = new String(message.getChannel());
        String messageBody = new String(message.getBody());
        System.out.println("接收到消息:频道=" + channel + ",内容=" + messageBody);
    }
}			
			
			

49.12.2. @RedisListener 注解方案

49.12.2.1. 配置 Redis 消息监听器容器

			
package cn.netkiller.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Configuration
public class RedisConfiguration {
    @Bean
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "lock", 300000L);
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}
			
				

49.12.2.2. 订阅者(Subscriber):订阅一个或多个频道,接收频道中的消息。

				
import org.springframework.data.redis.listener.annotation.RedisListener;
import org.springframework.stereotype.Component;

@Component
public class RedisMessageListener {

    // 订阅单个频道
    @RedisListener(channel = "my-channel")
    public void onMessage(String message) {
        System.out.println("接收到消息:" + message);
    }

    // 订阅多个频道
    @RedisListener(channels = {"channel1", "channel2"})
    public void onMultiChannelMessage(String message) {
        System.out.println("多频道消息:" + message);
    }

    // 订阅匹配模式的频道(如以 "user-" 开头的所有频道)
    @RedisListener(pattern = "user-*")
    public void onPatternMessage(String message) {
        System.out.println("模式匹配消息:" + message);
    }
}				
				
				

使用通配符订阅频道

				
import org.springframework.data.redis.listener.annotation.RedisListener;
import org.springframework.stereotype.Component;

@Component
public class RedisAnnotationListener {

    // 注解方式订阅所有频道
    @RedisListener(pattern = "*")
    public void onAllChannelMessage(String message, String channel) {
        System.out.println("注解方式接收:频道=" + channel + ",内容=" + message);
    }
}				
				
				

49.12.2.3. 发布者(Publisher):发送消息到指定频道(Channel)

				
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class RedisMessagePublisher {

    private final StringRedisTemplate stringRedisTemplate;

    public RedisMessagePublisher(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    // 发布消息到指定频道
    public void publish(String channel, String message) {
        stringRedisTemplate.convertAndSend(channel, message);
        System.out.println("发布消息:频道=" + channel + ",内容=" + message);
    }
}
				
				

49.12.2.4. 测试演示

				
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RedisPubSubController {

    private final RedisMessagePublisher publisher;

    public RedisPubSubController(RedisMessagePublisher publisher) {
        this.publisher = publisher;
    }

    // 访问 http://localhost:8080/publish/my-channel/hello 测试发布消息
    @GetMapping("/publish/{channel}/{message}")
    public String publish(@PathVariable String channel, @PathVariable String message) {
        publisher.publish(channel, message);
        return "消息已发布:频道=" + channel + ",内容=" + message;
    }
}			
				
				

49.12.3. 线程池

			
package cn.netkiller.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class RedisConfiguration {
    @Bean
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "lock", 300000L);
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 配置线程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("RedisMessageListenerContainer-");
        executor.setThreadGroupName("Redis");
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        executor.initialize();
        container.setTaskExecutor(executor);

        return container;
    }
}