mysql驱动协议之loadbalance和replication

Owen Jia 2018年12月29日 2,067次浏览

背景

偶然下和朋友聊到了mysql多节点集群架场景,对应我们各系代码如何去使用它,牵扯到mysql的驱动包中已经支持的策略。发现这块的概念有些模糊,索性就梳理了一番留着后用。重点是:replication\loadbalance,以mysql-connector-java:5.1.38为例展开。

官方java驱动文档:Multi-Host Connections

java connector

Mysql的multi-connection策略,官方叫server failover。replication和loadbalance协议都是建立在failover基础上演变而来对应mysql不同的部署架构:Cluster架构和Replication架构,这两个不再展开。

jdbc:mysql://[primary host][:port],[secondary host 1][:port][,[secondary host 2][:port]]...[/[database]]»
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

官方URL格式:primary host代表主库首选连接,secondary host代表从库,依次排列尝试连接同时支持很多配置属性(详细去看文档):

  • failOverReadOnly
  • secondsBeforeRetryMaster
  • queriesBeforeRetryMaster
  • retriesAllDown
  • autoReconnect
  • autoReconnectForPools

在驱动包中类NonRegisteringDriver.java定义了mysql支持的多种连接协议,可以看出有四种方式。mxj是指mysql会根据不同的系统平台切换不同的mysqld binary执行,支持macos、linux、window、Solaris等等,replicaiton和loadbalance后面会介绍。

    private static final String REPLICATION_URL_PREFIX = "jdbc:mysql:replication://";
    private static final String URL_PREFIX = "jdbc:mysql://";
    private static final String MXJ_URL_PREFIX = "jdbc:mysql:mxj://";
    public static final String LOADBALANCE_URL_PREFIX = "jdbc:mysql:loadbalance://";

我们根据Mysql支持的驱动类型也可以看出一共三个实现类,常规使用的com.mysql.jdbc.Driver,主从库模式下采用com.mysql.jdbc.ReplicationDriver,还有种为兼容 MM.MySQL实现的org.djt.mm.mysql.Driver。

dirver-architecture

触发的场景条件

当以下三个场景中会出发loadbalance策略执行:

  1. 一个事务提交或者回滚时;
  2. 以08开头的sql state 通信异常发生时;
  3. 自定义模式匹配条件时;一个SQLException匹配到mysql支持的loadBalanceSQLStateFailover,loadBalanceSQLExceptionSubclassFailover ,loadBalanceExceptionChecker 三个方案自定义的conditions时;

协议:replication

jdbc.diver= com.mysql.jdbc.ReplicationDriver
replication\://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=falsejdbc.url= jdbc:mysql:

具体格式类似failover,比较大的变化是第一个host为master库是write/read模式,后面都是slave库是read模式,也是支持参数进行配置官网有详细介绍。

jdbc\:mysql\:replication\://\[master host]\[:port],\[slave host 1]\[:port]\[,\[slave host 2][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

replication协议是建立在failover和loadbalance基础上,适应Replication架构需要为解决读写分离、负载均衡场景的。在事务read only模式下请求会被转向到slave host,若果多个slave情况下采用round-robin(轮询)策略。对于非read only请求(write/read)都将转向到master host。5.1.27后版本支持多个master,多个master下采用load balance策略,具体参考loadbalance协议介绍。5.1.28版本后又支持动态添加节点,也就是程序运行是动态添加新的host到URL中而不需要重启服务器,我们经常会聊的动态数据源场景。

Read only这里是与数据库建立的connection属性,如Connection.setReadOnly(false)、Connection.setReadOnly(true)。


协议:loadbalance

jdbc.dirver= com.mysql.jdbc.Driver
loadbalance\://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false&loadBalanceStrategy=bestResponseTimejdbc.url= jdbc:mysql:

jdbc:mysql:loadbalance://[host1][:port],[host2][:port][,[host3][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]

