读写分离组件Meods原理及实现


特点:

1、支持C3P0数据源的连接池功能;

2、支持当前线程事务的读写窗口一致;

3、支持主从复制数据时延情况下的智能切换;

4、从数据源层面实现读写分离,对上层调用透明;

5、配置简单

原理:

         读写分离:通过实现dataSorce的connection以及statement,当jdbc请求执行sql时会首先获取connection,如果是通过解析sql判断是查询还是更新来选择连接池的读写连接类型,同时需要结合主从复制检测的结果进行综合判断来实现读写连接分离。而读写的链接都是从读写的dataSorce中获取。

       读写窗口一致性:通过重写dataSource的connection,如果当前连接已经存在写连接请求就强制采用写连接;

       主从复制时延智能切换:通过启动单线程检测master与slave数据是否存在时延,来决策系统主从是否存在时延,如果存在时延强制系统本次执行主库查询。

实现原理图:


源码实现:

先定义一个读写库检测接口:

public interface DsDelayManager {
	/**
	 * 校验数据源是否存在主从复制时延
	 * @author flykinghg
	 * @date 2015-10-30
	 * @time 下午3:38:24
	 */
	public void checkDsDelay() throws SQLException;
	
	
	/**
	 * 判断主从复制时延标记
	 * @author flykinghg
	 * @date 2015-10-30
	 * @time 下午3:32:54
	 * @return
	 */
	public boolean isDelay();

}
基于此接口的一个检测类:

/**
 * 数据源读写master与slave之间复制时延管理
 * 负责定时检测master与slave之间更新的数据是否保持一致
 * 如果保持一致则认为是数据复制无时延,否则为有时延
 * @author flykinghg
 * @date 2015-10-30
 * @time 下午3:40:53
 */
public class MeoRwDsDelayManager implements DsDelayManager {
	
	private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
	
	private long checkDsDelayIntervalInMills;
	
	private long acceptDsDelayInMills;
    
    private boolean started;
    
    private boolean isDelay = false;
    
    private String schema = "";
    
	private final MeoAbstractDataSource rwDs;
	
	private static final String updateSql = "update %s set uuid=%s where id=1";
	
	private static final String querySql = "select uuid from %s";
	
	public MeoRwDsDelayManager(MeoAbstractDataSource rwDs,long checkDsDelayIntervalInMills,long acceptDsDelayInMills, String schema) throws SQLException{
		this.rwDs = rwDs;
		this.checkDsDelayIntervalInMills = checkDsDelayIntervalInMills;
		this.acceptDsDelayInMills = acceptDsDelayInMills;
		this.schema = schema;
		startCheckDsDelay();
	}
	
	/**
	 * 更新
	 * @author wenhui
	 * @date 2015-10-30
	 * @time 下午4:34:14
	 * @return
	 */
	protected boolean excute(String sql) throws SQLException{
		PreparedStatement createStatement = null;
        try {
        	createStatement = rwDs.getWriteDataSource().getConnection().prepareStatement(sql);
        	createStatement.executeUpdate();
        }finally {
            if (createStatement != null) {
            	createStatement.close();
            }
        }
		return true;
	}
	
	/**
	 * 查询
	 * @author wenhui
	 * @date 2015-10-31
	 * @time 下午4:44:02
	 * @param sql
	 * @return
	 * @throws SQLException
	 */
	protected String query(String sql) throws SQLException{
		PreparedStatement createStatement = null;
		String result= null;
        try {
        	createStatement = rwDs.getReadDataSource().getConnection().prepareStatement(sql);
        	ResultSet rs = createStatement.executeQuery();
        	result = rs.getString(1);
        	rs.close();
        	return result;
        }finally {
            if (createStatement != null) {
            	createStatement.close();
            }
        }
	}
	

