# 22. GeoMesa Stream Processing¶

The GeoMesa Stream library (geomesa-stream in the source distribution) provides tools to process streams of SimpleFeatures. The library can be used to instantiate a DataStore either in GeoServer or in a user’s application to serve as a constant source of SimpleFeatures. For example, you can instantiate a DataStore that will connect to Twitter and show the most recent tweets in a spatial context. The timeout for the DataStore is configurable. A stream can be defined against any source that can be processed by Apache Camel. A SimpleFeatureConverter can be attached to the stream to translate the underlying data into SimpleFeatures.

## 22.1. Modules¶

• geomesa-stream-api - the stream source and processing APIs
• geomesa-stream-generic - definition of the Camel generic source
• geomesa-stream-datastore - DataStore implementation
• geomesa-stream-gs-plugin - GeoServer hooks for stream sources

## 22.2. Usage¶

To illustrate usage, assume we are processing a stream of Twitter data as a csv. The configuration in GeoServer is as follows:

{
type         = "generic"
source-route = "netty4:tcp://localhost:5899?textline=true"
sft          = {
fields = [
{ name = "user",      type = "String" }
{ name = "msg",       type = "String" }
{ name = "geom",      type = "Point",  index = true, srid = 4326, default = true }
{ name = "dtg",       type = "Date",   index = true }
]
}
converter    = {
id-field = "md5(string2bytes($0))" type = "delimited-text" format = "DEFAULT" fields = [ { name = "user", transform = "$0" }
{ name = "msg",       transform = "$1" } { name = "geom", transform = "point($2::double, $3::double)" } { name = "dtg", transform = "datetime($4)" }
]
}
}


This defines a stream source that will listen on port 5899 for csv messages that have the following columns: user, msg, lon, lat, dtg. To instantiate a DataStore for this type that keeps the last 30 seconds of tweets, use the following code.

val ds = DataStoreFinder.getDataStore(
Map(
StreamDataStoreParams.STREAM_DATASTORE_CONFIG.key -> sourceConf,
StreamDataStoreParams.CACHE_TIMEOUT.key           -> Integer.valueOf(30)
))


To query this stream source, use a FilterFactory from org.geotools.factory.CommonFactoryFinder. To receive notifications on new SimpleFeatures, use a StreamListener:

val listener =
new StreamListener {
def onNext(sf: SimpleFeature) = println(s"Received a new feature: \${sf.getID}")
}
ds.asInstanceOf[org.locationtech.geomesa.stream.datastore.StreamDataStore].registerListener(listener)


## 22.3. UDP¶

The generic source can be used with UDP as well, although there are some caveats:

• If you are sending text, the source route must include ‘?textline=true’, even though the Camel docs say that only applies to TCP
• Each UDP packet data must end with a newline character
• Each UDP packet data must contain exactly one line - everything after the newline will be dropped
• Maximum text line size can be controlled by the route parameter ‘decoderMaxLineLength’ with a maximum value of 2048
• If the message is longer than the line size then the message will be dropped
• Default maximum text line length is 1024
• Note that technically the line length can be longer, but Camel does not expose the Netty UDP RCVBUF_ALLOCATOR option, which causes messages to be truncated at 2048 bytes.