1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.data
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord}
13 import org.geotools.api.feature.simple.SimpleFeatureType
14 import org.geotools.api.filter.Filter
15 import org.locationtech.geomesa.kafka.consumer.ThreadedConsumer
16 import org.locationtech.geomesa.kafka.data.KafkaDataStore.ExpiryTimeConfig
17 import org.locationtech.geomesa.kafka.index.KafkaFeatureCache
18 import org.locationtech.geomesa.kafka.utils.GeoMessage.{Change, Clear, Delete}
19 import org.locationtech.geomesa.kafka.utils.GeoMessageSerializer
20 import org.locationtech.geomesa.kafka.versions.{KafkaConsumerVersions, RecordVersions}
21 import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
22 import org.locationtech.geomesa.utils.io.CloseWithLogging
23 
24 import java.io.Closeable
25 import java.time.Duration
26 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
27 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
28 
29 /**
30   * Reads from Kafka and populates a `KafkaFeatureCache`.
31   * Manages geotools feature listeners
32   */
33 trait KafkaCacheLoader extends Closeable with LazyLogging {
34   def cache: KafkaFeatureCache
35 }
36 
37 object KafkaCacheLoader extends LazyLogging {
38 
39   object LoaderStatus {
40     private val count = new AtomicInteger(0)
41     private val firstLoadStartTime = new AtomicLong(0L)
42 
43     def startLoad(): Boolean = synchronized {
44       count.incrementAndGet()
45       firstLoadStartTime.compareAndSet(0L, System.currentTimeMillis())
46     }
47     def completedLoad(): Unit = synchronized {
48       if (count.decrementAndGet() == 0) {
49         logger.info(s"Last active initial load completed.  " +
50           s"Initial loads took ${System.currentTimeMillis()-firstLoadStartTime.get} milliseconds.")
51         firstLoadStartTime.set(0L)
52       }
53     }
54 
55     def allLoaded(): Boolean = count.get() == 0
56   }
57 
58   object NoOpLoader extends KafkaCacheLoader {
59     override val cache: KafkaFeatureCache = KafkaFeatureCache.empty()
60     override def close(): Unit = {}
61   }
62 
63   class KafkaCacheLoaderImpl(
64       sft: SimpleFeatureType,
65       override val cache: KafkaFeatureCache,
66       consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
67       topic: String,
68       frequency: Long,
69       serializer: GeoMessageSerializer,
70       doInitialLoad: Boolean,
71       initialLoadConfig: ExpiryTimeConfig
72     ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) with KafkaCacheLoader {
73 
74     try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch {
75       case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time")
76     }
77 
78     private val initialLoader = if (doInitialLoad) {
79       // for the initial load, don't bother spatially indexing until we have the final state
80       val loader = new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this)
81       CachedThreadPool.execute(loader)
82       Some(loader)
83     } else {
84       startConsumers()
85       None
86     }
87 
88     override def close(): Unit = {
89       try {
90         super.close()
91       } finally {
92         CloseWithLogging(initialLoader)
93         cache.close()
94       }
95     }
96 
97     override protected [KafkaCacheLoader] def consume(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
98       val headers = RecordVersions.getHeaders(record)
99       val timestamp = RecordVersions.getTimestamp(record)
100       val message = serializer.deserialize(record.key(), record.value(), headers, timestamp)
101       logger.trace(s"Consumed message [$topic:${record.partition}:${record.offset}] $message")
102       message match {
103         case m: Change => cache.fireChange(timestamp, m.feature); cache.put(m.feature)
104         case m: Delete => cache.fireDelete(timestamp, m.id, cache.query(m.id).orNull); cache.remove(m.id)
105         case _: Clear  => cache.fireClear(timestamp); cache.clear()
106         case m => throw new IllegalArgumentException(s"Unknown message: $m")
107       }
108     }
109   }
110 
111   /**
112     * Handles initial loaded 'from-beginning' without indexing features in the spatial index. Will still
113     * trigger message events.
114     *
115     * @param consumers consumers, won't be closed even on call to 'close()'
116     * @param topic kafka topic
117     * @param frequency polling frequency in milliseconds
118     * @param serializer message serializer
119     * @param toLoad main cache loader, used for callback when bulk loading is done
120     */
121   private class InitialLoader(
122       sft: SimpleFeatureType,
123       consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
124       topic: String,
125       frequency: Long,
126       serializer: GeoMessageSerializer,
127       ordering: ExpiryTimeConfig,
128       toLoad: KafkaCacheLoaderImpl
129     ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), false) with Runnable {
130 
131     private val cache = KafkaFeatureCache.nonIndexing(sft, ordering)
132 
133     // track the offsets that we want to read to
134     private val offsets = new ConcurrentHashMap[Int, Long]()
135     private var latch: CountDownLatch = _
136     private val done = new AtomicBoolean(false)
137 
138     override protected def consume(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
139       if (done.get) { toLoad.consume(record) } else {
140         val headers = RecordVersions.getHeaders(record)
141         val timestamp = RecordVersions.getTimestamp(record)
142         val message = serializer.deserialize(record.key, record.value, headers, timestamp)
143         logger.trace(s"Consumed message [$topic:${record.partition}:${record.offset}] $message")
144         message match {
145           case m: Change => toLoad.cache.fireChange(timestamp, m.feature); cache.put(m.feature)
146           case m: Delete => toLoad.cache.fireDelete(timestamp, m.id, cache.query(m.id).orNull); cache.remove(m.id)
147           case _: Clear  => toLoad.cache.fireClear(timestamp); cache.clear()
148           case m => throw new IllegalArgumentException(s"Unknown message: $m")
149         }
150         // once we've hit the max offset for the partition, remove from the offset map to indicate we're done
151         val maxOffset = offsets.getOrDefault(record.partition, Long.MaxValue)
152         if (maxOffset <= record.offset) {
153           offsets.remove(record.partition)
154           latch.countDown()
155           logger.info(s"Initial load: consumed [$topic:${record.partition}:${record.offset}] of $maxOffset, " +
156               s"${latch.getCount} partitions remaining")
157         } else if (record.offset > 0 && record.offset % 1048576 == 0) { // magic number 2^20
158           logger.info(s"Initial load: consumed [$topic:${record.partition}:${record.offset}] of $maxOffset")
159         }
160       }
161     }
162 
163     override def run(): Unit = {
164       LoaderStatus.startLoad()
165 
166       import scala.collection.JavaConverters._
167 
168       val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition)
169       try {
170         // note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading
171         val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq)
172         val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq)
173         partitions.foreach { p =>
174           // end offsets are the *next* offset that will be returned, so subtract one to track the last offset
175           // we will actually consume
176           val endOffset = endOffsets.getOrElse(p, 0L) - 1L
177           // note: not sure if start offsets are also off by one, but at the worst we would skip bulk loading
178           // for the last message per topic
179           val beginningOffset = beginningOffsets.getOrElse(p, 0L)
180           if (beginningOffset < endOffset) {
181             offsets.put(p, endOffset)
182           }
183         }
184       } catch {
185         case e: NoSuchMethodException => logger.warn(s"Can't support initial bulk loading for current Kafka version: $e")
186       }
187       // don't bother spinning up the consumer threads if we don't need to actually bulk load anything
188       if (!offsets.isEmpty) {
189         logger.info(s"Starting initial load for [$topic] with ${offsets.size} partitions")
190         latch = new CountDownLatch(offsets.size)
191         startConsumers() // kick off the asynchronous consumer threads
192         try { latch.await() } finally {
193           // stop the consumer threads, but won't close the consumers due to `closeConsumers`
194           close()
195         }
196         // set a flag just in case the consumer threads haven't finished spinning down, so that we will
197         // pass any additional messages back to the main loader
198         done.set(true)
199         logger.info(s"Finished initial load, transferring to indexed cache for [$topic]")
200         cache.query(Filter.INCLUDE).foreach(toLoad.cache.put)
201         logger.info(s"Finished transfer for [$topic]")
202       }
203       logger.info(s"Starting normal load for [$topic]")
204       // start the normal loading
205       toLoad.startConsumers()
206       LoaderStatus.completedLoad()
207     }
208   }
209 }
Line Stmt Id Pos Tree Symbol Tests Code
40 91798 1764 - 1784 Apply java.util.concurrent.atomic.AtomicInteger.<init> new java.util.concurrent.atomic.AtomicInteger(0)
41 91799 1822 - 1840 Apply java.util.concurrent.atomic.AtomicLong.<init> new java.util.concurrent.atomic.AtomicLong(0L)
43 91804 1873 - 1994 Apply java.lang.Object.synchronized LoaderStatus.this.synchronized[Boolean]({ LoaderStatus.this.count.incrementAndGet(); LoaderStatus.this.firstLoadStartTime.compareAndSet(0L, java.lang.System.currentTimeMillis()) })
44 91800 1894 - 1917 Apply java.util.concurrent.atomic.AtomicInteger.incrementAndGet LoaderStatus.this.count.incrementAndGet()
45 91801 1957 - 1959 Literal <nosymbol> 0L
45 91803 1924 - 1988 Apply java.util.concurrent.atomic.AtomicLong.compareAndSet LoaderStatus.this.firstLoadStartTime.compareAndSet(0L, java.lang.System.currentTimeMillis())
45 91802 1961 - 1987 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
47 91810 2027 - 2295 Apply java.lang.Object.synchronized LoaderStatus.this.synchronized[Unit](if (LoaderStatus.this.count.decrementAndGet().==(0)) { (if (KafkaCacheLoader.this.logger.underlying.isInfoEnabled()) KafkaCacheLoader.this.logger.underlying.info(scala.StringContext.apply("Last active initial load completed. ").s().+(scala.StringContext.apply("Initial loads took ", " milliseconds.").s(java.lang.System.currentTimeMillis().-(LoaderStatus.this.firstLoadStartTime.get())))) else (): Unit); LoaderStatus.this.firstLoadStartTime.set(0L) } else ())
48 91805 2052 - 2080 Apply scala.Int.== LoaderStatus.this.count.decrementAndGet().==(0)
48 91807 2082 - 2289 Block <nosymbol> { (if (KafkaCacheLoader.this.logger.underlying.isInfoEnabled()) KafkaCacheLoader.this.logger.underlying.info(scala.StringContext.apply("Last active initial load completed. ").s().+(scala.StringContext.apply("Initial loads took ", " milliseconds.").s(java.lang.System.currentTimeMillis().-(LoaderStatus.this.firstLoadStartTime.get())))) else (): Unit); LoaderStatus.this.firstLoadStartTime.set(0L) }
48 91809 2048 - 2048 Block <nosymbol> ()
48 91808 2048 - 2048 Literal <nosymbol> ()
51 91806 2255 - 2281 Apply java.util.concurrent.atomic.AtomicLong.set LoaderStatus.this.firstLoadStartTime.set(0L)
55 91811 2328 - 2344 Apply scala.Int.== LoaderStatus.this.count.get().==(0)
59 91812 2441 - 2466 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty(org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty$default$1)
60 91813 2500 - 2502 Literal <nosymbol> ()
74 91815 2934 - 2990 Block java.lang.Class.getMethod classOf[org.apache.kafka.clients.consumer.ConsumerRecord].getMethod("timestamp")
74 91814 2934 - 2990 Apply java.lang.Class.getMethod classOf[org.apache.kafka.clients.consumer.ConsumerRecord].getMethod("timestamp")
75 91816 3040 - 3122 Typed <nosymbol> (if (KafkaCacheLoaderImpl.this.logger.underlying.isWarnEnabled()) KafkaCacheLoaderImpl.this.logger.underlying.warn("This version of Kafka doesn\'t support timestamps, using system time") else (): Unit)
78 91817 3166 - 3179 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.doInitialLoad KafkaCacheLoaderImpl.this.doInitialLoad
78 91827 3181 - 3447 Block <nosymbol> { val loader: org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader = new KafkaCacheLoader.this.InitialLoader(KafkaCacheLoaderImpl.this.sft, KafkaCacheLoaderImpl.this.consumers, KafkaCacheLoaderImpl.this.topic, KafkaCacheLoaderImpl.this.frequency, KafkaCacheLoaderImpl.this.serializer, KafkaCacheLoaderImpl.this.initialLoadConfig, this); org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute(loader); scala.Some.apply[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader](loader) }
80 91819 3318 - 3327 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consumers KafkaCacheLoaderImpl.this.consumers
80 91818 3313 - 3316 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.sft KafkaCacheLoaderImpl.this.sft
80 91821 3336 - 3345 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.frequency KafkaCacheLoaderImpl.this.frequency
80 91820 3329 - 3334 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.topic KafkaCacheLoaderImpl.this.topic
80 91823 3359 - 3376 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.initialLoadConfig KafkaCacheLoaderImpl.this.initialLoadConfig
80 91822 3347 - 3357 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.serializer KafkaCacheLoaderImpl.this.serializer
80 91824 3295 - 3383 Apply org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.<init> new KafkaCacheLoader.this.InitialLoader(KafkaCacheLoaderImpl.this.sft, KafkaCacheLoaderImpl.this.consumers, KafkaCacheLoaderImpl.this.topic, KafkaCacheLoaderImpl.this.frequency, KafkaCacheLoaderImpl.this.serializer, KafkaCacheLoaderImpl.this.initialLoadConfig, this)
81 91825 3390 - 3422 Apply org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute(loader)
82 91826 3429 - 3441 Apply scala.Some.apply scala.Some.apply[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader](loader)
83 91830 3453 - 3494 Block <nosymbol> { KafkaCacheLoaderImpl.this.startConsumers(KafkaCacheLoaderImpl.this.startConsumers$default$1); scala.None }
84 91828 3461 - 3477 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers KafkaCacheLoaderImpl.this.startConsumers(KafkaCacheLoaderImpl.this.startConsumers$default$1)
85 91829 3484 - 3488 Select scala.None scala.None
90 91831 3551 - 3564 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close KafkaCacheLoaderImpl.super.close()
90 91832 3551 - 3564 Block org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close KafkaCacheLoaderImpl.super.close()
91 91837 3581 - 3652 Block <nosymbol> { org.locationtech.geomesa.utils.io.`package`.CloseWithLogging.apply[Option[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader]](KafkaCacheLoaderImpl.this.initialLoader)(io.this.IsCloseable.optionIsCloseable); KafkaCacheLoaderImpl.this.cache.close() }
92 91833 3608 - 3621 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.initialLoader KafkaCacheLoaderImpl.this.initialLoader
92 91835 3591 - 3622 ApplyToImplicitArgs org.locationtech.geomesa.utils.io.CloseWithLogging.apply org.locationtech.geomesa.utils.io.`package`.CloseWithLogging.apply[Option[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader]](KafkaCacheLoaderImpl.this.initialLoader)(io.this.IsCloseable.optionIsCloseable)
92 91834 3607 - 3607 Select org.locationtech.geomesa.utils.io.IsCloseableImplicits.optionIsCloseable io.this.IsCloseable.optionIsCloseable
93 91836 3631 - 3644 Apply java.io.Closeable.close KafkaCacheLoaderImpl.this.cache.close()
98 91838 3794 - 3827 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
99 91839 3850 - 3885 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
100 91841 3943 - 3957 Apply org.apache.kafka.clients.consumer.ConsumerRecord.value record.value()
100 91840 3929 - 3941 Apply org.apache.kafka.clients.consumer.ConsumerRecord.key record.key()
100 91842 3906 - 3978 Apply org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize KafkaCacheLoaderImpl.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
103 91843 4150 - 4159 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature m.feature
103 91845 4172 - 4181 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature m.feature
103 91844 4122 - 4160 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireChange KafkaCacheLoaderImpl.this.cache.fireChange(timestamp, m.feature)
103 91847 4119 - 4182 Block <nosymbol> { KafkaCacheLoaderImpl.this.cache.fireChange(timestamp, m.feature); KafkaCacheLoaderImpl.this.cache.put(m.feature) }
103 91846 4162 - 4182 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put KafkaCacheLoaderImpl.this.cache.put(m.feature)
104 91849 4255 - 4259 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
104 91848 4237 - 4241 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
104 91851 4243 - 4267 ApplyToImplicitArgs scala.Option.orNull KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])
104 91850 4261 - 4261 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
104 91853 4283 - 4287 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
104 91852 4209 - 4268 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireDelete KafkaCacheLoaderImpl.this.cache.fireDelete(timestamp, m.id, KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]))
104 91855 4206 - 4288 Block <nosymbol> { KafkaCacheLoaderImpl.this.cache.fireDelete(timestamp, m.id, KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])); KafkaCacheLoaderImpl.this.cache.remove(m.id) }
104 91854 4270 - 4288 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.remove KafkaCacheLoaderImpl.this.cache.remove(m.id)
105 91857 4343 - 4356 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.clear KafkaCacheLoaderImpl.this.cache.clear()
105 91856 4315 - 4341 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireClear KafkaCacheLoaderImpl.this.cache.fireClear(timestamp)
105 91858 4312 - 4356 Block <nosymbol> { KafkaCacheLoaderImpl.this.cache.fireClear(timestamp); KafkaCacheLoaderImpl.this.cache.clear() }
106 91859 4375 - 4433 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
106 91860 4375 - 4433 Block <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
131 91861 5319 - 5322 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.sft InitialLoader.this.sft
131 91863 5289 - 5333 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.nonIndexing org.locationtech.geomesa.kafka.index.KafkaFeatureCache.nonIndexing(InitialLoader.this.sft, InitialLoader.this.ordering)
131 91862 5324 - 5332 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.ordering InitialLoader.this.ordering
134 91864 5410 - 5444 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[Int,Long]()
136 91865 5510 - 5534 Apply java.util.concurrent.atomic.AtomicBoolean.<init> new java.util.concurrent.atomic.AtomicBoolean(false)
139 91867 5653 - 5675 Apply org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consume InitialLoader.this.toLoad.consume(record)
139 91866 5641 - 5649 Apply java.util.concurrent.atomic.AtomicBoolean.get InitialLoader.this.done.get()
139 91868 5653 - 5675 Block org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consume InitialLoader.this.toLoad.consume(record)
139 91908 5683 - 7079 Block <nosymbol> { val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record); val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record); val message: org.locationtech.geomesa.kafka.utils.GeoMessage = InitialLoader.this.serializer.deserialize(record.key(), record.value(), headers, timestamp); (if (InitialLoader.this.logger.underlying.isTraceEnabled()) InitialLoader.this.logger.underlying.trace("Consumed message [{}:{}:{}] {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], (message: AnyRef)) else (): Unit); message match { case (m @ (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Change)) => { InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature); InitialLoader.this.cache.put(m.feature) } case (m @ (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Delete)) => { InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])); InitialLoader.this.cache.remove(m.id) } case (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Clear) => { InitialLoader.this.toLoad.cache.fireClear(timestamp); InitialLoader.this.cache.clear() } case (m @ _) => throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m)) }; val maxOffset: Long = InitialLoader.this.offsets.getOrDefault(record.partition(), 9223372036854775807L); if (maxOffset.<=(record.offset())) { InitialLoader.this.offsets.remove(record.partition()); InitialLoader.this.latch.countDown(); (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info(scala.StringContext.apply("Initial load: consumed [", ":", ":", "] of ", ", ").s(InitialLoader.this.topic, record.partition(), record.offset(), maxOffset).+(scala.StringContext.apply("", " partitions remaining").s(InitialLoader.this.latch.getCount()))) else (): Unit) } else if (record.offset().>(0).&&(record.offset().%(1048576).==(0))) (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef]) else (): Unit) else () }
140 91869 5707 - 5740 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
141 91870 5765 - 5800 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
142 91871 5846 - 5856 Apply org.apache.kafka.clients.consumer.ConsumerRecord.key record.key()
142 91873 5823 - 5891 Apply org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize InitialLoader.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
142 91872 5858 - 5870 Apply org.apache.kafka.clients.consumer.ConsumerRecord.value record.value()
145 91875 6041 - 6086 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireChange InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature)
145 91874 6076 - 6085 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature m.feature
145 91877 6088 - 6108 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put InitialLoader.this.cache.put(m.feature)
145 91876 6098 - 6107 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature m.feature
145 91878 6038 - 6108 Block <nosymbol> { InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature); InitialLoader.this.cache.put(m.feature) }
146 91879 6172 - 6176 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
146 91881 6196 - 6196 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
146 91880 6190 - 6194 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
146 91883 6137 - 6203 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireDelete InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]))
146 91882 6178 - 6202 ApplyToImplicitArgs scala.Option.orNull InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])
146 91885 6205 - 6223 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.remove InitialLoader.this.cache.remove(m.id)
146 91884 6218 - 6222 Select org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id m.id
146 91886 6134 - 6223 Block <nosymbol> { InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])); InitialLoader.this.cache.remove(m.id) }
147 91887 6252 - 6285 Apply org.locationtech.geomesa.kafka.index.KafkaListeners.fireClear InitialLoader.this.toLoad.cache.fireClear(timestamp)
147 91889 6249 - 6300 Block <nosymbol> { InitialLoader.this.toLoad.cache.fireClear(timestamp); InitialLoader.this.cache.clear() }
147 91888 6287 - 6300 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.clear InitialLoader.this.cache.clear()
148 91891 6321 - 6379 Block <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
148 91890 6321 - 6379 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
151 91893 6563 - 6576 Literal <nosymbol> 9223372036854775807L
151 91892 6545 - 6561 Apply org.apache.kafka.clients.consumer.ConsumerRecord.partition record.partition()
151 91894 6524 - 6577 Apply java.util.concurrent.ConcurrentHashMap.getOrDefault InitialLoader.this.offsets.getOrDefault(record.partition(), 9223372036854775807L)
152 91895 6603 - 6616 Apply org.apache.kafka.clients.consumer.ConsumerRecord.offset record.offset()
152 91896 6590 - 6616 Apply scala.Long.<= maxOffset.<=(record.offset())
152 91900 6618 - 6869 Block <nosymbol> { InitialLoader.this.offsets.remove(record.partition()); InitialLoader.this.latch.countDown(); (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info(scala.StringContext.apply("Initial load: consumed [", ":", ":", "] of ", ", ").s(InitialLoader.this.topic, record.partition(), record.offset(), maxOffset).+(scala.StringContext.apply("", " partitions remaining").s(InitialLoader.this.latch.getCount()))) else (): Unit) }
153 91897 6645 - 6661 Apply org.apache.kafka.clients.consumer.ConsumerRecord.partition record.partition()
153 91898 6630 - 6662 Apply java.util.concurrent.ConcurrentHashMap.remove InitialLoader.this.offsets.remove(record.partition())
154 91899 6673 - 6690 Apply java.util.concurrent.CountDownLatch.countDown InitialLoader.this.latch.countDown()
157 91901 6895 - 6896 Literal <nosymbol> 0
157 91903 6879 - 6928 Apply scala.Boolean.&& record.offset().>(0).&&(record.offset().%(1048576).==(0))
157 91902 6900 - 6928 Apply scala.Long.== record.offset().%(1048576).==(0)
157 91905 6875 - 6875 Literal <nosymbol> ()
157 91907 6875 - 7071 If <nosymbol> if (record.offset().>(0).&&(record.offset().%(1048576).==(0))) (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef]) else (): Unit) else ()
157 91906 6875 - 6875 Block <nosymbol> ()
158 91904 6963 - 7061 Typed <nosymbol> (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef]) else (): Unit)
164 91909 7126 - 7150 Apply org.locationtech.geomesa.kafka.data.KafkaCacheLoader.LoaderStatus.startLoad KafkaCacheLoader.this.LoaderStatus.startLoad()
168 91911 7223 - 7258 Apply org.apache.kafka.clients.consumer.Consumer.partitionsFor InitialLoader.this.consumers.head.partitionsFor(InitialLoader.this.topic)
168 91910 7252 - 7257 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic InitialLoader.this.topic
168 91913 7270 - 7270 TypeApply scala.collection.mutable.Buffer.canBuildFrom mutable.this.Buffer.canBuildFrom[Int]
168 91912 7271 - 7282 Apply org.apache.kafka.common.PartitionInfo.partition x$1.partition()
168 91914 7223 - 7283 ApplyToImplicitArgs scala.collection.TraversableLike.map scala.collection.JavaConverters.asScalaBufferConverter[org.apache.kafka.common.PartitionInfo](InitialLoader.this.consumers.head.partitionsFor(InitialLoader.this.topic)).asScala.map[Int, scala.collection.mutable.Buffer[Int]](((x$1: org.apache.kafka.common.PartitionInfo) => x$1.partition()))(mutable.this.Buffer.canBuildFrom[Int])
169 91931 7418 - 8186 Block <nosymbol> { val beginningOffsets: Map[Int,Long] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq); val endOffsets: Map[Int,Long] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq); partitions.foreach[AnyVal](((p: Int) => { val endOffset: Long = endOffsets.getOrElse[Long](p, 0L).-(1L); val beginningOffset: Long = beginningOffsets.getOrElse[Long](p, 0L); if (beginningOffset.<(endOffset)) InitialLoader.this.offsets.put(p, endOffset) else () })) }
171 91915 7480 - 7494 Select scala.collection.IterableLike.head InitialLoader.this.consumers.head
171 91917 7503 - 7519 Select scala.collection.SeqLike.toSeq partitions.toSeq
171 91916 7496 - 7501 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic InitialLoader.this.topic
171 91918 7441 - 7520 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq)
172 91919 7579 - 7593 Select scala.collection.IterableLike.head InitialLoader.this.consumers.head
172 91921 7602 - 7618 Select scala.collection.SeqLike.toSeq partitions.toSeq
172 91920 7595 - 7600 Select org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic InitialLoader.this.topic
172 91922 7546 - 7619 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq)
173 91930 7628 - 8186 Apply scala.collection.IterableLike.foreach partitions.foreach[AnyVal](((p: Int) => { val endOffset: Long = endOffsets.getOrElse[Long](p, 0L).-(1L); val beginningOffset: Long = beginningOffsets.getOrElse[Long](p, 0L); if (beginningOffset.<(endOffset)) InitialLoader.this.offsets.put(p, endOffset) else () }))
176 91923 7829 - 7861 Apply scala.Long.- endOffsets.getOrElse[Long](p, 0L).-(1L)
179 91924 8048 - 8081 Apply scala.collection.MapLike.getOrElse beginningOffsets.getOrElse[Long](p, 0L)
180 91925 8096 - 8123 Apply scala.Long.< beginningOffset.<(endOffset)
180 91929 8092 - 8092 Block <nosymbol> ()
180 91928 8092 - 8092 Literal <nosymbol> ()
181 91927 8139 - 8164 Block java.util.concurrent.ConcurrentHashMap.put InitialLoader.this.offsets.put(p, endOffset)
181 91926 8139 - 8164 Apply java.util.concurrent.ConcurrentHashMap.put InitialLoader.this.offsets.put(p, endOffset)
185 91932 8244 - 8324 Typed <nosymbol> (if (InitialLoader.this.logger.underlying.isWarnEnabled()) InitialLoader.this.logger.underlying.warn("Can\'t support initial bulk loading for current Kafka version: {}", (e: AnyRef)) else (): Unit)
188 91933 8446 - 8462 Select scala.Boolean.unary_! InitialLoader.this.offsets.isEmpty().unary_!
188 91947 8442 - 8442 Literal <nosymbol> ()
188 91946 8464 - 9244 Block <nosymbol> { (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Starting initial load for [{}] with {} partitions", (scala.Array.apply[AnyRef]((InitialLoader.this.topic: AnyRef), InitialLoader.this.offsets.size().asInstanceOf[AnyRef])((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); InitialLoader.this.latch_=(new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size())); InitialLoader.this.startConsumers(InitialLoader.this.startConsumers$default$1); try { InitialLoader.this.latch.await() } finally InitialLoader.this.close(); InitialLoader.this.done.set(true); (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Finished initial load, transferring to indexed cache for [{}]", (InitialLoader.this.topic: AnyRef)) else (): Unit); InitialLoader.this.cache.query(org.geotools.api.filter.Filter.INCLUDE).foreach[Unit]({ ((feature: org.geotools.api.feature.simple.SimpleFeature) => InitialLoader.this.toLoad.cache.put(feature)) }); (if (InitialLoader.this.logger.underlying.isInfoEnabled()) InitialLoader.this.logger.underlying.info("Finished transfer for [{}]", (InitialLoader.this.topic: AnyRef)) else (): Unit) }
188 91948 8442 - 8442 Block <nosymbol> ()
190 91935 8573 - 8605 Apply java.util.concurrent.CountDownLatch.<init> new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size())
190 91934 8592 - 8604 Apply java.util.concurrent.ConcurrentHashMap.size InitialLoader.this.offsets.size()
190 91936 8565 - 8605 Apply org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.latch_= InitialLoader.this.latch_=(new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size()))
191 91937 8614 - 8630 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers InitialLoader.this.startConsumers(InitialLoader.this.startConsumers$default$1)
192 91939 8691 - 8704 Block java.util.concurrent.CountDownLatch.await InitialLoader.this.latch.await()
192 91938 8691 - 8704 Apply java.util.concurrent.CountDownLatch.await InitialLoader.this.latch.await()
194 91941 8821 - 8828 Block org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close InitialLoader.this.close()
194 91940 8821 - 8828 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close InitialLoader.this.close()
198 91942 9015 - 9029 Apply java.util.concurrent.atomic.AtomicBoolean.set InitialLoader.this.done.set(true)
200 91943 9140 - 9154 Select org.geotools.api.filter.Filter.INCLUDE org.geotools.api.filter.Filter.INCLUDE
200 91945 9128 - 9181 Apply scala.collection.Iterator.foreach InitialLoader.this.cache.query(org.geotools.api.filter.Filter.INCLUDE).foreach[Unit]({ ((feature: org.geotools.api.feature.simple.SimpleFeature) => InitialLoader.this.toLoad.cache.put(feature)) })
200 91944 9164 - 9180 Apply org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put InitialLoader.this.toLoad.cache.put(feature)
205 91949 9341 - 9364 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers InitialLoader.this.toLoad.startConsumers(InitialLoader.this.toLoad.startConsumers$default$1)
206 91950 9371 - 9399 Apply org.locationtech.geomesa.kafka.data.KafkaCacheLoader.LoaderStatus.completedLoad KafkaCacheLoader.this.LoaderStatus.completedLoad()