	/**
	 * 启动数据源时延检查线程
	 * @author wenhui
	 * @date 2015-10-30
	 * @time 下午4:27:12
	 */
	protected void startCheckDsDelay() {
		if (this.started) {
            return;
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
            	try {
					MeoRwDsDelayManager.this.checkDsDelay();
				} catch (SQLException e) {
					e.printStackTrace();
				}

            }
        }, 5000, checkDsDelayIntervalInMills, TimeUnit.MILLISECONDS);
        this.started = true;
	}
	
	
	@Override
	public synchronized void checkDsDelay() throws SQLException {
		String uuid = UUID.randomUUID().toString();
		this.excute(String.format(updateSql, schema,uuid));
		try {
			Thread.sleep(acceptDsDelayInMills);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		String qUuid = this.query(String.format(querySql,schema));
		if(!uuid.equals(qUuid)){
			isDelay = true;
		}else{
			isDelay = false;
		}
	}

	@Override
	public boolean isDelay() {
		return isDelay;
	}

}
/**
 * 自定义数据源,支持数据复制时延对读写库的智能切换
 * 事务支持:当连接池存在写链接的时候必须采取写链接读写
 * 其它情况需要依据系统对复制时延的检测进行判断是否需要使用读从库
 * 如果时延过大就直接切换为写主库来进行读
 * @author flykinghg
 * @date 2015-10-30
 * @time 下午3:23:25
 */