格式同failover\replication类似,所有host没有主次之分都是平级,也支持参数控制:

  • loadBalanceConnectionGroup:自定义分组名,可以通过com.mysql.jdbc.jmx.LoadBalanceConnectionGroupManagerMBean类的方法控制,如:查看激活host数量、移除host、添加host等等操作。
  • loadBalanceEnableJMX:启动jmx监控,通过程序启动添加jvm参数 -Dcom.sun.management.jmxremote,通过jvm工具jconsole等可以查看。

BalanceStrategy.java

public interface BalanceStrategy extends Extension {

public abstract ConnectionImpl pickConnection(LoadBalancedConnectionProxy proxy, List<String> configuredHosts, Map<String, ConnectionImpl> liveConnections,long[] responseTimes, int numRetries) throws SQLException;
}

loadbalance负载均衡策略定义了BalanceStrategy接口,mysql支持已经实现该接口的策略有:BestResponseTimeBalanceStrategy、RandomBalanceStrategy(默认)、SequentialBalanceStrategy。

通过源码 Driver -> NonRegisteringDriver.connect(xx) -> NonRegisteringDriver.connectLoadBalanced() -> LoadBalancedConnectionProxy.createProxyInstance() 可以看到默认采用策略是 RandomBalanceStrategy。

  • RandomBalanceStrategy:随机选中一个host。
  • BestResponseTimeBalanceStrategy:选中事务响应最快的host。
  • SequentialBalanceStrategy:第一次随机之后顺序选后一个至循环往复。

RandomBalanceStrategy.java

int random = (int) Math.floor((Math.random() * whiteList.size()));
String hostPortSpec = whiteList.get(random);
ConnectionImpl conn = liveConnections.get(hostPortSpec);

BestResponseTimeBalanceStrategy.pickConnection(..)

for (int i = 0; i < responseTimes.length; i++) {
    long candidateResponseTime = responseTimes[i];
    if (candidateResponseTime < minResponseTime && !blackList.containsKey(configuredHosts.get(i))) {
        if (candidateResponseTime == 0) {
            bestHostIndex = i;
            break;
        }
        bestHostIndex = i;
        minResponseTime = candidateResponseTime;
    }
}

SequentialBalanceStrategy.pickConnection(..)

if (numHosts == 1) {
    this.currentHostIndex = 0; // pathological case
} else if (this.currentHostIndex == -1) {
    int random = (int) Math.floor((Math.random() * numHosts));

    for (int i = random; i < numHosts; i++) {
        if (!blackList.containsKey(configuredHosts.get(i))) {
            this.currentHostIndex = i;
            break;
        }
    }

    if (this.currentHostIndex == -1) {
        for (int i = 0; i < random; i++) {
            if (!blackList.containsKey(configuredHosts.get(i))) {
                this.currentHostIndex = i;
                break;
            }
        }
    }

    //...
} else {

    int i = this.currentHostIndex + 1;
    boolean foundGoodHost = false;

    for (; i < numHosts; i++) {
        if (!blackList.containsKey(configuredHosts.get(i))) {
            this.currentHostIndex = i;
            foundGoodHost = true;
            break;
        }
    }

    if (!foundGoodHost) {
        for (i = 0; i < this.currentHostIndex; i++) {
            if (!blackList.containsKey(configuredHosts.get(i))) {
                this.currentHostIndex = i;
                foundGoodHost = true;
                break;
            }
        }
    }
    //...
}

NonRegisteringDriver.java

public class NonRegisteringDriver implements java.sql.Driver {
	if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) {
		return connectLoadBalanced(url, info);
	} else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) {
		return connectReplicationConnection(url, info);
	}
}

LoadBalancedConnectionProxy.java

