基于spring的Redis Sentinel读写分离Slave连接池

0. 背景

Redis除了配置集群实现高可用之外,对于单机版的Redis,可以通过Master-Slave架构,配合使用Sentinel机制实现高可用架构,同时客户端可以实现自动失效转移。



类似于JdbcTemplate,Spring中使用RedisTemplate来操作Redis。SpringBoot中只需引入如下Maven依赖,即可自动配置一个RedisTemplate实例。

1
2
3
4
5
6
7
8
9
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>2.9.0</version>
</dependency>

RedisTemplate需要一个RedisConnectionFactory来管理Redis连接。可以在项目中定义一个RedisSentinelConfiguration给RedisConnectionFactory,即可生成一个基于Sentinel的连接池,并且实现了自动失效转移:当master失效时,Sentinel自动提升一个slave成为master保证Redis的master连接高可用。

下面是基于Sentinel的RedisConnectionFactory的典型配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Value("${spring.redis.password}")
private String redisPasswd;

@Bean
public RedisConnectionFactory jedisConnectionFactory() {
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster")
.sentinel("192.168.0.1", 26479)
.sentinel("192.168.0.2", 26479)
.sentinel("192.168.0.3", 26479);
sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(sentinelConfig);
System.out.println(jedisConnectionFactory.getClientConfiguration().getClientName());
return jedisConnectionFactory;
}

查看org.springframework.data.redis.connection.jedis.JedisConnectionFactory源码发现,当配置了RedisSentinelConfiguration后,RedisConnectionFactory会返回一个JedisSentinelPool连接池。该连接池里面所有的连接都是连接到Master上面的。 同时,在JedisSentinelPool中为每一个Sentinel都配置了+switch-master频道的监听。当监听到+switch-master消息后表示发生了master切换,有新的Master产生,然后会重新初始化到新Master的连接池。

至此,我们知道基于Sentinel可以创建RedisConnectionFactory,并可实现自动失效转移,但RedisConnectionFactory只会创建到Master的连接。一般情况下,如果所有的连接都是连接到Master上面,Slave就完全当成Master的备份了,造成性能浪费。通常,Slave只是单纯的复制Master的数据,为避免数据不一致,不应该往Slave写数据,可以在Redis配置文件中配置slave-read-onlyyes,让Slave拒绝所有的写操作。于是,对于一个基于Sentinel的Master-Slave Redis 服务器来说,可以将Master配置为可读写服务器,将所有Slave配置为只读服务器来实现读写分离,以充分利用服务器资源,并提高整个Redis系统的性能。

1. 提出问题

JedisSentinelPool连接池中的连接都是到Master的连接,那么如何获取到Slave的连接池呢? 分析了spring-boot-starter-data-redis和jedis之后,发现,并没有现成的Slave连接池可以拿来用,于是决定写一个。

2. 分析问题

通过RedisSentinelConfiguration,可以拿到sentinel的IP和端口,就可以连接到sentinel,再调用sentinel slaves mymaster命令,就可以拿到slave的IP和port。
然后就可以创建到slave的连接了


继续查看JedisFactory源码,了解到其实现了PooledObjectFactory接口,该接口来自org.apache.commons.pool2,由此可见,Jedis连接池是借助Apache
commons.pool2来实现的。

PoolFactory

由图看到,JedisConnectionFactory创建一个JedisSentinelPool,JedisSentinelPool创建JedisFactory,JedisFactory实现了PooledObjectFactory接口,在MakeObject()方法中产生新的Redis连接。在JedisSentinelPool中定义MasterListener还订阅+switch-master频道,一旦发生Master转移事件,自动作失效转移重新初始化master连接池。

3. 解决问题

模仿JedisConnectionFactory,JedisSentinelPool,和JedisFactory,创建JedisSentinelSlaveConnectionFactory,JedisSentinelSlavePool和JedisSentinelSlaveFactory它们之间的关系,如图UML-2所示。

JedisSentinel

