redis-cli -h 192.168.110.71 -p 6000 sentinel get-master-addr-by-name shard_a
1) "192.168.110.71"
2) "6379"
查询分片shard_a的主服务器地址,实现代码如下:
- private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
- HostAndPort master = null;
- boolean sentinelAvailable = false;
- log.info("Trying to find master from available Sentinels...");
- for (String sentinel : sentinels) {
- final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
- log.fine("Connecting to Sentinel " + hap);
- Jedis jedis = null;
- try {
- jedis = new Jedis(hap.getHost(), hap.getPort());
- List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
- // connected to sentinel...
- sentinelAvailable = true;
- if (masterAddr == null || masterAddr.size() != 2) {
- log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap
- + ".");
- continue;
- }
- master = toHostAndPort(masterAddr);
- log.fine("Found Redis master at " + master);
- break;
- } catch (JedisConnectionException e) {
- log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");
- } finally {
- if (jedis != null) {
- jedis.close();
- }
- }
- }
- if (master == null) {
- if (sentinelAvailable) {
- // can connect to sentinel, but master name seems to not
- // monitored
- throw new JedisException("Can connect to sentinel, but " + masterName
- + " seems to be not monitored...");
- } else {
- throw new JedisConnectionException("All sentinels down, cannot determine where is "
- + masterName + " master is running...");
- }
- }
- log.info("Redis master running at " + master + ", starting Sentinel listeners...");
- for (String sentinel : sentinels) {
- final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
- MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
- masterListeners.add(masterListener);
- masterListener.start();
- }
- return master;
- }
关于在java应用中如何使用,请参考如下代码实例:
- package cn.slimsmart.redis.demo.sentinel;
- import java.util.HashSet;
- import java.util.Set;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisSentinelPool;
- public class JedisSentinelTest {
- public static void main(String[] args) {
- Set<String> sentinels = new HashSet<String>();
- sentinels.add("192.168.100.90:6000");
- sentinels.add("192.168.110.71:6000");
- /**
- * masterName 分片的名称
- * sentinels Redis Sentinel 服务地址列表
- */
- JedisSentinelPool poolA = new JedisSentinelPool("shard_a", sentinels);
- JedisSentinelPool poolB = new JedisSentinelPool("shard_b", sentinels);
- //获取Jedis主服务器客户端实例
- Jedis jedisA = poolA.getResource();
- jedisA.set("key", "abc");
- Jedis jedisB = poolB.getResource();
- jedisB.set("key", "xyz");
- System.out.println("jedisA key:"+jedisA.get("key"));
- System.out.println("jedisB key:"+jedisB.get("key"));
- //输出结果
- //jedisA key:abc
- //jedisB key:xyz
- }
- }
项目源码: https://github.com/warmbreeze/sharded-jedis-sentinel-pool
一、ShardedJedisSentinelPool实现分析
1.构造函数
public ShardedJedisSentinelPool(final GenericObjectPoolConfig poolConfig, List<String> masters, Set<String> sentinels)
类似之前的Jedis Pool的构造方法,需要参数poolConfig提供诸如maxIdle,maxTotal之类的配置,masters是一个List,用来保存所有分片Master在Sentinel中配置的名字(注意master的顺序不能改变,因为Shard算法是依据分片位置进行计算,如果顺序错误将导致数据存储混乱),sentinels是一个Set,其中存放所有Sentinel的地址(格式:IP:PORT,如127.0.0.1:26379),顺序无关;
2.初始化连接池
在构造函数中,通过方法
private List<HostAndPort> initSentinels(Set<String> sentinels, final List<String> masters)
取得当前所有分片的master地址(IP&PORT),对每个分片,通过顺次连接Sentinel实例,获取该分片的master地址,如果无法获得,即所有Sentinel都无法连接,将休眠1秒后继续重试,直到取得所有分片的master地址,代码块如下:
- for (String masterName : masters) {
- HostAndPort master = null;
- boolean fetched = false;
- while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {
- for (String sentinel : sentinels) {
- final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
- log.fine("Connecting to Sentinel " + hap);
- try {
- Jedis jedis = new Jedis(hap.getHost(), hap.getPort());
- master = masterMap.get(masterName);
- if (master == null) {
- List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);
- if (hostAndPort != null && hostAndPort.size() > 0) {
- master = toHostAndPort(hostAndPort);
- log.fine("Found Redis master at " + master);
- shardMasters.add(master);
- masterMap.put(masterName, master);
- fetched = true;
- jedis.disconnect();
- break;
- }
- }
- } catch (JedisConnectionException e) {
- log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");
- }
- }
- if (null == master) {
- try {
- log.severe("All sentinels down, cannot determine where is " + masterName + " master is running... sleeping 1000ms, Will try again.");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- fetched = false;
- sentinelRetry++;
- }
- }
- // Try MAX_RETRY_SENTINEL times.
- if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {
- log.severe("All sentinels down and try " + MAX_RETRY_SENTINEL + " times, Abort.");
- throw new JedisConnectionException("Cannot connect all sentinels, Abort.");
- }
- }
3.监控每个Sentinel
在方法
public ShardedJedisSentinelPool(final GenericObjectPoolConfig poolConfig, List<String> masters, Set<String> sentinels)
最后,会为每个Sentinel启动一个Thread来监控Sentinel做出的更改:
protected class MasterListener extends Thread
该线程的run方法通过Jedis Pub/Sub API(实现JedisPubSub接口,并通过jedis.subscribe进行订阅)向Sentinel实例订阅“+switch-master”频道,当Sentinel进行主从切换时,该线程会得到新Master地址的通知,通过master name判断哪个分片进行了切换,将新master地址替换原来位置的地址,并调用initPool(List masters)进行Jedis连接池重建;后续所有通过该连接池取得的连接都指向新Master地址,对应用程序透明。
二、应用实例
- package cn.slimsmart.redis.demo.sentinel;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Set;
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
- import redis.clients.jedis.ShardedJedis;
- public class ShardedJedisSentinelTest {
- public static void main(String[] args) {
- //连接池配置
- GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
- //分片配置
- List<String> masters = new ArrayList<String>();
- masters.add("shard_a");
- masters.add("shard_b");
- //sentinel服务节点
- Set<String> sentinels = new HashSet<String>();
- sentinels.add("192.168.100.90:6000");
- sentinels.add("192.168.110.71:6000");
- //创建分片连接池
- ShardedJedisSentinelPool pool = new ShardedJedisSentinelPool(poolConfig, masters, sentinels);
- ShardedJedis jedis = null;
- try {
- jedis = pool.getResource();
- jedis.set("key_sharded", "abc");
- System.out.println(jedis.get("key_sharded"));
- } finally {
- if (jedis != null){
- pool.returnResource(jedis);
- }
- pool.destroy();
- }
- }
- }
redis 192.168.110.71:6379> get key_sharded
"abc"
redis 192.168.110.71:6380> get key_sharded
(nil)
可以看出key_sharded被添加到shard_a分片中。
集成spring配置参考:
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
- xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/tx
- http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
- http://www.springframework.org/schema/aop
- http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context-3.0.xsd"
- default-lazy-init="false" default-autowire="byName">
- <bean id="shardedJedisPool" class="cn.slimsmart.redis.demo.sentinel.ShardedJedisSentinelPool">
- <constructor-arg index="0" ref="jedisPoolConfig" />
- <constructor-arg index="1">
- <list>
- <value>shard_a</value>
- <value>shard_b</value>
- </list>
- </constructor-arg>
- <constructor-arg index="2">
- <set>
- <value>192.168.100.90:6000</value>
- <value>192.168.110.71:6000</value>
- </set>
- </constructor-arg>
- </bean>
- <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
- <property name="maxTotal" value="200" />
- <property name="maxIdle" value="100" />
- <property name="maxWaitMillis" value="5000" />
- <property name="testOnBorrow" value="true" />
- </bean>
- </beans>
参考文章: