Redis Sentinel高可用集群Java客户端

java客户端Jedis在2.2.2及以上版本实现了对Sentinel的支持,只要是通过命令:
redis-cli -h 192.168.110.71 -p 6000  sentinel get-master-addr-by-name shard_a
1) "192.168.110.71"
2) "6379"
查询分片shard_a的主服务器地址,实现代码如下:
[java]  view plain  copy
  1. private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {  
  2.   
  3.     HostAndPort master = null;  
  4.     boolean sentinelAvailable = false;  
  5.   
  6.     log.info("Trying to find master from available Sentinels...");  
  7.   
  8.     for (String sentinel : sentinels) {  
  9.       final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));  
  10.   
  11.       log.fine("Connecting to Sentinel " + hap);  
  12.   
  13.       Jedis jedis = null;  
  14.       try {  
  15.         jedis = new Jedis(hap.getHost(), hap.getPort());  
  16.   
  17.         List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);  
  18.   
  19.         // connected to sentinel...  
  20.         sentinelAvailable = true;  
  21.   
  22.         if (masterAddr == null || masterAddr.size() != 2) {  
  23.           log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap  
  24.               + ".");  
  25.           continue;  
  26.         }  
  27.   
  28.         master = toHostAndPort(masterAddr);  
  29.         log.fine("Found Redis master at " + master);  
  30.         break;  
  31.       } catch (JedisConnectionException e) {  
  32.         log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");  
  33.       } finally {  
  34.         if (jedis != null) {  
  35.           jedis.close();  
  36.         }  
  37.       }  
  38.     }  
  39.   
  40.     if (master == null) {  
  41.       if (sentinelAvailable) {  
  42.         // can connect to sentinel, but master name seems to not  
  43.         // monitored  
  44.         throw new JedisException("Can connect to sentinel, but " + masterName  
  45.             + " seems to be not monitored...");  
  46.       } else {  
  47.         throw new JedisConnectionException("All sentinels down, cannot determine where is "  
  48.             + masterName + " master is running...");  
  49.       }  
  50.     }  
  51.   
  52.     log.info("Redis master running at " + master + ", starting Sentinel listeners...");  
  53.   
  54.     for (String sentinel : sentinels) {  
  55.       final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));  
  56.       MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());  
  57.       masterListeners.add(masterListener);  
  58.       masterListener.start();  
  59.     }  
  60.   
  61.     return master;  
  62.   }  

关于在java应用中如何使用,请参考如下代码实例:

[java]  view plain  copy
  1. package cn.slimsmart.redis.demo.sentinel;  
  2.   
  3. import java.util.HashSet;  
  4. import java.util.Set;  
  5.   
  6. import redis.clients.jedis.Jedis;  
  7. import redis.clients.jedis.JedisSentinelPool;  
  8.   
  9. public class JedisSentinelTest {  
  10.   
  11.     public static void main(String[] args) {  
  12.         Set<String> sentinels = new HashSet<String>();  
  13.         sentinels.add("192.168.100.90:6000");  
  14.         sentinels.add("192.168.110.71:6000");  
  15.           
  16.         /** 
  17.          * masterName 分片的名称 
  18.          * sentinels Redis Sentinel 服务地址列表 
  19.          */  
  20.         JedisSentinelPool poolA = new JedisSentinelPool("shard_a", sentinels);  
  21.         JedisSentinelPool poolB = new JedisSentinelPool("shard_b", sentinels);  
  22.         //获取Jedis主服务器客户端实例  
  23.         Jedis jedisA = poolA.getResource();  
  24.         jedisA.set("key""abc");  
  25.           
  26.         Jedis jedisB = poolB.getResource();  
  27.         jedisB.set("key""xyz");  
  28.           
  29.         System.out.println("jedisA key:"+jedisA.get("key"));  
  30.         System.out.println("jedisB key:"+jedisB.get("key"));  
  31.         //输出结果  
  32.         //jedisA key:abc  
  33.         //jedisB key:xyz  
  34.     }  
  35. }  
由上面代码可以看出jedis的JedisSentinelPool需要指定分片名称即主服务名称(masterName),这样根据需求硬编码将缓存数据添加到对应的分片中,无法自动分片。若使用ShardedJedisPool,当分片发生主从切换无法获得通知,所有对那个分片的操作将会失败。下面我们介绍一个开源项目sharded-jedis-sentinel-pool,能及时感知所有分片主从切换行为,进行连接池重建。
项目源码: https://github.com/warmbreeze/sharded-jedis-sentinel-pool
一、ShardedJedisSentinelPool实现分析
1.构造函数

public ShardedJedisSentinelPool(final GenericObjectPoolConfig poolConfig, List<String> masters, Set<String> sentinels) 
类似之前的Jedis Pool的构造方法,需要参数poolConfig提供诸如maxIdle,maxTotal之类的配置,masters是一个List,用来保存所有分片Master在Sentinel中配置的名字(注意master的顺序不能改变,因为Shard算法是依据分片位置进行计算,如果顺序错误将导致数据存储混乱),sentinels是一个Set,其中存放所有Sentinel的地址(格式:IP:PORT,如127.0.0.1:26379),顺序无关;
2.初始化连接池
在构造函数中,通过方法
private List<HostAndPort> initSentinels(Set<String> sentinels, final List<String> masters) 
取得当前所有分片的master地址(IP&PORT),对每个分片,通过顺次连接Sentinel实例,获取该分片的master地址,如果无法获得,即所有Sentinel都无法连接,将休眠1秒后继续重试,直到取得所有分片的master地址,代码块如下: 
[java]  view plain  copy
  1. for (String masterName : masters) {  
  2.             HostAndPort master = null;  
  3.             boolean fetched = false;  
  4.   
  5.             while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {  
  6.                 for (String sentinel : sentinels) {  
  7.                     final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));  
  8.   
  9.                     log.fine("Connecting to Sentinel " + hap);  
  10.   
  11.                     try {  
  12.                         Jedis jedis = new Jedis(hap.getHost(), hap.getPort());  
  13.                         master = masterMap.get(masterName);  
  14.                         if (master == null) {  
  15.                             List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);  
  16.                             if (hostAndPort != null && hostAndPort.size() > 0) {  
  17.                                 master = toHostAndPort(hostAndPort);  
  18.                                 log.fine("Found Redis master at " + master);  
  19.                                 shardMasters.add(master);  
  20.                                 masterMap.put(masterName, master);  
  21.                                 fetched = true;  
  22.                                 jedis.disconnect();  
  23.                                 break;  
  24.                             }  
  25.                         }  
  26.                     } catch (JedisConnectionException e) {  
  27.                         log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one.");  
  28.                     }  
  29.                 }  
  30.   
  31.                 if (null == master) {  
  32.                     try {  
  33.                         log.severe("All sentinels down, cannot determine where is " + masterName + " master is running... sleeping 1000ms, Will try again.");  
  34.                         Thread.sleep(1000);  
  35.                     } catch (InterruptedException e) {  
  36.                         e.printStackTrace();  
  37.                     }  
  38.                     fetched = false;  
  39.                     sentinelRetry++;  
  40.                 }  
  41.             }  
  42.   
  43.             // Try MAX_RETRY_SENTINEL times.  
  44.             if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {  
  45.                 log.severe("All sentinels down and try " + MAX_RETRY_SENTINEL + " times, Abort.");  
  46.                 throw new JedisConnectionException("Cannot connect all sentinels, Abort.");  
  47.             }  
  48.         }  
通过private void initPool(List<HostAndPort> masters)初始化连接池,到此连接池中的所有连接都指向分片的master。

