背景
偶然下和朋友聊到了mysql多节点集群架场景,对应我们各系代码如何去使用它,牵扯到mysql的驱动包中已经支持的策略。发现这块的概念有些模糊,索性就梳理了一番留着后用。重点是:replication\loadbalance,以mysql-connector-java:5.1.38为例展开。
官方java驱动文档:Multi-Host Connections
java connector
Mysql的multi-connection策略,官方叫server failover。replication和loadbalance协议都是建立在failover基础上演变而来对应mysql不同的部署架构:Cluster架构和Replication架构,这两个不再展开。
jdbc:mysql://[primary host][:port],[secondary host 1][:port][,[secondary host 2][:port]]...[/[database]]»
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
官方URL格式:primary host代表主库首选连接,secondary host代表从库,依次排列尝试连接同时支持很多配置属性(详细去看文档):
- failOverReadOnly
- secondsBeforeRetryMaster
- queriesBeforeRetryMaster
- retriesAllDown
- autoReconnect
- autoReconnectForPools
在驱动包中类NonRegisteringDriver.java定义了mysql支持的多种连接协议,可以看出有四种方式。mxj是指mysql会根据不同的系统平台切换不同的mysqld binary执行,支持macos、linux、window、Solaris等等,replicaiton和loadbalance后面会介绍。
private static final String REPLICATION_URL_PREFIX = "jdbc:mysql:replication://";
private static final String URL_PREFIX = "jdbc:mysql://";
private static final String MXJ_URL_PREFIX = "jdbc:mysql:mxj://";
public static final String LOADBALANCE_URL_PREFIX = "jdbc:mysql:loadbalance://";
我们根据Mysql支持的驱动类型也可以看出一共三个实现类,常规使用的com.mysql.jdbc.Driver,主从库模式下采用com.mysql.jdbc.ReplicationDriver,还有种为兼容 MM.MySQL实现的org.djt.mm.mysql.Driver。
触发的场景条件
当以下三个场景中会出发loadbalance策略执行:
- 一个事务提交或者回滚时;
- 以08开头的sql state 通信异常发生时;
- 自定义模式匹配条件时;一个SQLException匹配到mysql支持的loadBalanceSQLStateFailover,loadBalanceSQLExceptionSubclassFailover ,loadBalanceExceptionChecker 三个方案自定义的conditions时;
协议:replication
jdbc.diver= com.mysql.jdbc.ReplicationDriver
replication\://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=falsejdbc.url= jdbc:mysql:
具体格式类似failover,比较大的变化是第一个host为master库是write/read模式,后面都是slave库是read模式,也是支持参数进行配置官网有详细介绍。
jdbc\:mysql\:replication\://\[master host]\[:port],\[slave host 1]\[:port]\[,\[slave host 2][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
replication协议是建立在failover和loadbalance基础上,适应Replication架构需要为解决读写分离、负载均衡场景的。在事务read only模式下请求会被转向到slave host,若果多个slave情况下采用round-robin(轮询)策略。对于非read only请求(write/read)都将转向到master host。5.1.27后版本支持多个master,多个master下采用load balance策略,具体参考loadbalance协议介绍。5.1.28版本后又支持动态添加节点,也就是程序运行是动态添加新的host到URL中而不需要重启服务器,我们经常会聊的动态数据源场景。
Read only这里是与数据库建立的connection属性,如Connection.setReadOnly(false)、Connection.setReadOnly(true)。
协议:loadbalance
jdbc.dirver= com.mysql.jdbc.Driver
loadbalance\://127.0.0.1:3306,11.12.3.44:3306/mydb?autoReconnect=false&loadBalanceStrategy=bestResponseTimejdbc.url= jdbc:mysql:
jdbc:mysql:loadbalance://[host1][:port],[host2][:port][,[host3][:port]]...[/[database]] »
[?propertyName1=propertyValue1[&propertyName2=propertyValue2]...]
格式同failover\replication类似,所有host没有主次之分都是平级,也支持参数控制:
- loadBalanceConnectionGroup:自定义分组名,可以通过com.mysql.jdbc.jmx.LoadBalanceConnectionGroupManagerMBean类的方法控制,如:查看激活host数量、移除host、添加host等等操作。
- loadBalanceEnableJMX:启动jmx监控,通过程序启动添加jvm参数 -Dcom.sun.management.jmxremote,通过jvm工具jconsole等可以查看。
BalanceStrategy.java
public interface BalanceStrategy extends Extension {
public abstract ConnectionImpl pickConnection(LoadBalancedConnectionProxy proxy, List<String> configuredHosts, Map<String, ConnectionImpl> liveConnections,long[] responseTimes, int numRetries) throws SQLException;
}
loadbalance负载均衡策略定义了BalanceStrategy接口,mysql支持已经实现该接口的策略有:BestResponseTimeBalanceStrategy、RandomBalanceStrategy(默认)、SequentialBalanceStrategy。
通过源码 Driver -> NonRegisteringDriver.connect(xx) -> NonRegisteringDriver.connectLoadBalanced() -> LoadBalancedConnectionProxy.createProxyInstance() 可以看到默认采用策略是 RandomBalanceStrategy。
- RandomBalanceStrategy:随机选中一个host。
- BestResponseTimeBalanceStrategy:选中事务响应最快的host。
- SequentialBalanceStrategy:第一次随机之后顺序选后一个至循环往复。
RandomBalanceStrategy.java
int random = (int) Math.floor((Math.random() * whiteList.size()));
String hostPortSpec = whiteList.get(random);
ConnectionImpl conn = liveConnections.get(hostPortSpec);
BestResponseTimeBalanceStrategy.pickConnection(..)
for (int i = 0; i < responseTimes.length; i++) {
long candidateResponseTime = responseTimes[i];
if (candidateResponseTime < minResponseTime && !blackList.containsKey(configuredHosts.get(i))) {
if (candidateResponseTime == 0) {
bestHostIndex = i;
break;
}
bestHostIndex = i;
minResponseTime = candidateResponseTime;
}
}
SequentialBalanceStrategy.pickConnection(..)
if (numHosts == 1) {
this.currentHostIndex = 0; // pathological case
} else if (this.currentHostIndex == -1) {
int random = (int) Math.floor((Math.random() * numHosts));
for (int i = random; i < numHosts; i++) {
if (!blackList.containsKey(configuredHosts.get(i))) {
this.currentHostIndex = i;
break;
}
}
if (this.currentHostIndex == -1) {
for (int i = 0; i < random; i++) {
if (!blackList.containsKey(configuredHosts.get(i))) {
this.currentHostIndex = i;
break;
}
}
}
//...
} else {
int i = this.currentHostIndex + 1;
boolean foundGoodHost = false;
for (; i < numHosts; i++) {
if (!blackList.containsKey(configuredHosts.get(i))) {
this.currentHostIndex = i;
foundGoodHost = true;
break;
}
}
if (!foundGoodHost) {
for (i = 0; i < this.currentHostIndex; i++) {
if (!blackList.containsKey(configuredHosts.get(i))) {
this.currentHostIndex = i;
foundGoodHost = true;
break;
}
}
}
//...
}
NonRegisteringDriver.java
public class NonRegisteringDriver implements java.sql.Driver {
if (StringUtils.startsWithIgnoreCase(url, LOADBALANCE_URL_PREFIX)) {
return connectLoadBalanced(url, info);
} else if (StringUtils.startsWithIgnoreCase(url, REPLICATION_URL_PREFIX)) {
return connectReplicationConnection(url, info);
}
}
LoadBalancedConnectionProxy.java
public class LoadBalancedConnectionProxy extends MultiHostConnectionProxy implements PingTarget {
/**
* Creates a proxy for java.sql.Connection that routes requests between the given list of host:port and uses the given properties when creating connections.
*
* @param hosts
* The list of the hosts to load balance.
* @param props
* Connection properties from where to get initial settings and to be used in new connections.
* @throws SQLException
*/
private LoadBalancedConnectionProxy(List<String> hosts, Properties props) throws SQLException {
super();
String group = props.getProperty("loadBalanceConnectionGroup", null);
boolean enableJMX = false;
String enableJMXAsString = props.getProperty("loadBalanceEnableJMX", "false");
try {
enableJMX = Boolean.parseBoolean(enableJMXAsString);
} catch (Exception e) {
throw SQLError.createSQLException(
Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceEnableJMX", new Object[] { enableJMXAsString }),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
}
if (group != null) {
this.connectionGroup = ConnectionGroupManager.getConnectionGroupInstance(group);
if (enableJMX) {
ConnectionGroupManager.registerJmx();
}
this.connectionGroupProxyID = this.connectionGroup.registerConnectionProxy(this, hosts);
hosts = new ArrayList<String>(this.connectionGroup.getInitialHosts());
}
// hosts specifications may have been reset with settings from a previous connection group
int numHosts = initializeHostsSpecs(hosts, props);
this.liveConnections = new HashMap<String, ConnectionImpl>(numHosts);
this.hostsToListIndexMap = new HashMap<String, Integer>(numHosts);
for (int i = 0; i < numHosts; i++) {
this.hostsToListIndexMap.put(this.hostList.get(i), i);
}
this.connectionsToHostsMap = new HashMap<ConnectionImpl, String>(numHosts);
this.responseTimes = new long[numHosts];
String retriesAllDownAsString = this.localProps.getProperty("retriesAllDown", "120");
try {
this.retriesAllDown = Integer.parseInt(retriesAllDownAsString);
} catch (NumberFormatException nfe) {
throw SQLError.createSQLException(
Messages.getString("LoadBalancedConnectionProxy.badValueForRetriesAllDown", new Object[] { retriesAllDownAsString }),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
}
String blacklistTimeoutAsString = this.localProps.getProperty(BLACKLIST_TIMEOUT_PROPERTY_KEY, "0");
try {
this.globalBlacklistTimeout = Integer.parseInt(blacklistTimeoutAsString);
} catch (NumberFormatException nfe) {
throw SQLError.createSQLException(
Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceBlacklistTimeout", new Object[] { retriesAllDownAsString }),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
}
String strategy = this.localProps.getProperty("loadBalanceStrategy", "random");
if ("random".equals(strategy)) {
this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, "com.mysql.jdbc.RandomBalanceStrategy", "InvalidLoadBalanceStrategy", null)
.get(0);
} else if ("bestResponseTime".equals(strategy)) {
this.balancer = (BalanceStrategy) Util
.loadExtensions(null, props, "com.mysql.jdbc.BestResponseTimeBalanceStrategy", "InvalidLoadBalanceStrategy", null).get(0);
} else {
this.balancer = (BalanceStrategy) Util.loadExtensions(null, props, strategy, "InvalidLoadBalanceStrategy", null).get(0);
}
String autoCommitSwapThresholdAsString = props.getProperty("loadBalanceAutoCommitStatementThreshold", "0");
try {
this.autoCommitSwapThreshold = Integer.parseInt(autoCommitSwapThresholdAsString);
} catch (NumberFormatException nfe) {
throw SQLError.createSQLException(Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementThreshold",
new Object[] { autoCommitSwapThresholdAsString }), SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
}
String autoCommitSwapRegex = props.getProperty("loadBalanceAutoCommitStatementRegex", "");
if (!("".equals(autoCommitSwapRegex))) {
try {
"".matches(autoCommitSwapRegex);
} catch (Exception e) {
throw SQLError.createSQLException(
Messages.getString("LoadBalancedConnectionProxy.badValueForLoadBalanceAutoCommitStatementRegex", new Object[] { autoCommitSwapRegex }),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT, null);
}
}
if (this.autoCommitSwapThreshold > 0) {
String statementInterceptors = this.localProps.getProperty("statementInterceptors");
if (statementInterceptors == null) {
this.localProps.setProperty("statementInterceptors", "com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
} else if (statementInterceptors.length() > 0) {
this.localProps.setProperty("statementInterceptors", statementInterceptors + ",com.mysql.jdbc.LoadBalancedAutoCommitInterceptor");
}
props.setProperty("statementInterceptors", this.localProps.getProperty("statementInterceptors"));
}
this.balancer.init(null, props);
String lbExceptionChecker = this.localProps.getProperty("loadBalanceExceptionChecker", "com.mysql.jdbc.StandardLoadBalanceExceptionChecker");
this.exceptionChecker = (LoadBalanceExceptionChecker) Util.loadExtensions(null, props, lbExceptionChecker, "InvalidLoadBalanceExceptionChecker", null)
.get(0);
pickNewConnection();
}
}
作者:Owen Jia,推荐关注他的博客:Owen Blog。