| Line |
Stmt Id |
Pos |
Tree |
Symbol |
Tests |
Code |
|
36
|
28
|
1456
-
1574
|
Apply
|
java.util.Collections.newSetFromMap
|
|
java.util.Collections.newSetFromMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]())
|
|
37
|
27
|
1489
-
1573
|
Apply
|
java.util.concurrent.ConcurrentHashMap.<init>
|
|
new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]()
|
|
37
|
29
|
1456
-
1582
|
Select
|
scala.collection.convert.Decorators.AsScala.asScala
|
|
scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](java.util.Collections.newSetFromMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]())).asScala
|
|
39
|
30
|
1626
-
1642
|
Select
|
scala.collection.SeqLike.length
|
|
BatchConsumer.this.consumers.length
|
|
39
|
31
|
1644
-
1669
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.<init>
|
|
new BatchConsumer.this.ConsumerCoordinator()
|
|
39
|
32
|
1608
-
1670
|
Apply
|
java.util.concurrent.CyclicBarrier.<init>
|
|
new java.util.concurrent.CyclicBarrier(BatchConsumer.this.consumers.length, new BatchConsumer.this.ConsumerCoordinator())
|
|
40
|
33
|
1696
-
1731
|
Apply
|
org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LogOffsetCommitCallback.<init>
|
|
new org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LogOffsetCommitCallback(BatchConsumer.this.logger)
|
|
46
|
34
|
1907
-
1950
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.<init>
|
|
new BatchConsumer.this.ConsumerRunnable(id, consumer, handler)
|
|
75
|
35
|
3273
-
3278
|
Literal
|
<nosymbol>
|
|
false
|
|
78
|
36
|
3323
-
3340
|
Select
|
scala.collection.TraversableOnce.nonEmpty
|
|
BatchConsumer.this.messages.nonEmpty
|
|
78
|
52
|
3319
-
3319
|
Literal
|
<nosymbol>
|
|
()
|
|
78
|
53
|
3319
-
3319
|
Block
|
<nosymbol>
|
|
()
|
|
79
|
51
|
3352
-
3714
|
Try
|
<nosymbol>
|
|
try {
BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)) match {
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit => {
ConsumerCoordinator.this.resume();
ConsumerCoordinator.this.commit();
BatchConsumer.this.messages.clear()
}
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Continue => ConsumerCoordinator.this.resume()
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Pause => ConsumerCoordinator.this.pause()
}
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error processing message batch:", e)
else
(): Unit)
}
|
|
80
|
37
|
3398
-
3406
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.offset
|
|
x$1.offset()
|
|
80
|
38
|
3397
-
3397
|
Select
|
scala.math.Ordering.Long
|
|
math.this.Ordering.Long
|
|
80
|
39
|
3376
-
3407
|
ApplyToImplicitArgs
|
scala.collection.SeqLike.sortBy
|
|
BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)
|
|
80
|
40
|
3368
-
3408
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.consume
|
|
BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long))
|
|
80
|
49
|
3368
-
3605
|
Match
|
<nosymbol>
|
|
BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)) match {
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit => {
ConsumerCoordinator.this.resume();
ConsumerCoordinator.this.commit();
BatchConsumer.this.messages.clear()
}
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Continue => ConsumerCoordinator.this.resume()
case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Pause => ConsumerCoordinator.this.pause()
}
|
|
81
|
41
|
3458
-
3466
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume
|
|
ConsumerCoordinator.this.resume()
|
|
81
|
42
|
3468
-
3476
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.commit
|
|
ConsumerCoordinator.this.commit()
|
|
81
|
43
|
3478
-
3494
|
Apply
|
scala.collection.mutable.SetLike.clear
|
|
BatchConsumer.this.messages.clear()
|
|
81
|
44
|
3455
-
3494
|
Block
|
<nosymbol>
|
|
{
ConsumerCoordinator.this.resume();
ConsumerCoordinator.this.commit();
BatchConsumer.this.messages.clear()
}
|
|
82
|
45
|
3536
-
3544
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume
|
|
ConsumerCoordinator.this.resume()
|
|
82
|
46
|
3536
-
3544
|
Block
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume
|
|
ConsumerCoordinator.this.resume()
|
|
83
|
47
|
3586
-
3593
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.pause
|
|
ConsumerCoordinator.this.pause()
|
|
83
|
48
|
3586
-
3593
|
Block
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.pause
|
|
ConsumerCoordinator.this.pause()
|
|
86
|
50
|
3654
-
3704
|
Typed
|
<nosymbol>
|
|
(if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error processing message batch:", e)
else
(): Unit)
|
|
92
|
58
|
3770
-
3935
|
Apply
|
scala.collection.IterableLike.foreach
|
|
BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try {
c.commitAsync(BatchConsumer.this.callback)
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error committing offsets:", e)
else
(): Unit)
}))
|
|
93
|
54
|
3823
-
3831
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.callback
|
|
BatchConsumer.this.callback
|
|
93
|
55
|
3809
-
3832
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.commitAsync
|
|
c.commitAsync(BatchConsumer.this.callback)
|
|
93
|
56
|
3809
-
3832
|
Block
|
org.apache.kafka.clients.consumer.Consumer.commitAsync
|
|
c.commitAsync(BatchConsumer.this.callback)
|
|
94
|
57
|
3873
-
3917
|
Typed
|
<nosymbol>
|
|
(if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error committing offsets:", e)
else
(): Unit)
|
|
100
|
59
|
3987
-
3994
|
Select
|
scala.Boolean.unary_!
|
|
ConsumerCoordinator.this.paused.unary_!
|
|
100
|
66
|
3996
-
4207
|
Block
|
<nosymbol>
|
|
{
BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try {
c.pause(c.assignment())
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e)
else
(): Unit)
}));
ConsumerCoordinator.this.paused_=(true)
}
|
|
100
|
67
|
3983
-
3983
|
Literal
|
<nosymbol>
|
|
()
|
|
100
|
68
|
3983
-
3983
|
Block
|
<nosymbol>
|
|
()
|
|
101
|
64
|
4006
-
4177
|
Apply
|
scala.collection.IterableLike.foreach
|
|
BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try {
c.pause(c.assignment())
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e)
else
(): Unit)
}))
|
|
102
|
60
|
4055
-
4069
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.assignment
|
|
c.assignment()
|
|
102
|
61
|
4047
-
4070
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.pause
|
|
c.pause(c.assignment())
|
|
102
|
62
|
4047
-
4070
|
Block
|
org.apache.kafka.clients.consumer.Consumer.pause
|
|
c.pause(c.assignment())
|
|
103
|
63
|
4113
-
4155
|
Typed
|
<nosymbol>
|
|
(if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e)
else
(): Unit)
|
|
106
|
65
|
4186
-
4199
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused_=
|
|
ConsumerCoordinator.this.paused_=(true)
|
|
111
|
69
|
4260
-
4266
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused
|
|
ConsumerCoordinator.this.paused
|
|
111
|
76
|
4268
-
4482
|
Block
|
<nosymbol>
|
|
{
BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try {
c.resume(c.assignment())
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e)
else
(): Unit)
}));
ConsumerCoordinator.this.paused_=(false)
}
|
|
111
|
77
|
4256
-
4256
|
Literal
|
<nosymbol>
|
|
()
|
|
111
|
78
|
4256
-
4256
|
Block
|
<nosymbol>
|
|
()
|
|
112
|
74
|
4278
-
4451
|
Apply
|
scala.collection.IterableLike.foreach
|
|
BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try {
c.resume(c.assignment())
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e)
else
(): Unit)
}))
|
|
113
|
70
|
4328
-
4342
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.assignment
|
|
c.assignment()
|
|
113
|
71
|
4319
-
4343
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.resume
|
|
c.resume(c.assignment())
|
|
113
|
72
|
4319
-
4343
|
Block
|
org.apache.kafka.clients.consumer.Consumer.resume
|
|
c.resume(c.assignment())
|
|
114
|
73
|
4386
-
4429
|
Typed
|
<nosymbol>
|
|
(if (BatchConsumer.this.logger.underlying.isErrorEnabled())
BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e)
else
(): Unit)
|
|
117
|
75
|
4460
-
4474
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused_=
|
|
ConsumerCoordinator.this.paused_=(false)
|
|
126
|
120
|
4687
-
5939
|
Block
|
<nosymbol>
|
|
{
var interrupted: Boolean = false;
while$2(){
if (BatchConsumer.this.isOpen.&&(interrupted.unary_!))
{
try {
val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency);
<stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(",");
(if (BatchConsumer.this.logger.underlying.isDebugEnabled())
BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit);
if (result.isEmpty().unary_!)
{
val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator();
while$1(){
if (records.hasNext())
{
BatchConsumer.this.messages.+=(records.next());
while$1()
}
else
()
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit)
}
else
()
} catch {
case ((_: org.apache.kafka.common.errors.WakeupException)| (_: org.apache.kafka.common.errors.InterruptException)| (_: InterruptedException)) => interrupted = true
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!)
interrupted = true
else
()
} finally if (interrupted.unary_!)
{
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit);
try {
BatchConsumer.this.barrier.await()
} catch {
case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit)
}
else
();
while$2()
}
else
()
}
}
|
|
127
|
79
|
4705
-
4710
|
Literal
|
<nosymbol>
|
|
false
|
|
128
|
80
|
4736
-
4748
|
Select
|
scala.Boolean.unary_!
|
|
interrupted.unary_!
|
|
128
|
81
|
4726
-
4748
|
Apply
|
scala.Boolean.&&
|
|
BatchConsumer.this.isOpen.&&(interrupted.unary_!)
|
|
128
|
117
|
4762
-
5929
|
Block
|
<nosymbol>
|
|
{
try {
val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency);
<stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(",");
(if (BatchConsumer.this.logger.underlying.isDebugEnabled())
BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit);
if (result.isEmpty().unary_!)
{
val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator();
while$1(){
if (records.hasNext())
{
BatchConsumer.this.messages.+=(records.next());
while$1()
}
else
()
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit)
}
else
()
} catch {
case ((_: org.apache.kafka.common.errors.WakeupException)| (_: org.apache.kafka.common.errors.InterruptException)| (_: InterruptedException)) => interrupted = true
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!)
interrupted = true
else
()
} finally if (interrupted.unary_!)
{
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit);
try {
BatchConsumer.this.barrier.await()
} catch {
case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit)
}
else
();
while$2()
}
|
|
128
|
118
|
4719
-
4719
|
Literal
|
<nosymbol>
|
|
()
|
|
128
|
119
|
4719
-
4719
|
Block
|
<nosymbol>
|
|
()
|
|
129
|
97
|
4780
-
5355
|
Block
|
<nosymbol>
|
|
{
val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency);
<stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(",");
(if (BatchConsumer.this.logger.underlying.isDebugEnabled())
BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit);
if (result.isEmpty().unary_!)
{
val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator();
while$1(){
if (records.hasNext())
{
BatchConsumer.this.messages.+=(records.next());
while$1()
}
else
()
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit)
}
else
()
}
|
|
129
|
116
|
4762
-
4762
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.while$2
|
|
while$2()
|
|
130
|
82
|
4820
-
4828
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.consumer
|
|
ConsumerRunnable.this.consumer
|
|
130
|
83
|
4830
-
4839
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.frequency
|
|
BatchConsumer.this.frequency
|
|
130
|
84
|
4793
-
4840
|
Apply
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll
|
|
org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency)
|
|
133
|
85
|
5067
-
5082
|
Select
|
scala.Boolean.unary_!
|
|
result.isEmpty().unary_!
|
|
133
|
94
|
5084
-
5355
|
Block
|
<nosymbol>
|
|
{
val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator();
while$1(){
if (records.hasNext())
{
BatchConsumer.this.messages.+=(records.next());
while$1()
}
else
()
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef))
else
(): Unit)
}
|
|
133
|
95
|
5063
-
5063
|
Literal
|
<nosymbol>
|
|
()
|
|
133
|
96
|
5063
-
5063
|
Block
|
<nosymbol>
|
|
()
|
|
134
|
86
|
5114
-
5131
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecords.iterator
|
|
result.iterator()
|
|
135
|
87
|
5153
-
5168
|
Apply
|
java.util.Iterator.hasNext
|
|
records.hasNext()
|
|
135
|
91
|
5188
-
5214
|
Block
|
<nosymbol>
|
|
{
BatchConsumer.this.messages.+=(records.next());
while$1()
}
|
|
135
|
92
|
5146
-
5146
|
Literal
|
<nosymbol>
|
|
()
|
|
135
|
93
|
5146
-
5146
|
Block
|
<nosymbol>
|
|
()
|
|
136
|
88
|
5200
-
5214
|
Apply
|
java.util.Iterator.next
|
|
records.next()
|
|
136
|
89
|
5188
-
5214
|
Apply
|
scala.collection.mutable.SetLike.+=
|
|
BatchConsumer.this.messages.+=(records.next())
|
|
136
|
90
|
5197
-
5197
|
Apply
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.while$1
|
|
while$1()
|
|
141
|
98
|
5479
-
5483
|
Literal
|
<nosymbol>
|
|
true
|
|
141
|
99
|
5465
-
5483
|
Assign
|
<nosymbol>
|
|
interrupted = true
|
|
142
|
100
|
5536
-
5538
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.id
|
|
ConsumerRunnable.this.id
|
|
142
|
101
|
5520
-
5542
|
Select
|
scala.Boolean.unary_!
|
|
ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!
|
|
142
|
102
|
5560
-
5564
|
Literal
|
<nosymbol>
|
|
true
|
|
142
|
103
|
5546
-
5564
|
Assign
|
<nosymbol>
|
|
interrupted = true
|
|
142
|
104
|
5516
-
5516
|
Literal
|
<nosymbol>
|
|
()
|
|
142
|
105
|
5516
-
5516
|
Block
|
<nosymbol>
|
|
()
|
|
142
|
106
|
5516
-
5566
|
If
|
<nosymbol>
|
|
if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!)
interrupted = true
else
()
|
|
144
|
107
|
5605
-
5617
|
Select
|
scala.Boolean.unary_!
|
|
interrupted.unary_!
|
|
144
|
112
|
5619
-
5917
|
Block
|
<nosymbol>
|
|
{
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit);
try {
BatchConsumer.this.barrier.await()
} catch {
case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit)
}
|
|
144
|
113
|
5601
-
5601
|
Literal
|
<nosymbol>
|
|
()
|
|
144
|
114
|
5601
-
5601
|
Block
|
<nosymbol>
|
|
()
|
|
144
|
115
|
5601
-
5917
|
If
|
<nosymbol>
|
|
if (interrupted.unary_!)
{
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit);
try {
BatchConsumer.this.barrier.await()
} catch {
case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true
};
(if (BatchConsumer.this.logger.underlying.isTraceEnabled())
BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef))
else
(): Unit)
}
else
()
|
|
146
|
108
|
5706
-
5721
|
Apply
|
java.util.concurrent.CyclicBarrier.await
|
|
BatchConsumer.this.barrier.await()
|
|
146
|
109
|
5706
-
5721
|
Block
|
java.util.concurrent.CyclicBarrier.await
|
|
BatchConsumer.this.barrier.await()
|
|
147
|
110
|
5822
-
5826
|
Literal
|
<nosymbol>
|
|
true
|
|
147
|
111
|
5808
-
5826
|
Assign
|
<nosymbol>
|
|
interrupted = true
|
|
154
|
121
|
5972
-
5988
|
Apply
|
org.apache.kafka.clients.consumer.Consumer.close
|
|
ConsumerRunnable.this.consumer.close()
|
|
154
|
122
|
5972
-
5988
|
Block
|
org.apache.kafka.clients.consumer.Consumer.close
|
|
ConsumerRunnable.this.consumer.close()
|
|
154
|
124
|
5966
-
6091
|
Try
|
<nosymbol>
|
|
try {
ConsumerRunnable.this.consumer.close()
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isWarnEnabled())
BatchConsumer.this.logger.underlying.warn(scala.StringContext.apply("Error calling close on consumer: ").s(), e)
else
(): Unit)
}
|
|
155
|
123
|
6029
-
6081
|
Typed
|
<nosymbol>
|
|
(if (BatchConsumer.this.logger.underlying.isWarnEnabled())
BatchConsumer.this.logger.underlying.warn(scala.StringContext.apply("Error calling close on consumer: ").s(), e)
else
(): Unit)
|
|
165
|
125
|
6242
-
6242
|
Select
|
scala.Enumeration.Value
|
|
BatchResult.this.Value
|
|
165
|
126
|
6242
-
6242
|
Select
|
scala.Enumeration.Value
|
|
BatchResult.this.Value
|
|
165
|
127
|
6242
-
6247
|
Select
|
scala.Enumeration.Value
|
|
BatchResult.this.Value
|