1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.index
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import io.micrometer.core.instrument.Tags
13 import org.geotools.api.data.{FeatureListener, SimpleFeatureSource}
14 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
15 import org.geotools.api.filter.Filter
16 import org.geotools.api.filter.expression.Expression
17 import org.locationtech.geomesa.filter.factory.FastFilterFactory
18 import org.locationtech.geomesa.kafka.data.KafkaDataStore._
19 
20 import java.io.Closeable
21 import java.util.concurrent._
22 
23 trait KafkaFeatureCache extends KafkaListeners with Closeable {
24   def put(feature: SimpleFeature): Unit
25   def remove(id: String): Unit
26   def clear(): Unit
27   def size(): Int
28   def size(filter: Filter): Int
29   def query(id: String): Option[SimpleFeature]
30   def query(filter: Filter): Iterator[SimpleFeature]
31   def views: Seq[KafkaFeatureCacheView]
32 }
33 
34 object KafkaFeatureCache extends LazyLogging {
35 
36   /**
37    * Create a standard feature cache
38    *
39    * @param sft simple feature type
40    * @param config cache config
41    * @param views layer view config
42    * @return
43    */
44   def apply(sft: SimpleFeatureType, config: IndexConfig, views: Seq[LayerView] = Seq.empty, tags: Tags = Tags.empty()): KafkaFeatureCache = {
45     if (config.expiry == ImmediatelyExpireConfig) {
46       new NoOpFeatureCache(views.map(v => KafkaFeatureCacheView.empty(v.viewSft)))
47     } else {
48       new KafkaFeatureCacheImpl(sft, config, views, tags)
49     }
50   }
51 
52   /**
53     * No-op cache
54     *
55     * @return
56     */
57   def empty(views: Seq[LayerView] = Seq.empty): KafkaFeatureCache =
58     new EmptyFeatureCache(views.map(v => KafkaFeatureCacheView.empty(v.viewSft)))
59 
60   /**
61     * Cache that won't spatially index the features
62     *
63     * @param sft simple feature type
64     * @param ordering feature ordering
65     * @return
66     */
67   def nonIndexing(sft: SimpleFeatureType, ordering: ExpiryTimeConfig = NeverExpireConfig): KafkaFeatureCache = {
68     val event: PartialFunction[ExpiryTimeConfig, String] = {
69       case EventTimeConfig(_, exp, true) => exp
70     }
71 
72     val ord = ordering match {
73       case o if event.isDefinedAt(o) => Some(event.apply(o))
74       // all filters use the same event time ordering
75       case FilteredExpiryConfig(filters) if event.isDefinedAt(filters.head._2) => Some(event.apply(filters.head._2))
76       case _ => None
77     }
78 
79     ord.map(FastFilterFactory.toExpression(sft, _)) match {
80       case None => new NonIndexingFeatureCache()
81       case Some(exp) => new NonIndexingEventTimeFeatureCache(exp)
82     }
83   }
84 
85   /**
86     * Non-indexing feature cache that just tracks the most recent feature
87     */
88   private class NonIndexingFeatureCache extends KafkaFeatureCache {
89 
90     private val state = new ConcurrentHashMap[String, SimpleFeature]
91 
92     override def put(feature: SimpleFeature): Unit = state.put(feature.getID, feature)
93 
94     override def remove(id: String): Unit = state.remove(id)
95 
96     override def clear(): Unit = state.clear()
97 
98     override def close(): Unit = {}
99 
100     override def size(): Int = state.size()
101 
102     override def size(filter: Filter): Int = query(filter).length
103 
104     override def query(id: String): Option[SimpleFeature] = Option(state.get(id))
105 
106     override def query(filter: Filter): Iterator[SimpleFeature] = {
107       import scala.collection.JavaConverters._
108       val features = state.asScala.valuesIterator
109       if (filter == Filter.INCLUDE) { features } else {
110         features.filter(filter.evaluate)
111       }
112     }
113 
114     override def views: Seq[KafkaFeatureCacheView] = Seq.empty
115   }
116 
117   /**
118     * Non-indexing feature cache that just tracks the most recent feature, based on event time
119     *
120     * @param time event time expression
121     */
122   private class NonIndexingEventTimeFeatureCache(time: Expression) extends KafkaFeatureCache {
123 
124     private val state = new ConcurrentHashMap[String, (SimpleFeature, Long)]
125 
126     /**
127       * Note: this method is not thread-safe. The `state` and can fail to replace the correct values
128       * if the same feature is updated simultaneously from two different threads
129       *
130       * In our usage, this isn't a problem, as a given feature ID is always operated on by a single thread
131       * due to kafka consumer partitioning
132       */
133     override def put(feature: SimpleFeature): Unit = {
134       val tuple = (feature, FeatureStateFactory.time(time, feature))
135       val old = state.put(feature.getID, tuple)
136       if (old != null && old._2 > tuple._2) {
137         state.replace(feature.getID, tuple, old)
138       }
139     }
140 
141     override def remove(id: String): Unit = state.remove(id)
142 
143     override def clear(): Unit = state.clear()
144 
145     override def close(): Unit = {}
146 
147     override def size(): Int = state.size()
148 
149     override def size(filter: Filter): Int = query(filter).length
150 
151     override def query(id: String): Option[SimpleFeature] = Option(state.get(id)).map(_._1)
152 
153     override def query(filter: Filter): Iterator[SimpleFeature] = {
154       import scala.collection.JavaConverters._
155       val features = state.asScala.valuesIterator.map(_._1)
156       if (filter == Filter.INCLUDE) { features } else {
157         features.filter(filter.evaluate)
158       }
159     }
160 
161     override def views: Seq[KafkaFeatureCacheView] = Seq.empty
162   }
163 
164   class EmptyFeatureCache(val views: Seq[KafkaFeatureCacheView]) extends KafkaFeatureCache {
165     override def put(feature: SimpleFeature): Unit = throw new UnsupportedOperationException("Empty feature cache")
166     override def remove(id: String): Unit = throw new UnsupportedOperationException("Empty feature cache")
167     override def clear(): Unit = throw new UnsupportedOperationException("Empty feature cache")
168     override def size(): Int = 0
169     override def size(filter: Filter): Int = 0
170     override def query(id: String): Option[SimpleFeature] = None
171     override def query(filter: Filter): Iterator[SimpleFeature] = Iterator.empty
172     override def close(): Unit = {}
173     override def addListener(source: SimpleFeatureSource, listener: FeatureListener): Unit = {}
174     override def removeListener(source: SimpleFeatureSource, listener: FeatureListener): Unit = {}
175   }
176 
177   private class NoOpFeatureCache(val views: Seq[KafkaFeatureCacheView]) extends KafkaFeatureCache {
178     override def put(feature: SimpleFeature): Unit = {}
179     override def remove(id: String): Unit = {}
180     override def clear(): Unit = {}
181     override def size(): Int = 0
182     override def size(filter: Filter): Int = 0
183     override def query(id: String): Option[SimpleFeature] = None
184     override def query(filter: Filter): Iterator[SimpleFeature] = Iterator.empty
185     override def close(): Unit = {}
186   }
187 }
Line Stmt Id Pos Tree Symbol Tests Code
45 2414 1750 - 1773 Select org.locationtech.geomesa.kafka.data.KafkaDataStore.ImmediatelyExpireConfig org.locationtech.geomesa.kafka.data.KafkaDataStore.ImmediatelyExpireConfig
45 2415 1733 - 1773 Apply java.lang.Object.== config.expiry.==(org.locationtech.geomesa.kafka.data.KafkaDataStore.ImmediatelyExpireConfig)
46 2416 1847 - 1856 Select org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView.viewSft v.viewSft
46 2417 1819 - 1857 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView.empty KafkaFeatureCacheView.empty(v.viewSft)
46 2418 1813 - 1813 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]
46 2419 1804 - 1858 ApplyToImplicitArgs scala.collection.TraversableLike.map views.map[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView, Seq[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]](((v: org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView) => KafkaFeatureCacheView.empty(v.viewSft)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView])
46 2420 1783 - 1859 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NoOpFeatureCache.<init> new KafkaFeatureCache.this.NoOpFeatureCache(views.map[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView, Seq[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]](((v: org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView) => KafkaFeatureCacheView.empty(v.viewSft)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]))
46 2421 1783 - 1859 Block org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NoOpFeatureCache.<init> new KafkaFeatureCache.this.NoOpFeatureCache(views.map[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView, Seq[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]](((v: org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView) => KafkaFeatureCacheView.empty(v.viewSft)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]))
48 2422 1879 - 1930 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCacheImpl.<init> new KafkaFeatureCacheImpl(sft, config, views, tags)
48 2423 1879 - 1930 Block org.locationtech.geomesa.kafka.index.KafkaFeatureCacheImpl.<init> new KafkaFeatureCacheImpl(sft, config, views, tags)
58 2424 2130 - 2139 Select org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView.viewSft v.viewSft
58 2425 2102 - 2140 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView.empty KafkaFeatureCacheView.empty(v.viewSft)
58 2426 2096 - 2096 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]
58 2427 2087 - 2141 ApplyToImplicitArgs scala.collection.TraversableLike.map views.map[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView, Seq[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]](((v: org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView) => KafkaFeatureCacheView.empty(v.viewSft)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView])
58 2428 2065 - 2142 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.EmptyFeatureCache.<init> new KafkaFeatureCache.this.EmptyFeatureCache(views.map[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView, Seq[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]](((v: org.locationtech.geomesa.kafka.data.KafkaDataStore.LayerView) => KafkaFeatureCacheView.empty(v.viewSft)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.index.KafkaFeatureCacheView]))
68 2430 2477 - 2477 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.$anonfun.<init> new $anonfun()
69 2429 2523 - 2526 Ident org.locationtech.geomesa.kafka.index.KafkaFeatureCache.$anonfun.exp exp
73 2431 2581 - 2601 Apply scala.PartialFunction.isDefinedAt event.isDefinedAt(o)
73 2432 2610 - 2624 Apply scala.Function1.apply event.apply(o)
73 2433 2605 - 2625 Apply scala.Some.apply scala.Some.apply[String](event.apply(o))
73 2434 2605 - 2625 Block scala.Some.apply scala.Some.apply[String](event.apply(o))
75 2435 2742 - 2757 Select scala.Tuple2._2 filters.head._2
75 2436 2724 - 2758 Apply scala.PartialFunction.isDefinedAt event.isDefinedAt(filters.head._2)
75 2437 2779 - 2794 Select scala.Tuple2._2 filters.head._2
75 2438 2767 - 2795 Apply scala.Function1.apply event.apply(filters.head._2)
75 2439 2762 - 2796 Apply scala.Some.apply scala.Some.apply[String](event.apply(filters.head._2))
75 2440 2762 - 2796 Block scala.Some.apply scala.Some.apply[String](event.apply(filters.head._2))
76 2441 2813 - 2817 Select scala.None scala.None
76 2442 2813 - 2817 Block scala.None scala.None
79 2443 2837 - 2875 Apply org.locationtech.geomesa.filter.factory.FastFilterFactory.toExpression org.locationtech.geomesa.filter.factory.FastFilterFactory.toExpression(sft, x$1)
79 2444 2829 - 2876 Apply scala.Option.map ord.map[org.geotools.api.filter.expression.Expression](((x$1: String) => org.locationtech.geomesa.filter.factory.FastFilterFactory.toExpression(sft, x$1)))
80 2445 2904 - 2933 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingFeatureCache.<init> new KafkaFeatureCache.this.NonIndexingFeatureCache()
80 2446 2904 - 2933 Block org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingFeatureCache.<init> new KafkaFeatureCache.this.NonIndexingFeatureCache()
81 2447 2958 - 2999 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingEventTimeFeatureCache.<init> new KafkaFeatureCache.this.NonIndexingEventTimeFeatureCache(exp)
81 2448 2958 - 2999 Block org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingEventTimeFeatureCache.<init> new KafkaFeatureCache.this.NonIndexingEventTimeFeatureCache(exp)
90 2449 3191 - 3235 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[String,org.geotools.api.feature.simple.SimpleFeature]()
92 2450 3300 - 3313 Apply org.geotools.api.feature.simple.SimpleFeature.getID feature.getID()
92 2451 3290 - 3323 Apply java.util.concurrent.ConcurrentHashMap.put NonIndexingFeatureCache.this.state.put(feature.getID(), feature)
92 2452 3299 - 3299 Literal <nosymbol> ()
94 2453 3369 - 3385 Apply java.util.concurrent.ConcurrentHashMap.remove NonIndexingFeatureCache.this.state.remove(id)
94 2454 3381 - 3381 Literal <nosymbol> ()
96 2455 3420 - 3433 Apply java.util.concurrent.ConcurrentHashMap.clear NonIndexingFeatureCache.this.state.clear()
98 2456 3468 - 3470 Literal <nosymbol> ()
100 2457 3503 - 3515 Apply java.util.concurrent.ConcurrentHashMap.size NonIndexingFeatureCache.this.state.size()
102 2458 3562 - 3582 Select scala.collection.Iterator.length NonIndexingFeatureCache.this.query(filter).length
104 2459 3651 - 3664 Apply java.util.concurrent.ConcurrentHashMap.get NonIndexingFeatureCache.this.state.get(id)
104 2460 3644 - 3665 Apply scala.Option.apply scala.Option.apply[org.geotools.api.feature.simple.SimpleFeature](NonIndexingFeatureCache.this.state.get(id))
108 2461 3803 - 3808 Select org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingFeatureCache.state NonIndexingFeatureCache.this.state
108 2462 3803 - 3831 Select scala.collection.MapLike.valuesIterator scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[String, org.geotools.api.feature.simple.SimpleFeature](NonIndexingFeatureCache.this.state).asScala.valuesIterator
109 2463 3852 - 3866 Select org.geotools.api.filter.Filter.INCLUDE org.geotools.api.filter.Filter.INCLUDE
109 2464 3842 - 3866 Apply java.lang.Object.== filter.==(org.geotools.api.filter.Filter.INCLUDE)
109 2465 3870 - 3878 Ident org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingFeatureCache.features features
110 2466 3912 - 3927 Apply org.geotools.api.filter.Filter.evaluate filter.evaluate(x$1)
110 2467 3896 - 3928 Apply scala.collection.Iterator.filter features.filter({ ((x$1: Any) => filter.evaluate(x$1)) })
110 2468 3896 - 3928 Block scala.collection.Iterator.filter features.filter({ ((x$1: Any) => filter.evaluate(x$1)) })
114 2469 3997 - 4006 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
124 2470 4286 - 4338 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[String,(org.geotools.api.feature.simple.SimpleFeature, Long)]()
134 2471 4805 - 4809 Select org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingEventTimeFeatureCache.time NonIndexingEventTimeFeatureCache.this.time
134 2472 4780 - 4819 Apply org.locationtech.geomesa.kafka.index.FeatureStateFactory.time FeatureStateFactory.time(NonIndexingEventTimeFeatureCache.this.time, feature)
134 2473 4770 - 4820 Apply scala.Tuple2.apply scala.Tuple2.apply[org.geotools.api.feature.simple.SimpleFeature, Long](feature, FeatureStateFactory.time(NonIndexingEventTimeFeatureCache.this.time, feature))
135 2474 4847 - 4860 Apply org.geotools.api.feature.simple.SimpleFeature.getID feature.getID()
135 2475 4837 - 4868 Apply java.util.concurrent.ConcurrentHashMap.put NonIndexingEventTimeFeatureCache.this.state.put(feature.getID(), tuple)
136 2476 4886 - 4890 Literal <nosymbol> null
136 2477 4903 - 4911 Select scala.Tuple2._2 tuple._2
136 2478 4894 - 4911 Apply scala.Long.> old._2.>(tuple._2)
136 2479 4879 - 4911 Apply scala.Boolean.&& old.!=(null).&&(old._2.>(tuple._2))
136 2484 4875 - 4875 Literal <nosymbol> ()
136 2485 4875 - 4875 Block <nosymbol> ()
137 2480 4937 - 4950 Apply org.geotools.api.feature.simple.SimpleFeature.getID feature.getID()
137 2481 4923 - 4963 Apply java.util.concurrent.ConcurrentHashMap.replace NonIndexingEventTimeFeatureCache.this.state.replace(feature.getID(), tuple, old)
137 2482 4936 - 4936 Literal <nosymbol> ()
137 2483 4923 - 4963 Block <nosymbol> { NonIndexingEventTimeFeatureCache.this.state.replace(feature.getID(), tuple, old); () }
141 2486 5023 - 5039 Apply java.util.concurrent.ConcurrentHashMap.remove NonIndexingEventTimeFeatureCache.this.state.remove(id)
141 2487 5035 - 5035 Literal <nosymbol> ()
143 2488 5074 - 5087 Apply java.util.concurrent.ConcurrentHashMap.clear NonIndexingEventTimeFeatureCache.this.state.clear()
145 2489 5122 - 5124 Literal <nosymbol> ()
147 2490 5157 - 5169 Apply java.util.concurrent.ConcurrentHashMap.size NonIndexingEventTimeFeatureCache.this.state.size()
149 2491 5216 - 5236 Select scala.collection.Iterator.length NonIndexingEventTimeFeatureCache.this.query(filter).length
151 2492 5305 - 5318 Apply java.util.concurrent.ConcurrentHashMap.get NonIndexingEventTimeFeatureCache.this.state.get(id)
151 2493 5324 - 5328 Select scala.Tuple2._1 x$2._1
151 2494 5298 - 5329 Apply scala.Option.map scala.Option.apply[(org.geotools.api.feature.simple.SimpleFeature, Long)](NonIndexingEventTimeFeatureCache.this.state.get(id)).map[org.geotools.api.feature.simple.SimpleFeature](((x$2: (org.geotools.api.feature.simple.SimpleFeature, Long)) => x$2._1))
155 2495 5467 - 5472 Select org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingEventTimeFeatureCache.state NonIndexingEventTimeFeatureCache.this.state
155 2496 5500 - 5504 Select scala.Tuple2._1 x$3._1
155 2497 5467 - 5505 Apply scala.collection.Iterator.map scala.collection.JavaConverters.mapAsScalaConcurrentMapConverter[String, (org.geotools.api.feature.simple.SimpleFeature, Long)](NonIndexingEventTimeFeatureCache.this.state).asScala.valuesIterator.map[org.geotools.api.feature.simple.SimpleFeature](((x$3: (org.geotools.api.feature.simple.SimpleFeature, Long)) => x$3._1))
156 2498 5526 - 5540 Select org.geotools.api.filter.Filter.INCLUDE org.geotools.api.filter.Filter.INCLUDE
156 2499 5516 - 5540 Apply java.lang.Object.== filter.==(org.geotools.api.filter.Filter.INCLUDE)
156 2500 5544 - 5552 Ident org.locationtech.geomesa.kafka.index.KafkaFeatureCache.NonIndexingEventTimeFeatureCache.features features
157 2501 5586 - 5601 Apply org.geotools.api.filter.Filter.evaluate filter.evaluate(x$1)
157 2502 5570 - 5602 Apply scala.collection.Iterator.filter features.filter({ ((x$1: Any) => filter.evaluate(x$1)) })
157 2503 5570 - 5602 Block scala.collection.Iterator.filter features.filter({ ((x$1: Any) => filter.evaluate(x$1)) })
161 2504 5671 - 5680 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
165 2505 5832 - 5894 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException("Empty feature cache")
166 2506 5939 - 6001 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException("Empty feature cache")
167 2507 6035 - 6097 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException("Empty feature cache")
168 2508 6129 - 6130 Literal <nosymbol> 0
169 2509 6176 - 6177 Literal <nosymbol> 0
170 2510 6238 - 6242 Select scala.None scala.None
171 2511 6309 - 6323 Select scala.collection.Iterator.empty scala.`package`.Iterator.empty
172 2512 6357 - 6359 Literal <nosymbol> ()
173 2513 6453 - 6455 Literal <nosymbol> ()
174 2514 6552 - 6554 Literal <nosymbol> ()
178 2515 6713 - 6715 Literal <nosymbol> ()
179 2516 6760 - 6762 Literal <nosymbol> ()
180 2517 6796 - 6798 Literal <nosymbol> ()
181 2518 6830 - 6831 Literal <nosymbol> 0
182 2519 6877 - 6878 Literal <nosymbol> 0
183 2520 6939 - 6943 Select scala.None scala.None
184 2521 7010 - 7024 Select scala.collection.Iterator.empty scala.`package`.Iterator.empty
185 2522 7058 - 7060 Literal <nosymbol> ()