项目场景:
BUG场景:使用Redis做轻量级的消息发布订阅,一直没有什么问题,但是当把redis的client-type:换为jedis时,出现订阅消息处理的方法不运行了。
问题描述:
在百度上搜索springboot使用redis发布订阅很容易就能搜到例如以下代码:
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("xxxxxx"));
return redisMessageListenerContainer;
}
添加消息监听器,即可使用指定方法执行监听到的消息,但是这么写是会有一些问题的,我们查看RedisMessageListenerContainer中源码会发现里面有两个线程池,他们是分别用来订阅和处理消息的,如果没有手动set线程池进去,就默认使用SimpleAsyncTaskExecutor,并且threadFactory默认也是null,这时监听到消息后,最终执行如下代码:
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}
会直接createThread,这里如果监听消息处理的方法较慢,同时消息量比较大时,会无限制的创建线程,存在较大风险。所以在创建RedisMessageListenerContainer时需要手动set自己创建的线程池进去如下:
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(ISSUE_TASK_TOPIC));
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadNamePrefix("RedisMessageListenerContainerExecutor-");
taskExecutor.initialize();
redisMessageListenerContainer.setTaskExecutor(taskExecutor);
return redisMessageListenerContainer;
}
这里没有设置CorePoolSize,因为我们监听的消息量比较少,频率也低,所有默认有一个线程实际就足够了,实际使用也确实没有问题。
但是当我们不再使用springboot默认的lettuce客户端,换为jedis后出现了问题,导致订阅消息处理的方法不执行。
原因分析:
一开始并没有意识到线程池数量的问题,以为是不是spring-data-redis出BUG了,我只是增加了一行配置client-type: jedis,就导致消息收不到了不应该吧,后来仔细看代码发现线程池只有一个线程,猜想会不会那个线程被jedis的什么任务阻塞了,所以增加了一个线程果然问题解决。
后来看源码发现了原来Jedis在订阅消息时的如下代码:
@Override
public void pSubscribe(MessageListener listener, byte[]... patterns) {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
if (isQueueing()) {
throw new UnsupportedOperationException();
}
if (isPipelined()) {
throw new UnsupportedOperationException();
}
try {
BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);
subscription = new JedisSubscription(listener, jedisPubSub, null, patterns);
jedis.psubscribe(jedisPubSub, patterns);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
jedis.psubscribe是阻塞的,相对的Lettuce的实现并没有阻塞。
解决方案:
创建RedisMessageListenerContainer时,subscriptionExecutor和taskExecutor分别创建线程池,或者将taskExecutor线程数增大
评论区