其中,JedisSentinelSlaveConnectionFactory就是可以传递给RedisTemplate的。JedisSentinelSlaveConnectionFactory继承自JedisConnectionFactory并且覆盖了createRedisSentinelPool方法,在JedisConnectionFactory中,该方法会返回一个JedisSentinelPool,而新的方法会返回JedisSentinelSlavePool。JedisSentinelSlavePool和JedisSentinelPool都是继承自Pool的。 JedisSentinelSlavePool会生成JedisSentinelSlaveFactory,JedisSentinelSlaveFactory实现了PooledObjectFactory接口,在public PooledObjectmakeObject()方法中,通过sentinel连接,
调用sentinel slaves命令,获取所有可用的slave的ip和port,然后随机的创建一个slave连接并返回。

JedisSentinelSlaveConnectionFactory的createRedisSentinelPool方法

1
2
3
4
5
6
7
8
@Override 
protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
}
}

  1. 通过配置RedisSentinelConfiguration传递sentinel配置和master name给JedisSentinelSlaveConnectionFactory,然后sentinel配置和master name会传递到JedisSentinelSlavePool和JedisSentinelSlaveFactory中
  2. 创建 JedisSentinelSlavePool,在JedisSentinelSlavePool中启动监听,监听”+switch-master”频道,一旦新master产生,即初始化连接池
  3. 连接池有JedisSentinelSlaveFactory来代理,JedisSentinelSlaveFactory实现了PooledObjectFactory
    在makeObject()中首先根据配置的Sentinel Set找到一个可用的sentinel连接,然后执行sentinel slaves master_name获取所有slave列表
    随机选择一个slave创建连接。 如果连接不成功则重试,最大重试5次,依然不能成功创建连接则抛出异常。
  4. 由图uml-2可知,JedisConnectionFactory实现了InitializingBean,Spring会在Bean初始化之后,调用接口方法void afterPropertiesSet() throws Exception;
    在这个方法中创建连接池
  5. JedisConnectionFactory实现了DisposableBean,会在Spring 容器销毁时,调用public void destroy() 方法销毁连接池

4. 实战

4-1. 工程结构
1. pom.xml
1
2
3
4
5
6
7
8
9
10
<dependency> 
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
2. JedisSentinelSlaveFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package redis.clients.jedis; 

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.JedisURIHelper;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.net.URI;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class JedisSentinelSlaveFactory implements PooledObjectFactory<Jedis> {
private final String masterName;
private final int retryTimeWhenRetrieveSlave = 5;

private final AtomicReference<HostAndPort> hostAndPortOfASentinel = new AtomicReference<HostAndPort>();
private final int connectionTimeout;
private final int soTimeout;
private final String password;
private final int database;
private final String clientName;
private final boolean ssl;
private final SSLSocketFactory sslSocketFactory;
private SSLParameters sslParameters;
private HostnameVerifier hostnameVerifier;

public JedisSentinelSlaveFactory(final String host, final int port, final int connectionTimeout,
final int soTimeout, final String password, final int database, final String clientName,
final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters,
final HostnameVerifier hostnameVerifier,String masterName) {
this.hostAndPortOfASentinel.set(new HostAndPort(host, port));
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
this.ssl = ssl;
this.sslSocketFactory = sslSocketFactory;
this.sslParameters = sslParameters;
this.hostnameVerifier = hostnameVerifier;
this.masterName = masterName;
}

public JedisSentinelSlaveFactory(final URI uri, final int connectionTimeout, final int soTimeout,
final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory,
final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier,String masterName) {
if (!JedisURIHelper.isValid(uri)) {
throw new InvalidURIException(String.format(
"Cannot open Redis connection due invalid URI. %s", uri.toString()));
}

this.hostAndPortOfASentinel.set(new HostAndPort(uri.getHost(), uri.getPort()));
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = JedisURIHelper.getPassword(uri);
this.database = JedisURIHelper.getDBIndex(uri);
this.clientName = clientName;
this.ssl = ssl;
this.sslSocketFactory = sslSocketFactory;
this.sslParameters = sslParameters;
this.hostnameVerifier = hostnameVerifier;
this.masterName = masterName;
}

public void setHostAndPortOfASentinel(final HostAndPort hostAndPortOfASentinel) {
this.hostAndPortOfASentinel.set(hostAndPortOfASentinel);
}

@Override
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.getDB() != database) {
jedis.select(database);
}

}

