1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.apache.spark.geomesa.api.python
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import net.razorvine.pickle.{IObjectPickler, Pickler}
13 import org.apache.spark.api.java.JavaRDD
14 import org.apache.spark.api.python.SerDeUtil
15 import org.apache.spark.rdd.RDD
16 import org.locationtech.geomesa.utils.text.WKBUtils
17 import org.locationtech.jts.geom.Geometry
18 
19 import java.io.OutputStream
20 
21 object GeoMesaSeDerUtil extends LazyLogging {
22 
23   implicit def toPickledRDD(rdd: RDD[_]): JavaRDD[Array[Byte]] =
24     rdd.mapPartitions{ i =>  AutoBatchedPickler(i) }.toJavaRDD
25 
26   implicit def toPickledRDD(jrdd: JavaRDD[_]): JavaRDD[Array[Byte]] = toPickledRDD(jrdd.rdd)
27 
28 }
29 
30 object AutoBatchedPickler extends LazyLogging {
31 
32   val module = "geomesa_pyspark.types"
33   val function = "_deserialize_from_wkb"
34 
35   val pickler = new IObjectPickler {
36     def pickle(o: Object, os: OutputStream, p: Pickler): Unit = {
37       import net.razorvine.pickle.Opcodes
38       os.write(Opcodes.GLOBAL)
39       os.write(s"$module\n$function\n".getBytes)
40       p.save(WKBUtils.write(o.asInstanceOf[Geometry]))
41       os.write(Opcodes.TUPLE1)
42       os.write(Opcodes.REDUCE)
43     }
44   }
45   Pickler.registerCustomPickler(classOf[Geometry], pickler)
46   logger.info("registered JTS Geometry to WKB pickler")
47 
48   def apply(i: Iterator[Any]): AutoBatchedPickler =  new AutoBatchedPickler(i)
49 }
50 
51 class AutoBatchedPickler(i: Iterator[Any]) extends SerDeUtil.AutoBatchedPickler(i)
Line Stmt Id Pos Tree Symbol Tests Code
24 70196 967 - 1025 Apply org.apache.spark.rdd.RDD.toJavaRDD rdd.mapPartitions[Array[Byte]](((i: Iterator[_$1]) => AutoBatchedPickler.apply(i)), rdd.mapPartitions$default$2[Nothing])((ClassTag.apply[Array[Byte]](scala.runtime.ScalaRunTime.arrayClass(classOf[scala.Byte])): scala.reflect.ClassTag[Array[Byte]])).toJavaRDD()
26 70197 1110 - 1118 Select org.apache.spark.api.java.JavaRDD.rdd jrdd.rdd
26 70198 1097 - 1119 Apply org.apache.spark.geomesa.api.python.GeoMesaSeDerUtil.toPickledRDD GeoMesaSeDerUtil.this.toPickledRDD(jrdd.rdd)
32 70199 1188 - 1211 Literal <nosymbol> "geomesa_pyspark.types"
33 70200 1229 - 1252 Literal <nosymbol> "_deserialize_from_wkb"
35 70209 1270 - 1273 Apply org.apache.spark.geomesa.api.python.AutoBatchedPickler.$anon.<init> new $anon()
38 70201 1405 - 1429 Apply java.io.OutputStream.write os.write(99)
39 70203 1436 - 1478 Apply java.io.OutputStream.write os.write(scala.StringContext.apply("", "\\n", "\\n").s(AutoBatchedPickler.this.module, AutoBatchedPickler.this.function).getBytes())
39 70202 1445 - 1477 Apply java.lang.String.getBytes scala.StringContext.apply("", "\\n", "\\n").s(AutoBatchedPickler.this.module, AutoBatchedPickler.this.function).getBytes()
40 70205 1492 - 1532 Apply org.locationtech.geomesa.utils.text.WKBUtils.write org.locationtech.geomesa.utils.text.WKBUtils.write(o.asInstanceOf[org.locationtech.jts.geom.Geometry])
40 70204 1507 - 1531 TypeApply scala.Any.asInstanceOf o.asInstanceOf[org.locationtech.jts.geom.Geometry]
40 70206 1485 - 1533 Apply net.razorvine.pickle.Pickler.save p.save(org.locationtech.geomesa.utils.text.WKBUtils.write(o.asInstanceOf[org.locationtech.jts.geom.Geometry]))
41 70207 1540 - 1564 Apply java.io.OutputStream.write os.write(133)
42 70208 1571 - 1595 Apply java.io.OutputStream.write os.write(82)
45 70211 1657 - 1664 Select org.apache.spark.geomesa.api.python.AutoBatchedPickler.pickler AutoBatchedPickler.this.pickler
45 70210 1638 - 1655 Literal <nosymbol> classOf[org.locationtech.jts.geom.Geometry]
45 70212 1608 - 1665 Apply net.razorvine.pickle.Pickler.registerCustomPickler net.razorvine.pickle.Pickler.registerCustomPickler(classOf[org.locationtech.jts.geom.Geometry], AutoBatchedPickler.this.pickler)
48 70213 1776 - 1801 Apply org.apache.spark.geomesa.api.python.AutoBatchedPickler.<init> new AutoBatchedPickler(i)