3.监控每个Sentinel
在方法
public ShardedJedisSentinelPool(final GenericObjectPoolConfig poolConfig, List<String> masters, Set<String> sentinels)
最后,会为每个Sentinel启动一个Thread来监控Sentinel做出的更改: 
protected class MasterListener extends Thread
该线程的run方法通过Jedis Pub/Sub API(实现JedisPubSub接口,并通过jedis.subscribe进行订阅)向Sentinel实例订阅“+switch-master”频道,当Sentinel进行主从切换时,该线程会得到新Master地址的通知,通过master name判断哪个分片进行了切换,将新master地址替换原来位置的地址,并调用initPool(List masters)进行Jedis连接池重建;后续所有通过该连接池取得的连接都指向新Master地址,对应用程序透明。

二、应用实例

[java]  view plain  copy
  1. package cn.slimsmart.redis.demo.sentinel;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.HashSet;  
  5. import java.util.List;  
  6. import java.util.Set;  
  7.   
  8. import org.apache.commons.pool2.impl.GenericObjectPoolConfig;  
  9.   
  10. import redis.clients.jedis.ShardedJedis;  
  11.   
  12. public class ShardedJedisSentinelTest {  
  13.   
  14.     public static void main(String[] args) {  
  15.           
  16.         //连接池配置  
  17.         GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();  
  18.         //分片配置  
  19.         List<String> masters = new ArrayList<String>();  
  20.         masters.add("shard_a");  
  21.         masters.add("shard_b");  
  22.         //sentinel服务节点  
  23.         Set<String> sentinels = new HashSet<String>();  
  24.         sentinels.add("192.168.100.90:6000");  
  25.         sentinels.add("192.168.110.71:6000");  
  26.         //创建分片连接池  
  27.         ShardedJedisSentinelPool pool = new ShardedJedisSentinelPool(poolConfig, masters, sentinels);  
  28.           
  29.         ShardedJedis jedis = null;  
  30.         try {  
  31.             jedis = pool.getResource();  
  32.             jedis.set("key_sharded""abc");  
  33.             System.out.println(jedis.get("key_sharded"));  
  34.         } finally {  
  35.             if (jedis != null){  
  36.                 pool.returnResource(jedis);  
  37.             }  
  38.             pool.destroy();  
  39.         }  
  40.     }  
  41. }  
运行后,通过redis客户端查看:

redis 192.168.110.71:6379> get key_sharded
"abc"

redis 192.168.110.71:6380> get key_sharded
(nil)

可以看出key_sharded被添加到shard_a分片中。
集成spring配置参考:

[html]  view plain  copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"  
  4.     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"  
  5.     xmlns:context="http://www.springframework.org/schema/context"  
  6.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd   
  7.     http://www.springframework.org/schema/tx   
  8.     http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
  9.     http://www.springframework.org/schema/aop   
  10.     http://www.springframework.org/schema/aop/spring-aop-3.0.xsd   
  11.     http://www.springframework.org/schema/context   
  12.     http://www.springframework.org/schema/context/spring-context-3.0.xsd"  
  13.     default-lazy-init="false" default-autowire="byName">  
  14.   
  15.     <bean id="shardedJedisPool" class="cn.slimsmart.redis.demo.sentinel.ShardedJedisSentinelPool">  
  16.         <constructor-arg index="0" ref="jedisPoolConfig" />  
  17.         <constructor-arg index="1">  
  18.             <list>  
  19.                 <value>shard_a</value>  
  20.                 <value>shard_b</value>  
  21.             </list>  
  22.         </constructor-arg>  
  23.         <constructor-arg index="2">  
  24.             <set>  
  25.                 <value>192.168.100.90:6000</value>  
  26.                 <value>192.168.110.71:6000</value>  
  27.             </set>  
  28.         </constructor-arg>  
  29.     </bean>  
  30.   
  31.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
  32.         <property name="maxTotal" value="200" />  
  33.         <property name="maxIdle" value="100" />  
  34.         <property name="maxWaitMillis" value="5000" />  
  35.         <property name="testOnBorrow" value="true" />  
  36.     </bean>  
  37. </beans>  

参考文章:

1.基于Redis Sentinel的Redis集群(主从&Sharding)高可用方案

2.Redis主从复制和主从切换(spring 集成)

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值