@Override
public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
final BinaryJedis jedis = pooledJedis.getObject();
if (jedis.isConnected()) {
try {
try {
jedis.quit();
} catch (Exception e) {
}
jedis.disconnect();
} catch (Exception e) {

}
}

}

@Override
public PooledObject<Jedis> makeObject() throws Exception {
final Jedis jedisSentinel = getASentinel();

List<Map<String,String>> slaves = jedisSentinel.sentinelSlaves(this.masterName);
if(slaves == null || slaves.isEmpty()) {
throw new JedisException(String.format("No valid slave for master: %s",this.masterName));
}

DefaultPooledObject<Jedis> result = tryToGetSlave(slaves);

if(null != result) {
return result;
} else {
throw new JedisException(String.format("No valid slave for master: %s, after try %d times.",
this.masterName,retryTimeWhenRetrieveSlave));
}

}

private DefaultPooledObject<Jedis> tryToGetSlave(List<Map<String,String>> slaves) {
SecureRandom sr = new SecureRandom();
int retry = retryTimeWhenRetrieveSlave;
while(retry >= 0) {
retry--;
int randomIndex = sr.nextInt(slaves.size());
String host = slaves.get(randomIndex).get("ip");
String port = slaves.get(randomIndex).get("port");
final Jedis jedisSlave = new Jedis(host,Integer.valueOf(port), connectionTimeout,soTimeout,
ssl, sslSocketFactory,sslParameters, hostnameVerifier);
try {
jedisSlave.connect();
if (null != this.password) {
jedisSlave.auth(this.password);
}
if (database != 0) {
jedisSlave.select(database);
}
if (clientName != null) {
jedisSlave.clientSetname(clientName);
}
return new DefaultPooledObject<Jedis>(jedisSlave);

} catch (Exception e) {
jedisSlave.close();
slaves.remove(randomIndex);
continue;
}
}

return null;
}

private Jedis getASentinel() {
final HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();
final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

try {
jedis.connect();
} catch (JedisException je) {
jedis.close();
throw je;
}
return jedis;
}

@Override
public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
// TODO maybe should select db 0? Not sure right now.
}

@Override
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
final BinaryJedis jedis = pooledJedis.getObject();
try {
HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();

String connectionHost = jedis.getClient().getHost();
int connectionPort = jedis.getClient().getPort();

return hostAndPort.getHost().equals(connectionHost)
&& hostAndPort.getPort() == connectionPort && jedis.isConnected()
&& jedis.ping().equals("PONG");
} catch (final Exception e) {
return false;
}
}
}
3. JedisSentinelSlavePool.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package redis.clients.jedis; 

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.Pool;

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class JedisSentinelSlavePool extends Pool<Jedis> {
private final String masterName;

protected GenericObjectPoolConfig poolConfig;

protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
protected int soTimeout = Protocol.DEFAULT_TIMEOUT;

protected String password;

protected int database = Protocol.DEFAULT_DATABASE;

protected String clientName;

protected final Set<JedisSentinelSlavePool.MasterListener> masterListeners = new HashSet<JedisSentinelSlavePool.MasterListener>();

protected Logger logger = LoggerFactory.getLogger(JedisSentinelSlavePool.class.getName());

private volatile JedisSentinelSlaveFactory factory;
private volatile HostAndPort currentSentinel;

private Set<String> sentinels;

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig) {
this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels) {
this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels, String password) {
this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout, final String password) {
this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int timeout) {
this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final String password) {
this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout, final String password,
final int database) {
this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout, final String password,
final int database, final String clientName) {
this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout,
final String password, final int database) {
this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);
}