public abstract class MeoAbstractDataSource implements DataSource {
	private static final String MYSQL_URL_PREFIX = "jdbc:mysql://";
	private Map<MeoDsType,DataSource> rwDsContaner = new ConcurrentHashMap<MeoDsType,DataSource>();
    private PrintWriter logWriter;
    private String[] readJdbcUrl; 
    private String[] writeJdbcUrl;
    private int loginTimeout;
    private Map<String, Object> properties = new HashMap<String, Object>();
    private boolean inited = false;
    private long checkDsDelayIntervalInMills = 1000L;
	private long acceptDsDelayInMills = 30L;
	private  MeoRwDsDelayManager mdsm;
	private String checkDsSchema = "";
	
    
    private synchronized void init() throws SQLException {
        if (inited) {
            return;
        }
        try {
            initDataSource(writeJdbcUrl, MeoDsType.WRITE);
            initDataSource(readJdbcUrl, MeoDsType.READ);
            mdsm = new MeoRwDsDelayManager(this, checkDsDelayIntervalInMills, acceptDsDelayInMills,checkDsSchema);
            inited = true;
        } catch (SQLException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initDataSource(String[] urls, MeoDsType type) throws SQLException, Exception {
        String dataSourceClass = getDataSourceClass();
        Class<?> clazz = Class.forName(dataSourceClass);
        BeanInfo beanInfo = Introspector.getBeanInfo(clazz);
        PropertyDescriptor[] pds = beanInfo.getPropertyDescriptors();

        for (int i = 0; i < urls.length; i++) {
            String url = urls[i];
            if(rwDsContaner!=null && rwDsContaner.containsKey(type)){
            	return;
            }else{
                DataSource myds = (DataSource) clazz.newInstance();
                for (PropertyDescriptor pd : pds) {
                    if (getJdbcUrlPropertyName().equals(pd.getName())) {
                        pd.getWriteMethod().invoke(myds, url);
                        continue;
                    }else if("description".equals(pd.getName())){
                        pd.getWriteMethod().invoke(myds, type.getKey());
                    }
                    Object param = properties.get(pd.getName());
                    if (param == null) {
                        continue;
                    }
                    pd.getWriteMethod().invoke(myds, param);
                }
                rwDsContaner.put(type, myds);
            }
        }
        return;
    }
    
    protected abstract String getDataSourceClass();

    protected abstract String getJdbcUrlPropertyName();
    
    public DataSource getReadDataSource() throws SQLException {
    	DataSource ds = rwDsContaner.get(MeoDsType.READ);
    	prepareDataSource(ds);
    	return ds;
    }
    
    public DataSource getWriteDataSource() throws SQLException {
    	DataSource ds =  rwDsContaner.get(MeoDsType.WRITE);
    	prepareDataSource(ds);
    	return ds;
    }
    public Connection getReadConnection(String username, String password) throws SQLException {
    	DataSource ds = this.getReadDataSource();
		if (username == null) {
            return ds.getConnection();
        } else {
            return ds.getConnection(username, password);
        }
    }
    
    public Connection getWriteConnection(String username, String password) throws SQLException {
    	DataSource ds = this.getWriteDataSource();
		if (username == null) {
            return ds.getConnection();
        } else {
            return ds.getConnection(username, password);
        }
    }
    
	@Override
	public PrintWriter getLogWriter() throws SQLException {
		return logWriter;
	}

	@Override
	public void setLogWriter(PrintWriter out) throws SQLException {
		this.logWriter = out;
	}

	@Override
	public void setLoginTimeout(int seconds) throws SQLException {
		this.loginTimeout = seconds;
	}

	@Override
	public int getLoginTimeout() throws SQLException {
		return loginTimeout;
	}

	@Override
	public Logger getParentLogger() throws SQLFeatureNotSupportedException {
		return null;
	}

	@Override
	public <T> T unwrap(Class<T> iface) throws SQLException {
		return null;
	}

	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		return false;
	}

	@Override
	public Connection getConnection() throws SQLException {
		init();
        return new MeoRwConnection(this);
	}

	@Override
	public Connection getConnection(String username, String password)
			throws SQLException {
		init();
        return new MeoRwConnection(this,username,password);
	}
	
	protected void setProperty(String name, Object property) {
        this.properties.put(name, property);
    }
	
	private void prepareDataSource(DataSource ds) throws SQLException {
        if (logWriter != null) {
            ds.setLogWriter(logWriter);
        }
        if (loginTimeout != 0) {
            ds.setLoginTimeout(0);
        }
    }

	public MeoRwDsDelayManager getMdsm() {
		return mdsm;
	}
	
	public void setReadJdbcUrl(String readJdbcUrl) {
        String[] hosts = parseUrl(readJdbcUrl);
        this.readJdbcUrl = hosts;
    }

    public void setWriteJdbcUrl(String writeJdbcUrl) {
        String[] hosts = parseUrl(writeJdbcUrl);
        this.writeJdbcUrl = hosts;
    }
    
    private static String[] parseUrl( String url ) {
         int beginningOfSlashes = url.indexOf("//");
         url = url.substring(beginningOfSlashes + 2);

         int slashColon = -1;
         slashColon = url.indexOf('/');
         String urlTail = url.substring(slashColon);
         
         String host_string = url.substring(0, slashColon);
         String[] hosts = host_string.split("\\,");
         
         String[] urls = new String[hosts.length];
         String defaultPort = String.valueOf(3306);
         for (int i = hosts.length - 1; i >= 0; i--) {
             String host = hosts[i];
             int colonIdx = host.indexOf(':');
             if(colonIdx<0){
                 host = host+":"+defaultPort;
             }else{
                 defaultPort = host.substring(colonIdx+1);
             }
             urls[i] = MYSQL_URL_PREFIX + host + urlTail;
         }
 		return urls;
	}

	public void setCheckDsDelayIntervalInMills(long checkDsDelayIntervalInMills) {
		this.checkDsDelayIntervalInMills = checkDsDelayIntervalInMills > 0?checkDsDelayIntervalInMills:this.checkDsDelayIntervalInMills;
	}

	public void setAcceptDsDelayInMills(long acceptDsDelayInMills) {
		this.acceptDsDelayInMills = acceptDsDelayInMills > 0?acceptDsDelayInMills:this.acceptDsDelayInMills;
	}

	public void setCheckDsSchema(String checkDsSchema) {
		this.checkDsSchema = checkDsSchema;
	}
	
}
一个基于C3P0的数据源实现,继承以上接口实现参数设置:

/**
 * 对接c3p0组件实现数据源的连接池功能
 * @author flykinghg
 * @date 2015-10-30
 * @time 下午3:25:02
 */
public class C3p0MeoRwDataSource extends MeoAbstractDataSource {

	public C3p0MeoRwDataSource() {
    }

    public void setMinPoolSize(int minPoolSize) {
        setProperty("minPoolSize", minPoolSize);
    }

    public void setMaxConnectionAge(int maxConnectionAge) {
        setProperty("maxConnectionAge", maxConnectionAge);
    }

    public void setAcquireIncrement(int acquireIncrement) {
        setProperty("acquireIncrement", acquireIncrement);
    }

