1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.versions
10 
11 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener, ConsumerRecords, OffsetAndTimestamp}
12 import org.apache.kafka.common.TopicPartition
13 
14 import java.time.Duration
15 import java.util.Collections
16 
17 /**
18   * Reflection wrapper for KafkaConsumer methods between kafka versions 0.9, 0.10, 1.0, 1.1, 2.0, and 2.1
19   */
20 object KafkaConsumerVersions {
21 
22   private val methods = classOf[Consumer[_, _]].getDeclaredMethods
23 
24   def poll[K, V](consumer: Consumer[K, V], timeout: Duration): ConsumerRecords[K, V] =
25     _poll(consumer, timeout).asInstanceOf[ConsumerRecords[K, V]]
26 
27   def seekToBeginning(consumer: Consumer[_, _], topic: TopicPartition): Unit = _seekToBeginning(consumer, topic)
28 
29   def pause(consumer: Consumer[_, _], topic: TopicPartition): Unit = _pause(consumer, topic)
30 
31   def resume(consumer: Consumer[_, _], topic: TopicPartition): Unit = _resume(consumer, topic)
32 
33   def subscribe(consumer: Consumer[_, _], topic: String): Unit = _subscribe(consumer, topic)
34 
35   def subscribe(consumer: Consumer[_, _], topic: String, listener: ConsumerRebalanceListener): Unit =
36     _subscribeWithListener(consumer, topic, listener)
37 
38   def beginningOffsets(consumer: Consumer[_, _], topic: String, partitions: Seq[Int]): Map[Int, Long] =
39     _beginningOffsets(consumer, topic, partitions)
40 
41   def endOffsets(consumer: Consumer[_, _], topic: String, partitions: Seq[Int]): Map[Int, Long] =
42     _endOffsets(consumer, topic, partitions)
43 
44   def offsetsForTimes(consumer: Consumer[_, _], topic: String, partitions: Seq[Int], time: Long): Map[Int, Long] =
45     _offsetsForTimes(consumer, topic, partitions, time)
46 
47   // this will return ConsumerRecords, but the signature is AnyRef to avoid a second .asInstanceOf
48   private val _poll: (Consumer[_, _], Duration) => AnyRef = {
49     val polls = methods.filter(m => m.getName == "poll" && m.getParameterCount == 1)
50 
51     def fromDuration: Option[(Consumer[_, _], Duration) => AnyRef] = polls.collectFirst {
52       case m if m.getParameterTypes.apply(0) == classOf[Duration] =>
53         (c: Consumer[_, _], d: Duration) => tryInvocation(m.invoke(c, d))
54     }
55     def fromLong: Option[(Consumer[_, _], Duration) => AnyRef] = polls.collectFirst {
56       case m if m.getParameterTypes.apply(0) == java.lang.Long.TYPE =>
57         (c: Consumer[_, _], d: Duration) => tryInvocation(m.invoke(c, Long.box(d.toMillis)))
58     }
59 
60     fromDuration.orElse(fromLong).getOrElse {
61       throw new NoSuchMethodException(s"Couldn't find Consumer.poll method")
62     }
63   }
64 
65   private val _seekToBeginning: (Consumer[_, _], TopicPartition) => Unit = consumerTopicInvocation("seekToBeginning")
66 
67   private val _pause: (Consumer[_, _], TopicPartition) => Unit = consumerTopicInvocation("pause")
68 
69   private val _resume: (Consumer[_, _], TopicPartition) => Unit = consumerTopicInvocation("resume")
70 
71   private val _subscribe: (Consumer[_, _], String) => Unit = {
72     val method = methods.find(m => m.getName == "subscribe" && m.getParameterCount == 1 &&
73         m.getParameterTypes.apply(0).isAssignableFrom(classOf[java.util.List[_]])).getOrElse {
74       throw new NoSuchMethodException(s"Couldn't find Consumer.subscribe method")
75     }
76     (consumer, topic) => tryInvocation(method.invoke(consumer, Collections.singletonList(topic)))
77   }
78 
79   private val _subscribeWithListener: (Consumer[_, _], String, ConsumerRebalanceListener) => Unit = {
80     val method = methods.find(m => m.getName == "subscribe" && m.getParameterCount == 2 &&
81         m.getParameterTypes.apply(0).isAssignableFrom(classOf[java.util.List[_]]) &&
82         m.getParameterTypes.apply(1).isAssignableFrom(classOf[ConsumerRebalanceListener])).getOrElse {
83       throw new NoSuchMethodException(s"Couldn't find Consumer.subscribe method")
84     }
85     (consumer, topic, listener) => tryInvocation(method.invoke(consumer, Collections.singletonList(topic), listener))
86   }
87 
88   private val _beginningOffsets: (Consumer[_, _], String, Seq[Int]) => Map[Int, Long] = {
89     import scala.collection.JavaConverters._
90     // note: this method doesn't exist in 0.9, so may be null
91     val method = methods.find(m => m.getName == "beginningOffsets" && m.getParameterCount == 1).orNull
92     (consumer, topic, partitions) => {
93       if (method == null) {
94         throw new NoSuchMethodException(s"Couldn't find Consumer.beginningOffsets method")
95       }
96       val topicAndPartitions = new java.util.ArrayList[TopicPartition](partitions.length)
97       partitions.foreach(p => topicAndPartitions.add(new TopicPartition(topic, p)))
98       val offsets = tryInvocation(method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[TopicPartition, Long]])
99       val result = Map.newBuilder[Int, Long]
100       result.sizeHint(offsets.size())
101       offsets.asScala.foreach { case (tp, o) => result += (tp.partition -> o) }
102       result.result()
103     }
104   }
105 
106   private val _endOffsets: (Consumer[_, _], String, Seq[Int]) => Map[Int, Long] = {
107     import scala.collection.JavaConverters._
108     // note: this method doesn't exist in 0.9, so may be null
109     val method = methods.find(m => m.getName == "endOffsets" && m.getParameterCount == 1).orNull
110     (consumer, topic, partitions) => {
111       if (method == null) {
112         throw new NoSuchMethodException(s"Couldn't find Consumer.endOffsets method")
113       }
114       val topicAndPartitions = new java.util.ArrayList[TopicPartition](partitions.length)
115       partitions.foreach(p => topicAndPartitions.add(new TopicPartition(topic, p)))
116       val offsets = tryInvocation(method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[TopicPartition, Long]])
117       val result = Map.newBuilder[Int, Long]
118       result.sizeHint(offsets.size())
119       offsets.asScala.foreach { case (tp, o) => result += (tp.partition -> o) }
120       result.result()
121     }
122   }
123 
124   private val _offsetsForTimes: (Consumer[_, _], String, Seq[Int], Long) => Map[Int, Long] = {
125     import scala.collection.JavaConverters._
126     // note: this method doesn't exist until 0.10.1, so may be null
127     val method = methods.find(m => m.getName == "offsetsForTimes" && m.getParameterCount == 1).orNull
128     (consumer, topic, partitions, time) => {
129       if (method == null) {
130         throw new NoSuchMethodException(s"Couldn't find Consumer.offsetsForTimes method")
131       }
132       val timestamps = new java.util.HashMap[TopicPartition, java.lang.Long](partitions.length)
133       partitions.foreach(p => timestamps.put(new TopicPartition(topic, p), Long.box(time)))
134       val offsets = tryInvocation(method.invoke(consumer, timestamps).asInstanceOf[java.util.Map[TopicPartition, OffsetAndTimestamp]])
135       val result = Map.newBuilder[Int, Long]
136       result.sizeHint(offsets.size())
137       offsets.asScala.foreach { case (tp, o) => if (o != null) { result += (tp.partition -> o.offset()) } }
138       result.result()
139     }
140   }
141 
142   private def consumerTopicInvocation(name: String): (Consumer[_, _], TopicPartition) => Unit = {
143     val method = methods.find(m => m.getName == name && m.getParameterCount == 1).getOrElse {
144       throw new NoSuchMethodException(s"Couldn't find Consumer.$name method")
145     }
146     val binding = method.getParameterTypes.apply(0)
147 
148     if (binding == classOf[Array[TopicPartition]]) {
149       (consumer, tp) => tryInvocation(method.invoke(consumer, Array(tp)))
150     } else if (binding == classOf[java.util.Collection[TopicPartition]]) {
151       (consumer, tp) => tryInvocation(method.invoke(consumer, Collections.singletonList(tp)))
152     } else {
153       throw new NoSuchMethodException(s"Couldn't find Consumer.$name method with correct parameters: $method")
154     }
155   }
156 }
Line Stmt Id Pos Tree Symbol Tests Code
22 194 904 - 946 Apply java.lang.Class.getDeclaredMethods classOf[org.apache.kafka.clients.consumer.Consumer].getDeclaredMethods()
25 195 1039 - 1099 TypeApply scala.Any.asInstanceOf KafkaConsumerVersions.this._poll.apply(consumer, timeout).asInstanceOf[org.apache.kafka.clients.consumer.ConsumerRecords[K,V]]
27 196 1180 - 1213 Apply scala.Function2.apply KafkaConsumerVersions.this._seekToBeginning.apply(consumer, topic)
29 197 1284 - 1307 Apply scala.Function2.apply KafkaConsumerVersions.this._pause.apply(consumer, topic)
31 198 1379 - 1403 Apply scala.Function2.apply KafkaConsumerVersions.this._resume.apply(consumer, topic)
33 199 1470 - 1497 Apply scala.Function2.apply KafkaConsumerVersions.this._subscribe.apply(consumer, topic)
36 200 1605 - 1654 Apply scala.Function3.apply KafkaConsumerVersions.this._subscribeWithListener.apply(consumer, topic, listener)
39 201 1764 - 1810 Apply scala.Function3.apply KafkaConsumerVersions.this._beginningOffsets.apply(consumer, topic, partitions)
42 202 1914 - 1954 Apply scala.Function3.apply KafkaConsumerVersions.this._endOffsets.apply(consumer, topic, partitions)
45 203 2075 - 2126 Apply scala.Function4.apply KafkaConsumerVersions.this._offsetsForTimes.apply(consumer, topic, partitions, time)
49 204 2305 - 2312 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
49 205 2338 - 2344 Literal <nosymbol> "poll"
49 206 2348 - 2372 Apply scala.Int.== m.getParameterCount().==(1)
49 207 2325 - 2372 Apply scala.Boolean.&& m.getName().==("poll").&&(m.getParameterCount().==(1))
49 208 2305 - 2373 Apply scala.collection.TraversableLike.filter scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).filter(((m: java.lang.reflect.Method) => m.getName().==("poll").&&(m.getParameterCount().==(1))))
51 213 2463 - 2463 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun.<init> new $anonfun()
51 214 2444 - 2613 Apply scala.collection.TraversableOnce.collectFirst scala.Predef.refArrayOps[java.lang.reflect.Method](polls).collectFirst[(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[java.lang.reflect.Method,(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef] with Serializable { def <init>(): <$anon: java.lang.reflect.Method => ((org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef)> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: java.lang.reflect.Method, B1 >: (org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[java.lang.reflect.Method]: java.lang.reflect.Method): java.lang.reflect.Method @unchecked) match { case (m @ _) if m.getParameterTypes().apply(0).==(classOf[java.time.Duration]) => ((c: org.apache.kafka.clients.consumer.Consumer[_, _], d: java.time.Duration) => versions.this.`package`.tryInvocation[Object](m.invoke(c, d))) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: java.lang.reflect.Method): Boolean = ((x1.asInstanceOf[java.lang.reflect.Method]: java.lang.reflect.Method): java.lang.reflect.Method @unchecked) match { case (m @ _) if m.getParameterTypes().apply(0).==(classOf[java.time.Duration]) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[java.lang.reflect.Method,(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef]))
52 209 2481 - 2530 Apply java.lang.Object.== m.getParameterTypes().apply(0).==(classOf[java.time.Duration])
53 210 2592 - 2606 Apply java.lang.reflect.Method.invoke m.invoke(c, d)
53 211 2578 - 2607 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](m.invoke(c, d))
53 212 2542 - 2607 Function org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun.$anonfun ((c: org.apache.kafka.clients.consumer.Consumer[_, _], d: java.time.Duration) => versions.this.`package`.tryInvocation[Object](m.invoke(c, d)))
55 223 2698 - 2698 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun.<init> new $anonfun()
55 224 2679 - 2869 Apply scala.collection.TraversableOnce.collectFirst scala.Predef.refArrayOps[java.lang.reflect.Method](polls).collectFirst[(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[java.lang.reflect.Method,(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef] with Serializable { def <init>(): <$anon: java.lang.reflect.Method => ((org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef)> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: java.lang.reflect.Method, B1 >: (org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[java.lang.reflect.Method]: java.lang.reflect.Method): java.lang.reflect.Method @unchecked) match { case (m @ _) if m.getParameterTypes().apply(0).==(java.lang.Long.TYPE) => ((c: org.apache.kafka.clients.consumer.Consumer[_, _], d: java.time.Duration) => versions.this.`package`.tryInvocation[Object](m.invoke(c, scala.Long.box(d.toMillis())))) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: java.lang.reflect.Method): Boolean = ((x1.asInstanceOf[java.lang.reflect.Method]: java.lang.reflect.Method): java.lang.reflect.Method @unchecked) match { case (m @ _) if m.getParameterTypes().apply(0).==(java.lang.Long.TYPE) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[java.lang.reflect.Method,(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef]))
56 215 2742 - 2743 Literal <nosymbol> 0
56 216 2748 - 2767 Select java.lang.Long.TYPE java.lang.Long.TYPE
56 217 2716 - 2767 Apply java.lang.Object.== m.getParameterTypes().apply(0).==(java.lang.Long.TYPE)
57 218 2850 - 2860 Apply java.time.Duration.toMillis d.toMillis()
57 219 2841 - 2861 Apply scala.Long.box scala.Long.box(d.toMillis())
57 220 2829 - 2862 Apply java.lang.reflect.Method.invoke m.invoke(c, scala.Long.box(d.toMillis()))
57 221 2815 - 2863 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](m.invoke(c, scala.Long.box(d.toMillis())))
57 222 2779 - 2863 Function org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun.$anonfun ((c: org.apache.kafka.clients.consumer.Consumer[_, _], d: java.time.Duration) => versions.this.`package`.tryInvocation[Object](m.invoke(c, scala.Long.box(d.toMillis()))))
60 226 2875 - 2999 Apply scala.Option.getOrElse fromDuration.orElse[(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](fromLong).getOrElse[(org.apache.kafka.clients.consumer.Consumer[_, _], java.time.Duration) => AnyRef](throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.poll method").s()))
61 225 2923 - 2993 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.poll method").s())
65 227 3080 - 3122 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.consumerTopicInvocation KafkaConsumerVersions.this.consumerTopicInvocation("seekToBeginning")
67 228 3189 - 3221 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.consumerTopicInvocation KafkaConsumerVersions.this.consumerTopicInvocation("pause")
69 229 3289 - 3322 Apply org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.consumerTopicInvocation KafkaConsumerVersions.this.consumerTopicInvocation("resume")
72 230 3404 - 3411 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
72 231 3435 - 3446 Literal <nosymbol> "subscribe"
72 232 3450 - 3474 Apply scala.Int.== m.getParameterCount().==(1)
72 234 3422 - 3559 Apply scala.Boolean.&& m.getName().==("subscribe").&&(m.getParameterCount().==(1)).&&(m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List]))
73 233 3486 - 3559 Apply java.lang.Class.isAssignableFrom m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List])
73 236 3404 - 3660 Apply scala.Option.getOrElse scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==("subscribe").&&(m.getParameterCount().==(1)).&&(m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List])))).getOrElse[java.lang.reflect.Method](throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.subscribe method").s()))
74 235 3579 - 3654 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.subscribe method").s())
76 237 3724 - 3756 Apply java.util.Collections.singletonList java.util.Collections.singletonList[String](topic)
76 238 3700 - 3757 Apply java.lang.reflect.Method.invoke method.invoke(consumer, java.util.Collections.singletonList[String](topic))
76 239 3686 - 3758 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](method.invoke(consumer, java.util.Collections.singletonList[String](topic)))
76 240 3699 - 3699 Literal <nosymbol> ()
80 241 3883 - 3890 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
80 242 3914 - 3925 Literal <nosymbol> "subscribe"
80 243 3929 - 3953 Apply scala.Int.== m.getParameterCount().==(2)
81 244 3965 - 4038 Apply java.lang.Class.isAssignableFrom m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List])
81 246 3901 - 4131 Apply scala.Boolean.&& m.getName().==("subscribe").&&(m.getParameterCount().==(2)).&&(m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List])).&&(m.getParameterTypes().apply(1).isAssignableFrom(classOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener]))
82 245 4050 - 4131 Apply java.lang.Class.isAssignableFrom m.getParameterTypes().apply(1).isAssignableFrom(classOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])
82 248 3883 - 4232 Apply scala.Option.getOrElse scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==("subscribe").&&(m.getParameterCount().==(2)).&&(m.getParameterTypes().apply(0).isAssignableFrom(classOf[java.util.List])).&&(m.getParameterTypes().apply(1).isAssignableFrom(classOf[org.apache.kafka.clients.consumer.ConsumerRebalanceListener])))).getOrElse[java.lang.reflect.Method](throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.subscribe method").s()))
83 247 4151 - 4226 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.subscribe method").s())
85 249 4306 - 4338 Apply java.util.Collections.singletonList java.util.Collections.singletonList[String](topic)
85 250 4282 - 4349 Apply java.lang.reflect.Method.invoke method.invoke(consumer, java.util.Collections.singletonList[String](topic), listener)
85 251 4268 - 4350 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](method.invoke(consumer, java.util.Collections.singletonList[String](topic), listener))
85 252 4281 - 4281 Literal <nosymbol> ()
91 253 4570 - 4577 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
91 254 4601 - 4619 Literal <nosymbol> "beginningOffsets"
91 255 4623 - 4647 Apply scala.Int.== m.getParameterCount().==(1)
91 256 4588 - 4647 Apply scala.Boolean.&& m.getName().==("beginningOffsets").&&(m.getParameterCount().==(1))
91 257 4649 - 4649 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
91 258 4570 - 4655 ApplyToImplicitArgs scala.Option.orNull scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==("beginningOffsets").&&(m.getParameterCount().==(1)))).orNull[java.lang.reflect.Method](scala.Predef.$conforms[Null])
93 259 4705 - 4719 Apply java.lang.Object.== method.==(null)
93 262 4701 - 4701 Literal <nosymbol> ()
93 263 4701 - 4701 Block <nosymbol> ()
94 260 4731 - 4813 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.beginningOffsets method").s())
94 261 4731 - 4813 Block <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.beginningOffsets method").s())
96 264 4893 - 4910 Select scala.collection.SeqLike.length partitions.length
96 265 4853 - 4911 Apply java.util.ArrayList.<init> new java.util.ArrayList[org.apache.kafka.common.TopicPartition](partitions.length)
97 266 4965 - 4993 Apply org.apache.kafka.common.TopicPartition.<init> new org.apache.kafka.common.TopicPartition(topic, p)
97 267 4942 - 4994 Apply java.util.ArrayList.add topicAndPartitions.add(new org.apache.kafka.common.TopicPartition(topic, p))
97 268 4918 - 4995 Apply scala.collection.IterableLike.foreach partitions.foreach[Boolean](((p: Int) => topicAndPartitions.add(new org.apache.kafka.common.TopicPartition(topic, p))))
98 269 5030 - 5123 TypeApply scala.Any.asInstanceOf method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,Long]]
98 270 5016 - 5124 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[java.util.Map[org.apache.kafka.common.TopicPartition,Long]](method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,Long]])
99 271 5144 - 5169 TypeApply scala.collection.immutable.Map.newBuilder scala.Predef.Map.newBuilder[Int, Long]
100 272 5192 - 5206 Apply java.util.Map.size offsets.size()
100 273 5176 - 5207 Apply scala.collection.mutable.Builder.sizeHint result.sizeHint(offsets.size())
101 274 5267 - 5284 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o)
101 275 5256 - 5285 Apply scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o))
101 276 5256 - 5285 Block scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o))
101 277 5214 - 5287 Apply scala.collection.IterableLike.foreach scala.collection.JavaConverters.mapAsScalaMapConverter[org.apache.kafka.common.TopicPartition, Long](offsets).asScala.foreach[scala.collection.mutable.Builder[(Int, Long),scala.collection.immutable.Map[Int,Long]]](((x0$1: (org.apache.kafka.common.TopicPartition, Long)) => x0$1 match { case (_1: org.apache.kafka.common.TopicPartition, _2: Long)(org.apache.kafka.common.TopicPartition, Long)((tp @ _), (o @ _)) => result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o)) }))
102 278 5294 - 5309 Apply scala.collection.mutable.Builder.result result.result()
109 279 5529 - 5536 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
109 280 5560 - 5572 Literal <nosymbol> "endOffsets"
109 281 5576 - 5600 Apply scala.Int.== m.getParameterCount().==(1)
109 282 5547 - 5600 Apply scala.Boolean.&& m.getName().==("endOffsets").&&(m.getParameterCount().==(1))
109 283 5602 - 5602 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
109 284 5529 - 5608 ApplyToImplicitArgs scala.Option.orNull scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==("endOffsets").&&(m.getParameterCount().==(1)))).orNull[java.lang.reflect.Method](scala.Predef.$conforms[Null])
111 285 5658 - 5672 Apply java.lang.Object.== method.==(null)
111 288 5654 - 5654 Literal <nosymbol> ()
111 289 5654 - 5654 Block <nosymbol> ()
112 286 5684 - 5760 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.endOffsets method").s())
112 287 5684 - 5760 Block <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.endOffsets method").s())
114 290 5840 - 5857 Select scala.collection.SeqLike.length partitions.length
114 291 5800 - 5858 Apply java.util.ArrayList.<init> new java.util.ArrayList[org.apache.kafka.common.TopicPartition](partitions.length)
115 292 5912 - 5940 Apply org.apache.kafka.common.TopicPartition.<init> new org.apache.kafka.common.TopicPartition(topic, p)
115 293 5889 - 5941 Apply java.util.ArrayList.add topicAndPartitions.add(new org.apache.kafka.common.TopicPartition(topic, p))
115 294 5865 - 5942 Apply scala.collection.IterableLike.foreach partitions.foreach[Boolean](((p: Int) => topicAndPartitions.add(new org.apache.kafka.common.TopicPartition(topic, p))))
116 295 5977 - 6070 TypeApply scala.Any.asInstanceOf method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,Long]]
116 296 5963 - 6071 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[java.util.Map[org.apache.kafka.common.TopicPartition,Long]](method.invoke(consumer, topicAndPartitions).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,Long]])
117 297 6091 - 6116 TypeApply scala.collection.immutable.Map.newBuilder scala.Predef.Map.newBuilder[Int, Long]
118 298 6139 - 6153 Apply java.util.Map.size offsets.size()
118 299 6123 - 6154 Apply scala.collection.mutable.Builder.sizeHint result.sizeHint(offsets.size())
119 300 6214 - 6231 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o)
119 301 6203 - 6232 Apply scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o))
119 302 6203 - 6232 Block scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o))
119 303 6161 - 6234 Apply scala.collection.IterableLike.foreach scala.collection.JavaConverters.mapAsScalaMapConverter[org.apache.kafka.common.TopicPartition, Long](offsets).asScala.foreach[scala.collection.mutable.Builder[(Int, Long),scala.collection.immutable.Map[Int,Long]]](((x0$2: (org.apache.kafka.common.TopicPartition, Long)) => x0$2 match { case (_1: org.apache.kafka.common.TopicPartition, _2: Long)(org.apache.kafka.common.TopicPartition, Long)((tp @ _), (o @ _)) => result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o)) }))
120 304 6241 - 6256 Apply scala.collection.mutable.Builder.result result.result()
127 305 6493 - 6500 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
127 306 6524 - 6541 Literal <nosymbol> "offsetsForTimes"
127 307 6545 - 6569 Apply scala.Int.== m.getParameterCount().==(1)
127 308 6511 - 6569 Apply scala.Boolean.&& m.getName().==("offsetsForTimes").&&(m.getParameterCount().==(1))
127 309 6571 - 6571 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Null]
127 310 6493 - 6577 ApplyToImplicitArgs scala.Option.orNull scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==("offsetsForTimes").&&(m.getParameterCount().==(1)))).orNull[java.lang.reflect.Method](scala.Predef.$conforms[Null])
129 311 6633 - 6647 Apply java.lang.Object.== method.==(null)
129 314 6629 - 6629 Literal <nosymbol> ()
129 315 6629 - 6629 Block <nosymbol> ()
130 312 6659 - 6740 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.offsetsForTimes method").s())
130 313 6659 - 6740 Block <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.offsetsForTimes method").s())
132 316 6826 - 6843 Select scala.collection.SeqLike.length partitions.length
132 317 6772 - 6844 Apply java.util.HashMap.<init> new java.util.HashMap[org.apache.kafka.common.TopicPartition,Long](partitions.length)
133 318 6890 - 6918 Apply org.apache.kafka.common.TopicPartition.<init> new org.apache.kafka.common.TopicPartition(topic, p)
133 319 6920 - 6934 Apply scala.Long.box scala.Long.box(time)
133 320 6875 - 6935 Apply java.util.HashMap.put timestamps.put(new org.apache.kafka.common.TopicPartition(topic, p), scala.Long.box(time))
133 321 6851 - 6936 Apply scala.collection.IterableLike.foreach partitions.foreach[Long](((p: Int) => timestamps.put(new org.apache.kafka.common.TopicPartition(topic, p), scala.Long.box(time))))
134 322 6971 - 7070 TypeApply scala.Any.asInstanceOf method.invoke(consumer, timestamps).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp]]
134 323 6957 - 7071 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[java.util.Map[org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp]](method.invoke(consumer, timestamps).asInstanceOf[java.util.Map[org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndTimestamp]])
135 324 7091 - 7116 TypeApply scala.collection.immutable.Map.newBuilder scala.Predef.Map.newBuilder[Int, Long]
136 325 7139 - 7153 Apply java.util.Map.size offsets.size()
136 326 7123 - 7154 Apply scala.collection.mutable.Builder.sizeHint result.sizeHint(offsets.size())
137 327 7207 - 7216 Apply java.lang.Object.!= o.!=(null)
137 328 7231 - 7243 Apply org.apache.kafka.common.TopicPartition.partition tp.partition()
137 329 7247 - 7257 Apply org.apache.kafka.clients.consumer.OffsetAndTimestamp.offset o.offset()
137 330 7231 - 7257 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o.offset())
137 331 7220 - 7258 Apply scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o.offset()))
137 332 7220 - 7258 Block scala.collection.mutable.Builder.+= result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o.offset()))
137 333 7203 - 7203 Literal <nosymbol> ()
137 334 7203 - 7203 Block <nosymbol> ()
137 335 7203 - 7260 If <nosymbol> if (o.!=(null)) result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o.offset())) else ()
137 336 7161 - 7262 Apply scala.collection.IterableLike.foreach scala.collection.JavaConverters.mapAsScalaMapConverter[org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndTimestamp](offsets).asScala.foreach[Any](((x0$3: (org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndTimestamp)) => x0$3 match { case (_1: org.apache.kafka.common.TopicPartition, _2: org.apache.kafka.clients.consumer.OffsetAndTimestamp)(org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndTimestamp)((tp @ _), (o @ _)) => if (o.!=(null)) result.+=(scala.Predef.ArrowAssoc[Int](tp.partition()).->[Long](o.offset())) else () }))
138 337 7269 - 7284 Apply scala.collection.mutable.Builder.result result.result()
143 338 7411 - 7418 Select org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.methods KafkaConsumerVersions.this.methods
143 339 7450 - 7474 Apply scala.Int.== m.getParameterCount().==(1)
143 340 7429 - 7474 Apply scala.Boolean.&& m.getName().==(name).&&(m.getParameterCount().==(1))
143 342 7411 - 7571 Apply scala.Option.getOrElse scala.Predef.refArrayOps[java.lang.reflect.Method](KafkaConsumerVersions.this.methods).find(((m: java.lang.reflect.Method) => m.getName().==(name).&&(m.getParameterCount().==(1)))).getOrElse[java.lang.reflect.Method](throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.", " method").s(name)))
144 341 7494 - 7565 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.", " method").s(name))
146 343 7590 - 7623 Apply scala.Array.apply method.getParameterTypes().apply(0)
148 344 7633 - 7674 Apply java.lang.Object.== binding.==(classOf[[Lorg.apache.kafka.common.TopicPartition;])
149 345 7740 - 7749 ApplyToImplicitArgs scala.Array.apply scala.Array.apply[org.apache.kafka.common.TopicPartition](tp)((ClassTag.apply[org.apache.kafka.common.TopicPartition](classOf[org.apache.kafka.common.TopicPartition]): scala.reflect.ClassTag[org.apache.kafka.common.TopicPartition]))
149 346 7716 - 7750 Apply java.lang.reflect.Method.invoke method.invoke(consumer, scala.Array.apply[org.apache.kafka.common.TopicPartition](tp)((ClassTag.apply[org.apache.kafka.common.TopicPartition](classOf[org.apache.kafka.common.TopicPartition]): scala.reflect.ClassTag[org.apache.kafka.common.TopicPartition])))
149 347 7702 - 7751 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](method.invoke(consumer, scala.Array.apply[org.apache.kafka.common.TopicPartition](tp)((ClassTag.apply[org.apache.kafka.common.TopicPartition](classOf[org.apache.kafka.common.TopicPartition]): scala.reflect.ClassTag[org.apache.kafka.common.TopicPartition]))))
149 348 7715 - 7715 Literal <nosymbol> ()
149 349 7684 - 7751 Function org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun ((consumer: org.apache.kafka.clients.consumer.Consumer[_, _], tp: org.apache.kafka.common.TopicPartition) => { versions.this.`package`.tryInvocation[Object](method.invoke(consumer, scala.Array.apply[org.apache.kafka.common.TopicPartition](tp)((ClassTag.apply[org.apache.kafka.common.TopicPartition](classOf[org.apache.kafka.common.TopicPartition]): scala.reflect.ClassTag[org.apache.kafka.common.TopicPartition])))); () })
150 350 7767 - 7823 Apply java.lang.Object.== binding.==(classOf[java.util.Collection])
150 358 7763 - 8050 If <nosymbol> if (binding.==(classOf[java.util.Collection])) ((consumer: org.apache.kafka.clients.consumer.Consumer[_, _], tp: org.apache.kafka.common.TopicPartition) => { versions.this.`package`.tryInvocation[Object](method.invoke(consumer, java.util.Collections.singletonList[org.apache.kafka.common.TopicPartition](tp))); () }) else throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.", " method with correct parameters: ", "").s(name, method))
151 351 7889 - 7918 Apply java.util.Collections.singletonList java.util.Collections.singletonList[org.apache.kafka.common.TopicPartition](tp)
151 352 7865 - 7919 Apply java.lang.reflect.Method.invoke method.invoke(consumer, java.util.Collections.singletonList[org.apache.kafka.common.TopicPartition](tp))
151 353 7851 - 7920 Apply org.locationtech.geomesa.kafka.versions.tryInvocation versions.this.`package`.tryInvocation[Object](method.invoke(consumer, java.util.Collections.singletonList[org.apache.kafka.common.TopicPartition](tp)))
151 354 7864 - 7864 Literal <nosymbol> ()
151 355 7833 - 7920 Function org.locationtech.geomesa.kafka.versions.KafkaConsumerVersions.$anonfun ((consumer: org.apache.kafka.clients.consumer.Consumer[_, _], tp: org.apache.kafka.common.TopicPartition) => { versions.this.`package`.tryInvocation[Object](method.invoke(consumer, java.util.Collections.singletonList[org.apache.kafka.common.TopicPartition](tp))); () })
153 356 7940 - 8044 Throw <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.", " method with correct parameters: ", "").s(name, method))
153 357 7940 - 8044 Block <nosymbol> throw new java.lang.NoSuchMethodException(scala.StringContext.apply("Couldn\'t find Consumer.", " method with correct parameters: ", "").s(name, method))