public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
final String password, final int database, final String clientName) {
this.poolConfig = poolConfig;
this.connectionTimeout = connectionTimeout;
this.soTimeout = soTimeout;
this.password = password;
this.database = database;
this.clientName = clientName;
this.masterName = masterName;
this.sentinels = sentinels;

HostAndPort aSentinel = initsentinels(this.sentinels, masterName);
initPool(aSentinel);
}

public void destroy() {
for (JedisSentinelSlavePool.MasterListener m : masterListeners) {
m.shutdown();
}

super.destroy();
}

public HostAndPort getCurrentSentinel() {
return currentSentinel;
}

private void initPool(HostAndPort sentinel) {
if (!sentinel.equals(currentSentinel)) {
currentSentinel = sentinel;
if (factory == null) {
factory = new JedisSentinelSlaveFactory(sentinel.getHost(), sentinel.getPort(), connectionTimeout,
soTimeout, password, database, clientName, false, null, null, null,masterName);
initPool(poolConfig, factory);
} else {
factory.setHostAndPortOfASentinel(currentSentinel);
// although we clear the pool, we still have to check the
// returned object
// in getResource, this call only clears idle instances, not
// borrowed instances
internalPool.clear();
}

logger.info("Created JedisPool to sentinel at " + sentinel);
}
}

private HostAndPort initsentinels(Set<String> sentinels, final String masterName) {

HostAndPort aSentinel = null;
boolean sentinelAvailable = false;

logger.info("Trying to find a valid sentinel from available Sentinels...");

for (String sentinelStr : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinelStr);

logger.info("Connecting to Sentinel " + hap);

Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort());
sentinelAvailable = true;

List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
logger.warn("Can not get master addr from sentinel, master name: " + masterName
+ ". Sentinel: " + hap + ".");
continue;
}

aSentinel = hap;
logger.info("Found a Redis Sentinel at " + aSentinel);
break;
} catch (JedisException e) {
logger.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
+ ". Trying next one.");
} finally {
if (jedis != null) {
jedis.close();
}
}
}

if (aSentinel == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
}
}

logger.info("Found Redis sentinel running at " + aSentinel + ", starting Sentinel listeners...");

for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}

return aSentinel;
}

/**
* @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
* done using @see {@link redis.clients.jedis.Jedis#close()}
*/
@Override
@Deprecated
public void returnBrokenResource(final Jedis resource) {
if (resource != null) {
returnBrokenResourceObject(resource);
}
}

/**
* @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
* done using @see {@link redis.clients.jedis.Jedis#close()}
*/
@Override
@Deprecated
public void returnResource(final Jedis resource) {
if (resource != null) {
resource.resetState();
returnResourceObject(resource);
}
}

private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = getMasterAddrByNameResult.get(0);
int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

return new HostAndPort(host, port);
}

protected class MasterListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

j = new Jedis(host, port);

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

j.subscribe(new SentinelSlaveChangePubSub(), "+switch-master","+slave","+sdown","+odown","+reboot");

} catch (JedisConnectionException e) {

if (running.get()) {
logger.error("Lost connection to Sentinel at " + host + ":" + port
+ ". Sleeping 5000ms and retrying.", e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
logger.info( "Sleep interrupted: ", e1);
}
} else {
logger.info("Unsubscribing from Sentinel at " + host + ":" + port);
}
} finally {
j.close();
}
}
}

public void shutdown() {
try {
logger.info("Shutting down listener on " + host + ":" + port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.disconnect();
}
} catch (Exception e) {
logger.error("Caught exception while shutting down: ", e);
}
}

private class SentinelSlaveChangePubSub extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
if(masterName==null) {
logger.error("Master Name is null!");
throw new InvalidParameterException("Master Name is null!");
}
logger.info("Get message on chanel: " + channel + " published: " + message + "." + " current sentinel " + host + ":" + port );

