22.2. Spark Core¶
geomesa-spark-core is used to work directly with RDD
s of features
from GeoMesa and other geospatial data stores.
22.2.1. Example¶
The following is a complete Scala example of creating an RDD via a geospatial query against a GeoMesa data store:
// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
"instanceId" -> "instance",
"zookeepers" -> "zoo1,zoo2,zoo3",
"user" -> "user",
"password" -> "*****",
"auths" -> "USER,ADMIN",
"tableName" -> "geomesa_catalog")
// set SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("testSpark")
val sc = SparkContext.getOrCreate(conf)
// create RDD with a geospatial query using GeoMesa functions
val spatialRDDProvider = GeoMesaSpark(dsParams)
val filter = ECQL.toFilter("CONTAINS(POLYGON((0 0, 0 90, 90 90, 90 0, 0 0)), geom)")
val query = new Query("chicago", filter)
val resultRDD = spatialRDDProvider.rdd(new Configuration, sc, dsParams, query)
resultRDD.collect
// Array[org.opengis.feature.simple.SimpleFeature] = Array(
// ScalaSimpleFeature:4, ScalaSimpleFeature:5, ScalaSimpleFeature:6,
// ScalaSimpleFeature:7, ScalaSimpleFeature:9)
22.2.2. Configuration¶
geomesa-spark-core provides an API for accessing geospatial data
in Spark, by defining an interface called SpatialRDDProvider
. Different
implementations of this interface connect to GeoMesa Accumulo, generic
GeoTools-based DataStore
s, or data files in formats readable by the GeoMesa
converter library. These different providers are described in more detail
in Usage below.
To use these libraries in Spark, the shaded JAR built by the
geomesa-accumulo-spark-runtime module (geomesa-accumulo/geomesa-accumulo-spark-runtime
in the source distribution) contains all of the dependencies needed to run
the Accumulo RDD Provider. This shaded JAR can be passed (for example)
to the spark-submit
command via the --jars
option:
--jars file://path/to/geomesa-accumulo-spark-runtime_2.11-$VERSION.jar
or passed to Spark via the appropriate mechanism in notebook servers such as Jupyter (see Deploying GeoMesa Spark with Jupyter Notebook) or Zeppelin.
This shaded JAR should also provide the dependencies needed for the
Converter RDD Provider and GeoTools RDD Provider, so these JARs
may simply be added to --jars
as well (though in the latter
case additional JARs may be needed to implement the GeoTools data store accessed).
22.2.3. Simple Feature Serialization¶
To serialize RDD
s of SimpleFeature
s between nodes of a cluster, Spark
must be configured with a Kryo serialization registrator provided in
geomesa-spark-core.
Note
Configuring Kryo serialization is not needed when running Spark in local
mode, as jobs will be executed within a single JVM.
Add these two entries to $SPARK_HOME/conf/spark-defaults.conf
(or pass them as --conf
arguments to spark-submit
):
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
Note
Alternatively, these may be set in the SparkConf
object used to create the
SparkContext
:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName)
When using Spark in a notebook server, this will require disabling the automatic
creation of a SparkContext
.
After setting the configuration options, RDDs created by the GeoMesa
SpatialRDDProvider
implementations will be properly registered with the
serializer provider.
22.2.4. Usage¶
The main point of entry for the functionality provided by geomesa-spark-core is the
GeoMesaSpark
object:
val spatialRDDProvider = GeoMesaSpark(params)
GeoMesaSpark
loads a SpatialRDDProvider
implementation via SPI when the appropriate JAR is included on the classpath.
The implementation returned by GeoMesaSpark
is chosen based on the
parameters passed as an argument, as shown in the Scala code below:
// parameters to pass to the SpatialRDDProvider implementation
val params = Map(
"param1" -> "foo",
"param2" -> "bar")
// GeoTools Query; may be used to filter results retrieved from the data store
val query = new Query("foo")
// val query = new Query("foo", ECQL.toFilter("name like 'A%'"))
// get the RDD, using the SparkContext configured as above
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
22.2.5. Accumulo RDD Provider¶
AccumuloSpatialRDDProvider
is provided by the geomesa-accumulo-spark
module:
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-accumulo-spark_2.11</artifactId>
// version, etc.
</dependency>
This provider generates and saves RDD
s of features stored in a GeoMesa
AccumuloDataStore
. The configuration parameters passed to
AccumuloSpatialRDDProvider
are the same as those passed to
AccumuloDataStoreFactory.createDataStore()
or DataStoreFinder.getDataStore()
.
The feature to access in GeoMesa is passed as the type name of the query passed
to the rdd()
method. For example, to load an RDD
of features of type “gdelt”
from the “geomesa” Accumulo table:
val params = Map(
"instanceId" -> "mycloud",
"user" -> "user",
"password" -> "password",
"zookeepers" -> "zoo1,zoo2,zoo3",
"tableName" -> "geomesa")
val query = new Query("gdelt")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
To save features, use the save()
method:
GeoMesaSpark(params).save(rdd, params, "gdelt")
22.2.6. Converter RDD Provider¶
ConverterSpatialRDDProvider
is provided by the geomesa-spark-converter
module:
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-converter_2.11</artifactId>
// version, etc.
</dependency>
ConverterSpatialRDDProvider
reads features from one or more data files in formats
readable by the GeoMesa Convert library, including delimited and fixed-width text,
Avro, JSON, and XML files. It takes the following configuration parameters:
geomesa.converter
- the converter definition as a Typesafe Config stringgeomesa.converter.inputs
- input file paths, comma-delimitedgeomesa.sft
- theSimpleFeatureType
, as a spec string, configuration string, or environment lookup namegeomesa.sft.name
- (optional) the name of theSimpleFeatureType
Consider the example data described in the Example Usage section of the
GeoMesa Convert documentation. If the file example.csv
contains the
example data, and example.conf
contains the Typesafe configuration file for the
converter, the following Scala code can be used to load this data into an RDD
:
val exampleConf = ConfigFactory.load("example.conf").root().render()
val params = Map(
"geomesa.converter" -> exampleConf,
"geomesa.converter.inputs" -> "example.csv",
"geomesa.sft" -> "phrase:String,dtg:Date,geom:Point:srid=4326",
"geomesa.sft.name" -> "example")
val query = new Query("example")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
It is also possible to load the prepackaged converters for public data sources (GDELT, GeoNames, etc.) via Maven or SBT. See Prepackaged Converter Definitions for more details.
Warning
ConvertSpatialRDDProvider
is read-only, and does not support writing features
to data files.
22.2.7. GeoTools RDD Provider¶
GeoToolsSpatialRDDProvider
is provided by the geomesa-spark-geotools
module:
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-spark-geotools_2.11</artifactId>
// version, etc.
</dependency>
GeoToolsSpatialRDDProvider
generates and saves RDD
s of features stored in
a generic GeoTools DataStore
. The configuration parameters passed are the same as
those passed to DataStoreFinder.getDataStore()
to create the data store of interest,
plus a required boolean parameter called “geotools” to indicate to the SPI to load
GeoToolsSpatialRDDProvider
. For example, the CSVDataStore described in the
GeoTools ContentDataStore tutorial takes a single parameter called “file”. To use
this data store with GeoMesa Spark, do the following:
val params = Map(
"geotools" -> "true",
"file" -> "locations.csv")
val query = new Query("locations")
val rdd = GeoMesaSpark(params).rdd(new Configuration(), sc, params, query)
The name of the feature type to access in the data store is passed as the type name of the
query passed to the rdd()
method. In the example of the CSVDataStore, this is the
basename of the filename passed as an argument.
Warning
Do not use the GeoTools RDD provider with a GeoMesa Accumulo data store. The Accumulo RDD Provider provides additional optimizations to improve performance between Spark/SparkSQL and GeoMesa Accumulo data stores.
If both the GeoTools and Accumulo RDD providers are available on the classpath,
the GeoTools provider will only be used if "geotools" -> "true"
is included
as a parameter, and thus should be omitted with a GeoMesa Accumulo data store.
If your data store supports it, use the save()
method to save features:
GeoMesaSpark(params).save(rdd, params, "locations")