| 知乎专栏 |
Redis Pub/Sub 基于以下核心角色:
发布者(Publisher):发送消息到指定频道(Channel)。 订阅者(Subscriber):订阅一个或多个频道,接收频道中的消息。 频道(Channel):消息的传输媒介,发布者向频道发送消息,订阅者从频道接收消息。
配置 Redis
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
RedisMessageListener redisMessageListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅 "my-channel" 频道
container.addMessageListener(redisMessageListener, new ChannelTopic("my-channel"));
return container;
}
}
使用 PatternTopic("*") 订阅所有频道
container.addMessageListener(redisMessageListener, new PatternTopic("*"));
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 解析消息内容
String channel = new String(message.getChannel());
String messageBody = new String(message.getBody());
System.out.println("接收到消息:频道=" + channel + ",内容=" + messageBody);
}
}
package cn.netkiller.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.redis.util.RedisLockRegistry;
@Configuration
public class RedisConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "lock", 300000L);
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
import org.springframework.data.redis.listener.annotation.RedisListener;
import org.springframework.stereotype.Component;
@Component
public class RedisMessageListener {
// 订阅单个频道
@RedisListener(channel = "my-channel")
public void onMessage(String message) {
System.out.println("接收到消息:" + message);
}
// 订阅多个频道
@RedisListener(channels = {"channel1", "channel2"})
public void onMultiChannelMessage(String message) {
System.out.println("多频道消息:" + message);
}
// 订阅匹配模式的频道(如以 "user-" 开头的所有频道)
@RedisListener(pattern = "user-*")
public void onPatternMessage(String message) {
System.out.println("模式匹配消息:" + message);
}
}
使用通配符订阅频道
import org.springframework.data.redis.listener.annotation.RedisListener;
import org.springframework.stereotype.Component;
@Component
public class RedisAnnotationListener {
// 注解方式订阅所有频道
@RedisListener(pattern = "*")
public void onAllChannelMessage(String message, String channel) {
System.out.println("注解方式接收:频道=" + channel + ",内容=" + message);
}
}
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class RedisMessagePublisher {
private final StringRedisTemplate stringRedisTemplate;
public RedisMessagePublisher(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
// 发布消息到指定频道
public void publish(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
System.out.println("发布消息:频道=" + channel + ",内容=" + message);
}
}
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RedisPubSubController {
private final RedisMessagePublisher publisher;
public RedisPubSubController(RedisMessagePublisher publisher) {
this.publisher = publisher;
}
// 访问 http://localhost:8080/publish/my-channel/hello 测试发布消息
@GetMapping("/publish/{channel}/{message}")
public String publish(@PathVariable String channel, @PathVariable String message) {
publisher.publish(channel, message);
return "消息已发布:频道=" + channel + ",内容=" + message;
}
}
package cn.netkiller.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class RedisConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "lock", 300000L);
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 配置线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("RedisMessageListenerContainer-");
executor.setThreadGroupName("Redis");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.initialize();
container.setTaskExecutor(executor);
return container;
}
}