GeoMesaStreamsBuilder.java
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/
package org.locationtech.geomesa.kafka.jstreams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage;
import java.util.Map;
/**
* Wrapper for a kafka streams builder that will configure serialization based on a GeoMesa Kafka feature store
*/
public class GeoMesaStreamsBuilder {
private final org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sBuilder;
private final StreamsBuilder wrapped;
private GeoMesaStreamsBuilder(org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sBuilder, StreamsBuilder wrapped) {
this.sBuilder = sBuilder;
this.wrapped = wrapped;
}
/**
* Create a streams builder
*
* @param params data store parameters
*/
public static GeoMesaStreamsBuilder create(Map<String, String> params) {
return create(params, null, null, null);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param streamsBuilder underlying streams builder to use
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
StreamsBuilder streamsBuilder) {
return create(params, null, null, streamsBuilder);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param timestampExtractor timestamp extractor for message stream
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
TimestampExtractor timestampExtractor) {
return create(params, timestampExtractor, null, null);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param timestampExtractor timestamp extractor for message stream
* @param streamsBuilder underlying streams builder to use
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
TimestampExtractor timestampExtractor,
StreamsBuilder streamsBuilder) {
return create(params, timestampExtractor, null, streamsBuilder);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param resetPolicy auto offset reset for reading existing topics
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
AutoOffsetReset resetPolicy) {
return create(params, null, resetPolicy, null);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param resetPolicy auto offset reset for reading existing topics
* @param streamsBuilder underlying streams builder to use
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
AutoOffsetReset resetPolicy,
StreamsBuilder streamsBuilder) {
return create(params, null, resetPolicy, streamsBuilder);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param timestampExtractor timestamp extractor for message stream
* @param resetPolicy auto offset reset for reading existing topics
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
TimestampExtractor timestampExtractor,
AutoOffsetReset resetPolicy) {
return create(params, timestampExtractor, resetPolicy, null);
}
/**
* Create a streams builder
*
* @param params data store parameters
* @param timestampExtractor timestamp extractor for message stream
* @param resetPolicy auto offset reset for reading existing topics
* @param streamsBuilder underlying streams builder to use
*/
public static GeoMesaStreamsBuilder create(
Map<String, String> params,
TimestampExtractor timestampExtractor,
AutoOffsetReset resetPolicy,
StreamsBuilder streamsBuilder) {
if (streamsBuilder == null) {
streamsBuilder = new StreamsBuilder();
}
org.apache.kafka.streams.scala.StreamsBuilder scalaBuilder =
new org.apache.kafka.streams.scala.StreamsBuilder(streamsBuilder);
org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sbuilder =
org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder.apply(params, timestampExtractor, resetPolicy, scalaBuilder);
return new GeoMesaStreamsBuilder(sbuilder, streamsBuilder);
}
/**
* Create a stream of updates for a given feature type
*
* @param typeName feature type name
*/
public KStream<String, GeoMesaMessage> stream(String typeName) {
return sBuilder.stream(typeName).inner();
}
/**
* Create a table for a given feature type
*
* @param typeName feature type name
*/
public KTable<String, GeoMesaMessage> table(String typeName) {
return sBuilder.table(typeName).inner();
}
/**
* Create a table for a given feature type
*
* @param typeName feature type name
* @param materialized materialized
*/
public KTable<String, GeoMesaMessage> table(
String typeName,
Materialized<String, GeoMesaMessage, KeyValueStore<Bytes, byte[]>> materialized) {
return sBuilder.table(typeName, materialized).inner();
}
/**
* Create a global table for a given feature type
*
* @param typeName feature type name
*/
public GlobalKTable<String, GeoMesaMessage> globalTable(String typeName) {
return sBuilder.globalTable(typeName);
}
/**
* Create a global table for a given feature type
*
* @param typeName feature type name
* @param materialized materialized
*/
public GlobalKTable<String, GeoMesaMessage> globalTable(
String typeName,
Materialized<String, GeoMesaMessage, KeyValueStore<Bytes, byte[]>> materialized) {
return sBuilder.globalTable(typeName, materialized);
}
/**
* Write the stream to the given feature type, which must already exist. The messages
* must conform to the feature type schema
*
* @param typeName feature type name
* @param stream stream to persist
*/
public void to(String typeName, KStream<String, GeoMesaMessage> stream) {
sBuilder.to(typeName, new org.apache.kafka.streams.scala.kstream.KStream<>(stream));
}
/**
* Convenience method to build the underlying topology
*/
public Topology build() { return sBuilder.build(); }
/**
* Get the underlying streams builder
*
* @return the builder
*/
public StreamsBuilder wrapped() {
return this.wrapped;
}
/**
* Get the `GeoMesaMessage` serde for the given feature type
*
* @param typeName feature type name
* @return the serde
*/
public Serde<GeoMesaMessage> serde(String typeName) {
return sBuilder.serde(typeName);
}
}