String[] msg = message.split(" ");
List<String> msgList = Arrays.asList(msg);
if(msgList.isEmpty()) {return;}
boolean needResetPool = false;
if( masterName.equalsIgnoreCase(msgList.get(0))) { //message from channel +switch-master
//message looks like [+switch-master mymaster 192.168.0.2 6479 192.168.0.1 6479]
needResetPool = true;
}
int tmpIndex = msgList.indexOf("@") + 1;
//message looks like [+reboot slave 192.168.0.3:6479 192.168.0.3 6479 @ mymaster 192.168.0.1 6479]
if(tmpIndex >0 && masterName.equalsIgnoreCase(msgList.get(tmpIndex)) ) { //message from other channels
needResetPool = true;
}
if(needResetPool) {
HostAndPort aSentinel = initsentinels(sentinels, masterName);
initPool(aSentinel);
} else {
logger.info("message is not for master " + masterName);
}

}
}
}
}
4. JedisSentinelSlaveConFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package redis.clients.jedis; 

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import redis.clients.util.Pool;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.time.Duration;
import java.util.*;

public class JedisSentinelSlaveConnectionFactory extends JedisConnectionFactory {
public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig) {
super(sentinelConfig);
}

public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisClientConfiguration clientConfig){
super(sentinelConfig,clientConfig);
}

public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisPoolConfig poolConfig) {
super(sentinelConfig,poolConfig);
}

@Override
protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
}

private int getConnectTimeout() {
return Math.toIntExact(getClientConfiguration().getConnectTimeout().toMillis());
}

private Set<String> convertToJedisSentinelSet(Collection<RedisNode> nodes) {

if (CollectionUtils.isEmpty(nodes)) {
return Collections.emptySet();
}

Set<String> convertedNodes = new LinkedHashSet<>(nodes.size());
for (RedisNode node : nodes) {
if (node != null) {
convertedNodes.add(node.asString());
}
}
return convertedNodes;
}

private int getReadTimeout() {
return Math.toIntExact(getClientConfiguration().getReadTimeout().toMillis());
}

static class MutableJedisClientConfiguration implements JedisClientConfiguration {

private boolean useSsl;
private @Nullable
SSLSocketFactory sslSocketFactory;
private @Nullable
SSLParameters sslParameters;
private @Nullable
HostnameVerifier hostnameVerifier;
private boolean usePooling = true;
private GenericObjectPoolConfig poolConfig = new JedisPoolConfig();
private @Nullable
String clientName;
private Duration readTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);
private Duration connectTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);

public static JedisClientConfiguration create(JedisShardInfo shardInfo) {

JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
configuration.setShardInfo(shardInfo);
return configuration;
}

public static JedisClientConfiguration create(GenericObjectPoolConfig jedisPoolConfig) {

JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
configuration.setPoolConfig(jedisPoolConfig);
return configuration;
}

/* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUseSsl()
*/
@Override
public boolean isUseSsl() {
return useSsl;
}

