19.11. Confluent Integration

The Kafka Data Store can integrate with Confluent Kafka topics and the Confluent Schema Registry. The schema registry is a centralized store of versioned Avro schemas, each associated with a particular Kafka topic. The Confluent Kafka Data Store converts Avro schemas into SimpleFeatureTypes and deserializes records into SimpleFeatures.

To use a Confluent topic, set the URL to the schema registry in your data store parameter map under the key kafka.schema.registry.url. In GeoServer, select the “Confluent Kafka (GeoMesa)” store instead of the regular Kafka store.

Note that Confluent requires the Confluent client JARs, which are not bundled with GeoMesa. If the GeoMesa Kafka binary distribution has been installed, then the script bin/install-confluent-support.sh can be used to download them.

19.11.1. Supported Avro Schema Fields

The following avro schema field types are supported: STRING, BOOLEAN, INT, DOUBLE, LONG, FLOAT, BYTES, and ENUM. The UNION type is supported only for unions of NULL and one other supported type, e.g. "type": ["null","string"].

19.11.2. Supported SimpleFeature Fields

To encode types that may be part of a SimpleFeature, but are not part of a standard Avro schema, e.g. Geometry or Date, the Confluent Kafka Data Store supports interpreting several key-value metadata properties on schema fields, which are described below. All property values are case insensitive.

Any additional properties on a field that are not listed below and are not standard avro properties will be included as SFT attribute user data. Any additional properties on the schema will be included as SFT user data.

19.11.2.1. geomesa.geom.format

Indicates that the field should be interpreted as a Geometry in the given format:

Value

Required Field Type

Description

wkt

STRING

Well-Known Text representation of a Geometry

wkb

BYTES

Well-Known Binary representation of a Geometry

Must be accompanied by the key geomesa.geom.type.

19.11.2.2. geomesa.geom.type

Indicates that the field should be interpreted as a Geometry of the given type. The value must be one of the following: Geometry, Point, LineString, Polygon, MultiPoint, MultiLineString, MultiPolygon, GeometryCollection

Must be accompanied by the key geomesa.geom.format.

19.11.2.3. geomesa.geom.default

Indicates that the field represents the default Geometry for this SimpleFeatureType. If the keys geomesa.geom.format and geomesa.geom.type are not present on the same schema field, this attribute will be ignored. The value must be either true or false. There may only be one default geometry field for a given schema.

19.11.2.4. geomesa.date.format

Indicates that the field should be interpreted as a Date in the given format:

Value

Required Field Type

Description

epoch-millis

LONG

Milliseconds since the Unix epoch

iso-date

STRING

Generic ISO date format

iso-datetime

STRING

Generic ISO datetime format

19.11.2.5. geomesa.visibility.field

Specifies that the value of this field should be used as the visibility for features of this SimpleFeatureType. The value must be either true or false. There may only be one visibility field for a given schema, and the field must be of type STRING.

19.11.2.6. geomesa.exclude.field

Specifies whether this field should be excluded from the SimpleFeatureType. All fields without this property will be included. The value must be either true or false.

19.11.3. Example GeoMesa Avro Schema

{
  "namespace": "org.locationtech",
  "type": "record",
  "name": "GeoMesaAvroSchema",
  "geomesa.index.dtg": "date",
  "fields": [
    {
      "name": "id",
      "type": "string",
      "index": "true",
      "cardinality": "high"
    },
    {
      "name": "position",
      "type": "string",
      "geomesa.geom.format": "wkt",
      "geomesa.geom.type": "point",
      "geomesa.geom.default": "true",
      "srid": "4326"
    },
    {
      "name": "timestamp",
      "type": ["null","long"],
      "geomesa.date.format": "epoch-millis"
    },
    {
      "name": "date",
      "type": "string",
      "geomesa.date.format": "iso-datetime"
    },
    {
      "name": "visibility",
      "type": "string",
      "geomesa.visibility.field": "true",
      "geomesa.exclude.field": "true"
    }
  ]
}

19.11.4. Schema Overrides Config

The schema used to generate a SimpleFeatureType may optionally be overridden per topic by adding a data store configuration parameter at the key kafka.schema.overrides. The value must be a Typesafe Config string with the top-level key schemas that is an object that contains a mapping from topic name to schema definition. If an override for a schema exists, it will be used instead of the schema registry. The overrides might be useful if you have an existing schema without the GeoMesa properties.

19.11.4.1. Schema Overrides Example Config

{
  "schemas": {
    "topic1": {
      "type": "record",
      "name": "schema1",
      "fields": [
        {
          "name": "id",
          "type": "string",
          "cardinality": "high"
        },
        {
          "name": "position",
          "type": "string",
          "geomesa.geom.format": "wkt",
          "geomesa.geom.type": "point",
          "geomesa.geom.default": "true"
        },
        {
          "name": "speed",
          "type": "double"
        }
      ]
    },
    "topic2": {
      "type": "record",
      "name": "schema2",
      "fields": [
        {
          "name": "shape",
          "type": "bytes",
          "geomesa.geom.format": "wkb",
          "geomesa.geom.type": "geometry"
        },
        {
          "name": "date",
          "type": ["null","long"],
          "geomesa.date.format": "epoch-millis"
        }
      ]
    }
  }
}