目 录CONTENT

文章目录

GenericObjectPool

FatFish1
2025-03-19 / 0 评论 / 0 点赞 / 46 阅读 / 0 字 / 正在检测是否收录...

对象池概述

对象池是通过一定的规则来维护对象集合的容器。commos-pool在很多场景中,用来实现"连接池"/"任务worker池"等,大家常用的dbcp数据库连接池,也是基于commons-pool实现。

一个非常常见的实现就是RedisPool的实现:

http://www.chymfatfish.cn/archives/jedis#jedispool---jedis%E6%B1%A0%E5%8C%96%E7%94%A8%E6%B3%95

commons-pool实现思想非常简单,它主要的作用就是将"对象集合"池化,任何通过pool进行对象存取的操作,都会严格按照"pool配置"(比如池的大小)实时的创建对象/阻塞控制/销毁对象等。它在一定程度上,实现了对象集合的管理以及对象的分发。

对象池具有如下优势:

  • 将创建对象的方式,使用工厂模式;

  • 通过"pool配置"来约束对象存取的时机

  • 将对象列表保存在队列中(LinkedList)

对象池获取和归还对象逻辑如下:

对象池源码分析

GenericObjectPoolConfig

对象池Config具有如下配置项:

maxTotal: 最大值总数,当已借出数量大于这个总数,激活removeAbandoned方法清理超期不还的对象
maxActive: 链接池中最大连接数,默认为8.
maxIdle: 链接池中最大空闲的连接数,默认为8.
minIdle: 连接池中最少空闲的连接数,默认为0
maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时.
minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除。
softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.
numTestsPerEvictionRun: 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3.
testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值.
testOnReturn:  向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值.
testWhileIdle:  向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值.
timeBetweenEvictionRunsMillis:  “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.
whenExhaustedAction: 当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1:
 -> 0 : 抛出异常,
 -> 1 : 阻塞,直到有可用链接资源
 -> 2 : 强制创建新的链接资源
blockWhenExhausted:这个参数控制在对象池没有对象可以borrow的时候,是否阻塞。搭配maxWaitDuration一起使用。
maxWaitDuration:阻塞最长等待时间。搭配blockWhenExhausted一起使用,当borrow时阻塞,判断maxWaitDuration的正负,如果是负值,borrow的时候就调用LinkedBlockingDeque的takeFirst方法阻塞(一直阻塞,等别人唤醒),如果是非负,就调用pollFirst阻塞(周期性自己唤醒自己)

GenericObjectPool

GenericObjectPool中的核心成员变量包括:

// 是一个阻塞双端队列,用于存储池子中的空闲对象
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
// 从BaseGenericObjectPool继承过来的,标志插入对象时是从头插还是从尾插,默认为true,从头插
private volatile boolean lifo = BaseObjectPoolConfig.DEFAULT_LIFO;
// 数量计数器,可以代替直接执行对象列表操作增减
private final AtomicLong createCount = new AtomicLong();
// 对象工厂,提供create、激活、验证、destroy等能力,在对象池各种操作中回调,例如JedisFactory
private final PooledObjectFactory<T> factory;
// 所有对象map,是一个concurrentHashMap,允许多个线程从这个map中获取对象,从而实现池化
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects = new ConcurrentHashMap<>();

构造方法

public GenericObjectPool(final PooledObjectFactory<T> factory)

传入工厂类则以默认配置构造GenericObjectPool,还可以增加GenericObjectConfig参数补充对象池配置,方法如下:

public GenericObjectPool(final PooledObjectFactory<T> factory,
        final GenericObjectPoolConfig<T> config) {
    super(config, ONAME_BASE, config.getJmxNamePrefix());
    if (factory == null) {
        jmxUnregister(); // tidy up
        throw new IllegalArgumentException("Factory may not be null");
    }
    this.factory = factory;
    idleObjects = new LinkedBlockingDeque<>(config.getFairness());
    setConfig(config);
}

此构造函数实例化了一个LinkedList作为"对象池"容器,用来存取"对象"

继续跟进setConfig方法

setConfig

public void setConfig(final GenericObjectPoolConfig<T> conf) {
    super.setConfig(conf);
    setMaxIdle(conf.getMaxIdle());
    setMinIdle(conf.getMinIdle());
    setMaxTotal(conf.getMaxTotal());
}

在GenericObjectPool中设置了最大空闲、最小空闲、最大上限三个值,继续跟进父类看看设置了什么

// BaseGenericObjectPool#setConfig
protected void setConfig(final BaseObjectPoolConfig<T> config) {
    setLifo(config.getLifo());
    setMaxWait(config.getMaxWaitDuration());
    setBlockWhenExhausted(config.getBlockWhenExhausted());
    setTestOnCreate(config.getTestOnCreate());
    setTestOnBorrow(config.getTestOnBorrow());
    setTestOnReturn(config.getTestOnReturn());
    setTestWhileIdle(config.getTestWhileIdle());
    setNumTestsPerEvictionRun(config.getNumTestsPerEvictionRun());
    setMinEvictableIdleDuration(config.getMinEvictableIdleDuration());
    setDurationBetweenEvictionRuns(config.getDurationBetweenEvictionRuns());
    setSoftMinEvictableIdleDuration(config.getSoftMinEvictableIdleDuration());
    final EvictionPolicy<T> policy = config.getEvictionPolicy();
    if (policy == null) {
        // Use the class name (pre-2.6.0 compatible)
        setEvictionPolicyClassName(config.getEvictionPolicyClassName());
    } else {
        // Otherwise, use the class (2.6.0 feature)
        setEvictionPolicy(policy);
    }
    setEvictorShutdownTimeout(config.getEvictorShutdownTimeoutDuration());
}

这里大部分都是在set,但是有一个点隐藏在里面:setDurationBetweenEvictionRuns(config.getDurationBetweenEvictionRuns());

进入对应方法:

public final void setDurationBetweenEvictionRuns(final Duration timeBetweenEvictionRuns) {
    this.durationBetweenEvictionRuns = PoolImplUtils.nonNull(timeBetweenEvictionRuns, BaseObjectPoolConfig.DEFAULT_DURATION_BETWEEN_EVICTION_RUNS);
    startEvictor(this.durationBetweenEvictionRuns);
}

发现除了设置成员属性,还启动了一个驱逐器,跟进startEvictor方法

startEvictor

final void startEvictor(final Duration delay) {
    synchronized (evictionLock) {
        final boolean isPositiverDelay = PoolImplUtils.isPositive(delay);
        if (evictor == null) { // Starting evictor for the first time or after a cancel
            if (isPositiverDelay) { // Starting new evictor
                evictor = new Evictor();
                EvictionTimer.schedule(evictor, delay, delay);
            }
        } ……
}

核心在Evictor,并且通过一个ScheduledThreadPoolExecutor执行任务,可见Evictor应该是一个Runnable实现类,可以继续跟进下

Evictor#run

Evictor是继承自父类BaseGenericObjectPool的内部类,是Runnable的实现类

class Evictor implements Runnable

因此看run方法做了什么

public void run() {
    final ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        ……
        // Evict from the pool
        try {
            evict();
        } ……
        // Re-create idle instances.
        try {
            ensureMinIdle();
        } ……
    } finally {
        // Restore the previous CCL
        Thread.currentThread().setContextClassLoader(savedClassLoader);
    }
}

核心是这两行:

  • evict()点我跳转)方法做长期未被激活对象的驱逐操作

  • ensureMinIdle()点我跳转)方法做驱逐后的补齐操作,按照最小空闲数量补齐

evict

final EvictionConfig evictionConfig = new EvictionConfig(
        getMinEvictableIdleDuration(),
        getSoftMinEvictableIdleDuration(),
        getMinIdle());

构造驱逐配置,这里的三个配置分别是对象池config中的minEvictableIdleDuration、softMinEvictableIdleDuration、minIdle和maxIdle中的最小值

final boolean testWhileIdle = getTestWhileIdle();

这里又取了testWhileIdle属性

