| Line |
Stmt Id |
Pos |
Tree |
Symbol |
Tests |
Code |
|
65
|
2842
|
2810
-
2810
|
TypeApply
|
scala.collection.Seq.canBuildFrom
|
|
collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.utils.GeoMessage]
|
|
65
|
2843
|
2794
-
3181
|
ApplyToImplicitArgs
|
scala.collection.TraversableLike.flatMap
|
|
records.flatMap[org.locationtech.geomesa.kafka.utils.GeoMessage, Seq[org.locationtech.geomesa.kafka.utils.GeoMessage]](((record: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => try {
val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record);
val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record);
scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp))
} catch {
case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => {
(if (GeoMessageConsumer.this.logger.underlying.isErrorEnabled())
GeoMessageConsumer.this.logger.underlying.error("Error deserializing message:", e)
else
(): Unit);
scala.`package`.Iterator.empty
}
}))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.utils.GeoMessage])
|
|
66
|
2839
|
2846
-
3051
|
Block
|
<nosymbol>
|
|
{
val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record);
val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record);
scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp))
}
|
|
67
|
2833
|
2860
-
2893
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
|
|
68
|
2834
|
2920
-
2955
|
Apply
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp
|
|
org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
|
|
69
|
2835
|
3005
-
3015
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.key
|
|
record.key()
|
|
69
|
2836
|
3017
-
3029
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.value
|
|
record.value()
|
|
69
|
2837
|
2982
-
3050
|
Apply
|
org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize
|
|
GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
|
|
69
|
2838
|
2966
-
3051
|
Apply
|
scala.collection.Iterator.single
|
|
scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp))
|
|
71
|
2840
|
3149
-
3163
|
Select
|
scala.collection.Iterator.empty
|
|
scala.`package`.Iterator.empty
|
|
71
|
2841
|
3097
-
3163
|
Block
|
<nosymbol>
|
|
{
(if (GeoMessageConsumer.this.logger.underlying.isErrorEnabled())
GeoMessageConsumer.this.logger.underlying.error("Error deserializing message:", e)
else
(): Unit);
scala.`package`.Iterator.empty
}
|
|
74
|
2844
|
3192
-
3208
|
Select
|
scala.collection.SeqLike.isEmpty
|
|
messages.isEmpty
|
|
75
|
2845
|
3220
-
3238
|
Select
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
|
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
|
|
75
|
2846
|
3220
-
3238
|
Block
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
|
|
org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
|
|
77
|
2847
|
3262
-
3289
|
Apply
|
org.locationtech.geomesa.kafka.utils.GeoMessageProcessor.consume
|
|
GeoMessageConsumer.this.processor.consume(messages)
|
|
77
|
2848
|
3262
-
3289
|
Block
|
org.locationtech.geomesa.kafka.utils.GeoMessageProcessor.consume
|
|
GeoMessageConsumer.this.processor.consume(messages)
|