    public void setAcquireRetryAttempts(int acquireRetryAttempts) {
        setProperty("acquireRetryAttempts", acquireRetryAttempts);
    }

    public void setAcquireRetryDelay(int acquireRetryDelay) {
        setProperty("acquireRetryDelay", acquireRetryDelay);
    }

    public void setAutoCommitOnClose(boolean autoCommitOnClose) {
        setProperty("autoCommitOnClose", autoCommitOnClose);
    }

    public void setAutomaticTestTable(String automaticTestTable) {
        setProperty("automaticTestTable", automaticTestTable);
    }

    public void setBreakAfterAcquireFailure(boolean breakAfterAcquireFailure) {
        setProperty("breakAfterAcquireFailure", breakAfterAcquireFailure);
    }

    public void setCheckoutTimeout(int checkoutTimeout) {
        setProperty("checkoutTimeout", checkoutTimeout);
    }

    public void setConnectionTesterClassName(String connectionTesterClassName) {
        setProperty("connectionTesterClassName", connectionTesterClassName);
    }

    public void setFactoryClassLocation(String factoryClassLocation) {
        setProperty("factoryClassLocation", factoryClassLocation);
    }

    public void setForceIgnoreUnresolvedTransactions(boolean forceIgnoreUnresolvedTransactions) {
        setProperty("forceIgnoreUnresolvedTransactions", forceIgnoreUnresolvedTransactions);
    }

    public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) {
        setProperty("idleConnectionTestPeriod", idleConnectionTestPeriod);
    }

    public void setInitialPoolSize(int initialPoolSize) {
        setProperty("initialPoolSize", initialPoolSize);
    }

    public void setMaxIdleTime(int maxIdleTime) {
        setProperty("maxIdleTime", maxIdleTime);
    }

    public void setMaxPoolSize(int maxPoolSize) {
        setProperty("maxPoolSize", maxPoolSize);
    }

    public void setMaxStatements(int maxStatements) {
        setProperty("maxStatements", maxStatements);
    }

    public void setMaxStatementsPerConnection(int maxStatementsPerConnection) {
        setProperty("maxStatementsPerConnection", maxStatementsPerConnection);
    }

    public void setNumHelperThreads(int numHelperThreads) {
        setProperty("numHelperThreads", numHelperThreads);
    }

    public void setOverrideDefaultUser(String overrideDefaultUser) {
        setProperty("overrideDefaultUser", overrideDefaultUser);
    }

    public void setOverrideDefaultPassword(String overrideDefaultPassword) {
        setProperty("overrideDefaultPassword", overrideDefaultPassword);
    }

    public void setPreferredTestQuery(String preferredTestQuery) {
        setProperty("preferredTestQuery", preferredTestQuery);
    }

    public void setPropertyCycle(int propertyCycle) {
        setProperty("propertyCycle", propertyCycle);
    }

    public void setTestConnectionOnCheckout(boolean testConnectionOnCheckout) {
        setProperty("testConnectionOnCheckout", testConnectionOnCheckout);
    }

    public void setTestConnectionOnCheckin(boolean testConnectionOnCheckin) {
        setProperty("testConnectionOnCheckin", testConnectionOnCheckin);
    }


    /**
     * @see com.meizu.jdbc.MDataSource#getDataSourceClass()
     */
    @Override
    protected String getDataSourceClass() {
        return "com.mchange.v2.c3p0.ComboPooledDataSource";
    }

    /**
     * 
     */
    public void setDriverClass(String driver) {
        setProperty("driverClass", driver);
    }

    /**
     * @param string
     */
    public void setUser(String user) {
        setProperty("user", user);
    }

    /**
     * @param string
     */
    public void setPassword(String password) {
        setProperty("password", password);
    }

    /** 
     * @see com.meizu.jdbc.MDataSource#getJdbcUrlPropertyName()
     */
    @Override
    protected String getJdbcUrlPropertyName() {
        return "jdbcUrl";
    }

	@Override
	public Logger getParentLogger() throws SQLFeatureNotSupportedException {
		return null;
	}

}





评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值