for (int i = 0, m = getNumTests(); i < m; i++) {
    if (evictionIterator == null || !evictionIterator.hasNext()) {
        evictionIterator = new EvictionIterator(idleObjects);
    }
    ……
    underTest = evictionIterator.next();

这里做了一个循环,循环基于空闲对象列表的数量。

基于空闲对象队列构造了一个迭代器,迭代器为空的场景不谈,从迭代器中取出下一个需要校验的对象underTest

if (!underTest.startEvictionTest()) {
    i--;
    continue;
}

这里调用PoolObject对象的startEvictionTest方法,如果不需要驱逐,直接continue,不再对该对象做操作

evict = evictionPolicy.evict(evictionConfig, underTest, idleObjects.size());

这里做了一个判断,看下里面具体是怎么判断的

// -- DefaultEvictionPolicy#evict --
return (config.getIdleSoftEvictDuration().compareTo(underTest.getIdleDuration()) < 0 &&
        config.getMinIdle() < idleCount) ||
        config.getIdleEvictDuration().compareTo(underTest.getIdleDuration()) < 0;

对于可驱逐的对象,||语句的左半部分判断的是:

  • 第一个条件判断idleSoftEvictDuration和idleDuration的大小,即配置项配置的允许空闲时间和池对象实际的空闲时间大小,当不进行配置,idleSoftEvictDuration默认值为-1,这个判断恒成立

  • 第二个条件判断设置的minIdle属性和idleCount,即设置的允许空闲的最小数量,和当前空闲队列对象的量,当设置属性比当前队列数量小,返回true

||语句的右半部分判断的是当前对象的空闲时间和允许空闲的时间:

  • idleEvictDuration取的是配置中的允许空闲时间,与当前对象实际的空闲实际做对比

根据上面的判断可以分析出:要么队列大小已经超过当前设置的空闲数量,要么单个对象的空闲时间已经超过了当前设置的允许空闲时间,都会对对象产生驱逐

if (evict) {
    destroy(underTest, DestroyMode.NORMAL);

这里判断上一步得到的是否驱逐的结论,如果驱逐,执行destroy方法进行驱逐,否则验证刚刚testWhileIdle属性如果是true,表示这个空闲对象需要做激活和校验

if (testWhileIdle) {
    ……
        factory.activateObject(underTest);
        active = true;
    } catch (final Exception e) {
      destroy(underTest, DestroyMode.NORMAL);
      destroyedByEvictorCount.incrementAndGet();

默认的factory.activateObject(underTest);是没有任何操作的,而jedis实现的连接池中是通过jedis.select(db)的操作确认返回码,还是报错,确认当前连接是否还可用

// JedisFactory#activateObject
public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
  final BinaryJedis jedis = pooledJedis.getObject();
  if (jedis.getDB() != clientConfig.getDatabase()) {
    jedis.select(clientConfig.getDatabase());
  }
}

如果在激活同时判断连接的时候报错了,这里就判断为需要销毁的继续执行驱逐方法,如果激活没问题,再进行校验,这里是第二次判断是否驱逐了,是根据对象实际的业务状态进行判断

if (active) {
    ……
        validate = factory.validateObject(underTest);

默认的factory.validateObject(underTest)也是恒返回true的,而在jedis实现的连接池中,是通过ping操作去实现的

// JedisFactory#validateObject
public boolean validateObject(PooledObject<Jedis> pooledJedis) {
  final BinaryJedis jedis = pooledJedis.getObject();
  try {
    String host = jedisSocketFactory.getHost();
    int port = jedisSocketFactory.getPort();
    String connectionHost = jedis.getClient().getHost();
    int connectionPort = jedis.getClient().getPort();
    return host.equals(connectionHost)
        && port == connectionPort && jedis.isConnected()
        && jedis.ping().equals("PONG");
  } catch (final Exception e) {
    logger.error("Error while validating pooled Jedis object.", e);
    return false;
  }
}
if (!validate) {
    destroy(underTest, DestroyMode.NORMAL);
    destroyedByEvictorCount.incrementAndGet();
} else {
    ……
    factory.passivateObject(underTest);

如果激活校验失败了,这里依然是要做驱逐操作的,如果校验成功了,再取消对象的激活状态,这里默认实现factory.passivateObject(underTest);也是不做任何操作,同样jedis也没做任何操作。

ensureIdle - 确认空闲数量并创建

while (idleObjects.size() < idleCount) {
     final PooledObject<T> p = create();

条件是空闲对象列表元素数量小于预期的idleCount,如果是驱逐同时新建的流程,这里是通过getMinIdle方法获取到的设置的最小空闲数量minIdle属性。

符合条件调用create方法创建池中对象

if (getLifo()) {
    idleObjects.addFirst(p);
} else {
    idleObjects.addLast(p);
}

这里就比较简单了,是判断用头插还是尾插

destory - 销毁对象

idleObjects.remove(toDestroy);
allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));

主要操作idleObjects和allObjects两个列表/map

执行factory的回调函数

factory.destroyObject(toDestroy, destroyMode);

同时对计数器做操作

create - 构造对象

int localMaxTotal = getMaxTotal();

首先取配置的maxTotal属性,这个属性绝对了池子中连接数量上限

final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
    createCount.decrementAndGet();

这里先利用计数器执行数量增加,如果增加完了发现数量超了,再执行数量下调,先利用计数器操作的好处在于避免操作对象列表

判断可以创建后,执行创建和测试操作

p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
    createCount.decrementAndGet();
    return null;
}

如果测试不通,获取到任何异常,执行计数器下调操作

createCount.decrementAndGet();

创建完成一切正常的话,把新对象加入到allObject中

allObjects.put(new IdentityWrapper<>(p.getObject()), p);

borrowObject - 从对象池中取出对象

对象池提供给调用方使用的核心方法

执行borrow前有一个触发最大阈值的丢弃逻辑

if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) &&
        (getNumActive() > getMaxTotal() - 3)) {
    removeAbandoned(ac);
}

