目 录CONTENT

文章目录

ShardingJdbc

FatFish1
2025-08-20 / 0 评论 / 0 点赞 / 22 阅读 / 0 字 / 正在检测是否收录...

还记得JDBC相关的知识点:JDBC的api本质上就是四级对象:DataSource、Connection、Statement、ResultSet

DataSource要能获取Connection,Connection要能获取Statement,Statement要能执行sql返回ResultSet

任何提供的JDBC组件都遵循这四级对象,当然ShardingJdbc也不例外

ShardingJdbc最核心的概念是逻辑连接和物理连接:

  • 功能

    • 逻辑连接是一个虚拟的连接,是一个协调器接口,是对所有库、表的抽象

    • 物理连接是真正连接到数据源的连接

  • 数量

    • 逻辑连接只有一个

    • 物理连接根据数据源数量决定

  • 事务

    • 逻辑连接开启事务,实际上是开启一个事务上下文;逻辑连接回滚事务,实际上会控制所有物理连接回滚所有分片的数据

    • 物理连接开启事务,才会真正开启对应数据库的事务;物理连接回滚事务,实际上是回滚对应库的数据

  • 获取时机

    • 逻辑连接就像是传统的JDBCDatasource,在代码层面由开发人员构建

    • 物理连接是延迟构建的,是在ShardingJdbc执行sql时才会真正建立

ShardingDataSource

从四级对象的最顶级Datasource开始看,对应是ShardingDataSource

其中核心属性只有一个:

private final ShardingRuntimeContext runtimeContext;

即Sharding上下文

构造函数

public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
    super(dataSourceMap);
    checkDataSourceType(dataSourceMap);
    runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}

想要构造一个ShardingDataSource,需要三个部分:

  • dataSourceMap:DataSource与其逻辑名

  • ShardingRule:分库规则

  • props:数据库参数

这里根据ShardingRule构造ShardingRuntimeContext

其中,上下文中持有的包括:

// 存储分库信息和分库规则配置
private final DatabaseMetaData cachedDatabaseMetaData;
// 存储分库元数据信息,包括分库信息、分库规则、数据库类型
private final ShardingSphereMetaData metaData;
// 分库引擎
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;

getConnection

ShardingDataSource应该要实现的核心方法

public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}

可见ShardingConnection确实是个逻辑概念,将DataSourceMap映射封装进去,同时把上下文封装进去

而后面的TransactionTypeHolder.get() 实际上是持有的ThreadLocal封装的事务类型

public final class TransactionTypeHolder {
    
    private static final ThreadLocal<TransactionType> CONTEXT = new ThreadLocal<TransactionType>() {

        @Override
        protected TransactionType initialValue() {
            return TransactionType.LOCAL;
        }
    };

    public static TransactionType get() {
        return CONTEXT.get();
    }

事务类型初始化为LOCAL

ShardingConnection

Connection首先要具备获取statement的能力

prepareStatement

public PreparedStatement prepareStatement(final String sql) throws SQLException {
    return new ShardingPreparedStatement(this, sql);
}

可见ShardingPreparedStatement也是一个逻辑概念,到这里还没有拿具体的物理Connection

setAutoCommit

另一个比较重要的事务方法

public void setAutoCommit(final boolean autoCommit) throws SQLException {
    if (TransactionType.LOCAL == transactionType) {
        super.setAutoCommit(autoCommit);
        return;
    }
    if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {
        return;
    }
    if (autoCommit && shardingTransactionManager.isInTransaction()) {
        shardingTransactionManager.commit();
        return;
    }
    if (!autoCommit && !shardingTransactionManager.isInTransaction()) {
        closeCachedConnections();
        shardingTransactionManager.begin();
    }
}

先看LOCAL类型,执行super.setAutoCommit(autoCommit)

跟进到AbstractConnectionAdapter#setAutoCommitForLocalTransaction

private void setAutoCommitForLocalTransaction(final boolean autoCommit) throws SQLException {
    recordMethodInvocation(Connection.class, "setAutoCommit", new Class[]{boolean.class}, new Object[]{autoCommit});
    forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {
            
        @Override
        public void execute(final Connection connection) throws SQLException {
            connection.setAutoCommit(autoCommit);
        }
    });
}

到这里有一个回调,可以推断出,是实际执行sql的时候通过这里的回调去设置实际物理connection的事务属性

ShardingPrepareStatement

PrepareStatement具备执行sql返回ResultSet的能力

构造函数

private ShardingPreparedStatement(
        final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys)
        throws SQLException {
    if (Strings.isNullOrEmpty(sql)) {
        throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
    }
    this.connection = connection;
    this.sql = sql;
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    shardingEngine = new PreparedQueryShardingEngine(sql, runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
    batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
}

可见逻辑PS封装了Connection,同时持有shardingEngine、shardingContext

同时构造了preparedStatementExecutor和batchPreparedStatementExecutor,这两个是执行器

其真正执行sql在executeQuery方法,即JDBC执行sql的底层方法

executeQuery

是执行的整体框架,顺序完成以下工作:

  • 执行分库

  • 初始化sql执行器

  • 执行sql并获取结果合并器

  • 合并结果

public ResultSet executeQuery() throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        shard();
        initPreparedStatementExecutor();
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), 
                connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());
        result = getResultSet(mergeEngine);
    } finally {
        clearBatch();
    }
    currentResultSet = result;
    return result;
}

首先在shard()方法中做分库,分库结果存放在sqlRouteResult

private void shard() {
    sqlRouteResult = shardingEngine.shard(sql, getParameters());
}

下面执行preparedStatementExecutor.executeQuery() ,结果也存入mergeEngine

然后把sqlRouteResult一起传入mergeEngine,执行getResultSet方法

0

评论区