1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.apache.spark.geomesa
10 
11 import com.typesafe.scalalogging.LazyLogging
12 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, RpcTimeout}
13 import org.apache.spark.util.RpcUtils
14 import org.apache.spark.{SparkContext, SparkEnv}
15 import org.geotools.api.feature.simple.SimpleFeatureType
16 import org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator
17 import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes._
18 
19 import java.io.Serializable
20 import scala.reflect.ClassTag
21 import scala.util.{Failure, Success, Try}
22 
23 
24 object GeoMesaSparkKryoRegistratorEndpoint extends LazyLogging {
25 
26   val EnablePropertyKey = "spark.geomesa.kryo.rpc.enable"
27 
28   val EndpointName = "kryo-schema"
29 
30   private lazy val Timeout = RpcUtils.askRpcTimeout(SparkEnv.get.conf)
31   private lazy val EndpointRef = RpcUtils.makeDriverRef(EndpointName, SparkEnv.get.conf, SparkEnv.get.rpcEnv)
32 
33   lazy val Client: KryoClient = Option(SparkEnv.get)
34     .filterNot(_.executorId == SparkContext.DRIVER_IDENTIFIER)
35     .filter(endpointEnabled)
36     .map(_ => ExecutorKryoClient).getOrElse(NoOpKryoClient)
37 
38   private def endpointEnabled(sparkEnv: SparkEnv) =
39     !sparkEnv.conf.get(EnablePropertyKey, "true").equalsIgnoreCase("false")
40 
41   def init(): Unit = {
42     Option(SparkEnv.get).foreach {
43       sparkEnv =>
44         if (endpointEnabled(sparkEnv)) {
45           sparkEnv.executorId match {
46             case SparkContext.DRIVER_IDENTIFIER =>
47               val rpcEnv = sparkEnv.rpcEnv
48               Try(rpcEnv.setupEndpoint(EndpointName, new KryoEndpoint(rpcEnv))) match {
49                 case Success(ref) =>
50                   logger.info(s"$EndpointName rpc endpoint registered on driver ${ref.address}")
51                 case Failure(e: IllegalArgumentException) =>
52                   logger.debug(s"$EndpointName rpc endpoint registration failed, may have been already registered", e)
53                 case Failure(e: Exception) =>
54                   logger.warn(s"$EndpointName rpc endpoint registration failed", e)
55               }
56             case _ => GeoMesaSparkKryoRegistrator.putTypes(Client.getTypes())
57           }
58         } else {
59           logger.debug(s"$EndpointName rpc endpoint disabled")
60         }
61     }
62   }
63 
64   class KryoEndpoint(val rpcEnv: RpcEnv) extends RpcEndpoint {
65     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
66       case message: KryoMessage[_] =>
67         logger.info(s"$message received via rpc from ${context.senderAddress}")
68         context.reply(message.reply)
69     }
70   }
71 
72   trait KryoMessage[R] {
73     def reply: R
74     def ask(timeout: RpcTimeout = Timeout)(implicit c: ClassTag[R]): R = {
75       logger.info(s"$this sent via rpc to ${EndpointRef.address}")
76       val start = System.nanoTime()
77       val result = timeout.awaitResult[R](EndpointRef.ask[R](this, timeout))
78       val delta = (System.nanoTime() - start) / 1000000L
79       logger.info(s"$this response via rpc, $delta ms")
80       result
81     }
82   }
83 
84   class KryoGetTypeMessage(id: Int) extends KryoMessage[Option[(String, String)]] with Serializable {
85     def reply: Option[(String, String)] = Option(GeoMesaSparkKryoRegistrator.getType(id))
86     override def toString: String = s"getType(id=$id)"
87   }
88 
89   class KryoGetTypesMessage() extends KryoMessage[Seq[(String, String)]] with Serializable {
90     def reply: Seq[(String, String)] = GeoMesaSparkKryoRegistrator.getTypes.map(encodeSchema)
91     override def toString: String = s"getTypes()"
92   }
93 
94   class KryoPutTypeMessage(id: Int, name: String, spec: String) extends KryoMessage[Int] with Serializable {
95     def this(id: Int, sft: SimpleFeatureType) = this(id, sft.getTypeName, encodeType(sft))
96     def reply: Int = GeoMesaSparkKryoRegistrator.putType(createType(name, spec))
97     override def toString: String = s"putType(id=$id, name=$name, spec=...)"
98   }
99 
100   trait KryoClient {
101     def getTypes(): Seq[SimpleFeatureType]
102     def getType(id: Int): Option[SimpleFeatureType]
103     def putType(id: Int, schema: SimpleFeatureType): Int
104   }
105 
106   protected object ExecutorKryoClient extends KryoClient {
107     def getTypes(): Seq[SimpleFeatureType] = new KryoGetTypesMessage().ask().map(decodeSchema)
108     def getType(id: Int): Option[SimpleFeatureType] = new KryoGetTypeMessage(id).ask()
109     def putType(id: Int, sft: SimpleFeatureType): Int = new KryoPutTypeMessage(id, sft).ask()
110   }
111 
112   protected object NoOpKryoClient extends KryoClient {
113     def getTypes(): Seq[SimpleFeatureType] = Seq.empty
114     def getType(id: Int) = None
115     def putType(id: Int, sft: SimpleFeatureType): Int = id
116   }
117 
118   implicit def encodeSchema(t: SimpleFeatureType): (String, String) = (t.getTypeName, encodeType(t))
119   implicit def decodeSchema(t: (String, String)): SimpleFeatureType = createType(t._1, t._2)
120   implicit def optionSchema(t: Option[(String, String)]): Option[SimpleFeatureType] = t.map(decodeSchema)
121 
122 }
Line Stmt Id Pos Tree Symbol Tests Code
26 70118 1095 - 1126 Literal <nosymbol> "spark.geomesa.kryo.rpc.enable"
28 70119 1149 - 1162 Literal <nosymbol> "kryo-schema"
39 70121 1646 - 1652 Literal <nosymbol> "true"
39 70120 1627 - 1644 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.EnablePropertyKey GeoMesaSparkKryoRegistratorEndpoint.this.EnablePropertyKey
39 70123 1608 - 1679 Select scala.Boolean.unary_! sparkEnv.conf.get(GeoMesaSparkKryoRegistratorEndpoint.this.EnablePropertyKey, "true").equalsIgnoreCase("false").unary_!
39 70122 1671 - 1678 Literal <nosymbol> "false"
42 70124 1715 - 1727 Select org.apache.spark.SparkEnv.get org.apache.spark.SparkEnv.get
42 70141 1708 - 2663 Apply scala.Option.foreach scala.Option.apply[org.apache.spark.SparkEnv](org.apache.spark.SparkEnv.get).foreach[Any](((sparkEnv: org.apache.spark.SparkEnv) => if (GeoMesaSparkKryoRegistratorEndpoint.this.endpointEnabled(sparkEnv)) sparkEnv.executorId match { case org.apache.spark.SparkContext.DRIVER_IDENTIFIER => { val rpcEnv: org.apache.spark.rpc.RpcEnv = sparkEnv.rpcEnv; scala.util.Try.apply[org.apache.spark.rpc.RpcEndpointRef](rpcEnv.setupEndpoint(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName, new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv))) match { case (value: org.apache.spark.rpc.RpcEndpointRef)scala.util.Success[org.apache.spark.rpc.RpcEndpointRef]((ref @ _)) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isInfoEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.info("{} rpc endpoint registered on driver {}", (scala.Array.apply[AnyRef]((GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef), (ref.address: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: IllegalArgumentException))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug(scala.StringContext.apply("", " rpc endpoint registration failed, may have been already registered").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: Exception))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isWarnEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.warn(scala.StringContext.apply("", " rpc endpoint registration failed").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) } } case _ => org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes(GeoMesaSparkKryoRegistratorEndpoint.this.Client.getTypes()) } else (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug("{} rpc endpoint disabled", (GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef)) else (): Unit)))
44 70125 1769 - 1794 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.endpointEnabled GeoMesaSparkKryoRegistratorEndpoint.this.endpointEnabled(sparkEnv)
45 70126 1808 - 1827 Select org.apache.spark.SparkEnv.executorId sparkEnv.executorId
45 70139 1808 - 2567 Match <nosymbol> sparkEnv.executorId match { case org.apache.spark.SparkContext.DRIVER_IDENTIFIER => { val rpcEnv: org.apache.spark.rpc.RpcEnv = sparkEnv.rpcEnv; scala.util.Try.apply[org.apache.spark.rpc.RpcEndpointRef](rpcEnv.setupEndpoint(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName, new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv))) match { case (value: org.apache.spark.rpc.RpcEndpointRef)scala.util.Success[org.apache.spark.rpc.RpcEndpointRef]((ref @ _)) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isInfoEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.info("{} rpc endpoint registered on driver {}", (scala.Array.apply[AnyRef]((GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef), (ref.address: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: IllegalArgumentException))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug(scala.StringContext.apply("", " rpc endpoint registration failed, may have been already registered").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: Exception))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isWarnEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.warn(scala.StringContext.apply("", " rpc endpoint registration failed").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) } } case _ => org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes(GeoMesaSparkKryoRegistratorEndpoint.this.Client.getTypes()) }
46 70135 1884 - 2477 Block <nosymbol> { val rpcEnv: org.apache.spark.rpc.RpcEnv = sparkEnv.rpcEnv; scala.util.Try.apply[org.apache.spark.rpc.RpcEndpointRef](rpcEnv.setupEndpoint(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName, new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv))) match { case (value: org.apache.spark.rpc.RpcEndpointRef)scala.util.Success[org.apache.spark.rpc.RpcEndpointRef]((ref @ _)) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isInfoEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.info("{} rpc endpoint registered on driver {}", (scala.Array.apply[AnyRef]((GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef), (ref.address: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: IllegalArgumentException))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug(scala.StringContext.apply("", " rpc endpoint registration failed, may have been already registered").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) case (exception: Throwable)scala.util.Failure[org.apache.spark.rpc.RpcEndpointRef]((e @ (_: Exception))) => (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isWarnEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.warn(scala.StringContext.apply("", " rpc endpoint registration failed").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit) } }
47 70127 1914 - 1929 Select org.apache.spark.SparkEnv.rpcEnv sparkEnv.rpcEnv
48 70129 1983 - 2007 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoEndpoint.<init> new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv)
48 70128 1969 - 1981 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.EndpointName GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName
48 70131 1944 - 2009 Apply scala.util.Try.apply scala.util.Try.apply[org.apache.spark.rpc.RpcEndpointRef](rpcEnv.setupEndpoint(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName, new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv)))
48 70130 1948 - 2008 Apply org.apache.spark.rpc.RpcEnv.setupEndpoint rpcEnv.setupEndpoint(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName, new GeoMesaSparkKryoRegistratorEndpoint.this.KryoEndpoint(rpcEnv))
50 70132 2073 - 2151 Typed <nosymbol> (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isInfoEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.info("{} rpc endpoint registered on driver {}", (scala.Array.apply[AnyRef]((GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef), (ref.address: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit)
52 70133 2231 - 2331 Typed <nosymbol> (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug(scala.StringContext.apply("", " rpc endpoint registration failed, may have been already registered").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit)
54 70134 2396 - 2461 Typed <nosymbol> (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isWarnEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.warn(scala.StringContext.apply("", " rpc endpoint registration failed").s(GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName), e) else (): Unit)
56 70137 2500 - 2555 Apply org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes(GeoMesaSparkKryoRegistratorEndpoint.this.Client.getTypes())
56 70136 2537 - 2554 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoClient.getTypes GeoMesaSparkKryoRegistratorEndpoint.this.Client.getTypes()
56 70138 2500 - 2555 Block org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putTypes(GeoMesaSparkKryoRegistratorEndpoint.this.Client.getTypes())
59 70140 2595 - 2647 Typed <nosymbol> (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isDebugEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.debug("{} rpc endpoint disabled", (GeoMesaSparkKryoRegistratorEndpoint.this.EndpointName: AnyRef)) else (): Unit)
65 70145 2820 - 2820 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoEndpoint.$anonfun.<init> new $anonfun()
66 70144 2857 - 2976 Block <nosymbol> { (if (GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.isInfoEnabled()) GeoMesaSparkKryoRegistratorEndpoint.this.logger.underlying.info("{} received via rpc from {}", (scala.Array.apply[AnyRef]((message: AnyRef), (context.senderAddress: AnyRef))((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); context.reply(message.reply) }
68 70143 2948 - 2976 Apply org.apache.spark.rpc.RpcCallContext.reply context.reply(message.reply)
68 70142 2962 - 2975 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoMessage.reply message.reply
76 70146 3190 - 3207 Apply java.lang.System.nanoTime java.lang.System.nanoTime()
77 70147 3250 - 3283 ApplyToImplicitArgs org.apache.spark.rpc.RpcEndpointRef.ask GeoMesaSparkKryoRegistratorEndpoint.this.EndpointRef.ask[R](this, timeout)(c)
77 70148 3227 - 3284 Apply org.apache.spark.rpc.RpcTimeout.awaitResult timeout.awaitResult[R](GeoMesaSparkKryoRegistratorEndpoint.this.EndpointRef.ask[R](this, timeout)(c))
78 70149 3303 - 3341 Apply scala.Long./ java.lang.System.nanoTime().-(start)./(1000000L)
85 70151 3573 - 3612 Apply org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.getType org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.getType(KryoGetTypeMessage.this.id)
85 70150 3609 - 3611 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoGetTypeMessage.id KryoGetTypeMessage.this.id
85 70153 3566 - 3613 Apply scala.Option.apply scala.Option.apply[(String, String)](GeoMesaSparkKryoRegistratorEndpoint.this.encodeSchema(org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.getType(KryoGetTypeMessage.this.id)))
85 70152 3573 - 3612 ApplyImplicitView org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.encodeSchema GeoMesaSparkKryoRegistratorEndpoint.this.encodeSchema(org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.getType(KryoGetTypeMessage.this.id))
86 70155 3666 - 3668 Literal <nosymbol> ")"
86 70154 3652 - 3664 Literal <nosymbol> "getType(id="
86 70157 3650 - 3668 Apply scala.StringContext.s scala.StringContext.apply("getType(id=", ")").s(KryoGetTypeMessage.this.id)
86 70156 3664 - 3666 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoGetTypeMessage.id KryoGetTypeMessage.this.id
90 70159 3846 - 3846 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[(String, String)]
90 70158 3847 - 3859 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.encodeSchema GeoMesaSparkKryoRegistratorEndpoint.this.encodeSchema(t)
90 70160 3806 - 3860 ApplyToImplicitArgs scala.collection.TraversableLike.map org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.getTypes.map[(String, String), Seq[(String, String)]]({ ((t: org.geotools.api.feature.simple.SimpleFeatureType) => GeoMesaSparkKryoRegistratorEndpoint.this.encodeSchema(t)) })(collection.this.Seq.canBuildFrom[(String, String)])
91 70161 3897 - 3910 Apply scala.StringContext.s scala.StringContext.apply("getTypes()").s()
95 70163 4099 - 4114 Apply org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType(sft)
95 70162 4082 - 4097 Apply org.geotools.api.feature.simple.SimpleFeatureType.getTypeName sft.getTypeName()
95 70165 4073 - 4073 Literal <nosymbol> ()
95 70164 4073 - 4115 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.<init> KryoPutTypeMessage.this.<init>(id, sft.getTypeName(), org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType(sft))
96 70167 4190 - 4194 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.spec KryoPutTypeMessage.this.spec
96 70166 4184 - 4188 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.name KryoPutTypeMessage.this.name
96 70169 4137 - 4196 Apply org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putType org.locationtech.geomesa.spark.GeoMesaSparkKryoRegistrator.putType(org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.createType(KryoPutTypeMessage.this.name, KryoPutTypeMessage.this.spec))
96 70168 4173 - 4195 Apply org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.createType org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.createType(KryoPutTypeMessage.this.name, KryoPutTypeMessage.this.spec)
97 70171 4249 - 4257 Literal <nosymbol> ", name="
97 70170 4235 - 4247 Literal <nosymbol> "putType(id="
97 70173 4247 - 4249 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.id KryoPutTypeMessage.this.id
97 70172 4261 - 4273 Literal <nosymbol> ", spec=...)"
97 70175 4233 - 4273 Apply scala.StringContext.s scala.StringContext.apply("putType(id=", ", name=", ", spec=...)").s(KryoPutTypeMessage.this.id, KryoPutTypeMessage.this.name)
97 70174 4257 - 4261 Select org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.name KryoPutTypeMessage.this.name
107 70177 4561 - 4592 ApplyToImplicitArgs org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoMessage.ask qual$1.ask(x$1)((ClassTag.apply[Seq[(String, String)]](classOf[scala.collection.Seq]): scala.reflect.ClassTag[Seq[(String, String)]]))
107 70176 4561 - 4586 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoGetTypesMessage.<init> new GeoMesaSparkKryoRegistratorEndpoint.this.KryoGetTypesMessage()
107 70179 4596 - 4596 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.geotools.api.feature.simple.SimpleFeatureType]
107 70178 4597 - 4609 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.decodeSchema GeoMesaSparkKryoRegistratorEndpoint.this.decodeSchema(t)
107 70180 4561 - 4610 ApplyToImplicitArgs scala.collection.TraversableLike.map { <artifact> val qual$1: org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoGetTypesMessage = new GeoMesaSparkKryoRegistratorEndpoint.this.KryoGetTypesMessage(); <artifact> val x$1: org.apache.spark.rpc.RpcTimeout = qual$1.ask$default$1; qual$1.ask(x$1)((ClassTag.apply[Seq[(String, String)]](classOf[scala.collection.Seq]): scala.reflect.ClassTag[Seq[(String, String)]])) }.map[org.geotools.api.feature.simple.SimpleFeatureType, Seq[org.geotools.api.feature.simple.SimpleFeatureType]]({ ((t: (String, String)) => GeoMesaSparkKryoRegistratorEndpoint.this.decodeSchema(t)) })(collection.this.Seq.canBuildFrom[org.geotools.api.feature.simple.SimpleFeatureType])
108 70181 4665 - 4691 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoGetTypeMessage.<init> new GeoMesaSparkKryoRegistratorEndpoint.this.KryoGetTypeMessage(id)
108 70183 4665 - 4697 ApplyImplicitView org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.optionSchema GeoMesaSparkKryoRegistratorEndpoint.this.optionSchema(qual$1.ask(x$1)((ClassTag.apply[Option[(String, String)]](classOf[scala.Option]): scala.reflect.ClassTag[Option[(String, String)]])))
108 70182 4665 - 4697 ApplyToImplicitArgs org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoMessage.ask qual$1.ask(x$1)((ClassTag.apply[Option[(String, String)]](classOf[scala.Option]): scala.reflect.ClassTag[Option[(String, String)]]))
109 70185 4754 - 4791 ApplyToImplicitArgs org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoMessage.ask qual$1.ask(x$1)((ClassTag.Int: scala.reflect.ClassTag[Int]))
109 70184 4754 - 4785 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.KryoPutTypeMessage.<init> new GeoMesaSparkKryoRegistratorEndpoint.this.KryoPutTypeMessage(id, sft)
113 70186 4897 - 4906 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
114 70187 4934 - 4938 Select scala.None scala.None
118 70189 5089 - 5102 Apply org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType(t)
118 70188 5074 - 5087 Apply org.geotools.api.feature.simple.SimpleFeatureType.getTypeName t.getTypeName()
118 70190 5073 - 5103 Apply scala.Tuple2.apply scala.Tuple2.apply[String, String](t.getTypeName(), org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.encodeType(t))
119 70191 5185 - 5189 Select scala.Tuple2._1 t._1
119 70193 5174 - 5196 Apply org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.createType org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.createType(t._1, t._2)
119 70192 5191 - 5195 Select scala.Tuple2._2 t._2
120 70195 5283 - 5302 Apply scala.Option.map t.map[org.geotools.api.feature.simple.SimpleFeatureType]({ ((t: (String, String)) => GeoMesaSparkKryoRegistratorEndpoint.this.decodeSchema(t)) })
120 70194 5289 - 5301 Apply org.apache.spark.geomesa.GeoMesaSparkKryoRegistratorEndpoint.decodeSchema GeoMesaSparkKryoRegistratorEndpoint.this.decodeSchema(t)