可以看到校验点:

  • 开启AbandonedOnBorrow配置

  • 空闲数量<2

  • 活跃数量>最大活跃数量-3,这里使用了一个核心对象池配置:maxTotal

符合条件执行removeAbandoned方法进行丢弃,然后开始执行borrow

首先检查对象池开启状态,以及检查池子中的废弃对象并丢弃。

PooledObject<T> p = null;
final boolean blockWhenExhausted = getBlockWhenExhausted();

准备工作:一个池对象p,一个标志取不到是否阻塞的属性blockWhenExhausted

p = idleObjects.pollFirst();
if (p == null) {
    p = create();
    if (p != null) {
        create = true;
    }
}

首先从空闲队列中取第一个对象,如果取不到,调用create方法创建一个新对象,如果创造出来了,直接跳到后面不判断blockWhenExhausted

如果创造不出来:

if (blockWhenExhausted) {
    if (PooledObject.isNull(p)) {
        p = borrowMaxWaitDuration.isNegative() ? idleObjects.takeFirst() : idleObjects.pollFirst(borrowMaxWaitDuration);
    }
    if (PooledObject.isNull(p)) {
        throw new NoSuchElementException(appendStats(
                "Timeout waiting for idle object, borrowMaxWaitDuration=" + borrowMaxWaitDuration));
    }
} else if (PooledObject.isNull(p)) {
    throw new NoSuchElementException(appendStats("Pool exhausted"));
}

这一段,先判断blockWhenExhausted属性,如果是true,即取不出来需要阻塞:

  • 当borrowMaxWaitDuration属性为负,即不做周期性获取的时候,执行blockDequeue的takeFirst方法取第一个,这里是使用Condition做的条件阻塞,取不到则await等待signal唤醒,是双端队列的特性

  • 当borrowMaxWaitDuration属性非负,即做周期性获取的时候,执行pollFirst方法加一个周期

  • 如果阻塞方法取出来也是null,抛异常

如果blocakWhenExhausted属性为false,则上面取不出来也创建不出来,不阻塞了,直接抛异常

这里使用的idleObjects是一个双端阻塞队列:

http://www.chymfatfish.cn/archives/linkedblockingdeque#takefirst---%E9%98%BB%E5%A1%9E%E6%96%B9%E6%B3%95
if (!p.allocate()) {
    p = null;
}

这里不管是之前取的还是create的还是阻塞取的,这里取到之后要改下状态,改不掉,让p为null

