21.3. SparkSQL

GeoMesa SparkSQL support builds upon the DataSet/DataFrame API present in the Spark SQL module to provide geospatial capabilities. This includes custom geospatial data types and functions, the ability to create a DataFrame from a GeoTools DataStore, and optimizations to improve SQL query performance.

GeoMesa SparkSQL code is provided by the geomesa-spark-sql module:

<dependency>
  <groupId>org.locationtech.geomesa</groupId>
  <artifactId>geomesa-spark-sql_2.11</artifactId>
  // version, etc.
</dependency>

21.3.1. Example

The following is a Scala example of connecting to GeoMesa Accumulo via SparkSQL:

// DataStore params to a hypothetical GeoMesa Accumulo table
val dsParams = Map(
  "instanceId" -> "instance",
  "zookeepers" -> "zoo1,zoo2,zoo3",
  "user"       -> "user",
  "password"   -> "*****",
  "auths"      -> "USER,ADMIN",
  "tableName"  -> "geomesa_catalog")

// Create SparkSession
val sparkSession = SparkSession.builder()
  .appName("testSpark")
  .config("spark.sql.crossJoin.enabled", "true")
  .master("local[*]")
  .getOrCreate()

// Create DataFrame using the "geomesa" format
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", "chicago")
  .load()
dataFrame.createOrReplaceTempView("chicago")

// Query against the "chicago" schema
val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)

resultDataFrame.show
/*
+-------+------+-----------+--------------------+-----------------+
|__fid__|arrest|case_number|                 dtg|             geom|
+-------+------+-----------+--------------------+-----------------+
|      4|  true|          4|2016-01-04 00:00:...|POINT (76.5 38.5)|
|      5|  true|          5|2016-01-05 00:00:...|    POINT (77 38)|
|      6|  true|          6|2016-01-06 00:00:...|    POINT (78 39)|
|      7|  true|          7|2016-01-07 00:00:...|    POINT (20 20)|
|      9|  true|          9|2016-01-09 00:00:...|    POINT (50 50)|
+-------+------+-----------+--------------------+-----------------+
*/

21.3.2. Configuration

Because GeoMesa SparkSQL stacks on top of the geomesa-spark-core module, one or more of the SpatialRDDProvider implementations, as described in Spark Core, must be included on the classpath. The shaded JAR built by the geomesa-accumulo-spark-runtime module (geomesa-accumulo/geomesa-accumulo-spark-runtime in the source distribution) contains all of the dependencies needed to run the Accumulo RDD Provider as well as geomesa-spark-sql. This shaded JAR can be passed (for example) to the spark-submit command via the --jars option:

--jars file://path/to/geomesa-accumulo-spark-runtime_2.11-$VERSION.jar

or passed to Spark via the appropriate mechanism in notebook servers such as Jupyter (see Deploying GeoMesa Spark with Jupyter Notebook) or Zeppelin.

This shaded JAR should also provide the dependencies needed for the Converter RDD Provider and GeoTools RDD Provider, so these JARs may simply be added to --jars as well (though in the latter case additional JARs may be needed to implement the GeoTools data store accessed).

Note

When using the Accumulo RDD Provider or Converter RDD Provider with geomesa-spark-sql, it is not necessary to set up the Kryo serialization described in Simple Feature Serialization. However, this may be required when using the GeoTools RDD Provider.

If you will be JOIN-ing multiple DataFrames together, it will be necessary to add the spark.sql.crossJoin.enabled property when creating the SparkSession object:

val spark = SparkSession.builder().
   // ...
   config("spark.sql.crossJoin.enabled", "true").
   // ...
   getOrCreate()

Warning

Cross-joins can be very, very inefficient. Take care to ensure that one or both sets of data joined are very small, and consider using the broadcast() method to ensure that at least one DataFrame joined is in memory.

21.3.3. Usage

To create a GeoMesa SparkSQL-enabled DataFrame with data corresponding to a particular feature type, do the following:

// dsParams contains the parameters to pass to the data store
val dataFrame = sparkSession.read
  .format("geomesa")
  .options(dsParams)
  .option("geomesa.feature", typeName)
  .load()

Specifically, invoking format("geomesa") registers the GeoMesa SparkSQL data source, and option("geomesa.feature", typeName) tells GeoMesa to use the feature type named typeName. This also registers the custom user-defined types and functions implemented in GeoMesa SparkSQL.

By registering a DataFrame as a temporary view, it is possible to access this data frame in subsequent SQL calls. For example:

dataFrame.createOrReplaceTempView("chicago")

makes it possible to call this data frame via the alias “chicago”:

val sqlQuery = "select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)"
val resultDataFrame = sparkSession.sql(sqlQuery)

Registering user-defined types and functions can also be done manually by invoking SQLTypes.init() on the SQLContext object of the Spark session:

SQLTypes.init(sparkSession.sqlContext)

It is also possible to write a Spark DataFrame to a GeoMesa table with

This will automatically convert the data frame’s underlying RDD[Row] into an RDD[SimpleFeature] and write to the data store in parallel. For this to work, the feature type featureName must already exist in the data store.

21.3.4. Geospatial User-defined Types and Functions

The GeoMesa SparkSQL module takes several classes representing geometry objects (as described by the OGC OpenGIS Simple feature access common architecture specification and implemented by the Java Topology Suite) and registers them as user-defined types (UDTs) in SparkSQL. These types are:

  • Geometry
  • Point
  • LineString
  • Polygon
  • MultiPoint
  • MultiLineString
  • MultiPolygon
  • GeometryCollection

GeoMesa SparkSQL also implements a subset of the functions described in the OGC OpenGIS Simple feature access SQL option specification as SparkSQL user-defined functions (UDFs). These include functions for creating geometries, accessing properties of geometries, casting Geometry objects to more specific subclasses, outputting geometries in other formats, measuring spatial relationships between geometries, and processing geometries.

For example, the following SQL query

select * from chicago where st_contains(st_makeBBOX(0.0, 0.0, 90.0, 90.0), geom)

uses two UDFs–st_contains and st_makeBBOX–to find the rows in the chicago DataFrame where column geom is contained within the specified bounding box.

A complete list of the implemented UDFs is given in the next section (SparkSQL Functions).