1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.consumer
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import org.apache.kafka.clients.consumer.Consumer
13 import org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.{ConsumerErrorHandler, LoggingConsumerErrorHandler}
14 
15 import java.io.Closeable
16 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
17 
18 abstract class BaseThreadedConsumer(consumers: Seq[Consumer[Array[Byte], Array[Byte]]])
19     extends Closeable with LazyLogging {
20 
21   import scala.collection.JavaConverters._
22 
23   @volatile
24   private var open = true
25 
26   private val executor: ExecutorService = Executors.newFixedThreadPool(consumers.length)
27 
28   def startConsumers(handler: Option[ConsumerErrorHandler] = None): Unit = {
29     val format = if (consumers.lengthCompare(10) > 0) { "%02d" } else { "%d" }
30     val topics = consumers.flatMap(_.subscription().asScala).distinct
31     val h = handler.getOrElse(new LoggingConsumerErrorHandler(logger, topics))
32     var i = 0
33     consumers.foreach { c =>
34       executor.execute(createConsumerRunnable(String.format(format, Int.box(i)), c, h))
35       i += 1
36     }
37     logger.debug(s"Started $i consumer(s) on topic ${topics.mkString(", ")}")
38   }
39 
40   override def close(): Unit = {
41     open = false
42     executor.shutdown()
43     executor.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
44   }
45 
46   protected def isOpen: Boolean = open
47 
48   protected def createConsumerRunnable(
49       id: String,
50       consumer: Consumer[Array[Byte], Array[Byte]],
51       handler: ConsumerErrorHandler): Runnable
52 }
Line Stmt Id Pos Tree Symbol Tests Code
24 1 1026 - 1030 Literal <nosymbol> true
26 2 1103 - 1119 Select scala.collection.SeqLike.length BaseThreadedConsumer.this.consumers.length
26 3 1074 - 1120 Apply java.util.concurrent.Executors.newFixedThreadPool java.util.concurrent.Executors.newFixedThreadPool(BaseThreadedConsumer.this.consumers.length)
29 4 1220 - 1251 Apply scala.Int.> BaseThreadedConsumer.this.consumers.lengthCompare(10).>(0)
29 5 1255 - 1261 Literal <nosymbol> "%02d"
29 6 1255 - 1261 Block <nosymbol> "%02d"
29 7 1271 - 1275 Literal <nosymbol> "%d"
29 8 1271 - 1275 Block <nosymbol> "%d"
30 9 1313 - 1329 Apply org.apache.kafka.clients.consumer.Consumer.subscription x$1.subscription()
30 10 1313 - 1337 Select scala.collection.convert.Decorators.AsScala.asScala scala.collection.JavaConverters.asScalaSetConverter[String](x$1.subscription()).asScala
30 11 1312 - 1312 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[String]
30 12 1295 - 1347 Select scala.collection.SeqLike.distinct BaseThreadedConsumer.this.consumers.flatMap[String, Seq[String]](((x$1: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => scala.collection.JavaConverters.asScalaSetConverter[String](x$1.subscription()).asScala))(collection.this.Seq.canBuildFrom[String]).distinct
31 13 1378 - 1425 Apply org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LoggingConsumerErrorHandler.<init> new org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LoggingConsumerErrorHandler(BaseThreadedConsumer.this.logger, topics)
31 14 1360 - 1426 Apply scala.Option.getOrElse handler.getOrElse[org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.ConsumerErrorHandler](new org.locationtech.geomesa.kafka.consumer.ThreadedConsumer.LoggingConsumerErrorHandler(BaseThreadedConsumer.this.logger, topics))
32 15 1439 - 1440 Literal <nosymbol> 0
33 21 1445 - 1576 Apply scala.collection.IterableLike.foreach BaseThreadedConsumer.this.consumers.foreach[Unit](((c: org.apache.kafka.clients.consumer.Consumer[Array[Byte],Array[Byte]]) => { BaseThreadedConsumer.this.executor.execute(BaseThreadedConsumer.this.createConsumerRunnable(java.lang.String.format(format, scala.Int.box(i)), c, h)); i = i.+(1) }))
34 16 1538 - 1548 Apply scala.Int.box scala.Int.box(i)
34 17 1516 - 1549 Apply java.lang.String.format java.lang.String.format(format, scala.Int.box(i))
34 18 1493 - 1556 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.createConsumerRunnable BaseThreadedConsumer.this.createConsumerRunnable(java.lang.String.format(format, scala.Int.box(i)), c, h)
34 19 1476 - 1557 Apply java.util.concurrent.Executor.execute BaseThreadedConsumer.this.executor.execute(BaseThreadedConsumer.this.createConsumerRunnable(java.lang.String.format(format, scala.Int.box(i)), c, h))
35 20 1564 - 1570 Apply scala.Int.+ i.+(1)
41 22 1697 - 1709 Apply org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.open_= BaseThreadedConsumer.this.open_=(false)
42 23 1714 - 1733 Apply java.util.concurrent.ExecutorService.shutdown BaseThreadedConsumer.this.executor.shutdown()
43 24 1738 - 1796 Apply java.util.concurrent.ExecutorService.awaitTermination BaseThreadedConsumer.this.executor.awaitTermination(9223372036854775807L, SECONDS)
43 25 1763 - 1763 Literal <nosymbol> ()
46 26 1836 - 1840 Select org.locationtech.geomesa.kafka.consumer.BaseThreadedConsumer.open BaseThreadedConsumer.this.open