1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.consumer
10 
11 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord}
12 import org.apache.kafka.common.errors.{InterruptException, WakeupException}
13 import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
14 import org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.{ConsumerErrorHandler, LogOffsetCommitCallback}
15 import org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions
16 
17 import java.time.Duration
18 import java.util.Collections
19 import java.util.concurrent._
20 import scala.util.control.NonFatal
21 
22 /**
23  * Consumer that will process messages in batch, with guaranteed at-least-once processing
24  *
25  * @param consumers consumers
26  * @param frequency poll frequency
27  */
28 abstract class BatchConsumer(consumers: Seq[Consumer[Array[Byte], Array[Byte]]], frequency: Duration)
29     extends BaseThreadedConsumer(consumers) {
30 
31   import BatchResult.BatchResult
32 
33   import scala.collection.JavaConverters._
34 
35   private val messages =
36     Collections.newSetFromMap(
37       new ConcurrentHashMap[ConsumerRecord[Array[Byte], Array[Byte]], java.lang.Boolean]()).asScala
38 
39   private val barrier = new CyclicBarrier(consumers.length, new ConsumerCoordinator())
40   private val callback = new LogOffsetCommitCallback(logger)
41 
42   override protected def createConsumerRunnable(
43       id: String,
44       consumer: Consumer[Array[Byte], Array[Byte]],
45       handler: ConsumerErrorHandler): Runnable = {
46     new ConsumerRunnable(id, consumer, handler)
47   }
48 
49   /**
50    * Consume a batch of records.
51    *
52    * The response from this method will determine the continued processing of messages. If `Commit`
53    * is returned, the batch is considered complete and won't be presented again. If `Continue` is
54    * returned, the batch will be presented again in the future, and more messages will be read off the topic
55    * in the meantime. If `Pause` is returned, the batch will be presented again in the future, but
56    * no more messages will be read off the topic in the meantime.
57    *
58    * This method should return in a reasonable amount of time. If too much time is spent processing
59    * messages, consumers may be considered inactive and be dropped from processing. See
60    * https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
61    *
62    * Note: if there is an error committing the batch or something else goes wrong, some messages may
63    * be repeated in a subsequent call, regardless of the response from this method
64    *
65    * @param records records
66    * @return commit, continue, or pause
67    */
68   protected def consume(records: Seq[ConsumerRecord[Array[Byte], Array[Byte]]]): BatchResult
69 
70   /**
71    * Invokes a callback on a batch of messages and commits offsets
72    */
73   class ConsumerCoordinator extends Runnable {
74 
75     private var paused = false
76 
77     override def run(): Unit = {
78       if (messages.nonEmpty) {
79         try {
80           consume(messages.toSeq.sortBy(_.offset)) match {
81             case BatchResult.Commit   => resume(); commit(); messages.clear()
82             case BatchResult.Continue => resume()
83             case BatchResult.Pause    => pause()
84           }
85         } catch {
86           case NonFatal(e) => logger.error("Error processing message batch:", e)
87         }
88       }
89     }
90 
91     private def commit(): Unit ={
92       consumers.foreach { c =>
93         try { c.commitAsync(callback) } catch {
94           case NonFatal(e) => logger.error("Error committing offsets:", e)
95         }
96       }
97     }
98 
99     private def pause(): Unit = {
100       if (!paused) {
101         consumers.foreach { c =>
102           try { c.pause(c.assignment()) } catch {
103             case NonFatal(e) => logger.error("Error pausing consumer:", e)
104           }
105         }
106         paused = true
107       }
108     }
109 
110     private def resume(): Unit = {
111       if (paused) {
112         consumers.foreach { c =>
113           try { c.resume(c.assignment()) } catch {
114             case NonFatal(e) => logger.error("Error resuming consumer:", e)
115           }
116         }
117         paused = false
118       }
119     }
120   }
121 
122   class ConsumerRunnable(id: String, consumer: Consumer[Array[Byte], Array[Byte]], handler: ConsumerErrorHandler)
123       extends Runnable {
124 
125     override def run(): Unit = {
126       try {
127         var interrupted = false
128         while (isOpen && !interrupted) {
129           try {
130             val result = KafkaConsumerVersions.poll(consumer, frequency)
131             lazy val topics = result.partitions.asScala.map(tp => s"[${tp.topic}:${tp.partition}]").mkString(",")
132             logger.debug(s"Consumer [$id] poll received ${result.count()} records for $topics")
133             if (!result.isEmpty) {
134               val records = result.iterator()
135               while (records.hasNext) {
136                 messages += records.next()
137               }
138               logger.trace(s"Consumer [$id] finished processing ${result.count()} records from topic $topics")
139             }
140           } catch {
141             case _: WakeupException | _: InterruptException | _: InterruptedException => interrupted = true
142             case NonFatal(e) => if (!handler.handle(id, e)) { interrupted = true }
143           } finally {
144             if (!interrupted) {
145               logger.trace(s"Consumer [$id] waiting on barrier")
146               try { barrier.await() } catch {
147                 case _: BrokenBarrierException | _: InterruptedException => interrupted = true
148               }
149               logger.trace(s"Consumer [$id] passed barrier")
150             }
151           }
152         }
153       } finally {
154         try { consumer.close() } catch {
155           case NonFatal(e) => logger.warn(s"Error calling close on consumer: ", e)
156         }
157       }
158     }
159   }
160 }
161 
162 object BatchConsumer {
163   object BatchResult extends Enumeration {
164     type BatchResult = Value
165     val Commit, Continue, Pause = Value
166   }
167 }
Line Stmt Id Pos Tree Symbol Tests Code
36 28 1456 - 1574 Apply java.util.Collections.newSetFromMap java.util.Collections.newSetFromMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]())
37 27 1489 - 1573 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]()
37 29 1456 - 1582 Select scala.collection.convert.Decorators.AsScala.asScala scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](java.util.Collections.newSetFromMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]](new java.util.concurrent.ConcurrentHashMap[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]],Boolean]())).asScala
39 30 1626 - 1642 Select scala.collection.SeqLike.length BatchConsumer.this.consumers.length
39 31 1644 - 1669 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.<init> new BatchConsumer.this.ConsumerCoordinator()
39 32 1608 - 1670 Apply java.util.concurrent.CyclicBarrier.<init> new java.util.concurrent.CyclicBarrier(BatchConsumer.this.consumers.length, new BatchConsumer.this.ConsumerCoordinator())
40 33 1696 - 1731 Apply org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LogOffsetCommitCallback.<init> new org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LogOffsetCommitCallback(BatchConsumer.this.logger)
46 34 1907 - 1950 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.<init> new BatchConsumer.this.ConsumerRunnable(id, consumer, handler)
75 35 3273 - 3278 Literal <nosymbol> false
78 36 3323 - 3340 Select scala.collection.TraversableOnce.nonEmpty BatchConsumer.this.messages.nonEmpty
78 52 3319 - 3319 Literal <nosymbol> ()
78 53 3319 - 3319 Block <nosymbol> ()
79 51 3352 - 3714 Try <nosymbol> try { BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)) match { case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit => { ConsumerCoordinator.this.resume(); ConsumerCoordinator.this.commit(); BatchConsumer.this.messages.clear() } case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Continue => ConsumerCoordinator.this.resume() case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Pause => ConsumerCoordinator.this.pause() } } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error processing message batch:", e) else (): Unit) }
80 37 3398 - 3406 Apply org.apache.kafka.clients.consumer.ConsumerRecord.offset x$1.offset()
80 38 3397 - 3397 Select scala.math.Ordering.Long math.this.Ordering.Long
80 39 3376 - 3407 ApplyToImplicitArgs scala.collection.SeqLike.sortBy BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)
80 40 3368 - 3408 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.consume BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long))
80 49 3368 - 3605 Match <nosymbol> BatchConsumer.this.consume(BatchConsumer.this.messages.toSeq.sortBy[Long](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => x$1.offset()))(math.this.Ordering.Long)) match { case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit => { ConsumerCoordinator.this.resume(); ConsumerCoordinator.this.commit(); BatchConsumer.this.messages.clear() } case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Continue => ConsumerCoordinator.this.resume() case org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Pause => ConsumerCoordinator.this.pause() }
81 41 3458 - 3466 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume ConsumerCoordinator.this.resume()
81 42 3468 - 3476 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.commit ConsumerCoordinator.this.commit()
81 43 3478 - 3494 Apply scala.collection.mutable.SetLike.clear BatchConsumer.this.messages.clear()
81 44 3455 - 3494 Block <nosymbol> { ConsumerCoordinator.this.resume(); ConsumerCoordinator.this.commit(); BatchConsumer.this.messages.clear() }
82 45 3536 - 3544 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume ConsumerCoordinator.this.resume()
82 46 3536 - 3544 Block org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.resume ConsumerCoordinator.this.resume()
83 47 3586 - 3593 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.pause ConsumerCoordinator.this.pause()
83 48 3586 - 3593 Block org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.pause ConsumerCoordinator.this.pause()
86 50 3654 - 3704 Typed <nosymbol> (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error processing message batch:", e) else (): Unit)
92 58 3770 - 3935 Apply scala.collection.IterableLike.foreach BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try { c.commitAsync(BatchConsumer.this.callback) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error committing offsets:", e) else (): Unit) }))
93 54 3823 - 3831 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.callback BatchConsumer.this.callback
93 55 3809 - 3832 Apply org.apache.kafka.clients.consumer.Consumer.commitAsync c.commitAsync(BatchConsumer.this.callback)
93 56 3809 - 3832 Block org.apache.kafka.clients.consumer.Consumer.commitAsync c.commitAsync(BatchConsumer.this.callback)
94 57 3873 - 3917 Typed <nosymbol> (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error committing offsets:", e) else (): Unit)
100 59 3987 - 3994 Select scala.Boolean.unary_! ConsumerCoordinator.this.paused.unary_!
100 66 3996 - 4207 Block <nosymbol> { BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try { c.pause(c.assignment()) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e) else (): Unit) })); ConsumerCoordinator.this.paused_=(true) }
100 67 3983 - 3983 Literal <nosymbol> ()
100 68 3983 - 3983 Block <nosymbol> ()
101 64 4006 - 4177 Apply scala.collection.IterableLike.foreach BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try { c.pause(c.assignment()) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e) else (): Unit) }))
102 60 4055 - 4069 Apply org.apache.kafka.clients.consumer.Consumer.assignment c.assignment()
102 61 4047 - 4070 Apply org.apache.kafka.clients.consumer.Consumer.pause c.pause(c.assignment())
102 62 4047 - 4070 Block org.apache.kafka.clients.consumer.Consumer.pause c.pause(c.assignment())
103 63 4113 - 4155 Typed <nosymbol> (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error pausing consumer:", e) else (): Unit)
106 65 4186 - 4199 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused_= ConsumerCoordinator.this.paused_=(true)
111 69 4260 - 4266 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused ConsumerCoordinator.this.paused
111 76 4268 - 4482 Block <nosymbol> { BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try { c.resume(c.assignment()) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e) else (): Unit) })); ConsumerCoordinator.this.paused_=(false) }
111 77 4256 - 4256 Literal <nosymbol> ()
111 78 4256 - 4256 Block <nosymbol> ()
112 74 4278 - 4451 Apply scala.collection.IterableLike.foreach BatchConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => try { c.resume(c.assignment()) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e) else (): Unit) }))
113 70 4328 - 4342 Apply org.apache.kafka.clients.consumer.Consumer.assignment c.assignment()
113 71 4319 - 4343 Apply org.apache.kafka.clients.consumer.Consumer.resume c.resume(c.assignment())
113 72 4319 - 4343 Block org.apache.kafka.clients.consumer.Consumer.resume c.resume(c.assignment())
114 73 4386 - 4429 Typed <nosymbol> (if (BatchConsumer.this.logger.underlying.isErrorEnabled()) BatchConsumer.this.logger.underlying.error("Error resuming consumer:", e) else (): Unit)
117 75 4460 - 4474 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerCoordinator.paused_= ConsumerCoordinator.this.paused_=(false)
126 120 4687 - 5939 Block <nosymbol> { var interrupted: Boolean = false; while$2(){ if (BatchConsumer.this.isOpen.&&(interrupted.unary_!)) { try { val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency); <stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(","); (if (BatchConsumer.this.logger.underlying.isDebugEnabled()) BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit); if (result.isEmpty().unary_!) { val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator(); while$1(){ if (records.hasNext()) { BatchConsumer.this.messages.+=(records.next()); while$1() } else () }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit) } else () } catch { case ((_: org.apache.kafka.common.errors.WakeupException)| (_: org.apache.kafka.common.errors.InterruptException)| (_: InterruptedException)) => interrupted = true case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!) interrupted = true else () } finally if (interrupted.unary_!) { (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit); try { BatchConsumer.this.barrier.await() } catch { case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit) } else (); while$2() } else () } }
127 79 4705 - 4710 Literal <nosymbol> false
128 80 4736 - 4748 Select scala.Boolean.unary_! interrupted.unary_!
128 81 4726 - 4748 Apply scala.Boolean.&& BatchConsumer.this.isOpen.&&(interrupted.unary_!)
128 117 4762 - 5929 Block <nosymbol> { try { val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency); <stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(","); (if (BatchConsumer.this.logger.underlying.isDebugEnabled()) BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit); if (result.isEmpty().unary_!) { val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator(); while$1(){ if (records.hasNext()) { BatchConsumer.this.messages.+=(records.next()); while$1() } else () }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit) } else () } catch { case ((_: org.apache.kafka.common.errors.WakeupException)| (_: org.apache.kafka.common.errors.InterruptException)| (_: InterruptedException)) => interrupted = true case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!) interrupted = true else () } finally if (interrupted.unary_!) { (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit); try { BatchConsumer.this.barrier.await() } catch { case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit) } else (); while$2() }
128 118 4719 - 4719 Literal <nosymbol> ()
128 119 4719 - 4719 Block <nosymbol> ()
129 97 4780 - 5355 Block <nosymbol> { val result: org.apache.kafka.clients.consumer.ConsumerRecords[Array[Byte],Array[Byte]] = org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency); <stable> <accessor> lazy val topics: String = scala.collection.JavaConverters.asScalaSetConverter[org.apache.kafka.common.TopicPartition](result.partitions()).asScala.map[String, scala.collection.mutable.Set[String]](((tp: org.apache.kafka.common.TopicPartition) => scala.StringContext.apply("[", ":", "]").s(tp.topic(), tp.partition())))(mutable.this.Set.canBuildFrom[String]).mkString(","); (if (BatchConsumer.this.logger.underlying.isDebugEnabled()) BatchConsumer.this.logger.underlying.debug("Consumer [{}] poll received {} records for {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit); if (result.isEmpty().unary_!) { val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator(); while$1(){ if (records.hasNext()) { BatchConsumer.this.messages.+=(records.next()); while$1() } else () }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit) } else () }
129 116 4762 - 4762 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.while$2 while$2()
130 82 4820 - 4828 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.consumer ConsumerRunnable.this.consumer
130 83 4830 - 4839 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.frequency BatchConsumer.this.frequency
130 84 4793 - 4840 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.poll[Array[Byte], Array[Byte]](ConsumerRunnable.this.consumer, BatchConsumer.this.frequency)
133 85 5067 - 5082 Select scala.Boolean.unary_! result.isEmpty().unary_!
133 94 5084 - 5355 Block <nosymbol> { val records: java.util.Iterator[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]] = result.iterator(); while$1(){ if (records.hasNext()) { BatchConsumer.this.messages.+=(records.next()); while$1() } else () }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] finished processing {} records from topic {}", (ConsumerRunnable.this.id: AnyRef), result.count().asInstanceOf[AnyRef], (topics: AnyRef)) else (): Unit) }
133 95 5063 - 5063 Literal <nosymbol> ()
133 96 5063 - 5063 Block <nosymbol> ()
134 86 5114 - 5131 Apply org.apache.kafka.clients.consumer.ConsumerRecords.iterator result.iterator()
135 87 5153 - 5168 Apply java.util.Iterator.hasNext records.hasNext()
135 91 5188 - 5214 Block <nosymbol> { BatchConsumer.this.messages.+=(records.next()); while$1() }
135 92 5146 - 5146 Literal <nosymbol> ()
135 93 5146 - 5146 Block <nosymbol> ()
136 88 5200 - 5214 Apply java.util.Iterator.next records.next()
136 89 5188 - 5214 Apply scala.collection.mutable.SetLike.+= BatchConsumer.this.messages.+=(records.next())
136 90 5197 - 5197 Apply org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.while$1 while$1()
141 98 5479 - 5483 Literal <nosymbol> true
141 99 5465 - 5483 Assign <nosymbol> interrupted = true
142 100 5536 - 5538 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.ConsumerRunnable.id ConsumerRunnable.this.id
142 101 5520 - 5542 Select scala.Boolean.unary_! ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!
142 102 5560 - 5564 Literal <nosymbol> true
142 103 5546 - 5564 Assign <nosymbol> interrupted = true
142 104 5516 - 5516 Literal <nosymbol> ()
142 105 5516 - 5516 Block <nosymbol> ()
142 106 5516 - 5566 If <nosymbol> if (ConsumerRunnable.this.handler.handle(ConsumerRunnable.this.id, e).unary_!) interrupted = true else ()
144 107 5605 - 5617 Select scala.Boolean.unary_! interrupted.unary_!
144 112 5619 - 5917 Block <nosymbol> { (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit); try { BatchConsumer.this.barrier.await() } catch { case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit) }
144 113 5601 - 5601 Literal <nosymbol> ()
144 114 5601 - 5601 Block <nosymbol> ()
144 115 5601 - 5917 If <nosymbol> if (interrupted.unary_!) { (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] waiting on barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit); try { BatchConsumer.this.barrier.await() } catch { case ((_: java.util.concurrent.BrokenBarrierException)| (_: InterruptedException)) => interrupted = true }; (if (BatchConsumer.this.logger.underlying.isTraceEnabled()) BatchConsumer.this.logger.underlying.trace("Consumer [{}] passed barrier", (ConsumerRunnable.this.id: AnyRef)) else (): Unit) } else ()
146 108 5706 - 5721 Apply java.util.concurrent.CyclicBarrier.await BatchConsumer.this.barrier.await()
146 109 5706 - 5721 Block java.util.concurrent.CyclicBarrier.await BatchConsumer.this.barrier.await()
147 110 5822 - 5826 Literal <nosymbol> true
147 111 5808 - 5826 Assign <nosymbol> interrupted = true
154 121 5972 - 5988 Apply org.apache.kafka.clients.consumer.Consumer.close ConsumerRunnable.this.consumer.close()
154 122 5972 - 5988 Block org.apache.kafka.clients.consumer.Consumer.close ConsumerRunnable.this.consumer.close()
154 124 5966 - 6091 Try <nosymbol> try { ConsumerRunnable.this.consumer.close() } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (BatchConsumer.this.logger.underlying.isWarnEnabled()) BatchConsumer.this.logger.underlying.warn(scala.StringContext.apply("Error calling close on consumer: ").s(), e) else (): Unit) }
155 123 6029 - 6081 Typed <nosymbol> (if (BatchConsumer.this.logger.underlying.isWarnEnabled()) BatchConsumer.this.logger.underlying.warn(scala.StringContext.apply("Error calling close on consumer: ").s(), e) else (): Unit)
165 125 6242 - 6242 Select scala.Enumeration.Value BatchResult.this.Value
165 126 6242 - 6242 Select scala.Enumeration.Value BatchResult.this.Value
165 127 6242 - 6247 Select scala.Enumeration.Value BatchResult.this.Value