1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka
10 
11 import org.apache.kafka.streams.processor.StreamPartitioner
12 import org.geotools.api.data.DataStoreFinder
13 import org.geotools.api.feature.`type`.{AttributeDescriptor, Name}
14 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
15 import org.geotools.api.feature.{GeometryAttribute, Property}
16 import org.geotools.api.filter.identity.FeatureId
17 import org.geotools.api.geometry.BoundingBox
18 import org.locationtech.geomesa.features.SimpleFeatureSerializer
19 import org.locationtech.geomesa.kafka.data.KafkaDataStore
20 import org.locationtech.geomesa.kafka.utils.GeoMessageSerializer
21 import org.locationtech.geomesa.utils.geotools.converters.FastConverter
22 import org.locationtech.geomesa.utils.io.WithClose
23 
24 import java.nio.charset.StandardCharsets
25 import java.util.concurrent.ConcurrentHashMap
26 import scala.collection.mutable.ArrayBuffer
27 
28 package object streams {
29 
30   /**
31    * Trait for provided metadata about a feature type topic
32    */
33   trait HasTopicMetadata {
34 
35     /**
36      * Gets the topic associated with a feature type
37      *
38      * @param typeName feature type name
39      * @return
40      */
41     def topic(typeName: String): String
42 
43     /**
44      * Gets the partitioning associated with a feature type
45      *
46      * @param typeName feature type name
47      * @return true if Kafka default partitioning is used, false if custom partitioning is used
48      */
49     def usesDefaultPartitioning(typeName: String): Boolean
50   }
51 
52   /**
53    * Kafka partitioner for GeoMesa messages, to make sure all updates for a given
54    * feature go to the same partition
55    */
56   class GeoMessageStreamPartitioner extends StreamPartitioner[String, GeoMesaMessage] {
57     override def partition(
58         topic: String,
59         key: String,
60         value: GeoMesaMessage,
61         numPartitions: Int): Integer = {
62       GeoMessageSerializer.partition(numPartitions,
63         if (key == null) { null } else { key.getBytes(StandardCharsets.UTF_8) })
64     }
65   }
66 
67   /**
68    * Cache for serializers and topic names
69    *
70    * @param params data store params
71    */
72   class SerializerCache(params: java.util.Map[String, _]) extends HasTopicMetadata {
73 
74     private val metadataByTypeName = new ConcurrentHashMap[String, SchemaMetadata]()
75     private val serializersByTopic = new ConcurrentHashMap[String, GeoMesaMessageSerializer]()
76 
77     private val metadataLoader = new java.util.function.Function[String, SchemaMetadata]() {
78       override def apply(typeName: String): SchemaMetadata = loadMetadata(typeName)
79     }
80 
81     private val serializerLoader = new java.util.function.Function[String, GeoMesaMessageSerializer]() {
82       override def apply(topic: String): GeoMesaMessageSerializer = loadSerializer(topic)
83     }
84 
85     // track last-used serializer so we don't have to look them up by hash each
86     // time if we're just reading/writing to one topic (which is the standard use-case)
87     @volatile
88     private var last: (String, GeoMesaMessageSerializer) = ("", null)
89 
90     override def topic(typeName: String): String = metadataByTypeName.computeIfAbsent(typeName, metadataLoader).topic
91 
92     override def usesDefaultPartitioning(typeName: String): Boolean =
93       metadataByTypeName.computeIfAbsent(typeName, metadataLoader).usesDefaultPartitioning
94 
95     /**
96      * Gets the serializer associated with a topic
97      *
98      * @param topic kafka topic name
99      * @return
100      */
101     def serializer(topic: String): GeoMesaMessageSerializer = {
102       val (lastTopic, lastSerializer) = last
103       if (lastTopic == topic) { lastSerializer } else {
104         val serializer = serializersByTopic.computeIfAbsent(topic, serializerLoader)
105         // should be thread-safe due to volatile
106         last = (topic, serializer)
107         serializer
108       }
109     }
110 
111     private def loadMetadata(typeName: String): SchemaMetadata = {
112       withDataStore { ds =>
113         ds.getSchema(typeName) match {
114           case sft => SchemaMetadata(KafkaDataStore.topic(sft), KafkaDataStore.usesDefaultPartitioning(sft))
115           case null =>
116             throw new IllegalArgumentException(
117               s"Schema '$typeName' does not exist in the configured store. " +
118                   s"Available schemas: ${ds.getTypeNames.mkString(", ")}")
119         }
120       }
121     }
122 
123     private def loadSerializer(topic: String): GeoMesaMessageSerializer = {
124       withDataStore { ds =>
125         val topics = ArrayBuffer.empty[String]
126         // order so that we check the most likely ones first
127         val typeNames = ds.getTypeNames.partition(_.contains(topic)) match {
128           case (left, right) => left ++ right
129         }
130         var i = 0
131         while (i < typeNames.length) {
132           val sft = ds.getSchema(typeNames(i))
133           KafkaDataStore.topic(sft) match {
134             case t if t == topic =>
135               val internal = ds.serialization(sft).serializer
136               return new GeoMesaMessageSerializer(sft, internal)
137 
138             case t => topics += t
139           }
140           i += 1
141         }
142         throw new IllegalArgumentException(
143           s"Topic '$topic' does not exist in the configured store. Available topics: ${topics.mkString(", ")}")
144       }
145     }
146 
147     private def withDataStore[T](fn: KafkaDataStore => T): T = {
148       WithClose(DataStoreFinder.getDataStore(params)) {
149         case ds: KafkaDataStore => fn(ds)
150         case null => throw new IllegalArgumentException("Could not load data store with provided params")
151         case ds => throw new IllegalArgumentException(s"Expected a KafkaDataStore but got ${ds.getClass.getName}")
152       }
153     }
154 
155     private case class SchemaMetadata(topic: String, usesDefaultPartitioning: Boolean)
156   }
157 
158   /**
159    * Serializer for GeoMesaMessages
160    *
161    * @param sft feature type
162    * @param internal nested serializer
163    */
164   class GeoMesaMessageSerializer(val sft: SimpleFeatureType, val internal: SimpleFeatureSerializer) {
165 
166     import scala.collection.JavaConverters._
167 
168     private val converters: Array[AnyRef => AnyRef] =
169       sft.getAttributeDescriptors.toArray(Array.empty[AttributeDescriptor]).map { d =>
170         val binding = d.getType.getBinding.asInstanceOf[Class[_ <: AnyRef]]
171         (in: AnyRef) => FastConverter.convert(in, binding)
172       }
173 
174     def serialize(data: GeoMesaMessage): Array[Byte] = {
175       data.action match {
176         case MessageAction.Upsert => internal.serialize(wrap(data))
177         case MessageAction.Delete => null
178         case null => throw new NullPointerException("action is null")
179         case _ => throw new UnsupportedOperationException(s"No serialization implemented for action '${data.action}'")
180       }
181     }
182 
183     def deserialize(data: Array[Byte]): GeoMesaMessage = {
184       if (data == null || data.isEmpty) { GeoMesaMessage.delete() } else {
185         val feature = internal.deserialize(data)
186         val userData = if (feature.getUserData.isEmpty) { Map.empty[String, String] } else {
187           val builder = Map.newBuilder[String, String]
188           feature.getUserData.asScala.foreach {
189             case (k: String, v: String) => builder += k -> v
190             case (k, v) => builder += k.toString -> v.toString
191           }
192           builder.result
193         }
194         GeoMesaMessage.upsert(feature.getAttributes.asScala.toSeq, userData)
195       }
196     }
197 
198     /**
199      * Wrap a message as a simple feature
200      *
201      * @param message message
202      * @return
203      */
204     def wrap(message: GeoMesaMessage): SimpleFeature =
205       new SerializableFeature(converters, message.attributes.toIndexedSeq, message.userData)
206   }
207 
208   /**
209    * SimpleFeature skeleton that only provides the methods required for GeoMesa serialization, which are:
210    *   * `def getAttribute(i: Int): AnyRef`
211    *   * `def getUserData: java.util.Map[AnyRef, AnyRef]`
212    *
213    * See
214    *   * @see [[org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization#writeFeature]]
215    *   * @see [[org.locationtech.geomesa.features.avro.serialization.SimpleFeatureDatumWriter#write]]
216    *
217    * @param converters attribute converters to enforce feature type schema
218    * @param attributes message attributes
219    */
220   // noinspection NotImplementedCode
221   private[streams] class SerializableFeature(
222       converters: Array[AnyRef => AnyRef],
223       attributes: IndexedSeq[AnyRef],
224       userData: Map[String, String]
225     ) extends SimpleFeature {
226 
227     import scala.collection.JavaConverters._
228 
229     override def getAttribute(i: Int): AnyRef = converters(i).apply(attributes(i))
230     override def getUserData: java.util.Map[AnyRef, AnyRef] =
231       userData.asJava.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
232 
233     override def getID: String = throw new UnsupportedOperationException()
234     override def getType: SimpleFeatureType = throw new UnsupportedOperationException()
235     override def getFeatureType: SimpleFeatureType = throw new UnsupportedOperationException()
236     override def getAttributes: java.util.List[AnyRef] = throw new UnsupportedOperationException()
237     override def setAttributes(list:java.util.List[AnyRef]): Unit = throw new UnsupportedOperationException()
238     override def setAttributes(objects: Array[AnyRef]): Unit = throw new UnsupportedOperationException()
239     override def getAttribute(s: String): AnyRef = throw new UnsupportedOperationException()
240     override def setAttribute(s: String, o: Any): Unit = throw new UnsupportedOperationException()
241     override def getAttribute(name: Name): AnyRef = throw new UnsupportedOperationException()
242     override def setAttribute(name: Name, o: Any): Unit = throw new UnsupportedOperationException()
243     override def setAttribute(i: Int, o: Any): Unit = throw new UnsupportedOperationException()
244     override def getAttributeCount: Int = throw new UnsupportedOperationException()
245     override def getDefaultGeometry: AnyRef = throw new UnsupportedOperationException()
246     override def setDefaultGeometry(o: Any): Unit = throw new UnsupportedOperationException()
247     override def getIdentifier: FeatureId = throw new UnsupportedOperationException()
248     override def getBounds: BoundingBox = throw new UnsupportedOperationException()
249     override def getDefaultGeometryProperty: GeometryAttribute = throw new UnsupportedOperationException()
250     override def setDefaultGeometryProperty(geometryAttribute: GeometryAttribute): Unit = throw new UnsupportedOperationException()
251     override def setValue(collection:java.util.Collection[Property]): Unit = throw new UnsupportedOperationException()
252     override def getValue:java.util.Collection[_ <: Property] = throw new UnsupportedOperationException()
253     override def getProperties(name: Name):java.util.Collection[Property] = throw new UnsupportedOperationException()
254     override def getProperty(name: Name): Property = throw new UnsupportedOperationException()
255     override def getProperties(s: String):java.util.Collection[Property] = throw new UnsupportedOperationException()
256     override def getProperties:java.util.Collection[Property] = throw new UnsupportedOperationException()
257     override def getProperty(s: String): Property = throw new UnsupportedOperationException()
258     override def validate(): Unit = throw new UnsupportedOperationException()
259     override def getDescriptor: AttributeDescriptor = throw new UnsupportedOperationException()
260     override def setValue(o: Any): Unit = throw new UnsupportedOperationException()
261     override def getName: Name = throw new UnsupportedOperationException()
262     override def isNillable: Boolean = throw new UnsupportedOperationException()
263   }
264 }
Line Stmt Id Pos Tree Symbol Tests Code
62 95637 2308 - 2434 ApplyImplicitView scala.Predef.int2Integer scala.Predef.int2Integer(org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.partition(numPartitions, if (key.==(null)) null else key.getBytes(java.nio.charset.StandardCharsets.UTF_8)))
62 95636 2308 - 2434 Apply org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.partition org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.partition(numPartitions, if (key.==(null)) null else key.getBytes(java.nio.charset.StandardCharsets.UTF_8))
63 95631 2381 - 2385 Literal <nosymbol> null
63 95630 2366 - 2377 Apply java.lang.Object.== key.==(null)
63 95633 2408 - 2430 Select java.nio.charset.StandardCharsets.UTF_8 java.nio.charset.StandardCharsets.UTF_8
63 95632 2381 - 2385 Block <nosymbol> null
63 95635 2395 - 2431 Block java.lang.String.getBytes key.getBytes(java.nio.charset.StandardCharsets.UTF_8)
63 95634 2395 - 2431 Apply java.lang.String.getBytes key.getBytes(java.nio.charset.StandardCharsets.UTF_8)
74 95638 2666 - 2713 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[String,SerializerCache.this.SchemaMetadata]()
75 95639 2751 - 2808 Apply java.util.concurrent.ConcurrentHashMap.<init> new java.util.concurrent.ConcurrentHashMap[String,org.locationtech.geomesa.kafka.streams.package.GeoMesaMessageSerializer]()
77 95641 2843 - 2846 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.$anon.<init> new $anon()
78 95640 2964 - 2986 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.loadMetadata SerializerCache.this.loadMetadata(typeName)
81 95643 3029 - 3032 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.$anon.<init> new $anon()
82 95642 3167 - 3188 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.loadSerializer SerializerCache.this.loadSerializer(topic)
88 95644 3437 - 3447 Apply scala.Tuple2.apply scala.Tuple2.apply[String, Null]("", null)
90 95645 3545 - 3559 Select org.locationtech.geomesa.kafka.streams.SerializerCache.metadataLoader SerializerCache.this.metadataLoader
90 95646 3500 - 3566 Select org.locationtech.geomesa.kafka.streams.SerializerCache.SchemaMetadata.topic SerializerCache.this.metadataByTypeName.computeIfAbsent(typeName, SerializerCache.this.metadataLoader).topic
93 95647 3689 - 3703 Select org.locationtech.geomesa.kafka.streams.SerializerCache.metadataLoader SerializerCache.this.metadataLoader
93 95648 3644 - 3728 Select org.locationtech.geomesa.kafka.streams.SerializerCache.SchemaMetadata.usesDefaultPartitioning SerializerCache.this.metadataByTypeName.computeIfAbsent(typeName, SerializerCache.this.metadataLoader).usesDefaultPartitioning
102 95649 3931 - 3931 Select scala.Tuple2._1 x$1._1
102 95650 3942 - 3942 Select scala.Tuple2._2 x$1._2
103 95651 3975 - 3993 Apply java.lang.Object.== lastTopic.==(topic)
103 95652 3997 - 4011 Ident org.locationtech.geomesa.kafka.streams.SerializerCache.lastSerializer lastSerializer
103 95657 4019 - 4216 Block <nosymbol> { val serializer: org.locationtech.geomesa.kafka.streams.package.GeoMesaMessageSerializer = SerializerCache.this.serializersByTopic.computeIfAbsent(topic, SerializerCache.this.serializerLoader); SerializerCache.this.last_=(scala.Tuple2.apply[String, org.locationtech.geomesa.kafka.streams.package.GeoMesaMessageSerializer](topic, serializer)); serializer }
104 95653 4088 - 4104 Select org.locationtech.geomesa.kafka.streams.SerializerCache.serializerLoader SerializerCache.this.serializerLoader
104 95654 4046 - 4105 Apply java.util.concurrent.ConcurrentHashMap.computeIfAbsent SerializerCache.this.serializersByTopic.computeIfAbsent(topic, SerializerCache.this.serializerLoader)
106 95655 4170 - 4189 Apply scala.Tuple2.apply scala.Tuple2.apply[String, org.locationtech.geomesa.kafka.streams.package.GeoMesaMessageSerializer](topic, serializer)
106 95656 4163 - 4189 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.last_= SerializerCache.this.last_=(scala.Tuple2.apply[String, org.locationtech.geomesa.kafka.streams.package.GeoMesaMessageSerializer](topic, serializer))
112 95664 4297 - 4709 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.withDataStore SerializerCache.this.withDataStore[SerializerCache.this.SchemaMetadata](((ds: org.locationtech.geomesa.kafka.data.KafkaDataStore) => ds.getSchema(typeName) match { case (sft @ _) => SerializerCache.this.SchemaMetadata.apply(org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft), org.locationtech.geomesa.kafka.data.KafkaDataStore.usesDefaultPartitioning(sft)) case null => throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Schema \'", "\' does not exist in the configured store. ").s(typeName).+(scala.StringContext.apply("Available schemas: ", "").s(scala.Predef.refArrayOps[String](ds.getTypeNames()).mkString(", ")))) }))
114 95659 4422 - 4465 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.usesDefaultPartitioning org.locationtech.geomesa.kafka.data.KafkaDataStore.usesDefaultPartitioning(sft)
114 95658 4395 - 4420 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.topic org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft)
114 95661 4380 - 4466 Block org.locationtech.geomesa.kafka.streams.SerializerCache.SchemaMetadata.apply SerializerCache.this.SchemaMetadata.apply(org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft), org.locationtech.geomesa.kafka.data.KafkaDataStore.usesDefaultPartitioning(sft))
114 95660 4380 - 4466 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.SchemaMetadata.apply SerializerCache.this.SchemaMetadata.apply(org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft), org.locationtech.geomesa.kafka.data.KafkaDataStore.usesDefaultPartitioning(sft))
116 95663 4502 - 4691 Block <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Schema \'", "\' does not exist in the configured store. ").s(typeName).+(scala.StringContext.apply("Available schemas: ", "").s(scala.Predef.refArrayOps[String](ds.getTypeNames()).mkString(", "))))
116 95662 4502 - 4691 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Schema \'", "\' does not exist in the configured store. ").s(typeName).+(scala.StringContext.apply("Available schemas: ", "").s(scala.Predef.refArrayOps[String](ds.getTypeNames()).mkString(", "))))
124 95691 4799 - 5610 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.withDataStore SerializerCache.this.withDataStore[Nothing](((ds: org.locationtech.geomesa.kafka.data.KafkaDataStore) => { val topics: scala.collection.mutable.ArrayBuffer[String] = scala.collection.mutable.ArrayBuffer.empty[String]; val typeNames: Array[String] = scala.Predef.refArrayOps[String](ds.getTypeNames()).partition(((x$2: String) => x$2.contains(topic))) match { case (_1: Array[String], _2: Array[String])(Array[String], Array[String])((left @ _), (right @ _)) => scala.Predef.refArrayOps[String](left).++[String, Array[String]](scala.Predef.refArrayOps[String](right))(scala.this.Array.canBuildFrom[String]((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String]))) }; var i: Int = 0; while$1(){ if (i.<(typeNames.length)) { { val sft: org.geotools.api.feature.simple.SimpleFeatureType = ds.getSchema(typeNames.apply(i)); org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft) match { case (t @ _) if t.==(topic) => { val internal: org.locationtech.geomesa.features.SimpleFeatureSerializer = ds.serialization.apply(sft).serializer; return new `package`.this.GeoMesaMessageSerializer(sft, internal) } case (t @ _) => topics.+=(t) }; i = i.+(1) }; while$1() } else () }; throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Topic \'", "\' does not exist in the configured store. Available topics: ", "").s(topic, topics.mkString(", "))) }))
125 95665 4842 - 4867 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.mutable.ArrayBuffer.empty[String]
127 95667 4979 - 4996 Apply java.lang.String.contains x$2.contains(topic)
127 95666 4953 - 4968 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.getTypeNames ds.getTypeNames()
127 95668 4953 - 4997 Apply scala.collection.TraversableLike.partition scala.Predef.refArrayOps[String](ds.getTypeNames()).partition(((x$2: String) => x$2.contains(topic)))
128 95669 5046 - 5051 ApplyImplicitView scala.Predef.refArrayOps scala.Predef.refArrayOps[String](right)
128 95671 5038 - 5051 ApplyToImplicitArgs scala.collection.TraversableLike.++ scala.Predef.refArrayOps[String](left).++[String, Array[String]](scala.Predef.refArrayOps[String](right))(scala.this.Array.canBuildFrom[String]((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String])))
128 95670 5043 - 5043 ApplyToImplicitArgs scala.Array.canBuildFrom scala.this.Array.canBuildFrom[String]((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String]))
128 95672 5038 - 5051 Block scala.collection.TraversableLike.++ scala.Predef.refArrayOps[String](left).++[String, Array[String]](scala.Predef.refArrayOps[String](right))(scala.this.Array.canBuildFrom[String]((ClassTag.apply[String](classOf[java.lang.String]): scala.reflect.ClassTag[String])))
130 95673 5078 - 5079 Literal <nosymbol> 0
131 95675 5095 - 5115 Apply scala.Int.< i.<(typeNames.length)
131 95674 5099 - 5115 Select scala.Array.length typeNames.length
131 95687 5117 - 5446 Block <nosymbol> { { val sft: org.geotools.api.feature.simple.SimpleFeatureType = ds.getSchema(typeNames.apply(i)); org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft) match { case (t @ _) if t.==(topic) => { val internal: org.locationtech.geomesa.features.SimpleFeatureSerializer = ds.serialization.apply(sft).serializer; return new `package`.this.GeoMesaMessageSerializer(sft, internal) } case (t @ _) => topics.+=(t) }; i = i.+(1) }; while$1() }
131 95686 5117 - 5117 Apply org.locationtech.geomesa.kafka.streams.SerializerCache.while$1 while$1()
131 95689 5088 - 5088 Block <nosymbol> ()
131 95688 5088 - 5088 Literal <nosymbol> ()
132 95677 5139 - 5165 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.getSchema ds.getSchema(typeNames.apply(i))
132 95676 5152 - 5164 Apply scala.Array.apply typeNames.apply(i)
133 95678 5176 - 5201 Apply org.locationtech.geomesa.kafka.data.KafkaDataStore.topic org.locationtech.geomesa.kafka.data.KafkaDataStore.topic(sft)
134 95679 5232 - 5242 Apply java.lang.Object.== t.==(topic)
134 95682 5243 - 5372 Block <nosymbol> { val internal: org.locationtech.geomesa.features.SimpleFeatureSerializer = ds.serialization.apply(sft).serializer; return new `package`.this.GeoMesaMessageSerializer(sft, internal) }
135 95680 5275 - 5307 Select org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.serializer ds.serialization.apply(sft).serializer
136 95681 5329 - 5372 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessageSerializer.<init> new `package`.this.GeoMesaMessageSerializer(sft, internal)
138 95683 5396 - 5407 Apply scala.collection.mutable.ArrayBuffer.+= topics.+=(t)
138 95684 5396 - 5407 Block scala.collection.mutable.ArrayBuffer.+= topics.+=(t)
140 95685 5430 - 5436 Apply scala.Int.+ i.+(1)
142 95690 5455 - 5602 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Topic \'", "\' does not exist in the configured store. Available topics: ", "").s(topic, topics.mkString(", ")))
148 95693 5699 - 5735 Apply org.geotools.api.data.DataStoreFinder.getDataStore org.geotools.api.data.DataStoreFinder.getDataStore(SerializerCache.this.params)
148 95692 5728 - 5734 Select org.locationtech.geomesa.kafka.streams.SerializerCache.params SerializerCache.this.params
148 95701 5689 - 6009 ApplyToImplicitArgs org.locationtech.geomesa.utils.io.WithClose.apply org.locationtech.geomesa.utils.io.`package`.WithClose.apply[org.geotools.api.data.DataStore, T](org.geotools.api.data.DataStoreFinder.getDataStore(SerializerCache.this.params))(((x0$1: org.geotools.api.data.DataStore) => x0$1 match { case (ds @ (_: org.locationtech.geomesa.kafka.data.KafkaDataStore)) => fn.apply(ds) case null => throw new scala.`package`.IllegalArgumentException("Could not load data store with provided params") case (ds @ _) => throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Expected a KafkaDataStore but got ", "").s(ds.getClass().getName())) }))(io.this.IsCloseable.dataStoreIsCloseable)
148 95700 5737 - 5737 Select org.locationtech.geomesa.utils.io.IsCloseableImplicits.dataStoreIsCloseable io.this.IsCloseable.dataStoreIsCloseable
149 95695 5774 - 5780 Block scala.Function1.apply fn.apply(ds)
149 95694 5774 - 5780 Apply scala.Function1.apply fn.apply(ds)
150 95697 5802 - 5886 Block <nosymbol> throw new scala.`package`.IllegalArgumentException("Could not load data store with provided params")
150 95696 5802 - 5886 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException("Could not load data store with provided params")
151 95699 5906 - 6001 Block <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Expected a KafkaDataStore but got ", "").s(ds.getClass().getName()))
151 95698 5906 - 6001 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Expected a KafkaDataStore but got ", "").s(ds.getClass().getName()))
169 95703 6439 - 6508 Apply java.util.List.toArray GeoMesaMessageSerializer.this.sft.getAttributeDescriptors().toArray[org.geotools.api.feature.type.AttributeDescriptor](scala.Array.empty[org.geotools.api.feature.type.AttributeDescriptor]((ClassTag.apply[org.geotools.api.feature.type.AttributeDescriptor](classOf[org.geotools.api.feature.type.AttributeDescriptor]): scala.reflect.ClassTag[org.geotools.api.feature.type.AttributeDescriptor])))
169 95702 6475 - 6507 ApplyToImplicitArgs scala.Array.empty scala.Array.empty[org.geotools.api.feature.type.AttributeDescriptor]((ClassTag.apply[org.geotools.api.feature.type.AttributeDescriptor](classOf[org.geotools.api.feature.type.AttributeDescriptor]): scala.reflect.ClassTag[org.geotools.api.feature.type.AttributeDescriptor]))
169 95707 6439 - 6662 ApplyToImplicitArgs scala.collection.TraversableLike.map scala.Predef.refArrayOps[org.geotools.api.feature.type.AttributeDescriptor](GeoMesaMessageSerializer.this.sft.getAttributeDescriptors().toArray[org.geotools.api.feature.type.AttributeDescriptor](scala.Array.empty[org.geotools.api.feature.type.AttributeDescriptor]((ClassTag.apply[org.geotools.api.feature.type.AttributeDescriptor](classOf[org.geotools.api.feature.type.AttributeDescriptor]): scala.reflect.ClassTag[org.geotools.api.feature.type.AttributeDescriptor])))).map[AnyRef => AnyRef, Array[AnyRef => AnyRef]](((d: org.geotools.api.feature.type.AttributeDescriptor) => { val binding: Class[_ <: AnyRef] = d.getType().getBinding().asInstanceOf[Class[_ <: AnyRef]]; ((in: AnyRef) => org.locationtech.geomesa.utils.geotools.converters.FastConverter.convert[_$2](in, binding)) }))(scala.this.Array.canBuildFrom[AnyRef => AnyRef]((ClassTag.apply[AnyRef => AnyRef](classOf[scala.Function1]): scala.reflect.ClassTag[AnyRef => AnyRef])))
169 95706 6513 - 6513 ApplyToImplicitArgs scala.Array.canBuildFrom scala.this.Array.canBuildFrom[AnyRef => AnyRef]((ClassTag.apply[AnyRef => AnyRef](classOf[scala.Function1]): scala.reflect.ClassTag[AnyRef => AnyRef]))
170 95704 6542 - 6595 TypeApply scala.Any.asInstanceOf d.getType().getBinding().asInstanceOf[Class[_ <: AnyRef]]
171 95705 6620 - 6654 Apply org.locationtech.geomesa.utils.geotools.converters.FastConverter.convert org.locationtech.geomesa.utils.geotools.converters.FastConverter.convert[_$2](in, binding)
175 95708 6727 - 6738 Select org.locationtech.geomesa.kafka.streams.GeoMesaMessage.action data.action
176 95709 6803 - 6813 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessageSerializer.wrap GeoMesaMessageSerializer.this.wrap(data)
176 95711 6784 - 6814 Block org.locationtech.geomesa.features.SimpleFeatureSerializer.serialize GeoMesaMessageSerializer.this.internal.serialize(GeoMesaMessageSerializer.this.wrap(data))
176 95710 6784 - 6814 Apply org.locationtech.geomesa.features.SimpleFeatureSerializer.serialize GeoMesaMessageSerializer.this.internal.serialize(GeoMesaMessageSerializer.this.wrap(data))
177 95713 6852 - 6856 Block <nosymbol> null
177 95712 6852 - 6856 Literal <nosymbol> null
178 95715 6878 - 6926 Block <nosymbol> throw new scala.`package`.NullPointerException("action is null")
178 95714 6878 - 6926 Throw <nosymbol> throw new scala.`package`.NullPointerException("action is null")
179 95717 6945 - 7045 Block <nosymbol> throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("No serialization implemented for action \'", "\'").s(data.action))
179 95716 6945 - 7045 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("No serialization implemented for action \'", "\'").s(data.action))
184 95719 7146 - 7158 Select scala.collection.IndexedSeqOptimized.isEmpty scala.Predef.byteArrayOps(data).isEmpty
184 95718 7138 - 7142 Literal <nosymbol> null
184 95721 7162 - 7185 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.delete GeoMesaMessage.delete()
184 95720 7130 - 7158 Apply scala.Boolean.|| data.==(null).||(scala.Predef.byteArrayOps(data).isEmpty)
184 95722 7162 - 7185 Block org.locationtech.geomesa.kafka.streams.GeoMesaMessage.delete GeoMesaMessage.delete()
184 95743 7193 - 7695 Block <nosymbol> { val feature: org.geotools.api.feature.simple.SimpleFeature = GeoMesaMessageSerializer.this.internal.deserialize(data); val userData: scala.collection.immutable.Map[String,String] = if (feature.getUserData().isEmpty()) scala.Predef.Map.empty[String, String] else { val builder: scala.collection.mutable.Builder[(String, String),scala.collection.immutable.Map[String,String]] = scala.Predef.Map.newBuilder[String, String]; scala.collection.JavaConverters.mapAsScalaMapConverter[Object, Object](feature.getUserData()).asScala.foreach[scala.collection.mutable.Builder[(String, String),scala.collection.immutable.Map[String,String]]](((x0$1: (Object, Object)) => x0$1 match { case (_1: Object, _2: Object)(Object, Object)((k @ (_: String)), (v @ (_: String))) => builder.+=(scala.Predef.ArrowAssoc[String](k).->[String](v)) case (_1: Object, _2: Object)(Object, Object)((k @ _), (v @ _)) => builder.+=(scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString())) })); builder.result() }; GeoMesaMessage.upsert(scala.collection.JavaConverters.asScalaBufferConverter[Object](feature.getAttributes()).asScala.toSeq, userData) }
185 95723 7217 - 7243 Apply org.locationtech.geomesa.features.SimpleFeatureSerializer.deserialize GeoMesaMessageSerializer.this.internal.deserialize(data)
186 95725 7302 - 7327 TypeApply scala.collection.immutable.Map.empty scala.Predef.Map.empty[String, String]
186 95724 7271 - 7298 Apply java.util.Map.isEmpty feature.getUserData().isEmpty()
186 95726 7302 - 7327 Block scala.collection.immutable.Map.empty scala.Predef.Map.empty[String, String]
186 95739 7335 - 7610 Block <nosymbol> { val builder: scala.collection.mutable.Builder[(String, String),scala.collection.immutable.Map[String,String]] = scala.Predef.Map.newBuilder[String, String]; scala.collection.JavaConverters.mapAsScalaMapConverter[Object, Object](feature.getUserData()).asScala.foreach[scala.collection.mutable.Builder[(String, String),scala.collection.immutable.Map[String,String]]](((x0$1: (Object, Object)) => x0$1 match { case (_1: Object, _2: Object)(Object, Object)((k @ (_: String)), (v @ (_: String))) => builder.+=(scala.Predef.ArrowAssoc[String](k).->[String](v)) case (_1: Object, _2: Object)(Object, Object)((k @ _), (v @ _)) => builder.+=(scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString())) })); builder.result() }
187 95727 7361 - 7391 TypeApply scala.collection.immutable.Map.newBuilder scala.Predef.Map.newBuilder[String, String]
188 95728 7402 - 7421 Apply org.geotools.api.feature.Property.getUserData feature.getUserData()
188 95737 7402 - 7575 Apply scala.collection.IterableLike.foreach scala.collection.JavaConverters.mapAsScalaMapConverter[Object, Object](feature.getUserData()).asScala.foreach[scala.collection.mutable.Builder[(String, String),scala.collection.immutable.Map[String,String]]](((x0$1: (Object, Object)) => x0$1 match { case (_1: Object, _2: Object)(Object, Object)((k @ (_: String)), (v @ (_: String))) => builder.+=(scala.Predef.ArrowAssoc[String](k).->[String](v)) case (_1: Object, _2: Object)(Object, Object)((k @ _), (v @ _)) => builder.+=(scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString())) }))
189 95729 7494 - 7500 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String](k).->[String](v)
189 95731 7483 - 7500 Block scala.collection.mutable.Builder.+= builder.+=(scala.Predef.ArrowAssoc[String](k).->[String](v))
189 95730 7483 - 7500 Apply scala.collection.mutable.Builder.+= builder.+=(scala.Predef.ArrowAssoc[String](k).->[String](v))
190 95733 7553 - 7563 Apply java.lang.Object.toString v.toString()
190 95732 7539 - 7549 Apply java.lang.Object.toString k.toString()
190 95735 7528 - 7563 Apply scala.collection.mutable.Builder.+= builder.+=(scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString()))
190 95734 7539 - 7563 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString())
190 95736 7528 - 7563 Block scala.collection.mutable.Builder.+= builder.+=(scala.Predef.ArrowAssoc[String](k.toString()).->[String](v.toString()))
192 95738 7586 - 7600 Apply scala.collection.mutable.Builder.result builder.result()
194 95741 7641 - 7676 Select scala.collection.SeqLike.toSeq scala.collection.JavaConverters.asScalaBufferConverter[Object](feature.getAttributes()).asScala.toSeq
194 95740 7641 - 7662 Apply org.geotools.api.feature.simple.SimpleFeature.getAttributes feature.getAttributes()
194 95742 7619 - 7687 Apply org.locationtech.geomesa.kafka.streams.GeoMesaMessage.upsert GeoMesaMessage.upsert(scala.collection.JavaConverters.asScalaBufferConverter[Object](feature.getAttributes()).asScala.toSeq, userData)
205 95745 7910 - 7941 Select scala.collection.TraversableOnce.toIndexedSeq message.attributes.toIndexedSeq
205 95744 7898 - 7908 Select org.locationtech.geomesa.kafka.streams.GeoMesaMessageSerializer.converters GeoMesaMessageSerializer.this.converters
205 95747 7874 - 7960 Apply org.locationtech.geomesa.kafka.streams.SerializableFeature.<init> new `package`.this.SerializableFeature(GeoMesaMessageSerializer.this.converters, message.attributes.toIndexedSeq, message.userData)
205 95746 7943 - 7959 Select org.locationtech.geomesa.kafka.streams.GeoMesaMessage.userData message.userData
229 95749 8849 - 8883 Apply scala.Function1.apply SerializableFeature.this.converters.apply(i).apply(SerializableFeature.this.attributes.apply(i))
229 95748 8869 - 8882 Apply scala.collection.SeqLike.apply SerializableFeature.this.attributes.apply(i)
231 95751 8952 - 9011 TypeApply scala.Any.asInstanceOf scala.collection.JavaConverters.mapAsJavaMapConverter[String, String](SerializableFeature.this.userData).asJava.asInstanceOf[java.util.Map[AnyRef,AnyRef]]
231 95750 8952 - 8960 Select org.locationtech.geomesa.kafka.streams.SerializableFeature.userData SerializableFeature.this.userData
233 95752 9046 - 9087 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
234 95753 9134 - 9175 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
235 95754 9229 - 9270 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
236 95755 9328 - 9369 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
237 95756 9438 - 9479 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
238 95757 9543 - 9584 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
239 95758 9636 - 9677 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
240 95759 9735 - 9776 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
241 95760 9829 - 9870 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
242 95761 9929 - 9970 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
243 95762 10025 - 10066 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
244 95763 10109 - 10150 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
245 95764 10197 - 10238 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
246 95765 10291 - 10332 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
247 95766 10377 - 10418 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
248 95767 10461 - 10502 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
249 95768 10568 - 10609 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
250 95769 10700 - 10741 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
251 95770 10819 - 10860 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
252 95771 10925 - 10966 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
253 95772 11043 - 11084 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
254 95773 11138 - 11179 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
255 95774 11255 - 11296 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
256 95775 11361 - 11402 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
257 95776 11455 - 11496 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
258 95777 11533 - 11574 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
259 95778 11629 - 11670 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
260 95779 11713 - 11754 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
261 95780 11788 - 11829 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()
262 95781 11869 - 11910 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException()