GeoMessageProcessor.java
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/
package org.locationtech.geomesa.kafka.utils.interop;
import org.locationtech.geomesa.kafka.consumer.BatchConsumer;
import org.locationtech.geomesa.kafka.utils.GeoMessage;
import scala.Enumeration;
import java.util.List;
/**
* Message processor class. Guarantees 'at-least-once' processing.
*/
public interface GeoMessageProcessor extends org.locationtech.geomesa.kafka.utils.GeoMessageProcessor {
/**
* Consume a batch of records.
* <p>
* The response from this method will determine the continued processing of messages. If `Commit`
* is returned, the batch is considered complete and won't be presented again. If `Continue` is
* returned, the batch will be presented again in the future, and more messages will be read off the topic
* in the meantime. If `Pause` is returned, the batch will be presented again in the future, but
* no more messages will be read off the topic in the meantime.
* <p>
* This method should return in a reasonable amount of time. If too much time is spent processing
* messages, consumers may be considered inactive and be dropped from processing. See
* <a href="https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html">https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html</a>
* <p>
* Note: if there is an error committing the batch or something else goes wrong, some messages may
* be repeated in a subsequent call, regardless of the response from this method
*
* @param records records
* @return indication to continue, pause, or commit
*/
BatchResult consume(List<GeoMessage> records);
// scala 2.12 - note, can't @Override these due to scala version differences
default Enumeration.Value consume(scala.collection.Seq<GeoMessage> records) {
List<GeoMessage> list = scala.collection.JavaConverters.seqAsJavaListConverter(records).asJava();
BatchResult result = consume(list);
switch(result) {
case COMMIT: return BatchConsumer.BatchResult$.MODULE$.Commit();
case CONTINUE: return BatchConsumer.BatchResult$.MODULE$.Continue();
case PAUSE: return BatchConsumer.BatchResult$.MODULE$.Pause();
}
return null;
}
// scala 2.13 - note, can't @Override these due to scala version differences
default Enumeration.Value consume(scala.collection.immutable.Seq<GeoMessage> records) {
List<GeoMessage> list = scala.collection.JavaConverters.seqAsJavaListConverter(records).asJava();
BatchResult result = consume(list);
switch(result) {
case COMMIT: return BatchConsumer.BatchResult$.MODULE$.Commit();
case CONTINUE: return BatchConsumer.BatchResult$.MODULE$.Continue();
case PAUSE: return BatchConsumer.BatchResult$.MODULE$.Pause();
}
return null;
}
enum BatchResult {
COMMIT, CONTINUE, PAUSE
}
}