1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.kafka.utils
10 
11 import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord}
12 import org.locationtech.geomesa.kafka.consumer.BatchConsumer
13 import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult
14 import org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.BatchResult
15 import org.locationtech.geomesa.kafka.versions.RecordVersions
16 
17 import java.time.Duration
18 import scala.util.control.NonFatal
19 
20 /**
21  * Message processor class. Guarantees 'at-least-once' processing.
22  */
23 trait GeoMessageProcessor {
24 
25   /**
26    * Consume a batch of records.
27    *
28    * The response from this method will determine the continued processing of messages. If `Commit`
29    * is returned, the batch is considered complete and won't be presented again. If `Continue` is
30    * returned, the batch will be presented again in the future, and more messages will be read off the topic
31    * in the meantime. If `Pause` is returned, the batch will be presented again in the future, but
32    * no more messages will be read off the topic in the meantime.
33    *
34    * This method should return in a reasonable amount of time. If too much time is spent processing
35    * messages, consumers may be considered inactive and be dropped from processing. See
36    * https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
37    *
38    * Note: if there is an error committing the batch or something else goes wrong, some messages may
39    * be repeated in a subsequent call, regardless of the response from this method
40    *
41    * @param records records
42    * @return indication to continue, pause, or commit
43    */
44   def consume(records: Seq[GeoMessage]): BatchResult
45 }
46 
47 object GeoMessageProcessor {
48 
49   /**
50    * Kafka message consumer with guaranteed at-least-once processing
51    *
52    * @param consumers consumers
53    * @param frequency frequency
54    * @param serializer serializer
55    * @param processor message processor
56    */
57   class GeoMessageConsumer(
58       consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
59       frequency: Duration,
60       serializer: GeoMessageSerializer,
61       processor: GeoMessageProcessor
62     ) extends BatchConsumer(consumers, frequency) {
63 
64     override protected def consume(records: Seq[ConsumerRecord[Array[Byte], Array[Byte]]]): BatchResult = {
65       val messages = records.flatMap { record =>
66         try {
67           val headers = RecordVersions.getHeaders(record)
68           val timestamp = RecordVersions.getTimestamp(record)
69           Iterator.single(serializer.deserialize(record.key, record.value, headers, timestamp))
70         } catch {
71           case NonFatal(e) => logger.error("Error deserializing message:", e); Iterator.empty
72         }
73       }
74       if (messages.isEmpty) {
75         BatchResult.Commit
76       } else {
77         processor.consume(messages)
78       }
79     }
80   }
81 }
Line Stmt Id Pos Tree Symbol Tests Code
65 2842 2810 - 2810 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.utils.GeoMessage]
65 2843 2794 - 3181 ApplyToImplicitArgs scala.collection.TraversableLike.flatMap records.flatMap[org.locationtech.geomesa.kafka.utils.GeoMessage, Seq[org.locationtech.geomesa.kafka.utils.GeoMessage]](((record: org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],Array[Byte]]) => try { val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record); val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record); scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => { (if (GeoMessageConsumer.this.logger.underlying.isErrorEnabled()) GeoMessageConsumer.this.logger.underlying.error("Error deserializing message:", e) else (): Unit); scala.`package`.Iterator.empty } }))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.kafka.utils.GeoMessage])
66 2839 2846 - 3051 Block <nosymbol> { val headers: Map[String,Array[Byte]] = org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record); val timestamp: Long = org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record); scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)) }
67 2833 2860 - 2893 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders org.locationtech.geomesa.kafka.versions.RecordVersions.getHeaders(record)
68 2834 2920 - 2955 Apply org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp org.locationtech.geomesa.kafka.versions.RecordVersions.getTimestamp(record)
69 2835 3005 - 3015 Apply org.apache.kafka.clients.consumer.ConsumerRecord.key record.key()
69 2836 3017 - 3029 Apply org.apache.kafka.clients.consumer.ConsumerRecord.value record.value()
69 2837 2982 - 3050 Apply org.locationtech.geomesa.kafka.utils.GeoMessageSerializer.deserialize GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp)
69 2838 2966 - 3051 Apply scala.collection.Iterator.single scala.`package`.Iterator.single[org.locationtech.geomesa.kafka.utils.GeoMessage](GeoMessageConsumer.this.serializer.deserialize(record.key(), record.value(), headers, timestamp))
71 2840 3149 - 3163 Select scala.collection.Iterator.empty scala.`package`.Iterator.empty
71 2841 3097 - 3163 Block <nosymbol> { (if (GeoMessageConsumer.this.logger.underlying.isErrorEnabled()) GeoMessageConsumer.this.logger.underlying.error("Error deserializing message:", e) else (): Unit); scala.`package`.Iterator.empty }
74 2844 3192 - 3208 Select scala.collection.SeqLike.isEmpty messages.isEmpty
75 2845 3220 - 3238 Select org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
75 2846 3220 - 3238 Block org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit org.locationtech.geomesa.kafka.consumer.BatchConsumer.BatchResult.Commit
77 2847 3262 - 3289 Apply org.locationtech.geomesa.kafka.utils.GeoMessageProcessor.consume GeoMessageConsumer.this.processor.consume(messages)
77 2848 3262 - 3289 Block org.locationtech.geomesa.kafka.utils.GeoMessageProcessor.consume GeoMessageConsumer.this.processor.consume(messages)