GeoMesa SparkSQL support builds upon the
DataFrame API present
in the Spark SQL module to provide geospatial capabilities. This includes
custom geospatial data types and functions, the ability to create a
from a GeoTools
DataStore, and optimizations to improve SQL query performance.
GeoMesa SparkSQL code is provided by the
<dependency> <groupId>org.locationtech.geomesa</groupId> <artifactId>geomesa-spark-sql_2.11</artifactId> // version, etc. </dependency>
The following is a Scala example of connecting to GeoMesa Accumulo via SparkSQL:
// DataStore params to a hypothetical GeoMesa Accumulo table val dsParams = Map( "instanceId" -> "instance", "zookeepers" -> "zoo1,zoo2,zoo3", "user" -> "user", "password" -> "*****", "auths" -> "USER,ADMIN", "tableName" -> "geomesa_catalog") // Create SparkSession val sparkSession = SparkSession.builder() .appName("testSpark") .config("spark.sql.crossJoin.enabled", "true") .master("local[*]") .getOrCreate() // Create DataFrame using the "geomesa" format val dataFrame = sparkSession.read .format("geomesa") .options(dsParams) .option("geomesa.feature", "chicago") .load() dataFrame.createOrReplaceTempView("chicago") // Query against the "chicago" schema val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)" val resultDataFrame = sparkSession.sql(sqlQuery) resultDataFrame.show /* +-------+------+-----------+--------------------+-----------------+ |__fid__|arrest|case_number| dtg| geom| +-------+------+-----------+--------------------+-----------------+ | 4| true| 4|2016-01-04 00:00:...|POINT (76.5 38.5)| | 5| true| 5|2016-01-05 00:00:...| POINT (77 38)| | 6| true| 6|2016-01-06 00:00:...| POINT (78 39)| | 7| true| 7|2016-01-07 00:00:...| POINT (20 20)| | 9| true| 9|2016-01-09 00:00:...| POINT (50 50)| +-------+------+-----------+--------------------+-----------------+ */
Because GeoMesa SparkSQL stacks on top of the
one or more of the
SpatialRDDProvider implementations, as described in
Spark Core, must be included on the classpath. The shaded JAR built by the
geomesa-accumulo-spark-runtime module (
in the source distribution) contains all of the dependencies needed to run
the Accumulo RDD Provider as well as geomesa-spark-sql. This shaded
JAR can be passed (for example) to the
spark-submit command via the
or passed to Spark via the appropriate mechanism in notebook servers such as Jupyter (see Deploying GeoMesa Spark with Jupyter Notebook) or Zeppelin.
This shaded JAR should also provide the dependencies needed for the
Converter RDD Provider and GeoTools RDD Provider, so these JARs
may simply be added to
--jars as well (though in the latter
case additional JARs may be needed to implement the GeoTools data store accessed).
When using the Accumulo RDD Provider or Converter RDD Provider with geomesa-spark-sql, it is not necessary to set up the Kryo serialization described in Simple Feature Serialization. However, this may be required when using the GeoTools RDD Provider.
If you will be
DataFrames together, it will be necessary
to add the
spark.sql.crossJoin.enabled property when creating the
val spark = SparkSession.builder(). // ... config("spark.sql.crossJoin.enabled", "true"). // ... getOrCreate()
Cross-joins can be very, very inefficient. Take care to ensure that one or both
sets of data joined are very small, and consider using the
to ensure that at least one
DataFrame joined is in memory.
To create a GeoMesa SparkSQL-enabled
DataFrame with data corresponding to a particular
feature type, do the following:
// dsParams contains the parameters to pass to the data store val dataFrame = sparkSession.read .format("geomesa") .options(dsParams) .option("geomesa.feature", typeName) .load()
format("geomesa") registers the GeoMesa SparkSQL data source, and
option("geomesa.feature", typeName) tells GeoMesa to use the feature type
typeName. This also registers the custom user-defined types and functions
implemented in GeoMesa SparkSQL.
By registering a
DataFrame as a temporary view, it is possible to access
this data frame in subsequent SQL calls. For example:
makes it possible to call this data frame via the alias “chicago”:
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)" val resultDataFrame = sparkSession.sql(sqlQuery)
Registering user-defined types and functions can also be done manually by invoking
SQLTypes.init() on the
SQLContext object of the Spark session:
It is also possible to write a Spark DataFrame to a GeoMesa table with
This will automatically convert the data frame’s underlying RDD[Row] into an RDD[SimpleFeature] and write to the data store in parallel. For this to work, the feature type featureName must already exist in the data store.
22.3.4. Geospatial User-defined Types and Functions¶
The GeoMesa SparkSQL module takes several classes representing geometry objects (as described by the OGC OpenGIS Simple feature access common architecture specification and implemented by the Java Topology Suite) and registers them as user-defined types (UDTs) in SparkSQL. These types are:
GeoMesa SparkSQL also implements a subset of the functions described in the OGC OpenGIS Simple feature access SQL option specification as SparkSQL user-defined functions (UDFs). These include functions for creating geometries, accessing properties of geometries, casting Geometry objects to more specific subclasses, outputting geometries in other formats, measuring spatial relationships between geometries, and processing geometries.
For example, the following SQL query
select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)
uses two UDFs–
st_makeBBOX–to find the rows in the
DataFrame where column
geom is contained within the specified bounding box.
A complete list of the implemented UDFs is given in the next section (SparkSQL Functions).
22.3.5. In-memory Indexing¶
If your data is small enough to fit in the memory of your executors, you can tell GeoMesa SparkSQL to persist RDDs in memory
and leverage the use of CQEngine as an in-memory indexed data store. To do this, add the option
when creating your data frame. This will place an index on every attribute excluding the
fid and the geometry.
To index based on geometry, add the option
option("indexGeom", "true"). Queries to this relation will automatically
hit the cached RDD and query the in-memory data store that lives on each partition, which can yield significant speedups.
Given some knowledge of your data, it is also possible to ensure that the data will fit in memory by applying an initial query.
This can be done with the
query option. For example,
option("query", "dtg AFTER 2016-12-31T23:59:59Z")
22.3.6. Spatial Partitioning and Faster Joins¶
Additional speedups can be attained by spatially partitioning your data. Adding the option
will ensure that data that are spatially near each other will be placed on the same partition. By default, your data will
be partitioned into an NxN grid, but there exist 4 total partitioning strategies, and each can be specified by name with
EQUAL - Computes the bounds of your data and divides it into an NxN grid of equal size, where
N = sqrt(numPartitions)
WEIGHTED - Like EQUAL, but ensures that equal proportions of the data along each axis are in each grid cell.
EARTH - Like EQUAL, but uses the whole earth as the bounds instead of computing them based on the data.
RTREE - Constructs an R-Tree based on a sample of the data, and uses a subset of the bounding rectangles as partition envelopes.
The advantages to spatially partitioning are two fold:
1) Queries with a spatial predicate that lies wholly in one partition can go directly to that partition, skipping the overhead of scanning partitions that will be certain to not include the desired data.
2) If two data sets are partitioned by the same scheme, resulting in the same partition envelopes for both relations, then spatial joins can use the partition envelope as a key in the join. This dramatically reduces the number of comparisons required to complete the join.