# GeoMesa Spark: Spatial Join and Aggregation¶

This tutorial will show you how to:

1. Use GeoMesa with Apache Spark in Scala.
2. Create and use DataFrames with our geospatial User Defined Functions.
3. Calculate aggregate statistics based on a threshold distance.
4. Create a new simple feature type to represent this aggregation.
5. Write the result back to a data store.

## Background¶

NYCTaxi is taxi activity data published by the University of Illinois from Freedom of Information Law requests to NYC Taxi and Limo Commission.

GeoNames is a geographical database containing over 10 million geographical names and over 9 million unique features.

Suppose we wanted to answer the questions: “Do taxi pickups centralize near certain points of interest?”, “Are people more likely to request a pickup or be dropped off at points of interest?”. To find out, we would need to combine these two data sets and aggregate statistics over the result.

Since GeoNames is a data set of points, and NYCTaxi provides only the pickup and drop-off points of a trip, it is highly unlikely that the trip would start or end exactly on the labeled point of interest. So we can’t naively join the data sets based on where the points are equal. Instead, we will need to join where the points are within some tolerable distance of each other. This is henceforth referred to as a D-within join.

## Prerequisites¶

For this tutorial, we will assume that your have already ingested the two data sets into the data store of your choosing. Following this tutorial without having created the necessary tables will lead to errors.

The converters for the GeoNames data set, GeoNames, and the NYCTaxi data set, NYC Taxi, are provided with GeoMesa. For further guidance, you can follow one of the ingest tutorials Map-Reduce Ingest of GDELT. Once you have the data ingested in GeoMesa, you may proceed with the rest of the tutorial.

## Initializing Spark¶

To start working with Spark, we will need a Spark Session initialized, and to apply GeoMesa’s geospatial User Defined Types (UDTs) and User Defined Functions (UDFs) to our data in Spark, we will need to initialize our SparkSQL extensions. This functionality requires having the appropriate GeoMesa Spark runtime jar on the classpath when running your Spark job. GeoMesa provides spark runtime jars for Accumulo, HBase, and FileSystem data stores. For example, the following would start an interactive Spark REPL with all dependencies needed for running Spark with GeoMesa version 2.0.0 on an Accumulo data store.



## D-within Join¶

Now we’re ready to join the two data sets. This is where we will make use of two of our geospatial UDFs. st_contains takes two geometries as input, and it outputs whether the second geometry lies within the first one. st_bufferPoint takes a point and a distance in meters as input, and it outputs a circle around the point with radius equal to the provided distance. For more documentation and a full list of the UDFs provided by GeoMesa see SparkSQL Functions.

Using these two UDFs, we can build the following join query.

val joinedDF = geonamesNY
.select(st_bufferPoint($"geom", lit(50)).as("buffer"),$"name", $"geonameId") .join(taxiDF, st_contains($"buffer", $"pickup_point"))  The above query transforms the geometry of each GeoName point into a circle with a radius of 50 meters, and joins the result with the taxi records that had pickups anywhere in that circle. ## Aggregating¶ Now we have a DataFrame where each point of interest in New York is combined with a taxi record where a pickup was issued from approximately that location. To turn this into meaningful statistics about taxi habits in the region, we can do a GROUP BY operation and use some of SparkSQL’s aggregate functions. val aggregateDF = joinedDF.groupBy($"geonameId")
.agg(first("name").as("name"),
countDistinct($"trip_id")).as(s"numPickups"), first("buffer").as("buffer"))  The above query groups the data based on point of interest, and counts the number of distinct pickups. The result can be used to generate a heatmap of points of interest based on density of pickups, but to quickly see which points of interest are most departed from via taxi, we can sort the results and look at the top ten. val top10 = aggregateDF.orderBy($"numPickups".desc).take(10)
top10.foreach { row => println(row.getAs[String]("name") + row.getAs[Int]("numPickups")) }


This tells us that Hotel Gansevoort has the most taxi pickups.

## Write-back¶

If we would like to persist this aggregated result beyond the spark session, we will need to write it back to the underlying data store. This is done is two steps.

First we create a SimpleFeatureType that is aligned with the aggregated result:

import org.locationtech.geomesa.utils.geotools.SftBuilder
val sftBuilder = new SftBuilder()
sftBuilder.stringType("name")
sftBuilder.intType("numPickups")
sftBuilder.polygon("buffer")
val aggregateSft = sftBuilder.build("aggregate")


Following this, we can create the schema in the data store, then safely write the data.

import org.geotools.data.DataStoreFinder
DataStoreFinder.getDataStore(taxiParams).createSchema(aggregateSft)
aggregateDF.write.format("geomesa").options(taxiParams).option("geomesa.feature", "aggregate").save()


If you followed all of the above steps, the end result is a data set with the density of taxi pickups at all the points of interest in New York, optionally written back to the data store. If one was further interested in comparing this result against the distribution of taxi drop-offs, the above code could easily be adapted to use the drop-off points instead.

Further steps to visualize this result can be taken by following the example in GeoMesa Spark: Broadcast Join and Aggregation. This will lead to something like the following: