1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.hbase.server.common
10 
11 import com.google.protobuf.{ByteString, RpcCallback, RpcController}
12 import com.typesafe.scalalogging.StrictLogging
13 import org.apache.hadoop.hbase.client.Scan
14 import org.apache.hadoop.hbase.protobuf.ProtobufUtil
15 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos
16 import org.apache.hadoop.hbase.regionserver.RegionScanner
17 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter
18 import org.locationtech.geomesa.hbase.proto.GeoMesaProto
19 import org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse
20 import org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor
21 import org.locationtech.geomesa.hbase.server.common.CoprocessorScan.Aggregator
22 import org.locationtech.geomesa.index.iterators.AggregatingScan
23 import org.locationtech.geomesa.index.iterators.AggregatingScan.AggregateCallback
24 import org.locationtech.geomesa.utils.index.ByteArrays
25 import org.locationtech.geomesa.utils.io.WithClose
26 
27 import java.io.{IOException, InterruptedIOException}
28 import java.util.Base64
29 import scala.util.control.NonFatal
30 
31 /**
32  * Common trait for server side coprocessor class
33  */
34 trait CoprocessorScan extends StrictLogging {
35 
36   import scala.collection.JavaConverters._
37 
38   /**
39    * Gets a scanner for the current region
40    *
41    * @param scan scan
42    * @return
43    */
44   protected def getScanner(scan: Scan): RegionScanner
45 
46   /**
47    * Execute an RPC request against this coprocessor
48    *
49    * @param controller controller
50    * @param request request
51    * @param done callback
52    */
53   protected def execute(
54       controller: RpcController,
55       request: GeoMesaProto.GeoMesaCoprocessorRequest,
56       done: RpcCallback[GeoMesaProto.GeoMesaCoprocessorResponse]): Unit = {
57 
58     val results = GeoMesaCoprocessorResponse.newBuilder()
59 
60     if (request.getVersion != CoprocessorScan.AllowableRequestVersion) {
61       // We cannot handle this request.
62       // Immediately return an empty response indicating the highest response version
63       logger.error(
64         s"Got a coprocessor request with version ${request.getVersion}, " +
65             s"but can only handle ${CoprocessorScan.AllowableRequestVersion}.")
66       results.setVersion(CoprocessorScan.AllowableRequestVersion)
67       done.run(results.build)
68     } else {
69       try {
70         val options = GeoMesaCoprocessor.deserializeOptions(request.getOptions.toByteArray)
71         val timeout = options.get(GeoMesaCoprocessor.TimeoutOpt).map(_.toLong)
72         val yieldPartialResults = options.get(GeoMesaCoprocessor.YieldOpt).exists(_.toBoolean)
73         if (!controller.isCanceled && timeout.forall(_ > System.currentTimeMillis())) {
74           val clas = options(GeoMesaCoprocessor.AggregatorClass)
75           val aggregator = Class.forName(clas).newInstance().asInstanceOf[Aggregator]
76           logger.debug(s"Initializing aggregator $aggregator.")
77           aggregator.init(options)
78 
79           val scan = {
80             val bytes = Base64.getDecoder.decode(options(GeoMesaCoprocessor.ScanOpt))
81             ProtobufUtil.toScan(ClientProtos.Scan.parseFrom(bytes))
82           }
83 
84           WithClose(getScanner(scan)) { scanner =>
85             aggregator.setScanner(scanner)
86             aggregator.aggregate(
87               new CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout))
88           }
89         }
90       } catch {
91         case _: InterruptedException | _: InterruptedIOException => // stop processing, but don't return an error to prevent retries
92         case e: IOException => ResponseConverter.setControllerException(controller, e)
93         case NonFatal(e) => ResponseConverter.setControllerException(controller, new IOException(e))
94       }
95 
96       logger.debug(
97         s"Results total size: ${results.getPayloadList.asScala.map(_.size()).sum}" +
98           s"\n\tBatch sizes: ${results.getPayloadList.asScala.map(_.size()).mkString(", ")}")
99 
100       done.run(results.build)
101     }
102   }
103 
104   /**
105    * Coprocessor aggregate callback
106    *
107    * @param controller controller
108    * @param aggregator aggregator instance
109    * @param results result builder
110    * @param timeout timeout
111    */
112   private class CoprocessorAggregateCallback(
113       controller: RpcController,
114       aggregator: Aggregator,
115       results: GeoMesaCoprocessorResponse.Builder,
116       yieldPartialResults: Boolean,
117       timeout: Option[Long]
118     ) extends AggregateCallback {
119 
120     private val start = System.currentTimeMillis()
121 
122     logger.trace(
123       s"Running first batch on aggregator $aggregator" +
124         timeout.map(t => s" with remaining timeout ${t - System.currentTimeMillis()}ms").getOrElse(""))
125 
126     override def batch(bytes: Array[Byte]): Boolean = {
127       results.addPayload(ByteString.copyFrom(bytes))
128       continue()
129     }
130 
131     override def partial(bytes: => Array[Byte]): Boolean = {
132       if (continue()) { true } else {
133         // add the partial results and stop scanning
134         results.addPayload(ByteString.copyFrom(bytes))
135         false
136       }
137     }
138 
139     private def continue(): Boolean = {
140       if (controller.isCanceled) {
141         logger.warn(s"Stopping aggregator $aggregator due to controller being cancelled")
142         false
143       } else if (timeout.exists(_ < System.currentTimeMillis())) {
144         logger.warn(s"Stopping aggregator $aggregator due to timeout of ${timeout.get}ms")
145         false
146       } else if (yieldPartialResults) {
147         val lastScanned = aggregator.getLastScanned
148         logger.trace(
149           s"Stopping aggregator $aggregator at row ${ByteArrays.printable(lastScanned)} and " +
150               "returning intermediate results")
151         // This check covers the HBase Version Aggregator case
152         if (lastScanned != null && !lastScanned.isEmpty) {
153           results.setLastScanned(ByteString.copyFrom(lastScanned))
154         }
155         false
156       } else {
157         logger.trace(
158           s"Running next batch on aggregator $aggregator " +
159             s"with elapsed time ${System.currentTimeMillis() - start}ms" +
160             timeout.map(t => s" and remaining timeout ${t - System.currentTimeMillis()}ms").getOrElse(""))
161         true
162       }
163     }
164   }
165 }
166 
167 object CoprocessorScan {
168   type Aggregator = HBaseAggregator[_ <: AggregatingScan.Result]
169   val AllowableRequestVersion = 1
170 }
Line Stmt Id Pos Tree Symbol Tests Code
58 89847 2243 - 2282 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.newBuilder org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.newBuilder()
60 89849 2292 - 2353 Apply scala.Int.!= request.getVersion().!=(CoprocessorScan.AllowableRequestVersion)
60 89848 2314 - 2353 Select org.locationtech.geomesa.hbase.server.common.CoprocessorScan.AllowableRequestVersion CoprocessorScan.AllowableRequestVersion
60 89854 2355 - 2760 Block <nosymbol> { (if (CoprocessorScan.this.logger.underlying.isErrorEnabled()) CoprocessorScan.this.logger.underlying.error(scala.StringContext.apply("Got a coprocessor request with version ", ", ").s(request.getVersion()).+(scala.StringContext.apply("but can only handle ", ".").s(CoprocessorScan.AllowableRequestVersion))) else (): Unit); results.setVersion(CoprocessorScan.AllowableRequestVersion); done.run(results.build()) }
66 89851 2665 - 2724 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.setVersion results.setVersion(CoprocessorScan.AllowableRequestVersion)
66 89850 2684 - 2723 Select org.locationtech.geomesa.hbase.server.common.CoprocessorScan.AllowableRequestVersion CoprocessorScan.AllowableRequestVersion
67 89853 2731 - 2754 Apply com.google.protobuf.RpcCallback.run done.run(results.build())
67 89852 2740 - 2753 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.build results.build()
68 89895 2766 - 4417 Block <nosymbol> { try { val options: Map[String,String] = org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.deserializeOptions(request.getOptions().toByteArray()); val timeout: Option[Long] = options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.TimeoutOpt).map[Long](((x$1: String) => scala.Predef.augmentString(x$1).toLong)); val yieldPartialResults: Boolean = options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.YieldOpt).exists(((x$2: String) => scala.Predef.augmentString(x$2).toBoolean)); if (controller.isCanceled().unary_!.&&(timeout.forall(((x$3: Long) => x$3.>(java.lang.System.currentTimeMillis()))))) { val clas: String = options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass); val aggregator: org.locationtech.geomesa.hbase.server.common.HBaseAggregator[_ <: org.locationtech.geomesa.index.iterators.AggregatingScan.Result] = java.lang.Class.forName(clas).newInstance().asInstanceOf[org.locationtech.geomesa.hbase.server.common.CoprocessorScan.Aggregator]; (if (CoprocessorScan.this.logger.underlying.isDebugEnabled()) CoprocessorScan.this.logger.underlying.debug("Initializing aggregator {}.", (aggregator: AnyRef)) else (): Unit); aggregator.init(options); val scan: org.apache.hadoop.hbase.client.Scan = { val bytes: Array[Byte] = java.util.Base64.getDecoder().decode(options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt)); org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom(bytes)) }; org.locationtech.geomesa.utils.io.`package`.WithClose.apply[org.apache.hadoop.hbase.regionserver.RegionScanner, CoprocessorScan.this.CoprocessorAggregateCallback](CoprocessorScan.this.getScanner(scan))(((scanner: org.apache.hadoop.hbase.regionserver.RegionScanner) => { aggregator.setScanner(scanner); aggregator.aggregate[CoprocessorScan.this.CoprocessorAggregateCallback](new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout)) }))(io.this.IsCloseable.closeableIsCloseable) } else () } catch { case ((_: InterruptedException)| (_: java.io.InterruptedIOException)) => () case (e @ (_: java.io.IOException)) => org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, e) case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, new java.io.IOException(e)) }; (if (CoprocessorScan.this.logger.underlying.isDebugEnabled()) CoprocessorScan.this.logger.underlying.debug(scala.StringContext.apply("Results total size: ", "").s(scala.collection.JavaConverters.asScalaBufferConverter[com.google.protobuf.ByteString](results.getPayloadList()).asScala.map[Int, scala.collection.mutable.Buffer[Int]](((x$4: com.google.protobuf.ByteString) => x$4.size()))(mutable.this.Buffer.canBuildFrom[Int]).sum[Int](math.this.Numeric.IntIsIntegral)).+(scala.StringContext.apply("\\n\\tBatch sizes: ", "").s(scala.collection.JavaConverters.asScalaBufferConverter[com.google.protobuf.ByteString](results.getPayloadList()).asScala.map[Int, scala.collection.mutable.Buffer[Int]](((x$5: com.google.protobuf.ByteString) => x$5.size()))(mutable.this.Buffer.canBuildFrom[Int]).mkString(", ")))) else (): Unit); done.run(results.build()) }
69 89885 2788 - 3835 Block <nosymbol> { val options: Map[String,String] = org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.deserializeOptions(request.getOptions().toByteArray()); val timeout: Option[Long] = options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.TimeoutOpt).map[Long](((x$1: String) => scala.Predef.augmentString(x$1).toLong)); val yieldPartialResults: Boolean = options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.YieldOpt).exists(((x$2: String) => scala.Predef.augmentString(x$2).toBoolean)); if (controller.isCanceled().unary_!.&&(timeout.forall(((x$3: Long) => x$3.>(java.lang.System.currentTimeMillis()))))) { val clas: String = options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass); val aggregator: org.locationtech.geomesa.hbase.server.common.HBaseAggregator[_ <: org.locationtech.geomesa.index.iterators.AggregatingScan.Result] = java.lang.Class.forName(clas).newInstance().asInstanceOf[org.locationtech.geomesa.hbase.server.common.CoprocessorScan.Aggregator]; (if (CoprocessorScan.this.logger.underlying.isDebugEnabled()) CoprocessorScan.this.logger.underlying.debug("Initializing aggregator {}.", (aggregator: AnyRef)) else (): Unit); aggregator.init(options); val scan: org.apache.hadoop.hbase.client.Scan = { val bytes: Array[Byte] = java.util.Base64.getDecoder().decode(options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt)); org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom(bytes)) }; org.locationtech.geomesa.utils.io.`package`.WithClose.apply[org.apache.hadoop.hbase.regionserver.RegionScanner, CoprocessorScan.this.CoprocessorAggregateCallback](CoprocessorScan.this.getScanner(scan))(((scanner: org.apache.hadoop.hbase.regionserver.RegionScanner) => { aggregator.setScanner(scanner); aggregator.aggregate[CoprocessorScan.this.CoprocessorAggregateCallback](new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout)) }))(io.this.IsCloseable.closeableIsCloseable) } else () }
70 89855 2840 - 2870 Apply com.google.protobuf.ByteString.toByteArray request.getOptions().toByteArray()
70 89856 2802 - 2871 Apply org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.deserializeOptions org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.deserializeOptions(request.getOptions().toByteArray())
71 89857 2906 - 2935 Select org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.TimeoutOpt org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.TimeoutOpt
71 89859 2894 - 2950 Apply scala.Option.map options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.TimeoutOpt).map[Long](((x$1: String) => scala.Predef.augmentString(x$1).toLong))
71 89858 2941 - 2949 Select scala.collection.immutable.StringLike.toLong scala.Predef.augmentString(x$1).toLong
72 89861 3033 - 3044 Select scala.collection.immutable.StringLike.toBoolean scala.Predef.augmentString(x$2).toBoolean
72 89860 2997 - 3024 Select org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.YieldOpt org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.YieldOpt
72 89862 2985 - 3045 Apply scala.Option.exists options.get(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.YieldOpt).exists(((x$2: String) => scala.Predef.augmentString(x$2).toBoolean))
73 89863 3103 - 3129 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
73 89865 3084 - 3130 Apply scala.Option.forall timeout.forall(((x$3: Long) => x$3.>(java.lang.System.currentTimeMillis())))
73 89864 3099 - 3129 Apply scala.Long.> x$3.>(java.lang.System.currentTimeMillis())
73 89866 3058 - 3130 Apply scala.Boolean.&& controller.isCanceled().unary_!.&&(timeout.forall(((x$3: Long) => x$3.>(java.lang.System.currentTimeMillis()))))
73 89883 3054 - 3054 Literal <nosymbol> ()
73 89882 3132 - 3835 Block <nosymbol> { val clas: String = options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass); val aggregator: org.locationtech.geomesa.hbase.server.common.HBaseAggregator[_ <: org.locationtech.geomesa.index.iterators.AggregatingScan.Result] = java.lang.Class.forName(clas).newInstance().asInstanceOf[org.locationtech.geomesa.hbase.server.common.CoprocessorScan.Aggregator]; (if (CoprocessorScan.this.logger.underlying.isDebugEnabled()) CoprocessorScan.this.logger.underlying.debug("Initializing aggregator {}.", (aggregator: AnyRef)) else (): Unit); aggregator.init(options); val scan: org.apache.hadoop.hbase.client.Scan = { val bytes: Array[Byte] = java.util.Base64.getDecoder().decode(options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt)); org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom(bytes)) }; org.locationtech.geomesa.utils.io.`package`.WithClose.apply[org.apache.hadoop.hbase.regionserver.RegionScanner, CoprocessorScan.this.CoprocessorAggregateCallback](CoprocessorScan.this.getScanner(scan))(((scanner: org.apache.hadoop.hbase.regionserver.RegionScanner) => { aggregator.setScanner(scanner); aggregator.aggregate[CoprocessorScan.this.CoprocessorAggregateCallback](new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout)) }))(io.this.IsCloseable.closeableIsCloseable) }
73 89884 3054 - 3054 Block <nosymbol> ()
74 89867 3163 - 3197 Select org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass
74 89868 3155 - 3198 Apply scala.collection.MapLike.apply options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.AggregatorClass)
75 89869 3226 - 3284 TypeApply scala.Any.asInstanceOf java.lang.Class.forName(clas).newInstance().asInstanceOf[org.locationtech.geomesa.hbase.server.common.CoprocessorScan.Aggregator]
77 89870 3359 - 3383 Apply org.locationtech.geomesa.index.iterators.AggregatingScan.init aggregator.init(options)
80 89871 3465 - 3491 Select org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt
80 89873 3432 - 3493 Apply java.util.Base64.Decoder.decode java.util.Base64.getDecoder().decode(options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt))
80 89872 3457 - 3492 Apply scala.collection.MapLike.apply options.apply(org.locationtech.geomesa.hbase.rpc.coprocessor.GeoMesaCoprocessor.ScanOpt)
81 89875 3506 - 3561 Apply org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom(bytes))
81 89874 3526 - 3560 Apply org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.parseFrom(bytes)
84 89876 3595 - 3611 Apply org.locationtech.geomesa.hbase.server.common.CoprocessorScan.getScanner CoprocessorScan.this.getScanner(scan)
84 89881 3585 - 3825 ApplyToImplicitArgs org.locationtech.geomesa.utils.io.WithClose.apply org.locationtech.geomesa.utils.io.`package`.WithClose.apply[org.apache.hadoop.hbase.regionserver.RegionScanner, CoprocessorScan.this.CoprocessorAggregateCallback](CoprocessorScan.this.getScanner(scan))(((scanner: org.apache.hadoop.hbase.regionserver.RegionScanner) => { aggregator.setScanner(scanner); aggregator.aggregate[CoprocessorScan.this.CoprocessorAggregateCallback](new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout)) }))(io.this.IsCloseable.closeableIsCloseable)
84 89880 3613 - 3613 Select org.locationtech.geomesa.utils.io.IsCloseableImplicits.closeableIsCloseable io.this.IsCloseable.closeableIsCloseable
85 89877 3638 - 3668 Apply org.locationtech.geomesa.hbase.server.common.HBaseAggregator.setScanner aggregator.setScanner(scanner)
86 89879 3681 - 3813 Apply org.locationtech.geomesa.index.iterators.AggregatingScan.aggregate aggregator.aggregate[CoprocessorScan.this.CoprocessorAggregateCallback](new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout))
87 89878 3717 - 3812 Apply org.locationtech.geomesa.hbase.server.common.CoprocessorScan.CoprocessorAggregateCallback.<init> new CoprocessorScan.this.CoprocessorAggregateCallback(controller, aggregator, results, yieldPartialResults, timeout)
91 89887 3917 - 3919 Block <nosymbol> ()
91 89886 3917 - 3919 Literal <nosymbol> ()
92 89889 4016 - 4071 Block org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, e)
92 89888 4016 - 4071 Apply org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, e)
93 89891 4100 - 4172 Apply org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, new java.io.IOException(e))
93 89890 4153 - 4171 Apply java.io.IOException.<init> new java.io.IOException(e)
93 89892 4100 - 4172 Block org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter.setControllerException(controller, new java.io.IOException(e))
100 89893 4397 - 4410 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.build results.build()
100 89894 4388 - 4411 Apply com.google.protobuf.RpcCallback.run done.run(results.build())
120 89896 4899 - 4925 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
127 89897 5188 - 5214 Apply com.google.protobuf.ByteString.copyFrom com.google.protobuf.ByteString.copyFrom(bytes)
127 89898 5169 - 5215 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.addPayload CoprocessorAggregateCallback.this.results.addPayload(com.google.protobuf.ByteString.copyFrom(bytes))
128 89899 5222 - 5232 Apply org.locationtech.geomesa.hbase.server.common.CoprocessorScan.CoprocessorAggregateCallback.continue CoprocessorAggregateCallback.this.continue()
132 89901 5325 - 5329 Literal <nosymbol> true
132 89900 5311 - 5321 Apply org.locationtech.geomesa.hbase.server.common.CoprocessorScan.CoprocessorAggregateCallback.continue CoprocessorAggregateCallback.this.continue()
132 89902 5325 - 5329 Block <nosymbol> true
132 89906 5337 - 5468 Block <nosymbol> { CoprocessorAggregateCallback.this.results.addPayload(com.google.protobuf.ByteString.copyFrom(bytes)); false }
134 89903 5419 - 5445 Apply com.google.protobuf.ByteString.copyFrom com.google.protobuf.ByteString.copyFrom(bytes)
134 89904 5400 - 5446 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.addPayload CoprocessorAggregateCallback.this.results.addPayload(com.google.protobuf.ByteString.copyFrom(bytes))
135 89905 5455 - 5460 Literal <nosymbol> false
140 89907 5526 - 5547 Apply com.google.protobuf.RpcController.isCanceled CoprocessorAggregateCallback.this.controller.isCanceled()
140 89909 5549 - 5662 Block <nosymbol> { (if (CoprocessorScan.this.logger.underlying.isWarnEnabled()) CoprocessorScan.this.logger.underlying.warn("Stopping aggregator {} due to controller being cancelled", (CoprocessorAggregateCallback.this.aggregator: AnyRef)) else (): Unit); false }
142 89908 5649 - 5654 Literal <nosymbol> false
143 89911 5687 - 5717 Apply scala.Long.< x$6.<(java.lang.System.currentTimeMillis())
143 89910 5691 - 5717 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
143 89912 5672 - 5718 Apply scala.Option.exists CoprocessorAggregateCallback.this.timeout.exists(((x$6: Long) => x$6.<(java.lang.System.currentTimeMillis())))
143 89914 5720 - 5834 Block <nosymbol> { (if (CoprocessorScan.this.logger.underlying.isWarnEnabled()) CoprocessorScan.this.logger.underlying.warn("Stopping aggregator {} due to timeout of {}ms", (scala.Array.apply[AnyRef]((CoprocessorAggregateCallback.this.aggregator: AnyRef), CoprocessorAggregateCallback.this.timeout.get.asInstanceOf[AnyRef])((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); false }
143 89930 5668 - 6598 If <nosymbol> if (CoprocessorAggregateCallback.this.timeout.exists(((x$6: Long) => x$6.<(java.lang.System.currentTimeMillis())))) { (if (CoprocessorScan.this.logger.underlying.isWarnEnabled()) CoprocessorScan.this.logger.underlying.warn("Stopping aggregator {} due to timeout of {}ms", (scala.Array.apply[AnyRef]((CoprocessorAggregateCallback.this.aggregator: AnyRef), CoprocessorAggregateCallback.this.timeout.get.asInstanceOf[AnyRef])((ClassTag.AnyRef: scala.reflect.ClassTag[AnyRef])): _*)) else (): Unit); false } else if (CoprocessorAggregateCallback.this.yieldPartialResults) { val lastScanned: Array[Byte] = CoprocessorAggregateCallback.this.aggregator.getLastScanned; (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Stopping aggregator ", " at row ", " and ").s(CoprocessorAggregateCallback.this.aggregator, org.locationtech.geomesa.utils.index.ByteArrays.printable(lastScanned)).+("returning intermediate results")) else (): Unit); if (lastScanned.!=(null).&&(scala.Predef.byteArrayOps(lastScanned).isEmpty.unary_!)) CoprocessorAggregateCallback.this.results.setLastScanned(com.google.protobuf.ByteString.copyFrom(lastScanned)) else (); false } else { (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Running next batch on aggregator ", " ").s(CoprocessorAggregateCallback.this.aggregator).+(scala.StringContext.apply("with elapsed time ", "ms").s(java.lang.System.currentTimeMillis().-(CoprocessorAggregateCallback.this.start))).+(CoprocessorAggregateCallback.this.timeout.map[String](((t: Long) => scala.StringContext.apply(" and remaining timeout ", "ms").s(t.-(java.lang.System.currentTimeMillis())))).getOrElse[String](""))) else (): Unit); true }
145 89913 5821 - 5826 Literal <nosymbol> false
146 89915 5844 - 5863 Select org.locationtech.geomesa.hbase.server.common.CoprocessorScan.CoprocessorAggregateCallback.yieldPartialResults CoprocessorAggregateCallback.this.yieldPartialResults
146 89926 5865 - 6305 Block <nosymbol> { val lastScanned: Array[Byte] = CoprocessorAggregateCallback.this.aggregator.getLastScanned; (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Stopping aggregator ", " at row ", " and ").s(CoprocessorAggregateCallback.this.aggregator, org.locationtech.geomesa.utils.index.ByteArrays.printable(lastScanned)).+("returning intermediate results")) else (): Unit); if (lastScanned.!=(null).&&(scala.Predef.byteArrayOps(lastScanned).isEmpty.unary_!)) CoprocessorAggregateCallback.this.results.setLastScanned(com.google.protobuf.ByteString.copyFrom(lastScanned)) else (); false }
146 89929 5840 - 6598 If <nosymbol> if (CoprocessorAggregateCallback.this.yieldPartialResults) { val lastScanned: Array[Byte] = CoprocessorAggregateCallback.this.aggregator.getLastScanned; (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Stopping aggregator ", " at row ", " and ").s(CoprocessorAggregateCallback.this.aggregator, org.locationtech.geomesa.utils.index.ByteArrays.printable(lastScanned)).+("returning intermediate results")) else (): Unit); if (lastScanned.!=(null).&&(scala.Predef.byteArrayOps(lastScanned).isEmpty.unary_!)) CoprocessorAggregateCallback.this.results.setLastScanned(com.google.protobuf.ByteString.copyFrom(lastScanned)) else (); false } else { (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Running next batch on aggregator ", " ").s(CoprocessorAggregateCallback.this.aggregator).+(scala.StringContext.apply("with elapsed time ", "ms").s(java.lang.System.currentTimeMillis().-(CoprocessorAggregateCallback.this.start))).+(CoprocessorAggregateCallback.this.timeout.map[String](((t: Long) => scala.StringContext.apply(" and remaining timeout ", "ms").s(t.-(java.lang.System.currentTimeMillis())))).getOrElse[String](""))) else (): Unit); true }
147 89916 5893 - 5918 Select org.locationtech.geomesa.hbase.server.common.HBaseAggregator.getLastScanned CoprocessorAggregateCallback.this.aggregator.getLastScanned
152 89917 6175 - 6179 Literal <nosymbol> null
152 89919 6160 - 6203 Apply scala.Boolean.&& lastScanned.!=(null).&&(scala.Predef.byteArrayOps(lastScanned).isEmpty.unary_!)
152 89918 6183 - 6203 Select scala.Boolean.unary_! scala.Predef.byteArrayOps(lastScanned).isEmpty.unary_!
152 89923 6156 - 6156 Literal <nosymbol> ()
152 89924 6156 - 6156 Block <nosymbol> ()
153 89921 6217 - 6273 Apply org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.setLastScanned CoprocessorAggregateCallback.this.results.setLastScanned(com.google.protobuf.ByteString.copyFrom(lastScanned))
153 89920 6240 - 6272 Apply com.google.protobuf.ByteString.copyFrom com.google.protobuf.ByteString.copyFrom(lastScanned)
153 89922 6217 - 6273 Block org.locationtech.geomesa.hbase.proto.GeoMesaProto.GeoMesaCoprocessorResponse.Builder.setLastScanned CoprocessorAggregateCallback.this.results.setLastScanned(com.google.protobuf.ByteString.copyFrom(lastScanned))
155 89925 6292 - 6297 Literal <nosymbol> false
156 89928 6311 - 6598 Block <nosymbol> { (if (CoprocessorScan.this.logger.underlying.isTraceEnabled()) CoprocessorScan.this.logger.underlying.trace(scala.StringContext.apply("Running next batch on aggregator ", " ").s(CoprocessorAggregateCallback.this.aggregator).+(scala.StringContext.apply("with elapsed time ", "ms").s(java.lang.System.currentTimeMillis().-(CoprocessorAggregateCallback.this.start))).+(CoprocessorAggregateCallback.this.timeout.map[String](((t: Long) => scala.StringContext.apply(" and remaining timeout ", "ms").s(t.-(java.lang.System.currentTimeMillis())))).getOrElse[String](""))) else (): Unit); true }
161 89927 6586 - 6590 Literal <nosymbol> true
169 89931 6734 - 6735 Literal <nosymbol> 1