if (p != null) {
……
        factory.activateObject(p);
……
……
            destroy(p, DestroyMode.NORMAL);
if (p != null && getTestOnBorrow()) {
    ……
        validate = factory.validateObject(p);

p非null时说明取到了,做激活、测试,如果出问题就销毁

updateStatsBorrow(p, Duration.ofMillis(System.currentTimeMillis() - waitTimeMillis));

更新对象池状态和记录时间

return p.getObject();

最后返回对象池中封装的内容

removeAbandoned

private void removeAbandoned(final AbandonedConfig abandonedConfig) {
    // Generate a list of abandoned objects to remove
    final ArrayList<PooledObject<T>> remove = createRemoveList(abandonedConfig, allObjects);
    // Now remove the abandoned objects
    remove.forEach(pooledObject -> {
        ……
        try {
            invalidateObject(pooledObject.getObject(), DestroyMode.ABANDONED);
        } catch ……
    });
}

回收那些被取走的,但是超过了很长时间没有被使用的(被遗弃的)对象。首先构造待遗弃的列表,然后遍历执行invalidateObject进行销毁

跟进createRemoveList方法看下

createRemoveList

继承自父类BaseGenericObjectPool

final Instant timeout = Instant.now().minus(abandonedConfig.getRemoveAbandonedTimeoutDuration());

首先获取一个核心属性:丢弃对象的超时时间abandonedTimeoutDuration,默认是300s,即以当前向前推300s

synchronized (pooledObject) {
    if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
            pooledObject.getLastUsedInstant().compareTo(timeout) <= 0) {
        pooledObject.markAbandoned();
        remove.add(pooledObject);
    }
}

根据这个if,判定该对象需要销毁的条件包括:

  • 对象处于ALLOCATED状态,也就是使用中状态(使用中不一定就是执行中,也不一定真的还在被使用)

  • 上次被使用的时间比过期时间还小,说明上次被使用时间更久了,早就超过300s没用了

符合这两个条件就要被加入到remove队列中

returnObject - 归还池对象

pool2提供给调用方的核心方法,归还

有借有还,归还之后调用blockDequeue中的Condition的singal方法,激活阻塞的borrow线程

final PooledObject<T> p = getPooledObject(obj);

封装成池对象

markReturningState(p);

这里是现标记成归还中的状态

final Duration activeTime = p.getActiveDuration();

这里是记录时间状态

if (getTestOnReturn() && !factory.validateObject(p)) {
    ……
        destroy(p, DestroyMode.NORMAL);
    ……
        ensureIdle(1, false);
    ……
    updateStatsReturn(activeTime);
    return;
}

这里校验一下要归还的对象,如果直接是个不能用的了,就不归还了,直接销毁,然后通过ensureIdle方法创建一个新的空闲对象进去,同时给他更新归还状态

try {
    factory.passivateObject(p);
} catch (final Exception e1) {
    ……
        destroy(p, DestroyMode.NORMAL);
    ……
        ensureIdle(1, false);
    ……
    updateStatsReturn(activeTime);
    return;
}

校验完了还要取消归还对象的激活状态,在取消激活的时候如果出了异常,也是直接不归还了,重新创建一个新的对象,同时把时间和状态记录给他

if (!p.deallocate()) {
    throw new IllegalStateException(……);
}

取消激活后,将归还对象的状态重置为空闲,如果不成功,抛异常

final int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
    ……
        destroy(p, DestroyMode.NORMAL);
    ……
        ensureIdle(1, false);
    ……

取出配置的最大空闲对象数量,如果当前空闲队列长度已经大于配置了,直接销毁归还对象,同时执行创建方法,如果创建也进不去了,处理下异常结束。

} else {
    if (getLifo()) {
        idleObjects.addFirst(p);
    } else {
        idleObjects.addLast(p);
    }
    if (isClosed()) {
        ……
        clear();
    }
}

如果空闲队列长度还有空余,这里取一下头插还是尾插的配置,调用阻塞队列的头插/尾插方法,在调用的同时会让blockingDequeue中的Condition同时发一个signal信号给阻塞的borrow线程,激活borrow流程,同时处理一下如果对象池关闭的场景

updateStatsReturn(activeTime);

最后做归还后的状态记录

DefaultPoolObject - 池中对象

再看下Pool2对池中对象的封装

池中对象具备的核心属性包括:

// 标志该对象的状态,IDLE为空闲即在队列但未被使用;ALLOCATED为在使用;EVICTION为即将
// 被驱逐;VALIDATION为在队列中,可用状态;INVALID为即将被销毁
private PooledObjectState state = PooledObjectState.IDLE;
// 记录上次被归还的时间点,被构造的时候默认存入构造时间点
private volatile Instant lastReturnInstant = createInstant;

startEvictionTest - 进行可驱逐性测试

public synchronized boolean startEvictionTest() {
    if (state == PooledObjectState.IDLE) {
        state = PooledObjectState.EVICTION;
        return true;
    }
    return false;
}

判断状态为IDLE空闲态,则修改为EVICTION待驱逐,同时返回true

getIdleDuration - 获取空闲周期

final Duration elapsed = Duration.between(lastReturnInstant, now());
return elapsed.isNegative() ? Duration.ZERO : elapsed;

根据代码可以判断是获取上次被归还时间点到现在之间的时间差

PooledObjectFactory - 提供对池对象的操作能力的接口

使用pool2必须要实现PooledObjectFactory,提供创建、销毁、激活、校验等能力

public interface PooledObjectFactory<T> {
  
  void activateObject(PooledObject<T> p) throws Exception;
 
  void destroyObject(PooledObject<T> p) throws Exception;
  
  default void destroyObject(final PooledObject<T> p, final DestroyMode destroyMode) throws Exception {
      destroyObject(p);
  }
  
  PooledObject<T> makeObject() throws Exception;
 
  void passivateObject(PooledObject<T> p) throws Exception;
 
  boolean validateObject(PooledObject<T> p);
}

0

评论区