public void setUseSsl(boolean useSsl) {
this.useSsl = useSsl;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslSocketFactory()
*/
@Override
public Optional<SSLSocketFactory> getSslSocketFactory() {
return Optional.ofNullable(sslSocketFactory);
}

public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
this.sslSocketFactory = sslSocketFactory;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters()
*/
@Override
public Optional<SSLParameters> getSslParameters() {
return Optional.ofNullable(sslParameters);
}

public void setSslParameters(SSLParameters sslParameters) {
this.sslParameters = sslParameters;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getHostnameVerifier()
*/
@Override
public Optional<HostnameVerifier> getHostnameVerifier() {
return Optional.ofNullable(hostnameVerifier);
}

public void setHostnameVerifier(HostnameVerifier hostnameVerifier) {
this.hostnameVerifier = hostnameVerifier;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUsePooling()
*/
@Override
public boolean isUsePooling() {
return usePooling;
}

public void setUsePooling(boolean usePooling) {
this.usePooling = usePooling;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getPoolConfig()
*/
@Override
public Optional<GenericObjectPoolConfig> getPoolConfig() {
return Optional.ofNullable(poolConfig);
}

public void setPoolConfig(GenericObjectPoolConfig poolConfig) {
this.poolConfig = poolConfig;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getClientName()
*/
@Override
public Optional<String> getClientName() {
return Optional.ofNullable(clientName);
}

public void setClientName(String clientName) {
this.clientName = clientName;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getReadTimeout()
*/
@Override
public Duration getReadTimeout() {
return readTimeout;
}

public void setReadTimeout(Duration readTimeout) {
this.readTimeout = readTimeout;
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getConnectTimeout()
*/
@Override
public Duration getConnectTimeout() {
return connectTimeout;
}

public void setConnectTimeout(Duration connectTimeout) {
this.connectTimeout = connectTimeout;
}

public void setShardInfo(JedisShardInfo shardInfo) {

setSslSocketFactory(shardInfo.getSslSocketFactory());
setSslParameters(shardInfo.getSslParameters());
setHostnameVerifier(shardInfo.getHostnameVerifier());
setUseSsl(shardInfo.getSsl());
setConnectTimeout(Duration.ofMillis(shardInfo.getConnectionTimeout()));
setReadTimeout(Duration.ofMillis(shardInfo.getSoTimeout()));
}
}
}
4-2. 测试

在应用中,只需配置如下的JedisSentinelSlaveConnectionFactory,Spring Boot会自动配置一个
RedisTemplate redisTemplate和StringRedisTemplate stringRedisTemplate;
在代码中使用@Autowired注入即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean 
public RedisConnectionFactory jedisConnectionFactory() {
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster")
.sentinel("192.168.0.1", 26479)
.sentinel("192.168.0.2", 26479)
.sentinel("192.168.0.3", 26479);
sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
.clientName("MyRedisClient")
.build();
JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
return jedisConnectionFactory;
}
  1. RedisConfiguration.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.jack.yin.redis.configuration; 

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.RedisPassword;
    import org.springframework.data.redis.connection.RedisSentinelConfiguration;
    import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
    import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
    import redis.clients.jedis.JedisPoolConfig;
    import redis.clients.jedis.JedisSentinelSlaveConnectionFactory;

    @Configuration
    public class RedisConfiguration {
    @Value("${spring.redis.password}")
    private String redisPasswd;

    @Bean
    public RedisConnectionFactory jedisConnectionFactory() {
    RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
    .master("mymaster")
    .sentinel("192.168.0.1", 26479)
    .sentinel("192.168.0.2", 26479)
    .sentinel("192.168.0.3", 26479);
    sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
    JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
    .clientName("MyRedisClient")
    .build();
    JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
    return jedisConnectionFactory;
    }

    }
  2. DemoApplicationTests.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.junit.Test; 
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Enumeration;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=RedisDemoApplication.class)
public class DemoApplicationTests {

@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;

protected Logger log = Logger.getLogger(getClass().getName());

@Test
public void testGetAndSet() throws Exception{
System.out.println(redisTemplate.opsForValue().get("hello"));
redisTemplate.opsForValue().set("set-key","don't allowed to set");
//org.springframework.dao.InvalidDataAccessApiUsageException: READONLY You can't write against a read only slave.;
System.out.println(redisTemplate.opsForValue().get("sss"));
System.out.println(redisTemplate.opsForValue().get("bbb"));
}

}

4. 总结

优点:


连接池中的连接是随机建立的到所有slave的连接


当监测到master失效转移会自动初始化连接池,确保不会连接到master上


新增slave时可以自动被发现


slave下线会被自动侦测到,然后重新初始化连接池,确保不会连接到已经下线的slave

缺点:
reids slave 需要设置slave-read-only yes


slave同步master数据需要时间,在一个短暂时间内master和slave数据会不一致


基于spring的Redis Sentinel读写分离Slave连接池
https://vaughnn.github.io/posts/34b8fc37/
作者
vaughnn
发布于
2022年1月18日
更新于
2024年2月29日