public class LoadBalancedConnectionProxy extends MultiHostConnectionProxy implements PingTarget {
/**
     * Creates a proxy for java.sql.Connection that routes requests between the given list of host:port and uses the given properties when creating connections.
     * 
     * @param hosts
     *            The list of the hosts to load balance.
     * @param props
     *            Connection properties from where to get initial settings and to be used in new connections.
     * @throws SQLException
     */
    private LoadBalancedConnectionProxy(List<String> hosts, Properties props) throws SQLException {
        super();

        String group = props.getProperty("loadBalanceConnectionGroup", null);
        boolean enableJMX = false;
        String enableJMXAsString = props.getProperty("loadBalanceEnableJMX", "false");
        try {
            enableJMX = Boolean.parseBoolean(enableJMXAsString);
        } catch (Exception e) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceEnableJMX", new Object[] { enableJMXAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        if (group != null) {
            this.connectionGroup = ConnectionGroupManager.getConnectionGroupInstance(group);
            if (enableJMX) {
                ConnectionGroupManager.registerJmx();
            }
            this.connectionGroupProxyID = this.connectionGroup.registerConnectionProxy(this, hosts);
            hosts = new ArrayList<String>(this.connectionGroup.getInitialHosts());
        }

        // hosts specifications may have been reset with settings from a previous connection group
        int numHosts = initializeHostsSpecs(hosts, props);

        this.liveConnections = new HashMap<String, ConnectionImpl>(numHosts);
        this.hostsToListIndexMap = new HashMap<String, Integer>(numHosts);
        for (int i = 0; i < numHosts; i++) {
            this.hostsToListIndexMap.put(this.hostList.get(i), i);
        }
        this.connectionsToHostsMap = new HashMap<ConnectionImpl, String>(numHosts);
        this.responseTimes = new long[numHosts];

        String retriesAllDownAsString = this.localProps.getProperty("retriesAllDown", "120");
        try {
            this.retriesAllDown = Integer.parseInt(retriesAllDownAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String blacklistTimeoutAsString = this.localProps.getProperty(BLACKLIST_TIMEOUT_PROPERTY_KEY, "0");
        try {
            this.globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(
                    Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }),
                    SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String strategy = this.localProps.getProperty("loadBalanceStrategy", "random");
        if ("random".equals(strategy)) {
            this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null)
                    .get(0);
        } else if ("bestResponseTime".equals(strategy)) {
            this.balancer = (BalanceStrategy) Util
                    .loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
        } else {
            this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0);
        }

        String autoCommitSwapThresholdAsString = props.getProperty("loadBalanceAutoCommitStatementThreshold", "0");
        try {
            this.autoCommitSwapThreshold = Integer.parseInt(autoCommitSwapThresholdAsString);
        } catch (NumberFormatException nfe) {
            throw SQLError.createSQLException(Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementThreshold",
                    new Object[] { autoCommitSwapThresholdAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
        }

        String autoCommitSwapRegex = props.getProperty("loadBalanceAutoCommitStatementRegex", "");
        if (!("".equals(autoCommitSwapRegex))) {
            try {
                "".matches(autoCommitSwapRegex);
            } catch (Exception e) {
                throw SQLError.createSQLException(
                        Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementRegex", new Object[] { autoCommitSwapRegex }),
                        SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
            }
        }

        if (this.autoCommitSwapThreshold > 0) {
            String statementInterceptors = this.localProps.getProperty("statementInterceptors");
            if (statementInterceptors == null) {
                this.localProps.setProperty("statementInterceptors", "com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
            } else if (statementInterceptors.length() > 0) {
                this.localProps.setProperty("statementInterceptors", statementInterceptors + ",com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
            }
            props.setProperty("statementInterceptors", this.localProps.getProperty("statementInterceptors"));

        }

        this.balancer.init(null, props);

        String lbExceptionChecker = this.localProps.getProperty("loadBalanceExceptionChecker", "com.mysql.jdbc.StandardLoadBalanceExceptionChecker");
        this.exceptionChecker = (LoadBalanceExceptionChecker) Util.loadExtensions(null, props, lbExceptionChecker, "InvalidLoadBalanceExceptionChecker", null)
                .get(0);

        pickNewConnection();
    }
}

作者:Owen Jia,推荐关注他的博客:Owen Blog