1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.utils.cache
10 
11 import com.github.benmanes.caffeine.cache.{Cache, Caffeine, Ticker}
12 import org.locationtech.geomesa.utils.concurrent.ExitingExecutor
13 
14 import java.io.Closeable
15 import java.lang.ref.WeakReference
16 import java.util.concurrent._
17 import scala.concurrent.duration.Duration
18 
19 /**
20  * Creates a per-thread cache of values with a timed expiration.
21  *
22  * Map operations will only affect/reflect the state of the current thread. Additional methods `globalIterator`
23  * and `estimatedGlobalSize` are provided for global views across all threads, which generally should only be used
24  * for debugging.
25  *
26  * Since caches are only cleaned up when accessed, uses an asynchronous thread to
27  * actively clean up any orphaned thread local values
28  *
29  * @tparam K key type
30  * @tparam V value type
31  */
32 class ThreadLocalCache[K <: AnyRef, V <: AnyRef](
33     expiry: Duration,
34     executor: ScheduledExecutorService = ThreadLocalCache.executor,
35     ticker: Option[Ticker] = None
36   ) extends Runnable with Closeable {
37 
38   import scala.collection.JavaConverters._
39 
40   // weak references to our current caches, to allow cleanup + GC
41   private val references = new ConcurrentLinkedQueue[(Long, WeakReference[Cache[K, V]])]()
42 
43   private val caches = new ThreadLocal[Cache[K, V]]() {
44     override def initialValue(): Cache[K, V] = {
45       val builder = Caffeine.newBuilder().expireAfterAccess(expiry.toMillis, TimeUnit.MILLISECONDS)
46       ticker.foreach(builder.ticker)
47       val cache = builder.build[K, V]()
48       // this will always succeed as our queue is unbounded
49       references.offer((Thread.currentThread().getId, new WeakReference(cache)))
50       cache
51     }
52   }
53 
54   private val cleanup = executor.scheduleWithFixedDelay(this, expiry.toMillis, expiry.toMillis, TimeUnit.MILLISECONDS)
55 
56   def getOrElseUpdate(key: K, op: => V): V = {
57     val cached = caches.get.getIfPresent(key)
58     if (cached != null) { cached } else {
59       val value = op
60       caches.get.put(key, value)
61       value
62     }
63   }
64 
65   /**
66    * Gets an iterator across all thread-local values, not just the current thread
67    *
68    * @return iterator of (thread-id, key, value)
69    */
70   def globalIterator: Iterator[(Long, K, V)] = {
71     references.iterator.asScala.flatMap { case (id, ref) =>
72       val cache = ref.get
73       if (cache == null) { Iterator.empty } else {
74         cache.asMap().asScala.iterator.map { case (k, v) => (id, k, v) }
75       }
76     }
77   }
78 
79   /**
80    * Gets the estimated total size across all thread-local values
81    *
82    * @return
83    */
84   def estimatedGlobalSize: Long = {
85     var size = 0L
86     references.iterator.asScala.foreach { case (_, ref) =>
87       val cache = ref.get
88       if (cache != null) {
89         size += cache.estimatedSize()
90       }
91     }
92     size
93   }
94 
95   override def run(): Unit = {
96     val iter = references.iterator()
97     while (iter.hasNext) {
98       val cache = iter.next._2.get
99       if (cache == null) {
100         // cache has been GC'd, remove our reference to it
101         iter.remove()
102       } else {
103         cache.cleanUp()
104       }
105     }
106   }
107 
108   override def close(): Unit = {
109     cleanup.cancel(true)
110     val iter = references.iterator()
111     while (iter.hasNext) {
112       val cache = iter.next._2.get
113       if (cache != null) {
114         cache.asMap().clear()
115       }
116       iter.remove()
117     }
118   }
119 }
120 
121 object ThreadLocalCache {
122   // use a 2 thread executor service for all the caches - we only use a handful across the code base
123   private val executor = ExitingExecutor(Executors.newScheduledThreadPool(2).asInstanceOf[ScheduledThreadPoolExecutor])
124 }
Line Stmt Id Pos Tree Symbol Tests Code
41 2559 1637 - 1700 Apply java.util.concurrent.ConcurrentLinkedQueue.<init> new java.util.concurrent.ConcurrentLinkedQueue[(Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])]()
43 2570 1725 - 1728 Apply org.locationtech.geomesa.utils.cache.ThreadLocalCache.$anon.<init> new $anon()
45 2560 1867 - 1882 Select scala.concurrent.duration.Duration.toMillis ThreadLocalCache.this.expiry.toMillis
45 2561 1884 - 1905 Literal <nosymbol> MILLISECONDS
45 2562 1827 - 1906 Apply com.github.benmanes.caffeine.cache.Caffeine.expireAfterAccess com.github.benmanes.caffeine.cache.Caffeine.newBuilder().expireAfterAccess(ThreadLocalCache.this.expiry.toMillis, MILLISECONDS)
46 2563 1928 - 1942 Apply com.github.benmanes.caffeine.cache.Caffeine.ticker builder.ticker(x$1)
46 2564 1913 - 1943 Apply scala.Option.foreach ThreadLocalCache.this.ticker.foreach[com.github.benmanes.caffeine.cache.Caffeine[Object,Object]]({ ((x$1: com.github.benmanes.caffeine.cache.Ticker) => builder.ticker(x$1)) })
47 2565 1962 - 1983 Apply com.github.benmanes.caffeine.cache.Caffeine.build builder.build[K, V]()
49 2566 2068 - 2096 Apply java.lang.Thread.getId java.lang.Thread.currentThread().getId()
49 2567 2098 - 2122 Apply java.lang.ref.WeakReference.<init> new java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]](cache)
49 2568 2067 - 2123 Apply scala.Tuple2.apply scala.Tuple2.apply[Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]]](java.lang.Thread.currentThread().getId(), new java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]](cache))
49 2569 2050 - 2124 Apply java.util.concurrent.ConcurrentLinkedQueue.offer ThreadLocalCache.this.references.offer(scala.Tuple2.apply[Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]]](java.lang.Thread.currentThread().getId(), new java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]](cache)))
54 2571 2210 - 2225 Select scala.concurrent.duration.Duration.toMillis ThreadLocalCache.this.expiry.toMillis
54 2572 2227 - 2242 Select scala.concurrent.duration.Duration.toMillis ThreadLocalCache.this.expiry.toMillis
54 2573 2244 - 2265 Literal <nosymbol> MILLISECONDS
54 2574 2172 - 2266 Apply java.util.concurrent.ScheduledExecutorService.scheduleWithFixedDelay ThreadLocalCache.this.executor.scheduleWithFixedDelay(this, ThreadLocalCache.this.expiry.toMillis, ThreadLocalCache.this.expiry.toMillis, MILLISECONDS)
57 2575 2332 - 2360 Apply com.github.benmanes.caffeine.cache.Cache.getIfPresent ThreadLocalCache.this.caches.get().getIfPresent(key)
58 2576 2369 - 2383 Apply java.lang.Object.!= cached.!=(null)
58 2577 2387 - 2393 Ident org.locationtech.geomesa.utils.cache.ThreadLocalCache.cached cached
58 2579 2401 - 2474 Block <nosymbol> { val value: V = op; ThreadLocalCache.this.caches.get().put(key, value); value }
60 2578 2430 - 2456 Apply com.github.benmanes.caffeine.cache.Cache.put ThreadLocalCache.this.caches.get().put(key, value)
71 2580 2681 - 2700 Apply java.util.concurrent.ConcurrentLinkedQueue.iterator ThreadLocalCache.this.references.iterator()
71 2590 2734 - 2894 Block <nosymbol> { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = ref.get(); if (cache.==(null)) scala.`package`.Iterator.empty else scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[K, V](cache.asMap()).asScala.iterator.map[(Long, K, V)](((x0$2: (K, V)) => x0$2 match { case (_1: K, _2: V)(K, V)((k @ _), (v @ _)) => scala.Tuple3.apply[Long, K, V](id, k, v) })) }
71 2591 2681 - 2900 Apply scala.collection.Iterator.flatMap scala.collection.JavaConverters.asScalaIteratorConverter[(Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])](ThreadLocalCache.this.references.iterator()).asScala.flatMap[(Long, K, V)](((x0$1: (Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])) => x0$1 match { case (_1: Long, _2: java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])(Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])((id @ _), (ref @ _)) => { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = ref.get(); if (cache.==(null)) scala.`package`.Iterator.empty else scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[K, V](cache.asMap()).asScala.iterator.map[(Long, K, V)](((x0$2: (K, V)) => x0$2 match { case (_1: K, _2: V)(K, V)((k @ _), (v @ _)) => scala.Tuple3.apply[Long, K, V](id, k, v) })) } }))
72 2581 2755 - 2762 Apply java.lang.ref.Reference.get ref.get()
73 2582 2773 - 2786 Apply java.lang.Object.== cache.==(null)
73 2583 2790 - 2804 Select scala.collection.Iterator.empty scala.`package`.Iterator.empty
73 2584 2790 - 2804 Block scala.collection.Iterator.empty scala.`package`.Iterator.empty
74 2585 2822 - 2835 Apply com.github.benmanes.caffeine.cache.Cache.asMap cache.asMap()
74 2586 2874 - 2884 Apply scala.Tuple3.apply scala.Tuple3.apply[Long, K, V](id, k, v)
74 2587 2874 - 2884 Block scala.Tuple3.apply scala.Tuple3.apply[Long, K, V](id, k, v)
74 2588 2822 - 2886 Apply scala.collection.Iterator.map scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[K, V](cache.asMap()).asScala.iterator.map[(Long, K, V)](((x0$2: (K, V)) => x0$2 match { case (_1: K, _2: V)(K, V)((k @ _), (v @ _)) => scala.Tuple3.apply[Long, K, V](id, k, v) }))
74 2589 2822 - 2886 Block scala.collection.Iterator.map scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[K, V](cache.asMap()).asScala.iterator.map[(Long, K, V)](((x0$2: (K, V)) => x0$2 match { case (_1: K, _2: V)(K, V)((k @ _), (v @ _)) => scala.Tuple3.apply[Long, K, V](id, k, v) }))
85 2592 3053 - 3055 Literal <nosymbol> 0L
86 2593 3060 - 3079 Apply java.util.concurrent.ConcurrentLinkedQueue.iterator ThreadLocalCache.this.references.iterator()
86 2601 3112 - 3213 Block <nosymbol> { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = ref.get(); if (cache.!=(null)) size = size.+(cache.estimatedSize()) else () }
86 2602 3060 - 3219 Apply scala.collection.Iterator.foreach scala.collection.JavaConverters.asScalaIteratorConverter[(Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])](ThreadLocalCache.this.references.iterator()).asScala.foreach[Unit](((x0$1: (Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])) => x0$1 match { case (_1: Long, _2: java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])(Long, java.lang.ref.WeakReference[com.github.benmanes.caffeine.cache.Cache[K,V]])(_, (ref @ _)) => { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = ref.get(); if (cache.!=(null)) size = size.+(cache.estimatedSize()) else () } }))
87 2594 3133 - 3140 Apply java.lang.ref.Reference.get ref.get()
88 2595 3151 - 3164 Apply java.lang.Object.!= cache.!=(null)
88 2599 3147 - 3147 Literal <nosymbol> ()
88 2600 3147 - 3147 Block <nosymbol> ()
89 2596 3184 - 3205 Apply com.github.benmanes.caffeine.cache.Cache.estimatedSize cache.estimatedSize()
89 2597 3176 - 3205 Apply scala.Long.+ size.+(cache.estimatedSize())
89 2598 3176 - 3205 Assign <nosymbol> size = size.+(cache.estimatedSize())
96 2603 3280 - 3301 Apply java.util.concurrent.ConcurrentLinkedQueue.iterator ThreadLocalCache.this.references.iterator()
97 2604 3313 - 3325 Apply java.util.Iterator.hasNext iter.hasNext()
97 2611 3327 - 3327 Apply org.locationtech.geomesa.utils.cache.ThreadLocalCache.while$1 while$1()
97 2612 3327 - 3524 Block <nosymbol> { { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = iter.next()._2.get(); if (cache.==(null)) iter.remove() else cache.cleanUp() }; while$1() }
97 2613 3306 - 3306 Literal <nosymbol> ()
97 2614 3306 - 3306 Block <nosymbol> ()
98 2605 3347 - 3363 Apply java.lang.ref.Reference.get iter.next()._2.get()
99 2606 3374 - 3387 Apply java.lang.Object.== cache.==(null)
101 2607 3458 - 3471 Apply java.util.Iterator.remove iter.remove()
101 2608 3458 - 3471 Block java.util.Iterator.remove iter.remove()
103 2609 3495 - 3510 Apply com.github.benmanes.caffeine.cache.Cache.cleanUp cache.cleanUp()
103 2610 3495 - 3510 Block com.github.benmanes.caffeine.cache.Cache.cleanUp cache.cleanUp()
109 2615 3567 - 3587 Apply java.util.concurrent.Future.cancel ThreadLocalCache.this.cleanup.cancel(true)
110 2616 3603 - 3624 Apply java.util.concurrent.ConcurrentLinkedQueue.iterator ThreadLocalCache.this.references.iterator()
111 2617 3636 - 3648 Apply java.util.Iterator.hasNext iter.hasNext()
111 2625 3650 - 3650 Apply org.locationtech.geomesa.utils.cache.ThreadLocalCache.while$2 while$2()
111 2626 3650 - 3777 Block <nosymbol> { { val cache: com.github.benmanes.caffeine.cache.Cache[K,V] = iter.next()._2.get(); if (cache.!=(null)) cache.asMap().clear() else (); iter.remove() }; while$2() }
111 2627 3629 - 3629 Literal <nosymbol> ()
111 2628 3629 - 3629 Block <nosymbol> ()
112 2618 3670 - 3686 Apply java.lang.ref.Reference.get iter.next()._2.get()
113 2619 3697 - 3710 Apply java.lang.Object.!= cache.!=(null)
113 2622 3693 - 3693 Literal <nosymbol> ()
113 2623 3693 - 3693 Block <nosymbol> ()
114 2620 3722 - 3743 Apply java.util.Map.clear cache.asMap().clear()
114 2621 3722 - 3743 Block java.util.Map.clear cache.asMap().clear()
116 2624 3758 - 3771 Apply java.util.Iterator.remove iter.remove()
123 2629 3986 - 3987 Literal <nosymbol> 2
123 2630 3953 - 4030 TypeApply scala.Any.asInstanceOf java.util.concurrent.Executors.newScheduledThreadPool(2).asInstanceOf[java.util.concurrent.ScheduledThreadPoolExecutor]
123 2631 3937 - 3937 TypeApply org.locationtech.geomesa.utils.concurrent.ExitingExecutor.apply$default$2 org.locationtech.geomesa.utils.concurrent.ExitingExecutor.apply$default$2[Nothing]
123 2632 3937 - 4031 Apply org.locationtech.geomesa.utils.concurrent.ExitingExecutor.apply org.locationtech.geomesa.utils.concurrent.ExitingExecutor.apply[java.util.concurrent.ScheduledThreadPoolExecutor](java.util.concurrent.Executors.newScheduledThreadPool(2).asInstanceOf[java.util.concurrent.ScheduledThreadPoolExecutor], org.locationtech.geomesa.utils.concurrent.ExitingExecutor.apply$default$2[Nothing])