还记得JDBC相关的知识点:JDBC的api本质上就是四级对象:DataSource、Connection、Statement、ResultSet
DataSource要能获取Connection,Connection要能获取Statement,Statement要能执行sql返回ResultSet
任何提供的JDBC组件都遵循这四级对象,当然ShardingJdbc也不例外
ShardingJdbc最核心的概念是逻辑连接和物理连接:
功能
逻辑连接是一个虚拟的连接,是一个协调器接口,是对所有库、表的抽象
物理连接是真正连接到数据源的连接
数量
逻辑连接只有一个
物理连接根据数据源数量决定
事务
逻辑连接开启事务,实际上是开启一个事务上下文;逻辑连接回滚事务,实际上会控制所有物理连接回滚所有分片的数据
物理连接开启事务,才会真正开启对应数据库的事务;物理连接回滚事务,实际上是回滚对应库的数据
获取时机
逻辑连接就像是传统的JDBCDatasource,在代码层面由开发人员构建
物理连接是延迟构建的,是在ShardingJdbc执行sql时才会真正建立
ShardingDataSource
从四级对象的最顶级Datasource开始看,对应是ShardingDataSource
其中核心属性只有一个:
private final ShardingRuntimeContext runtimeContext;即Sharding上下文
构造函数
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}想要构造一个ShardingDataSource,需要三个部分:
dataSourceMap:DataSource与其逻辑名
ShardingRule:分库规则
props:数据库参数
这里根据ShardingRule构造ShardingRuntimeContext
其中,上下文中持有的包括:
// 存储分库信息和分库规则配置
private final DatabaseMetaData cachedDatabaseMetaData;
// 存储分库元数据信息,包括分库信息、分库规则、数据库类型
private final ShardingSphereMetaData metaData;
// 分库引擎
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;getConnection
ShardingDataSource应该要实现的核心方法
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}可见ShardingConnection确实是个逻辑概念,将DataSourceMap映射封装进去,同时把上下文封装进去
而后面的TransactionTypeHolder.get() 实际上是持有的ThreadLocal封装的事务类型
public final class TransactionTypeHolder {
private static final ThreadLocal<TransactionType> CONTEXT = new ThreadLocal<TransactionType>() {
@Override
protected TransactionType initialValue() {
return TransactionType.LOCAL;
}
};
public static TransactionType get() {
return CONTEXT.get();
}事务类型初始化为LOCAL
ShardingConnection
Connection首先要具备获取statement的能力
prepareStatement
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingPreparedStatement(this, sql);
}可见ShardingPreparedStatement也是一个逻辑概念,到这里还没有拿具体的物理Connection
setAutoCommit
另一个比较重要的事务方法
public void setAutoCommit(final boolean autoCommit) throws SQLException {
if (TransactionType.LOCAL == transactionType) {
super.setAutoCommit(autoCommit);
return;
}
if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {
return;
}
if (autoCommit && shardingTransactionManager.isInTransaction()) {
shardingTransactionManager.commit();
return;
}
if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
closeCachedConnections();
shardingTransactionManager.begin();
}
}先看LOCAL类型,执行super.setAutoCommit(autoCommit)
跟进到AbstractConnectionAdapter#setAutoCommitForLocalTransaction
private void setAutoCommitForLocalTransaction(final boolean autoCommit) throws SQLException {
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {
@Override
public void execute(final Connection connection) throws SQLException {
connection.setAutoCommit(autoCommit);
}
});
}到这里有一个回调,可以推断出,是实际执行sql的时候通过这里的回调去设置实际物理connection的事务属性
ShardingPrepareStatement
PrepareStatement具备执行sql返回ResultSet的能力
构造函数
private ShardingPreparedStatement(
final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
}可见逻辑PS封装了Connection,同时持有shardingEngine、shardingContext
同时构造了preparedStatementExecutor和batchPreparedStatementExecutor,这两个是执行器
其真正执行sql在executeQuery方法,即JDBC执行sql的底层方法
executeQuery
是执行的整体框架,顺序完成以下工作:
执行分库
初始化sql执行器
执行sql并获取结果合并器
合并结果
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
shard();
initPreparedStatementExecutor();
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(),
connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());
result = getResultSet(mergeEngine);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}首先在shard()方法中做分库,分库结果存放在sqlRouteResult
private void shard() {
sqlRouteResult = shardingEngine.shard(sql, getParameters());
}下面执行preparedStatementExecutor.executeQuery() ,结果也存入mergeEngine
然后把sqlRouteResult一起传入mergeEngine,执行getResultSet方法
评论区