GeoMesa Spark: Spatial Join and Aggregation =========================================== This tutorial will show you how to: 1. Use GeoMesa with `Apache Spark `__ in Scala. 2. Create and use DataFrames with our geospatial User Defined Functions. 3. Calculate aggregate statistics based on a threshold distance. 4. Create a new simple feature type to represent this aggregation. 5. Write the result back to a data store. Background ---------- `NYCTaxi `__ is taxi activity data published by the University of Illinois from Freedom of Information Law requests to NYC Taxi and Limo Commission. `GeoNames `__ is a geographical database containing over 10 million geographical names and over 9 million unique features. Suppose we wanted to answer the questions: "Do taxi pickups centralize near certain points of interest?", "Are people more likely to request a pickup or be dropped off at points of interest?". To find out, we would need to combine these two data sets and aggregate statistics over the result. Since GeoNames is a data set of points, and NYCTaxi provides only the pickup and drop-off points of a trip, it is highly unlikely that the trip would start or end exactly on the labeled point of interest. So we can't naively join the data sets based on where the points are equal. Instead, we will need to join where the points are within some tolerable distance of each other. This is henceforth referred to as a D-within join. Prerequisites ------------- For this tutorial, we will assume that your have already ingested the two data sets into the data store of your choosing. Following this tutorial without having created the necessary tables will lead to errors. The converters for the GeoNames data set, :doc:`/user/convert/premade/geonames`, and the NYCTaxi data set, :doc:`/user/convert/premade/nyctaxi`, are provided with GeoMesa. For further guidance, you can follow one of the ingest tutorials :doc:`/tutorials/geomesa-examples-gdelt`. Once you have the data ingested in GeoMesa, you may proceed with the rest of the tutorial. Initializing Spark ------------------ To start working with Spark, we will need a Spark Session initialized, and to apply GeoMesa's geospatial User Defined Types (UDTs) and User Defined Functions (UDFs) to our data in Spark, we will need to initialize our SparkSQL extensions. This functionality requires having the appropriate GeoMesa Spark runtime jar on the classpath when running your Spark job. GeoMesa provides spark runtime jars for Accumulo, HBase, and FileSystem data stores. For example, the following would start an interactive Spark REPL with all dependencies needed for running Spark with GeoMesa on an Accumulo data store. Replace ``${VERSION}`` with the appropriate Scala plus GeoMesa versions (e.g. |scala_release_version|): .. code-block:: bash $ bin/spark-shell --jars geomesa-accumulo-spark-runtime-accumulo2_${VERSION}.jar .. note:: See :ref:`spatial_rdd_providers` for details on choosing the correct GeoMesa Spark runtime JAR. To configure the Spark Session such that we can serialize Simple Features and work with geometric UDTs and UDFs, we must alter the Spark Session as follows. .. code-block:: scala import org.apache.spark.sql.SparkSession import org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator import org.locationtech.geomesa.spark.jts._ val spark: SparkSession = SparkSession.builder() .appName("testSpark") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryo.registrator", classOf[GeoMesaSparkKryoRegistrator].getName) .master("local[*]") .getOrCreate() .withJTS Note the ``withJTS``, which registers GeoMesa's UDTs and UDFs, and the two ``config`` options which tell Spark to use GeoMesa's custom Kryo serializer and registrator to handle serialization of Simple Features. These configuration options can also be set in the ``conf/spark-defaults.conf`` configuration file. Creating DataFrames ------------------- With our Spark Session created and configured, we can move on to loading our data from the data store into a Spark DataFrame. First we'll set up the parameters for connecting to the data store. For example, if our data is in two Accumulo catalogs, we would set up the following parameter maps: .. code-block:: scala val taxiParams = Map( "accumulo.instance.id" -> "instance", "accumulo.zookeepers" -> "zoo1:2181,zoo2:2181,zoo3:2181", "accumulo.user" -> "user", "accumulo.password" -> "password", "accumulo.catalog" -> "nyctaxi") val geonamesParams = Map( "accumulo.instance.id" -> "instance", "accumulo.zookeepers" -> "zoo1:2181,zoo2:2181,zoo3:2181", "accumulo.user" -> "user", "accumulo.password" -> "password", "accumulo.catalog" -> "geonames") .. note:: The above parameters assume Accumulo as the backing data store, but the rest of the tutorial is independent of which data store is used. Other supported data stores may be used by simply adapting the above parameters appropriately. Then we can make use of Spark's ``DataFrameReader`` and our ``SpatialRDDProvider`` to create a ``DataFrame`` .. code-block:: scala val taxiDF = spark.read.format("geomesa") .options(taxiParams) .option("geomesa.feature", "nyctaxi-single") .load() val geonamesDF = spark.read.format("geomesa") .options(geonamesParams) .option("geomesa.feature", "geonames") .load() Since we know our taxi data is limited to the state of New York, we can filter our geonames data. .. code-block:: scala import spark.implicits._ import org.apache.spark.sql.functions._ val geonamesNY = geonamesDF.where($"admin1Code" === lit("NY")) D-within Join ------------- Now we're ready to join the two data sets. This is where we will make use of two of our geospatial UDFs. ``st_contains`` takes two geometries as input, and it outputs whether the second geometry lies within the first one. ``st_bufferPoint`` takes a point and a distance in meters as input, and it outputs a circle around the point with radius equal to the provided distance. For more documentation and a full list of the UDFs provided by GeoMesa see :doc:`/user/spark/sparksql_functions`. Using these two UDFs, we can build the following join query. .. code-block:: scala val joinedDF = geonamesNY .select(st_bufferPoint($"geom", lit(50)).as("buffer"), $"name", $"geonameId") .join(taxiDF, st_contains($"buffer", $"pickup_point")) The above query transforms the geometry of each GeoName point into a circle with a radius of 50 meters, and joins the result with the taxi records that had pickups anywhere in that circle. Aggregating ----------- Now we have a DataFrame where each point of interest in New York is combined with a taxi record where a pickup was issued from approximately that location. To turn this into meaningful statistics about taxi habits in the region, we can do a ``GROUP BY`` operation and use some of SparkSQL's aggregate functions. .. code-block:: scala val aggregateDF = joinedDF.groupBy($"geonameId") .agg(first("name").as("name"), countDistinct($"trip_id")).as(s"numPickups"), first("buffer").as("buffer")) The above query groups the data based on point of interest, and counts the number of distinct pickups. The result can be used to generate a heatmap of points of interest based on density of pickups, but to quickly see which points of interest are most departed from via taxi, we can sort the results and look at the top ten. .. code-block:: scala val top10 = aggregateDF.orderBy($"numPickups".desc).take(10) top10.foreach { row => println(row.getAs[String]("name") + row.getAs[Int]("numPickups")) } This tells us that Hotel Gansevoort has the most taxi pickups. Write-back ---------- If we would like to persist this aggregated result beyond the spark session, we will need to write it back to the underlying data store. This is done is two steps. First we create a SimpleFeatureType that is aligned with the aggregated result: .. code-block:: scala import org.locationtech.geomesa.utils.geotools.SchemaBuilder val aggregateSft = SchemaBuilder.builder() .addString("name") .addInt("numPickups") .addPolygon("buffer") .build("aggregate") Following this, we can create the schema in the data store, then safely write the data. .. code-block:: scala import org.geotools.data.DataStoreFinder DataStoreFinder.getDataStore(taxiParams).createSchema(aggregateSft) aggregateDF.write.format("geomesa").options(taxiParams).option("geomesa.feature", "aggregate").save() If you followed all of the above steps, the end result is a data set with the density of taxi pickups at all the points of interest in New York, optionally written back to the data store. If one was further interested in comparing this result against the distribution of taxi drop-offs, the above code could easily be adapted to use the drop-off points instead. Further steps to visualize this result can be taken by following the example in :doc:`/tutorials/broadcast-join`. This will lead to something like the following: .. figure:: _static/img/tutorials/2018-04-04-dwithin-join/aggregate-NYCTaxi.png