GeoMesaStreamsBuilder.java

/***********************************************************************
 * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Apache License, Version 2.0
 * which accompanies this distribution and is available at
 * https://www.apache.org/licenses/LICENSE-2.0
 ***********************************************************************/

package org.locationtech.geomesa.kafka.jstreams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.locationtech.geomesa.kafka.streams.GeoMesaMessage;

import java.util.Map;

/**
 * Wrapper for a kafka streams builder that will configure serialization based on a GeoMesa Kafka feature store
 */
public class GeoMesaStreamsBuilder {

    private final org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sBuilder;
    private final StreamsBuilder wrapped;

    private GeoMesaStreamsBuilder(org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sBuilder, StreamsBuilder wrapped) {
        this.sBuilder = sBuilder;
        this.wrapped = wrapped;
    }

    /**
     * Create a streams builder
     *
     * @param params data store parameters
     */
    public static GeoMesaStreamsBuilder create(Map<String, String> params) {
        return create(params, null, null, null);
    }

    /**
     * Create a streams builder
     *
     * @param params         data store parameters
     * @param streamsBuilder underlying streams builder to use
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          StreamsBuilder streamsBuilder) {
        return create(params, null, null, streamsBuilder);
    }

    /**
     * Create a streams builder
     *
     * @param params             data store parameters
     * @param timestampExtractor timestamp extractor for message stream
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          TimestampExtractor timestampExtractor) {
        return create(params, timestampExtractor, null, null);
    }

    /**
     * Create a streams builder
     *
     * @param params             data store parameters
     * @param timestampExtractor timestamp extractor for message stream
     * @param streamsBuilder     underlying streams builder to use
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          TimestampExtractor timestampExtractor,
          StreamsBuilder streamsBuilder) {
        return create(params, timestampExtractor, null, streamsBuilder);
    }

    /**
     * Create a streams builder
     *
     * @param params      data store parameters
     * @param resetPolicy auto offset reset for reading existing topics
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          AutoOffsetReset resetPolicy) {
        return create(params, null, resetPolicy, null);
    }

    /**
     * Create a streams builder
     *
     * @param params         data store parameters
     * @param resetPolicy    auto offset reset for reading existing topics
     * @param streamsBuilder underlying streams builder to use
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          AutoOffsetReset resetPolicy,
          StreamsBuilder streamsBuilder) {
        return create(params, null, resetPolicy, streamsBuilder);
    }

    /**
     * Create a streams builder
     *
     * @param params             data store parameters
     * @param timestampExtractor timestamp extractor for message stream
     * @param resetPolicy        auto offset reset for reading existing topics
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          TimestampExtractor timestampExtractor,
          AutoOffsetReset resetPolicy) {
        return create(params, timestampExtractor, resetPolicy, null);
    }

    /**
     * Create a streams builder
     *
     * @param params             data store parameters
     * @param timestampExtractor timestamp extractor for message stream
     * @param resetPolicy        auto offset reset for reading existing topics
     * @param streamsBuilder     underlying streams builder to use
     */
    public static GeoMesaStreamsBuilder create(
          Map<String, String> params,
          TimestampExtractor timestampExtractor,
          AutoOffsetReset resetPolicy,
          StreamsBuilder streamsBuilder) {
        if (streamsBuilder == null) {
            streamsBuilder = new StreamsBuilder();
        }
        org.apache.kafka.streams.scala.StreamsBuilder scalaBuilder =
              new org.apache.kafka.streams.scala.StreamsBuilder(streamsBuilder);
        org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder sbuilder =
              org.locationtech.geomesa.kafka.streams.GeoMesaStreamsBuilder.apply(params, timestampExtractor, resetPolicy, scalaBuilder);
        return new GeoMesaStreamsBuilder(sbuilder, streamsBuilder);
    }

    /**
     * Create a stream of updates for a given feature type
     *
     * @param typeName feature type name
     */
    public KStream<String, GeoMesaMessage> stream(String typeName) {
        return sBuilder.stream(typeName).inner();
    }

    /**
     * Create a table for a given feature type
     *
     * @param typeName feature type name
     */
    public KTable<String, GeoMesaMessage> table(String typeName) {
        return sBuilder.table(typeName).inner();
    }

    /**
     * Create a table for a given feature type
     *
     * @param typeName     feature type name
     * @param materialized materialized
     */
    public KTable<String, GeoMesaMessage> table(
          String typeName,
          Materialized<String, GeoMesaMessage, KeyValueStore<Bytes, byte[]>> materialized) {
        return sBuilder.table(typeName, materialized).inner();
    }

    /**
     * Create a global table for a given feature type
     *
     * @param typeName feature type name
     */
    public GlobalKTable<String, GeoMesaMessage> globalTable(String typeName) {
        return sBuilder.globalTable(typeName);
    }

    /**
     * Create a global table for a given feature type
     *
     * @param typeName     feature type name
     * @param materialized materialized
     */
    public GlobalKTable<String, GeoMesaMessage> globalTable(
          String typeName,
          Materialized<String, GeoMesaMessage, KeyValueStore<Bytes, byte[]>> materialized) {
        return sBuilder.globalTable(typeName, materialized);
    }

    /**
     * Write the stream to the given feature type, which must already exist. The messages
     * must conform to the feature type schema
     *
     * @param typeName feature type name
     * @param stream   stream to persist
     */
    public void to(String typeName, KStream<String, GeoMesaMessage> stream) {
        sBuilder.to(typeName, new org.apache.kafka.streams.scala.kstream.KStream<>(stream));
    }

    /**
     * Convenience method to build the underlying topology
     */
    public Topology build() { return sBuilder.build(); }

    /**
     * Get the underlying streams builder
     *
     * @return the builder
     */
    public StreamsBuilder wrapped() {
        return this.wrapped;
    }

    /**
     * Get the `GeoMesaMessage` serde for the given feature type
     *
     * @param typeName feature type name
     * @return the serde
     */
    public Serde<GeoMesaMessage> serde(String typeName) {
        return sBuilder.serde(typeName);
    }
}