1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.utils.io
10 
11 import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
12 import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject, SwallowedExceptionListener}
13 
14 import java.io.Closeable
15 
16 /**
17   * Pool for sharing a finite set of closeable resources
18   *
19   * @tparam T object type
20   */
21 trait CloseablePool[T <: Closeable] extends Closeable {
22 
23   /**
24     * Borrow an item from the pool. The item will be returned to the pool after execution.
25     *
26     * @param fn function to execute on the borrowed item
27     * @tparam U return type
28     * @return
29     */
30   def borrow[U](fn: T => U): U
31 }
32 
33 object CloseablePool {
34 
35   /**
36     * Create a pool
37     *
38     * @param factory method for instantiating pool objects
39     * @param size max size of the pool
40     * @tparam T object type
41     * @return
42     */
43   def apply[T <: Closeable](factory: => T, size: Int = 8): CloseablePool[T] = {
44     val config = new GenericObjectPoolConfig[T]()
45     config.setMaxTotal(size)
46     new CommonsPoolPool(factory, config)
47   }
48 
49   /**
50     * Apache commons-pool-backed pool
51     *
52     * @param create factory method for creating new pool objects
53     * @param config pool configuration options
54     * @tparam T object type
55     */
56   class CommonsPoolPool[T <: Closeable](create: => T, config: GenericObjectPoolConfig[T]) extends CloseablePool[T] {
57 
58     private val factory: BasePooledObjectFactory[T] = new BasePooledObjectFactory[T] {
59       override def wrap(obj: T) = new DefaultPooledObject[T](obj)
60       override def create(): T = CommonsPoolPool.this.create
61       override def destroyObject(p: PooledObject[T]): Unit = p.getObject.close()
62     }
63 
64     private val pool = new GenericObjectPool[T](factory, config)
65 
66     override def borrow[U](fn: T => U): U = {
67       val t = pool.borrowObject()
68       try { fn(t) } finally {
69         pool.returnObject(t)
70       }
71     }
72 
73     override def close(): Unit = {
74       val errors = new java.util.concurrent.LinkedBlockingQueue[Exception]()
75       pool.setSwallowedExceptionListener(new SwallowedExceptionListener() {
76         override def onSwallowException(e: Exception): Unit = errors.offer(e)
77       })
78       pool.close()
79       if (!errors.isEmpty) {
80         val e = errors.poll()
81         while (!errors.isEmpty) {
82           e.addSuppressed(errors.poll())
83         }
84         throw e
85       }
86     }
87   }
88 }
Line Stmt Id Pos Tree Symbol Tests Code
44 8504 1434 - 1466 Apply org.apache.commons.pool2.impl.GenericObjectPoolConfig.<init> new org.apache.commons.pool2.impl.GenericObjectPoolConfig[T]()
45 8505 1471 - 1495 Apply org.apache.commons.pool2.impl.GenericObjectPoolConfig.setMaxTotal config.setMaxTotal(size)
46 8506 1500 - 1536 Apply org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.<init> new org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool[T](factory, config)
58 8510 1911 - 1914 Apply org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.$anon.<init> new $anon()
59 8507 1978 - 2009 Apply org.apache.commons.pool2.impl.DefaultPooledObject.<init> new org.apache.commons.pool2.impl.DefaultPooledObject[T](obj)
60 8508 2043 - 2070 Select org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.create CommonsPoolPool.this.create
61 8509 2132 - 2151 Apply java.io.Closeable.close p.getObject().close()
64 8511 2207 - 2214 Select org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.factory CommonsPoolPool.this.factory
64 8512 2216 - 2222 Select org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.config CommonsPoolPool.this.config
64 8513 2182 - 2223 Apply org.apache.commons.pool2.impl.GenericObjectPool.<init> new org.apache.commons.pool2.impl.GenericObjectPool[T](CommonsPoolPool.this.factory, CommonsPoolPool.this.config)
67 8514 2285 - 2304 Apply org.apache.commons.pool2.impl.GenericObjectPool.borrowObject CommonsPoolPool.this.pool.borrowObject()
68 8515 2317 - 2322 Apply scala.Function1.apply fn.apply(t)
68 8516 2317 - 2322 Block scala.Function1.apply fn.apply(t)
69 8517 2343 - 2363 Apply org.apache.commons.pool2.impl.GenericObjectPool.returnObject CommonsPoolPool.this.pool.returnObject(t)
69 8518 2343 - 2363 Block org.apache.commons.pool2.impl.GenericObjectPool.returnObject CommonsPoolPool.this.pool.returnObject(t)
74 8519 2433 - 2490 Apply java.util.concurrent.LinkedBlockingQueue.<init> new java.util.concurrent.LinkedBlockingQueue[Exception]()
75 8522 2532 - 2535 Apply org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.$anon.<init> new $anon()
75 8523 2497 - 2653 Apply org.apache.commons.pool2.impl.BaseGenericObjectPool.setSwallowedExceptionListener CommonsPoolPool.this.pool.setSwallowedExceptionListener({ final class $anon extends Object with org.apache.commons.pool2.SwallowedExceptionListener { def <init>(): <$anon: org.apache.commons.pool2.SwallowedExceptionListener> = { $anon.super.<init>(); () }; override def onSwallowException(e: Exception): Unit = { errors.offer(e); () } }; new $anon() })
76 8520 2629 - 2644 Apply java.util.concurrent.LinkedBlockingQueue.offer errors.offer(e)
76 8521 2641 - 2641 Literal <nosymbol> ()
78 8524 2660 - 2672 Apply org.apache.commons.pool2.impl.GenericObjectPool.close CommonsPoolPool.this.pool.close()
79 8525 2683 - 2698 Select scala.Boolean.unary_! errors.isEmpty().unary_!
79 8535 2700 - 2840 Block <nosymbol> { val e: Exception = errors.poll(); while$1(){ if (errors.isEmpty().unary_!) { e.addSuppressed(errors.poll()); while$1() } else () }; throw e }
79 8536 2679 - 2679 Literal <nosymbol> ()
79 8537 2679 - 2679 Block <nosymbol> ()
80 8526 2718 - 2731 Apply java.util.concurrent.LinkedBlockingQueue.poll errors.poll()
81 8527 2747 - 2762 Select scala.Boolean.unary_! errors.isEmpty().unary_!
81 8531 2776 - 2806 Block <nosymbol> { e.addSuppressed(errors.poll()); while$1() }
81 8532 2740 - 2740 Literal <nosymbol> ()
81 8533 2740 - 2740 Block <nosymbol> ()
82 8528 2792 - 2805 Apply java.util.concurrent.LinkedBlockingQueue.poll errors.poll()
82 8529 2776 - 2806 Apply java.lang.Throwable.addSuppressed e.addSuppressed(errors.poll())
82 8530 2791 - 2791 Apply org.locationtech.geomesa.utils.io.CloseablePool.CommonsPoolPool.while$1 while$1()
84 8534 2825 - 2832 Throw <nosymbol> throw e