11.6. Data Consumers¶
A GeoMesa Kafka data store in consumer mode reads feature data written
to Kafka (by a separate GeoMesa Kafka producer). The store supports two types of
SimpleFeatureSource
’s: live and replay. A Kafka Consumer Feature
Source operating in live mode continually pulls data from the end of
the message queue (e.g. latest time) and always represents the latest
state of the simple features; autoOffsetReset
can be set to smallest
to read from the beginning of the the message queue. A Kafka Consumer Feature
Source operating in replay mode will pull data from a specified time interval
in the past and can provide features as they existed at any point in time
within that interval.
First, create the data store. For example:
String brokers = ...
String zookeepers = ...
String zkPath = ...
// build parameters map
Map<String, Serializable> params = new HashMap<>();
params.put("brokers", brokers);
params.put("zookeepers", zookeepers);
// optional - the default is false
params.put("isProducer", Boolean.FALSE);
// optional
params.put("zkPath", zkPath);
// optional - can be set to "smallest" to read from the beginning of the message queue
params.put("autoOffsetReset", "largest");
// create the data store
KafkaDataStoreFactory factory = new KafkaDataStoreFactory();
DataStore consumerDs = factory.createDataStore(params);
The brokers
, zookeepers
, and zkPath
parameters must be
consistent with the values used to create the Kafka Data Store Producer.
Because createSchema
was called on the Kafka Data Store Producer, it
does not need to be called on the Consumer. Calling createSchema
with a SimpleFeatureType
that has already been created will result
in an exception being thrown. Note that all SimpleFeature
s
returned by the Kafka Data Store consumer will have a
SimpleFeatureType
equal to the streamingSFT
created when setting
up the producer, i.e. the SimpleFeatureType
will include the hint
added by KafkaDataStoreHelper.createStreamingSFT
.
Now that the Kafka Data Store Consumer has been created it can be queried in either live or replay mode.
11.6.1. Live Mode¶
Live mode is the default and requires no extra setup. In this mode the
SimpleFeatureSource
contains the current state of the
KafkaDataStore
. As SimpleFeatures
are created, modified,
deleted, or cleared by the Kafka Data Store Producer, the current state
is updated. All queries to the SimpleFeatureSource
are queries
against the current state. For example:
String typeName = ...
SimpleFeatureSource liveFeatureSource = consumerDs.getFeatureSource(typeName);
Filter filter = ...
liveFeatureSource.getFeatures(filter);
It is also possible to provide a CQL filter to the getFeatureSource method call which will ensure
the resulting FeatureSource
only contains certain records. Providing a filter to reduce the number of
returned records will provide a performance boost when using the featureSource.
String typeName = ...
SimpleFeatureSource liveFeatureSource = consumerDs.getFeatureSource(typeName, filter);
11.6.2. Replay Mode¶
Replay mode allows the a user to query the KafkaDataStore
as it
existed at any point in the past. Queries against a Kafka Replay Simple
Feature source specify a historical time to query and only the set and
version of SimpleFeature
s that existed at that point in time will
be used to answer the query.
In order to use Replay mode some additional hints are required: the start and end times of the replay window and a read behind duration:
Instant replayStart = ...
Instant replayEnd = ...
Duration readBehind = ...
ReplayConfig replayConfig = new ReplayConfig(replayStart, replayEnd, readBehind);
The replay window is simply an optimization that allows the Kafka Replay Feature Source to load, at initialization time, all state changes that occur within the window. Any query for a time outside of the window will return no results even if features existed at that time.
The read behind is the amount of time used to rebuild state. For
example, if readBehind = 5s
then for a query requesting state at
time = t
all state changes that occurred between t - 5s
and
t
will be used to build the state at time t
which will then be
used to answer the query. Selecting an appropriate read behind requires
an understanding of the producer. The expected use case is a producer
that updates every simple feature, even if it hasn’t changed, at a
regular interval. For example, if the producer is updating every x
seconds then a read behind of x + 1s
might be appropriate.
During initialization of the Kafka Replay Feature Source all state
changes from replayStart - readBehind
to replayEnd
will be read
and cached. As the size of the replay window and read behind increases
so does the amount of data that must be read and cashed. So, both the
size of the window and the read behind should be kept as small as
possible.
After creating the ReplayConfig
, pass it, along with the
streamingSFT
, to the KafkaDataStoreHelper
:
SimpleFeatureType streamingSFT = consumerDs.getSchema(typeName);
SimpleFeatureType replaySFT = KafkaDataStoreHelper.createReplaySFT(streamingSFT, replayConfig);
The streamingSFT
passed to createReplaySFT
must contain the
hints added by KafkaDataStoreHelper.createStreamingSFT
. The easiest
way to ensure this is to call consumerDs.getSchema(typeName)
. The
SimpleFeatureType
returned by createReplaySFT
will contain the
hint added by createStreamingSFT
as well as a a hint containing the
ReplayConfig
. Additionally the replaySFT
will have a different
name than streamingSFT
. This is to differentiate live and
replay SimpleFeatureType
s. The replaySFT
will also contain
an additional attribute, KafkaLogTime
, of type java.util.Date
which represents the historical query time.
After creating the replaySFT
the Kafka Replay Feature Source may be
created:
consumerDs.createSchema(replaySFT);
String replayTimeName = replaySFT.getTypeName();
SimpleFeatureSource replayFeatureSource = consumerDs.getFeatureSource(replayTimeName);
The call to createSchema
is required because the replaySFT
is a
new SimpleFeatureType
.
Finally the Kafka Replay Consumer Feature Source can be queried:
Instant historicalTime = ...
Filter timeFilter = ff.and(filter, ReplayTimeHelper.toFilter(historicalTime));
replayFeatureSource.getFeatures(timeFilter);