| Line |
Stmt Id |
Pos |
Tree |
Symbol |
Tests |
Code |
|
40
|
91798
|
1764
-
1784
|
Apply
|
java.util.concurrent.atomic.AtomicInteger.<init>
|
|
new java.util.concurrent.atomic.AtomicInteger(0)
|
|
41
|
91799
|
1822
-
1840
|
Apply
|
java.util.concurrent.atomic.AtomicLong.<init>
|
|
new java.util.concurrent.atomic.AtomicLong(0L)
|
|
43
|
91804
|
1873
-
1994
|
Apply
|
java.lang.Object.synchronized
|
|
LoaderStatus.this.synchronized[Boolean]({
LoaderStatus.this.count.incrementAndGet();
LoaderStatus.this.firstLoadStartTime.compareAndSet(0L, java.lang.System.currentTimeMillis())
})
|
|
44
|
91800
|
1894
-
1917
|
Apply
|
java.util.concurrent.atomic.AtomicInteger.incrementAndGet
|
|
LoaderStatus.this.count.incrementAndGet()
|
|
45
|
91801
|
1957
-
1959
|
Literal
|
<nosymbol>
|
|
0L
|
|
45
|
91803
|
1924
-
1988
|
Apply
|
java.util.concurrent.atomic.AtomicLong.compareAndSet
|
|
LoaderStatus.this.firstLoadStartTime.compareAndSet(0L, java.lang.System.currentTimeMillis())
|
|
45
|
91802
|
1961
-
1987
|
Apply
|
java.lang.System.currentTimeMillis
|
|
java.lang.System.currentTimeMillis()
|
|
47
|
91810
|
2027
-
2295
|
Apply
|
java.lang.Object.synchronized
|
|
LoaderStatus.this.synchronized[Unit](if (LoaderStatus.this.count.decrementAndGet().==(0))
{
(if (KafkaCacheLoader.this.logger.underlying.isInfoEnabled())
KafkaCacheLoader.this.logger.underlying.info(scala.StringContext.apply("Last active initial load completed. ").s().+(scala.StringContext.apply("Initial loads took ", " milliseconds.").s(java.lang.System.currentTimeMillis().-(LoaderStatus.this.firstLoadStartTime.get()))))
else
(): Unit);
LoaderStatus.this.firstLoadStartTime.set(0L)
}
else
())
|
|
48
|
91805
|
2052
-
2080
|
Apply
|
scala.Int.==
|
|
LoaderStatus.this.count.decrementAndGet().==(0)
|
|
48
|
91807
|
2082
-
2289
|
Block
|
<nosymbol>
|
|
{
(if (KafkaCacheLoader.this.logger.underlying.isInfoEnabled())
KafkaCacheLoader.this.logger.underlying.info(scala.StringContext.apply("Last active initial load completed. ").s().+(scala.StringContext.apply("Initial loads took ", " milliseconds.").s(java.lang.System.currentTimeMillis().-(LoaderStatus.this.firstLoadStartTime.get()))))
else
(): Unit);
LoaderStatus.this.firstLoadStartTime.set(0L)
}
|
|
48
|
91809
|
2048
-
2048
|
Block
|
<nosymbol>
|
|
()
|
|
48
|
91808
|
2048
-
2048
|
Literal
|
<nosymbol>
|
|
()
|
|
51
|
91806
|
2255
-
2281
|
Apply
|
java.util.concurrent.atomic.AtomicLong.set
|
|
LoaderStatus.this.firstLoadStartTime.set(0L)
|
|
55
|
91811
|
2328
-
2344
|
Apply
|
scala.Int.==
|
|
LoaderStatus.this.count.get().==(0)
|
|
59
|
91812
|
2441
-
2466
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty
|
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty(org.locationtech.geomesa.kafka.index.KafkaFeatureCache.empty$default$1)
|
|
60
|
91813
|
2500
-
2502
|
Literal
|
<nosymbol>
|
|
()
|
|
74
|
91815
|
2934
-
2990
|
Block
|
java.lang.Class.getMethod
|
|
classOf[org.apache.kafka.clients.consumer.ConsumerRecord].getMethod("timestamp")
|
|
74
|
91814
|
2934
-
2990
|
Apply
|
java.lang.Class.getMethod
|
|
classOf[org.apache.kafka.clients.consumer.ConsumerRecord].getMethod("timestamp")
|
|
75
|
91816
|
3040
-
3122
|
Typed
|
<nosymbol>
|
|
(if (KafkaCacheLoaderImpl.this.logger.underlying.isWarnEnabled())
KafkaCacheLoaderImpl.this.logger.underlying.warn("This version of Kafka doesn\'t support timestamps, using system time")
else
(): Unit)
|
|
78
|
91817
|
3166
-
3179
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.doInitialLoad
|
|
KafkaCacheLoaderImpl.this.doInitialLoad
|
|
78
|
91827
|
3181
-
3447
|
Block
|
<nosymbol>
|
|
{
val loader: org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader = new KafkaCacheLoader.this.InitialLoader(KafkaCacheLoaderImpl.this.sft, KafkaCacheLoaderImpl.this.consumers, KafkaCacheLoaderImpl.this.topic, KafkaCacheLoaderImpl.this.frequency, KafkaCacheLoaderImpl.this.serializer, KafkaCacheLoaderImpl.this.initialLoadConfig, this);
org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute(loader);
scala.Some.apply[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader](loader)
}
|
|
80
|
91819
|
3318
-
3327
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consumers
|
|
KafkaCacheLoaderImpl.this.consumers
|
|
80
|
91818
|
3313
-
3316
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.sft
|
|
KafkaCacheLoaderImpl.this.sft
|
|
80
|
91821
|
3336
-
3345
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.frequency
|
|
KafkaCacheLoaderImpl.this.frequency
|
|
80
|
91820
|
3329
-
3334
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.topic
|
|
KafkaCacheLoaderImpl.this.topic
|
|
80
|
91823
|
3359
-
3376
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.initialLoadConfig
|
|
KafkaCacheLoaderImpl.this.initialLoadConfig
|
|
80
|
91822
|
3347
-
3357
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.serializer
|
|
KafkaCacheLoaderImpl.this.serializer
|
|
80
|
91824
|
3295
-
3383
|
Apply
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.<init>
|
|
new KafkaCacheLoader.this.InitialLoader(KafkaCacheLoaderImpl.this.sft, KafkaCacheLoaderImpl.this.consumers, KafkaCacheLoaderImpl.this.topic, KafkaCacheLoaderImpl.this.frequency, KafkaCacheLoaderImpl.this.serializer, KafkaCacheLoaderImpl.this.initialLoadConfig, this)
|
|
81
|
91825
|
3390
-
3422
|
Apply
|
org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute
|
|
org.locationtech.geomesa.utils.concurrent.CachedThreadPool.execute(loader)
|
|
82
|
91826
|
3429
-
3441
|
Apply
|
scala.Some.apply
|
|
scala.Some.apply[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader](loader)
|
|
83
|
91830
|
3453
-
3494
|
Block
|
<nosymbol>
|
|
{
KafkaCacheLoaderImpl.this.startConsumers(KafkaCacheLoaderImpl.this.startConsumers$default$1);
scala.None
}
|
|
84
|
91828
|
3461
-
3477
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers
|
|
KafkaCacheLoaderImpl.this.startConsumers(KafkaCacheLoaderImpl.this.startConsumers$default$1)
|
|
85
|
91829
|
3484
-
3488
|
Select
|
scala.None
|
|
scala.None
|
|
90
|
91831
|
3551
-
3564
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close
|
|
KafkaCacheLoaderImpl.super.close()
|
|
90
|
91832
|
3551
-
3564
|
Block
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close
|
|
KafkaCacheLoaderImpl.super.close()
|
|
91
|
91837
|
3581
-
3652
|
Block
|
<nosymbol>
|
|
{
org.locationtech.geomesa.utils.io.`package`.CloseWithLogging.apply[Option[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader]](KafkaCacheLoaderImpl.this.initialLoader)(io.this.IsCloseable.optionIsCloseable);
KafkaCacheLoaderImpl.this.cache.close()
}
|
|
92
|
91833
|
3608
-
3621
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.initialLoader
|
|
KafkaCacheLoaderImpl.this.initialLoader
|
|
92
|
91835
|
3591
-
3622
|
ApplyToImplicitArgs
|
org.locationtech.geomesa.utils.io.CloseWithLogging.apply
|
|
org.locationtech.geomesa.utils.io.`package`.CloseWithLogging.apply[Option[org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader]](KafkaCacheLoaderImpl.this.initialLoader)(io.this.IsCloseable.optionIsCloseable)
|
|
92
|
91834
|
3607
-
3607
|
Select
|
org.locationtech.geomesa.utils.io.IsCloseableImplicits.optionIsCloseable
|
|
io.this.IsCloseable.optionIsCloseable
|
|
93
|
91836
|
3631
-
3644
|
Apply
|
java.io.Closeable.close
|
|
KafkaCacheLoaderImpl.this.cache.close()
|
|
98
|
91838
|
3794
-
3827
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
|
|
99
|
91839
|
3850
-
3885
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
|
|
100
|
91841
|
3943
-
3957
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.value
|
|
record.value()
|
|
100
|
91840
|
3929
-
3941
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.key
|
|
record.key()
|
|
100
|
91842
|
3906
-
3978
|
Apply
|
org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize
|
|
KafkaCacheLoaderImpl.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
|
|
103
|
91843
|
4150
-
4159
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature
|
|
m.feature
|
|
103
|
91845
|
4172
-
4181
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature
|
|
m.feature
|
|
103
|
91844
|
4122
-
4160
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireChange
|
|
KafkaCacheLoaderImpl.this.cache.fireChange(timestamp, m.feature)
|
|
103
|
91847
|
4119
-
4182
|
Block
|
<nosymbol>
|
|
{
KafkaCacheLoaderImpl.this.cache.fireChange(timestamp, m.feature);
KafkaCacheLoaderImpl.this.cache.put(m.feature)
}
|
|
103
|
91846
|
4162
-
4182
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put
|
|
KafkaCacheLoaderImpl.this.cache.put(m.feature)
|
|
104
|
91849
|
4255
-
4259
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
104
|
91848
|
4237
-
4241
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
104
|
91851
|
4243
-
4267
|
ApplyToImplicitArgs
|
scala.Option.orNull
|
|
KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])
|
|
104
|
91850
|
4261
-
4261
|
TypeApply
|
scala.Predef.$conforms
|
|
scala.Predef.$conforms[Null]
|
|
104
|
91853
|
4283
-
4287
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
104
|
91852
|
4209
-
4268
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireDelete
|
|
KafkaCacheLoaderImpl.this.cache.fireDelete(timestamp, m.id, KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]))
|
|
104
|
91855
|
4206
-
4288
|
Block
|
<nosymbol>
|
|
{
KafkaCacheLoaderImpl.this.cache.fireDelete(timestamp, m.id, KafkaCacheLoaderImpl.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]));
KafkaCacheLoaderImpl.this.cache.remove(m.id)
}
|
|
104
|
91854
|
4270
-
4288
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.remove
|
|
KafkaCacheLoaderImpl.this.cache.remove(m.id)
|
|
105
|
91857
|
4343
-
4356
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.clear
|
|
KafkaCacheLoaderImpl.this.cache.clear()
|
|
105
|
91856
|
4315
-
4341
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireClear
|
|
KafkaCacheLoaderImpl.this.cache.fireClear(timestamp)
|
|
105
|
91858
|
4312
-
4356
|
Block
|
<nosymbol>
|
|
{
KafkaCacheLoaderImpl.this.cache.fireClear(timestamp);
KafkaCacheLoaderImpl.this.cache.clear()
}
|
|
106
|
91859
|
4375
-
4433
|
Throw
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
|
|
106
|
91860
|
4375
-
4433
|
Block
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
|
|
131
|
91861
|
5319
-
5322
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.sft
|
|
InitialLoader.this.sft
|
|
131
|
91863
|
5289
-
5333
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.nonIndexing
|
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.nonIndexing(InitialLoader.this.sft, InitialLoader.this.ordering)
|
|
131
|
91862
|
5324
-
5332
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.ordering
|
|
InitialLoader.this.ordering
|
|
134
|
91864
|
5410
-
5444
|
Apply
|
java.util.concurrent.ConcurrentHashMap.<init>
|
|
new java.util.concurrent.ConcurrentHashMap[Int,Long]()
|
|
136
|
91865
|
5510
-
5534
|
Apply
|
java.util.concurrent.atomic.AtomicBoolean.<init>
|
|
new java.util.concurrent.atomic.AtomicBoolean(false)
|
|
139
|
91867
|
5653
-
5675
|
Apply
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consume
|
|
InitialLoader.this.toLoad.consume(record)
|
|
139
|
91866
|
5641
-
5649
|
Apply
|
java.util.concurrent.atomic.AtomicBoolean.get
|
|
InitialLoader.this.done.get()
|
|
139
|
91868
|
5653
-
5675
|
Block
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.KafkaCacheLoaderImpl.consume
|
|
InitialLoader.this.toLoad.consume(record)
|
|
139
|
91908
|
5683
-
7079
|
Block
|
<nosymbol>
|
|
{
val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record);
val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record);
val message: org.locationtech.geomesa.kafka.utils.GeoMessage = InitialLoader.this.serializer.deserialize(record.key(), record.value(), headers, timestamp);
(if (InitialLoader.this.logger.underlying.isTraceEnabled())
InitialLoader.this.logger.underlying.trace("Consumed message [{}:{}:{}] {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], (message: AnyRef))
else
(): Unit);
message match {
case (m @ (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Change)) => {
InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature);
InitialLoader.this.cache.put(m.feature)
}
case (m @ (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Delete)) => {
InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]));
InitialLoader.this.cache.remove(m.id)
}
case (_: org.locationtech.geomesa.kafka.utils.GeoMessage.Clear) => {
InitialLoader.this.toLoad.cache.fireClear(timestamp);
InitialLoader.this.cache.clear()
}
case (m @ _) => throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
};
val maxOffset: Long = InitialLoader.this.offsets.getOrDefault(record.partition(), 9223372036854775807L);
if (maxOffset.<=(record.offset()))
{
InitialLoader.this.offsets.remove(record.partition());
InitialLoader.this.latch.countDown();
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info(scala.StringContext.apply("Initial load: consumed [", ":", ":", "] of ", ", ").s(InitialLoader.this.topic, record.partition(), record.offset(), maxOffset).+(scala.StringContext.apply("", " partitions remaining").s(InitialLoader.this.latch.getCount())))
else
(): Unit)
}
else
if (record.offset().>(0).&&(record.offset().%(1048576).==(0)))
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef])
else
(): Unit)
else
()
}
|
|
140
|
91869
|
5707
-
5740
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
|
|
141
|
91870
|
5765
-
5800
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
|
|
142
|
91871
|
5846
-
5856
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.key
|
|
record.key()
|
|
142
|
91873
|
5823
-
5891
|
Apply
|
org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize
|
|
InitialLoader.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
|
|
142
|
91872
|
5858
-
5870
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.value
|
|
record.value()
|
|
145
|
91875
|
6041
-
6086
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireChange
|
|
InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature)
|
|
145
|
91874
|
6076
-
6085
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature
|
|
m.feature
|
|
145
|
91877
|
6088
-
6108
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put
|
|
InitialLoader.this.cache.put(m.feature)
|
|
145
|
91876
|
6098
-
6107
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Change.feature
|
|
m.feature
|
|
145
|
91878
|
6038
-
6108
|
Block
|
<nosymbol>
|
|
{
InitialLoader.this.toLoad.cache.fireChange(timestamp, m.feature);
InitialLoader.this.cache.put(m.feature)
}
|
|
146
|
91879
|
6172
-
6176
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
146
|
91881
|
6196
-
6196
|
TypeApply
|
scala.Predef.$conforms
|
|
scala.Predef.$conforms[Null]
|
|
146
|
91880
|
6190
-
6194
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
146
|
91883
|
6137
-
6203
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireDelete
|
|
InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]))
|
|
146
|
91882
|
6178
-
6202
|
ApplyToImplicitArgs
|
scala.Option.orNull
|
|
InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null])
|
|
146
|
91885
|
6205
-
6223
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.remove
|
|
InitialLoader.this.cache.remove(m.id)
|
|
146
|
91884
|
6218
-
6222
|
Select
|
org.locationtech.geomesa.kafka.utils.GeoMessage.Delete.id
|
|
m.id
|
|
146
|
91886
|
6134
-
6223
|
Block
|
<nosymbol>
|
|
{
InitialLoader.this.toLoad.cache.fireDelete(timestamp, m.id, InitialLoader.this.cache.query(m.id).orNull[org.geotools.api.feature.simple.SimpleFeature](scala.Predef.$conforms[Null]));
InitialLoader.this.cache.remove(m.id)
}
|
|
147
|
91887
|
6252
-
6285
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaListeners.fireClear
|
|
InitialLoader.this.toLoad.cache.fireClear(timestamp)
|
|
147
|
91889
|
6249
-
6300
|
Block
|
<nosymbol>
|
|
{
InitialLoader.this.toLoad.cache.fireClear(timestamp);
InitialLoader.this.cache.clear()
}
|
|
147
|
91888
|
6287
-
6300
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.clear
|
|
InitialLoader.this.cache.clear()
|
|
148
|
91891
|
6321
-
6379
|
Block
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
|
|
148
|
91890
|
6321
-
6379
|
Throw
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Unknown message: ", "").s(m))
|
|
151
|
91893
|
6563
-
6576
|
Literal
|
<nosymbol>
|
|
9223372036854775807L
|
|
151
|
91892
|
6545
-
6561
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.partition
|
|
record.partition()
|
|
151
|
91894
|
6524
-
6577
|
Apply
|
java.util.concurrent.ConcurrentHashMap.getOrDefault
|
|
InitialLoader.this.offsets.getOrDefault(record.partition(), 9223372036854775807L)
|
|
152
|
91895
|
6603
-
6616
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.offset
|
|
record.offset()
|
|
152
|
91896
|
6590
-
6616
|
Apply
|
scala.Long.<=
|
|
maxOffset.<=(record.offset())
|
|
152
|
91900
|
6618
-
6869
|
Block
|
<nosymbol>
|
|
{
InitialLoader.this.offsets.remove(record.partition());
InitialLoader.this.latch.countDown();
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info(scala.StringContext.apply("Initial load: consumed [", ":", ":", "] of ", ", ").s(InitialLoader.this.topic, record.partition(), record.offset(), maxOffset).+(scala.StringContext.apply("", " partitions remaining").s(InitialLoader.this.latch.getCount())))
else
(): Unit)
}
|
|
153
|
91897
|
6645
-
6661
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.partition
|
|
record.partition()
|
|
153
|
91898
|
6630
-
6662
|
Apply
|
java.util.concurrent.ConcurrentHashMap.remove
|
|
InitialLoader.this.offsets.remove(record.partition())
|
|
154
|
91899
|
6673
-
6690
|
Apply
|
java.util.concurrent.CountDownLatch.countDown
|
|
InitialLoader.this.latch.countDown()
|
|
157
|
91901
|
6895
-
6896
|
Literal
|
<nosymbol>
|
|
0
|
|
157
|
91903
|
6879
-
6928
|
Apply
|
scala.Boolean.&&
|
|
record.offset().>(0).&&(record.offset().%(1048576).==(0))
|
|
157
|
91902
|
6900
-
6928
|
Apply
|
scala.Long.==
|
|
record.offset().%(1048576).==(0)
|
|
157
|
91905
|
6875
-
6875
|
Literal
|
<nosymbol>
|
|
()
|
|
157
|
91907
|
6875
-
7071
|
If
|
<nosymbol>
|
|
if (record.offset().>(0).&&(record.offset().%(1048576).==(0)))
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef])
else
(): Unit)
else
()
|
|
157
|
91906
|
6875
-
6875
|
Block
|
<nosymbol>
|
|
()
|
|
158
|
91904
|
6963
-
7061
|
Typed
|
<nosymbol>
|
|
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Initial load: consumed [{}:{}:{}] of {}", (InitialLoader.this.topic: AnyRef), record.partition().asInstanceOf[AnyRef], record.offset().asInstanceOf[AnyRef], maxOffset.asInstanceOf[AnyRef])
else
(): Unit)
|
|
164
|
91909
|
7126
-
7150
|
Apply
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.LoaderStatus.startLoad
|
|
KafkaCacheLoader.this.LoaderStatus.startLoad()
|
|
168
|
91911
|
7223
-
7258
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.partitionsFor
|
|
InitialLoader.this.consumers.head.partitionsFor(InitialLoader.this.topic)
|
|
168
|
91910
|
7252
-
7257
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic
|
|
InitialLoader.this.topic
|
|
168
|
91913
|
7270
-
7270
|
TypeApply
|
scala.collection.mutable.Buffer.canBuildFrom
|
|
mutable.this.Buffer.canBuildFrom[Int]
|
|
168
|
91912
|
7271
-
7282
|
Apply
|
org.apache.kafka.common.PartitionInfo.partition
|
|
x$1.partition()
|
|
168
|
91914
|
7223
-
7283
|
ApplyToImplicitArgs
|
scala.collection.TraversableLike.map
|
|
scala.collection.JavaConverters.asScalaBufferConverter[org.apache.kafka.common.PartitionInfo](InitialLoader.this.consumers.head.partitionsFor(InitialLoader.this.topic)).asScala.map[Int, scala.collection.mutable.Buffer[Int]](((x$1: org.apache.kafka.common.PartitionInfo) => x$1.partition()))(mutable.this.Buffer.canBuildFrom[Int])
|
|
169
|
91931
|
7418
-
8186
|
Block
|
<nosymbol>
|
|
{
val beginningOffsets: Map[Int,Long] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq);
val endOffsets: Map[Int,Long] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq);
partitions.foreach[AnyVal](((p: Int) => {
val endOffset: Long = endOffsets.getOrElse[Long](p, 0L).-(1L);
val beginningOffset: Long = beginningOffsets.getOrElse[Long](p, 0L);
if (beginningOffset.<(endOffset))
InitialLoader.this.offsets.put(p, endOffset)
else
()
}))
}
|
|
171
|
91915
|
7480
-
7494
|
Select
|
scala.collection.IterableLike.head
|
|
InitialLoader.this.consumers.head
|
|
171
|
91917
|
7503
-
7519
|
Select
|
scala.collection.SeqLike.toSeq
|
|
partitions.toSeq
|
|
171
|
91916
|
7496
-
7501
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic
|
|
InitialLoader.this.topic
|
|
171
|
91918
|
7441
-
7520
|
Apply
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets
|
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.beginningOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq)
|
|
172
|
91919
|
7579
-
7593
|
Select
|
scala.collection.IterableLike.head
|
|
InitialLoader.this.consumers.head
|
|
172
|
91921
|
7602
-
7618
|
Select
|
scala.collection.SeqLike.toSeq
|
|
partitions.toSeq
|
|
172
|
91920
|
7595
-
7600
|
Select
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.topic
|
|
InitialLoader.this.topic
|
|
172
|
91922
|
7546
-
7619
|
Apply
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets
|
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.endOffsets(InitialLoader.this.consumers.head, InitialLoader.this.topic, partitions.toSeq)
|
|
173
|
91930
|
7628
-
8186
|
Apply
|
scala.collection.IterableLike.foreach
|
|
partitions.foreach[AnyVal](((p: Int) => {
val endOffset: Long = endOffsets.getOrElse[Long](p, 0L).-(1L);
val beginningOffset: Long = beginningOffsets.getOrElse[Long](p, 0L);
if (beginningOffset.<(endOffset))
InitialLoader.this.offsets.put(p, endOffset)
else
()
}))
|
|
176
|
91923
|
7829
-
7861
|
Apply
|
scala.Long.-
|
|
endOffsets.getOrElse[Long](p, 0L).-(1L)
|
|
179
|
91924
|
8048
-
8081
|
Apply
|
scala.collection.MapLike.getOrElse
|
|
beginningOffsets.getOrElse[Long](p, 0L)
|
|
180
|
91925
|
8096
-
8123
|
Apply
|
scala.Long.<
|
|
beginningOffset.<(endOffset)
|
|
180
|
91929
|
8092
-
8092
|
Block
|
<nosymbol>
|
|
()
|
|
180
|
91928
|
8092
-
8092
|
Literal
|
<nosymbol>
|
|
()
|
|
181
|
91927
|
8139
-
8164
|
Block
|
java.util.concurrent.ConcurrentHashMap.put
|
|
InitialLoader.this.offsets.put(p, endOffset)
|
|
181
|
91926
|
8139
-
8164
|
Apply
|
java.util.concurrent.ConcurrentHashMap.put
|
|
InitialLoader.this.offsets.put(p, endOffset)
|
|
185
|
91932
|
8244
-
8324
|
Typed
|
<nosymbol>
|
|
(if (InitialLoader.this.logger.underlying.isWarnEnabled())
InitialLoader.this.logger.underlying.warn("Can\'t support initial bulk loading for current Kafka version: {}", (e: AnyRef))
else
(): Unit)
|
|
188
|
91933
|
8446
-
8462
|
Select
|
scala.Boolean.unary_!
|
|
InitialLoader.this.offsets.isEmpty().unary_!
|
|
188
|
91947
|
8442
-
8442
|
Literal
|
<nosymbol>
|
|
()
|
|
188
|
91946
|
8464
-
9244
|
Block
|
<nosymbol>
|
|
{
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Starting initial load for [{}] with {} partitions", (scala.Array.apply[AnyRef]((InitialLoader.this.topic: AnyRef), InitialLoader.this.offsets.size().asInstanceOf[AnyRef])((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*))
else
(): Unit);
InitialLoader.this.latch_=(new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size()));
InitialLoader.this.startConsumers(InitialLoader.this.startConsumers$default$1);
try {
InitialLoader.this.latch.await()
} finally InitialLoader.this.close();
InitialLoader.this.done.set(true);
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Finished initial load, transferring to indexed cache for [{}]", (InitialLoader.this.topic: AnyRef))
else
(): Unit);
InitialLoader.this.cache.query(org.geotools.api.filter.Filter.INCLUDE).foreach[Unit]({
((feature: org.geotools.api.feature.simple.SimpleFeature) => InitialLoader.this.toLoad.cache.put(feature))
});
(if (InitialLoader.this.logger.underlying.isInfoEnabled())
InitialLoader.this.logger.underlying.info("Finished transfer for [{}]", (InitialLoader.this.topic: AnyRef))
else
(): Unit)
}
|
|
188
|
91948
|
8442
-
8442
|
Block
|
<nosymbol>
|
|
()
|
|
190
|
91935
|
8573
-
8605
|
Apply
|
java.util.concurrent.CountDownLatch.<init>
|
|
new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size())
|
|
190
|
91934
|
8592
-
8604
|
Apply
|
java.util.concurrent.ConcurrentHashMap.size
|
|
InitialLoader.this.offsets.size()
|
|
190
|
91936
|
8565
-
8605
|
Apply
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.InitialLoader.latch_=
|
|
InitialLoader.this.latch_=(new java.util.concurrent.CountDownLatch(InitialLoader.this.offsets.size()))
|
|
191
|
91937
|
8614
-
8630
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers
|
|
InitialLoader.this.startConsumers(InitialLoader.this.startConsumers$default$1)
|
|
192
|
91939
|
8691
-
8704
|
Block
|
java.util.concurrent.CountDownLatch.await
|
|
InitialLoader.this.latch.await()
|
|
192
|
91938
|
8691
-
8704
|
Apply
|
java.util.concurrent.CountDownLatch.await
|
|
InitialLoader.this.latch.await()
|
|
194
|
91941
|
8821
-
8828
|
Block
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close
|
|
InitialLoader.this.close()
|
|
194
|
91940
|
8821
-
8828
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.close
|
|
InitialLoader.this.close()
|
|
198
|
91942
|
9015
-
9029
|
Apply
|
java.util.concurrent.atomic.AtomicBoolean.set
|
|
InitialLoader.this.done.set(true)
|
|
200
|
91943
|
9140
-
9154
|
Select
|
org.geotools.api.filter.Filter.INCLUDE
|
|
org.geotools.api.filter.Filter.INCLUDE
|
|
200
|
91945
|
9128
-
9181
|
Apply
|
scala.collection.Iterator.foreach
|
|
InitialLoader.this.cache.query(org.geotools.api.filter.Filter.INCLUDE).foreach[Unit]({
((feature: org.geotools.api.feature.simple.SimpleFeature) => InitialLoader.this.toLoad.cache.put(feature))
})
|
|
200
|
91944
|
9164
-
9180
|
Apply
|
org.locationtech.geomesa.kafka.index.KafkaFeatureCache.put
|
|
InitialLoader.this.toLoad.cache.put(feature)
|
|
205
|
91949
|
9341
-
9364
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.startConsumers
|
|
InitialLoader.this.toLoad.startConsumers(InitialLoader.this.toLoad.startConsumers$default$1)
|
|
206
|
91950
|
9371
-
9399
|
Apply
|
org.locationtech.geomesa.kafka.data.KafkaCacheLoader.LoaderStatus.completedLoad
|
|
KafkaCacheLoader.this.LoaderStatus.completedLoad()
|