1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.convert.avro
10 
11 import com.typesafe.config.Config
12 import org.apache.avro.Schema
13 import org.apache.avro.Schema.Parser
14 import org.apache.avro.file.DataFileStream
15 import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
16 import org.apache.avro.io.{DecoderFactory, EncoderFactory}
17 import org.geotools.api.feature.simple.SimpleFeatureType
18 import org.locationtech.geomesa.convert.EvaluationContext
19 import org.locationtech.geomesa.convert.avro.AvroConverter._
20 import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, BasicOptions}
21 import org.locationtech.geomesa.convert2.transforms.Expression
22 import org.locationtech.geomesa.convert2.transforms.Expression.Column
23 import org.locationtech.geomesa.convert2.{AbstractConverter, ConverterConfig}
24 import org.locationtech.geomesa.utils.collection.CloseableIterator
25 import org.locationtech.geomesa.utils.io.CopyingInputStream
26 
27 import java.io.{ByteArrayOutputStream, InputStream}
28 
29 class AvroConverter(sft: SimpleFeatureType, config: AvroConfig, fields: Seq[BasicField], options: BasicOptions)
30     extends AbstractConverter[GenericRecord, AvroConfig, BasicField, BasicOptions](sft, config, fields, options) {
31 
32   private val schema = config.schema match {
33     case SchemaEmbedded => None
34     case SchemaString(s) => Some(new Parser().parse(s))
35     case SchemaFile(s) =>
36       val loader = Option(Thread.currentThread.getContextClassLoader).getOrElse(getClass.getClassLoader)
37       val res = Option(loader.getResourceAsStream(s)).orElse(Option(getClass.getResourceAsStream(s))).getOrElse {
38         throw new IllegalArgumentException(s"Could not load schema resource at $s")
39       }
40       Some(new Parser().parse(res))
41   }
42 
43   // if required, set the raw bytes in the result array
44   private val requiresBytes = {
45     val expressions = config.idField.toSeq ++ fields.flatMap(_.transforms) ++ config.userData.values
46     Expression.flatten(expressions).contains(Column(0))
47   }
48 
49   override protected def parse(is: InputStream, ec: EvaluationContext): CloseableIterator[GenericRecord] = {
50     schema match {
51       case Some(s) if requiresBytes => new GenericRecordBytesIterator(new CopyingInputStream(is), s, ec)
52       case Some(s)                  => new GenericRecordIterator(is, s, ec)
53       case None    if requiresBytes => new FileStreamBytesIterator(is, ec)
54       case None                     => new FileStreamIterator(is, ec)
55     }
56   }
57 
58   override protected def values(parsed: CloseableIterator[GenericRecord],
59                                 ec: EvaluationContext): CloseableIterator[Array[Any]] = {
60     val array = Array.ofDim[Any](2)
61     if (requiresBytes) {
62       parsed.map { record => array(0) = record.get(BytesField); array(1) = record; array }
63     } else {
64       parsed.map { record => array(1) = record; array }
65     }
66   }
67 }
68 
69 object AvroConverter {
70 
71   import scala.collection.JavaConverters._
72 
73   val BytesField = "__bytes__"
74 
75   /**
76     * Add a `__bytes__` field to the schema, for storing the raw bytes
77     *
78     * @param schema schema
79     * @return
80     */
81   def addBytes(schema: Schema): Schema = {
82     schema.getType match {
83       case Schema.Type.RECORD =>
84         val fields = new java.util.ArrayList[Schema.Field](schema.getFields.size() + 1)
85         schema.getFields.asScala.foreach { field =>
86           fields.add(new Schema.Field(field.name, field.schema, field.doc, field.defaultVal()))
87         }
88         fields.add(new Schema.Field(BytesField, Schema.create(Schema.Type.BYTES), "raw bytes", ""))
89 
90         val updated = Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError)
91         updated.setFields(fields)
92         updated
93 
94       case Schema.Type.UNION =>
95         Schema.createUnion(schema.getTypes.asScala.map(s => addBytes(s)).toSeq: _*)
96 
97       case _ =>
98         throw new NotImplementedError(
99           s"Raw Avro bytes (i.e. $$0) is not implemented for schema type ${schema.getType}")
100     }
101   }
102 
103   case class AvroConfig(
104       `type`: String,
105       schema: SchemaConfig,
106       idField: Option[Expression],
107       caches: Map[String, Config],
108       userData: Map[String, Expression]
109     ) extends ConverterConfig
110 
111   sealed trait SchemaConfig
112 
113   case class SchemaString(schema: String) extends SchemaConfig
114   case class SchemaFile(file: String) extends SchemaConfig
115   case object SchemaEmbedded extends SchemaConfig {
116     val name: String = "embedded"
117   }
118 
119   /**
120     * Reads avro records using a pre-defined schema
121     *
122     * @param is input stream
123     * @param schema schema
124     * @param ec evaluation context
125     */
126   class GenericRecordIterator private [AvroConverter] (is: InputStream, schema: Schema, ec: EvaluationContext)
127       extends CloseableIterator[GenericRecord] {
128 
129     private val reader = new GenericDatumReader[GenericRecord](schema)
130     private val decoder = DecoderFactory.get.binaryDecoder(is, null)
131     private var record: GenericRecord = _
132 
133     override def hasNext: Boolean = !decoder.isEnd
134 
135     override def next(): GenericRecord = {
136       ec.line += 1
137       record = reader.read(record, decoder)
138       record
139     }
140 
141     override def close(): Unit = is.close()
142   }
143 
144   /**
145     * Reads avro records using a pre-defined schema, setting the bytes for each record in a
146     * special `__bytes__` field
147     *
148     * @param is input stream
149     * @param schema schema
150     * @param ec evaluation context
151     */
152   class GenericRecordBytesIterator private [AvroConverter] (is: CopyingInputStream, schema: Schema, ec: EvaluationContext)
153       extends CloseableIterator[GenericRecord] {
154 
155     private val reader = new GenericDatumReader[GenericRecord](schema, addBytes(schema))
156     private val decoder = DecoderFactory.get.binaryDecoder(is, null)
157     private var record: GenericRecord = _
158 
159     override def hasNext: Boolean = !decoder.isEnd
160 
161     override def next(): GenericRecord = {
162       ec.line += 1
163       record = reader.read(record, decoder)
164       // parse out the bytes read and set them in the record
165       // check to see if the decoder buffered some bytes that weren't actually used
166       val buffered = decoder.inputStream().available()
167       record.put(BytesField, is.replay(is.copied - buffered))
168       record
169     }
170 
171     override def close(): Unit = is.close()
172   }
173 
174   /**
175     * Reads avro records from an avro file, with the schema embedded
176     *
177     * @param is input
178     * @param ec evaluation context
179     */
180   class FileStreamIterator private [AvroConverter] (is: InputStream, ec: EvaluationContext)
181       extends CloseableIterator[GenericRecord] {
182 
183     private val stream = new DataFileStream(is, new GenericDatumReader[GenericRecord]())
184     private var record: GenericRecord = _
185 
186     override def hasNext: Boolean = stream.hasNext
187 
188     override def next(): GenericRecord = {
189       ec.line += 1
190       record = stream.next(record)
191       record
192     }
193 
194     override def close(): Unit = stream.close()
195   }
196 
197   /**
198     * Reads avro records from an avro file, with the schema embedded, setting the bytes for
199     * each record in a special `__bytes__` field
200     *
201     * @param is input
202     * @param ec evaluation context
203     */
204   class FileStreamBytesIterator private [AvroConverter] (is: InputStream, ec: EvaluationContext)
205       extends CloseableIterator[GenericRecord] {
206 
207     private val reader = new GenericDatumReader[GenericRecord]()
208     private val stream = new DataFileStream(is, reader)
209     private var record: GenericRecord = _
210 
211     reader.setExpected(addBytes(reader.getSchema))
212 
213     // we can't tell which bytes correspond to which feature (due to buffering). if we could access the
214     // underlying avro encoder we could figure it out, but it is not exposed through DataFileStream. instead,
215     // re-serialize each record to get the raw bytes
216     private val out = new ByteArrayOutputStream()
217     private val writer = new GenericDatumWriter[GenericRecord](stream.getSchema)
218     private val encoder = EncoderFactory.get.binaryEncoder(out, null)
219 
220     override def hasNext: Boolean = stream.hasNext
221 
222     override def next(): GenericRecord = {
223       ec.line += 1
224       record = stream.next(record)
225       // regenerate the bytes read and set them in the record
226       out.reset()
227       writer.write(record, encoder)
228       encoder.flush()
229       record.put(BytesField, out.toByteArray)
230       record
231     }
232 
233     override def close(): Unit = stream.close()
234   }
235 }
Line Stmt Id Pos Tree Symbol Tests Code
32 57459 1707 - 1720 Select org.locationtech.geomesa.convert.avro.AvroConverter.AvroConfig.schema AvroConverter.this.config.schema
33 57460 1756 - 1760 Select scala.None scala.None
33 57461 1756 - 1760 Block scala.None scala.None
34 57462 1794 - 1815 Apply org.apache.avro.Schema.Parser.parse new org.apache.avro.Schema.Parser().parse(s)
34 57463 1789 - 1816 Apply scala.Some.apply scala.Some.apply[org.apache.avro.Schema](new org.apache.avro.Schema.Parser().parse(s))
34 57464 1789 - 1816 Block scala.Some.apply scala.Some.apply[org.apache.avro.Schema](new org.apache.avro.Schema.Parser().parse(s))
35 57475 1840 - 2189 Block <nosymbol> { val loader: ClassLoader = scala.Option.apply[ClassLoader](java.lang.Thread.currentThread().getContextClassLoader()).getOrElse[ClassLoader](AvroConverter.this.getClass().getClassLoader()); val res: java.io.InputStream = scala.Option.apply[java.io.InputStream](loader.getResourceAsStream(s)).orElse[java.io.InputStream](scala.Option.apply[java.io.InputStream](AvroConverter.this.getClass().getResourceAsStream(s))).getOrElse[java.io.InputStream](throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Could not load schema resource at ", "").s(s))); scala.Some.apply[org.apache.avro.Schema](new org.apache.avro.Schema.Parser().parse(res)) }
36 57465 1869 - 1911 Apply java.lang.Thread.getContextClassLoader java.lang.Thread.currentThread().getContextClassLoader()
36 57466 1923 - 1946 Apply java.lang.Class.getClassLoader AvroConverter.this.getClass().getClassLoader()
36 57467 1862 - 1947 Apply scala.Option.getOrElse scala.Option.apply[ClassLoader](java.lang.Thread.currentThread().getContextClassLoader()).getOrElse[ClassLoader](AvroConverter.this.getClass().getClassLoader())
37 57468 1971 - 2000 Apply java.lang.ClassLoader.getResourceAsStream loader.getResourceAsStream(s)
37 57469 2016 - 2047 Apply java.lang.Class.getResourceAsStream AvroConverter.this.getClass().getResourceAsStream(s)
37 57470 2009 - 2048 Apply scala.Option.apply scala.Option.apply[java.io.InputStream](AvroConverter.this.getClass().getResourceAsStream(s))
37 57472 1964 - 2153 Apply scala.Option.getOrElse scala.Option.apply[java.io.InputStream](loader.getResourceAsStream(s)).orElse[java.io.InputStream](scala.Option.apply[java.io.InputStream](AvroConverter.this.getClass().getResourceAsStream(s))).getOrElse[java.io.InputStream](throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Could not load schema resource at ", "").s(s)))
38 57471 2070 - 2145 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException(scala.StringContext.apply("Could not load schema resource at ", "").s(s))
40 57473 2165 - 2188 Apply org.apache.avro.Schema.Parser.parse new org.apache.avro.Schema.Parser().parse(res)
40 57474 2160 - 2189 Apply scala.Some.apply scala.Some.apply[org.apache.avro.Schema](new org.apache.avro.Schema.Parser().parse(res))
45 57476 2305 - 2319 Select org.locationtech.geomesa.convert.avro.AvroConverter.AvroConfig.idField AvroConverter.this.config.idField
45 57477 2344 - 2356 Select org.locationtech.geomesa.convert2.AbstractConverter.BasicField.transforms x$1.transforms
45 57478 2344 - 2356 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)
45 57479 2343 - 2343 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
45 57480 2329 - 2357 ApplyToImplicitArgs scala.collection.TraversableLike.flatMap AvroConverter.this.fields.flatMap[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](((x$1: org.locationtech.geomesa.convert2.AbstractConverter.BasicField) => scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression])
45 57481 2326 - 2326 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
45 57482 2361 - 2383 Select scala.collection.MapLike.values AvroConverter.this.config.userData.values
45 57483 2358 - 2358 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
45 57484 2305 - 2383 ApplyToImplicitArgs scala.collection.TraversableLike.++ scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](AvroConverter.this.config.idField).toSeq.++[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](AvroConverter.this.fields.flatMap[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](((x$1: org.locationtech.geomesa.convert2.AbstractConverter.BasicField) => scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]).++[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](AvroConverter.this.config.userData.values)(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression])
46 57485 2429 - 2438 Apply org.locationtech.geomesa.convert2.transforms.Expression.Column.apply org.locationtech.geomesa.convert2.transforms.Expression.Column.apply(0)
46 57486 2388 - 2439 Apply scala.collection.SeqLike.contains org.locationtech.geomesa.convert2.transforms.Expression.flatten(expressions).contains[org.locationtech.geomesa.convert2.transforms.Expression](org.locationtech.geomesa.convert2.transforms.Expression.Column.apply(0))
50 57487 2558 - 2564 Select org.locationtech.geomesa.convert.avro.AvroConverter.schema AvroConverter.this.schema
51 57488 2595 - 2608 Select org.locationtech.geomesa.convert.avro.AvroConverter.requiresBytes AvroConverter.this.requiresBytes
51 57489 2643 - 2669 Apply org.locationtech.geomesa.utils.io.CopyingInputStream.<init> new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2)
51 57490 2612 - 2677 Apply org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator(new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2), s, ec)
51 57491 2612 - 2677 Block org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator(new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2), s, ec)
52 57492 2717 - 2753 Apply org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator(is, s, ec)
52 57493 2717 - 2753 Block org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator(is, s, ec)
53 57494 2776 - 2789 Select org.locationtech.geomesa.convert.avro.AvroConverter.requiresBytes AvroConverter.this.requiresBytes
53 57495 2793 - 2828 Apply org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator(is, ec)
53 57496 2793 - 2828 Block org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator(is, ec)
54 57497 2868 - 2898 Apply org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator(is, ec)
54 57498 2868 - 2898 Block org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.<init> new org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator(is, ec)
60 57499 3107 - 3108 Literal <nosymbol> 2
60 57500 3090 - 3109 ApplyToImplicitArgs scala.Array.ofDim scala.Array.ofDim[Any](2)((ClassTag.Any: scala.reflect.ClassTag[Any]))
61 57501 3118 - 3131 Select org.locationtech.geomesa.convert.avro.AvroConverter.requiresBytes AvroConverter.this.requiresBytes
62 57502 3170 - 3171 Literal <nosymbol> 0
62 57503 3186 - 3196 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField org.locationtech.geomesa.convert.avro.AvroConverter.BytesField
62 57504 3175 - 3197 Apply org.apache.avro.generic.GenericRecord.get record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)
62 57505 3164 - 3197 Apply scala.Array.update array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField))
62 57506 3199 - 3216 Apply scala.Array.update array.update(1, record)
62 57507 3141 - 3225 Apply org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)); array.update(1, record); array }))
62 57508 3141 - 3225 Block org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)); array.update(1, record); array }))
64 57509 3268 - 3285 Apply scala.Array.update array.update(1, record)
64 57510 3245 - 3294 Apply org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(1, record); array }))
64 57511 3245 - 3294 Block org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(1, record); array }))
73 57512 3395 - 3406 Literal <nosymbol> "__bytes__"
82 57513 3586 - 3600 Apply org.apache.avro.Schema.getType schema.getType()
83 57536 3639 - 4148 Block <nosymbol> { val fields: java.util.ArrayList[org.apache.avro.Schema.Field] = new java.util.ArrayList[org.apache.avro.Schema.Field](schema.getFields().size().+(1)); scala.collection.JavaConverters.asScalaBufferConverter[org.apache.avro.Schema.Field](schema.getFields()).asScala.foreach[Boolean](((field: org.apache.avro.Schema.Field) => fields.add(new org.apache.avro.Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())))); fields.add(new org.apache.avro.Schema.Field(AvroConverter.this.BytesField, org.apache.avro.Schema.create(BYTES), "raw bytes", "")); val updated: org.apache.avro.Schema = org.apache.avro.Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError()); updated.setFields(fields); updated }
84 57514 3701 - 3728 Apply scala.Int.+ schema.getFields().size().+(1)
84 57515 3663 - 3729 Apply java.util.ArrayList.<init> new java.util.ArrayList[org.apache.avro.Schema.Field](schema.getFields().size().+(1))
85 57516 3738 - 3754 Apply org.apache.avro.Schema.getFields schema.getFields()
85 57523 3738 - 3887 Apply scala.collection.IterableLike.foreach scala.collection.JavaConverters.asScalaBufferConverter[org.apache.avro.Schema.Field](schema.getFields()).asScala.foreach[Boolean](((field: org.apache.avro.Schema.Field) => fields.add(new org.apache.avro.Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))))
86 57517 3820 - 3830 Apply org.apache.avro.Schema.Field.name field.name()
86 57518 3832 - 3844 Apply org.apache.avro.Schema.Field.schema field.schema()
86 57519 3846 - 3855 Apply org.apache.avro.Schema.Field.doc field.doc()
86 57520 3857 - 3875 Apply org.apache.avro.Schema.Field.defaultVal field.defaultVal()
86 57521 3803 - 3876 Apply org.apache.avro.Schema.Field.<init> new org.apache.avro.Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())
86 57522 3792 - 3877 Apply java.util.ArrayList.add fields.add(new org.apache.avro.Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
88 57524 3924 - 3934 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField AvroConverter.this.BytesField
88 57525 3936 - 3968 Apply org.apache.avro.Schema.create org.apache.avro.Schema.create(BYTES)
88 57526 3970 - 3981 Literal <nosymbol> "raw bytes"
88 57527 3983 - 3985 Literal <nosymbol> ""
88 57528 3907 - 3986 Apply org.apache.avro.Schema.Field.<init> new org.apache.avro.Schema.Field(AvroConverter.this.BytesField, org.apache.avro.Schema.create(BYTES), "raw bytes", "")
88 57529 3896 - 3987 Apply java.util.ArrayList.add fields.add(new org.apache.avro.Schema.Field(AvroConverter.this.BytesField, org.apache.avro.Schema.create(BYTES), "raw bytes", ""))
90 57530 4031 - 4045 Apply org.apache.avro.Schema.getName schema.getName()
90 57531 4047 - 4060 Apply org.apache.avro.Schema.getDoc schema.getDoc()
90 57532 4062 - 4081 Apply org.apache.avro.Schema.getNamespace schema.getNamespace()
90 57533 4083 - 4097 Apply org.apache.avro.Schema.isError schema.isError()
90 57534 4011 - 4098 Apply org.apache.avro.Schema.createRecord org.apache.avro.Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError())
91 57535 4107 - 4132 Apply org.apache.avro.Schema.setFields updated.setFields(fields)
95 57537 4209 - 4224 Apply org.apache.avro.Schema.getTypes schema.getTypes()
95 57538 4242 - 4253 Apply org.locationtech.geomesa.convert.avro.AvroConverter.addBytes AvroConverter.this.addBytes(s)
95 57539 4236 - 4236 TypeApply scala.collection.mutable.Buffer.canBuildFrom mutable.this.Buffer.canBuildFrom[org.apache.avro.Schema]
95 57540 4209 - 4260 Select scala.collection.SeqLike.toSeq scala.collection.JavaConverters.asScalaBufferConverter[org.apache.avro.Schema](schema.getTypes()).asScala.map[org.apache.avro.Schema, scala.collection.mutable.Buffer[org.apache.avro.Schema]](((s: org.apache.avro.Schema) => AvroConverter.this.addBytes(s)))(mutable.this.Buffer.canBuildFrom[org.apache.avro.Schema]).toSeq
95 57541 4190 - 4265 Apply org.apache.avro.Schema.createUnion org.apache.avro.Schema.createUnion((scala.collection.JavaConverters.asScalaBufferConverter[org.apache.avro.Schema](schema.getTypes()).asScala.map[org.apache.avro.Schema, scala.collection.mutable.Buffer[org.apache.avro.Schema]](((s: org.apache.avro.Schema) => AvroConverter.this.addBytes(s)))(mutable.this.Buffer.canBuildFrom[org.apache.avro.Schema]).toSeq: _*))
95 57542 4190 - 4265 Block org.apache.avro.Schema.createUnion org.apache.avro.Schema.createUnion((scala.collection.JavaConverters.asScalaBufferConverter[org.apache.avro.Schema](schema.getTypes()).asScala.map[org.apache.avro.Schema, scala.collection.mutable.Buffer[org.apache.avro.Schema]](((s: org.apache.avro.Schema) => AvroConverter.this.addBytes(s)))(mutable.this.Buffer.canBuildFrom[org.apache.avro.Schema]).toSeq: _*))
98 57543 4291 - 4414 Throw <nosymbol> throw new scala.NotImplementedError(scala.StringContext.apply("Raw Avro bytes (i.e. $0) is not implemented for schema type ", "").s(schema.getType()))
98 57544 4291 - 4414 Block <nosymbol> throw new scala.NotImplementedError(scala.StringContext.apply("Raw Avro bytes (i.e. $0) is not implemented for schema type ", "").s(schema.getType()))
116 57545 4868 - 4878 Literal <nosymbol> "embedded"
129 57546 5270 - 5276 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.schema GenericRecordIterator.this.schema
129 57547 5232 - 5277 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord](GenericRecordIterator.this.schema)
130 57548 5337 - 5339 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.is GenericRecordIterator.this.is
130 57549 5341 - 5345 Literal <nosymbol> null
130 57550 5304 - 5346 Apply org.apache.avro.io.DecoderFactory.binaryDecoder org.apache.avro.io.DecoderFactory.get().binaryDecoder(GenericRecordIterator.this.is, null)
133 57551 5426 - 5440 Select scala.Boolean.unary_! GenericRecordIterator.this.decoder.isEnd().unary_!
136 57552 5491 - 5503 Apply scala.Long.+ GenericRecordIterator.this.ec.line.+(1)
136 57553 5491 - 5503 Apply org.locationtech.geomesa.convert.EvaluationContext.line_= GenericRecordIterator.this.ec.line_=(GenericRecordIterator.this.ec.line.+(1))
137 57554 5531 - 5537 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.record GenericRecordIterator.this.record
137 57555 5539 - 5546 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.decoder GenericRecordIterator.this.decoder
137 57556 5519 - 5547 Apply org.apache.avro.generic.GenericDatumReader.read GenericRecordIterator.this.reader.read(GenericRecordIterator.this.record, GenericRecordIterator.this.decoder)
137 57557 5510 - 5547 Apply org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.record_= GenericRecordIterator.this.record_=(GenericRecordIterator.this.reader.read(GenericRecordIterator.this.record, GenericRecordIterator.this.decoder))
138 57558 5554 - 5560 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordIterator.record GenericRecordIterator.this.record
141 57559 5601 - 5611 Apply java.io.InputStream.close GenericRecordIterator.this.is.close()
155 57560 6087 - 6093 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.schema GenericRecordBytesIterator.this.schema
155 57561 6104 - 6110 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.schema GenericRecordBytesIterator.this.schema
155 57562 6095 - 6111 Apply org.locationtech.geomesa.convert.avro.AvroConverter.addBytes AvroConverter.this.addBytes(GenericRecordBytesIterator.this.schema)
155 57563 6049 - 6112 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord](GenericRecordBytesIterator.this.schema, AvroConverter.this.addBytes(GenericRecordBytesIterator.this.schema))
156 57564 6172 - 6174 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.is GenericRecordBytesIterator.this.is
156 57565 6176 - 6180 Literal <nosymbol> null
156 57566 6139 - 6181 Apply org.apache.avro.io.DecoderFactory.binaryDecoder org.apache.avro.io.DecoderFactory.get().binaryDecoder(GenericRecordBytesIterator.this.is, null)
159 57567 6261 - 6275 Select scala.Boolean.unary_! GenericRecordBytesIterator.this.decoder.isEnd().unary_!
162 57568 6326 - 6338 Apply scala.Long.+ GenericRecordBytesIterator.this.ec.line.+(1)
162 57569 6326 - 6338 Apply org.locationtech.geomesa.convert.EvaluationContext.line_= GenericRecordBytesIterator.this.ec.line_=(GenericRecordBytesIterator.this.ec.line.+(1))
163 57570 6366 - 6372 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.record GenericRecordBytesIterator.this.record
163 57571 6374 - 6381 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.decoder GenericRecordBytesIterator.this.decoder
163 57572 6354 - 6382 Apply org.apache.avro.generic.GenericDatumReader.read GenericRecordBytesIterator.this.reader.read(GenericRecordBytesIterator.this.record, GenericRecordBytesIterator.this.decoder)
163 57573 6345 - 6382 Apply org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.record_= GenericRecordBytesIterator.this.record_=(GenericRecordBytesIterator.this.reader.read(GenericRecordBytesIterator.this.record, GenericRecordBytesIterator.this.decoder))
166 57574 6549 - 6582 Apply java.io.InputStream.available GenericRecordBytesIterator.this.decoder.inputStream().available()
167 57575 6600 - 6610 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField AvroConverter.this.BytesField
167 57576 6622 - 6642 Apply scala.Int.- GenericRecordBytesIterator.this.is.copied.-(buffered)
167 57577 6612 - 6643 Apply org.locationtech.geomesa.utils.io.CopyingInputStream.replay GenericRecordBytesIterator.this.is.replay(GenericRecordBytesIterator.this.is.copied.-(buffered))
167 57578 6589 - 6644 Apply org.apache.avro.generic.GenericRecord.put GenericRecordBytesIterator.this.record.put(AvroConverter.this.BytesField, GenericRecordBytesIterator.this.is.replay(GenericRecordBytesIterator.this.is.copied.-(buffered)))
168 57579 6651 - 6657 Select org.locationtech.geomesa.convert.avro.AvroConverter.GenericRecordBytesIterator.record GenericRecordBytesIterator.this.record
171 57580 6698 - 6708 Apply org.locationtech.geomesa.utils.io.CopyingInputStream.close GenericRecordBytesIterator.this.is.close()
183 57581 7045 - 7047 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.is FileStreamIterator.this.is
183 57582 7049 - 7088 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord]()
183 57583 7026 - 7089 Apply org.apache.avro.file.DataFileStream.<init> new org.apache.avro.file.DataFileStream[org.apache.avro.generic.GenericRecord](FileStreamIterator.this.is, new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord]())
186 57584 7169 - 7183 Apply org.apache.avro.file.DataFileStream.hasNext FileStreamIterator.this.stream.hasNext()
189 57585 7234 - 7246 Apply scala.Long.+ FileStreamIterator.this.ec.line.+(1)
189 57586 7234 - 7246 Apply org.locationtech.geomesa.convert.EvaluationContext.line_= FileStreamIterator.this.ec.line_=(FileStreamIterator.this.ec.line.+(1))
190 57587 7274 - 7280 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.record FileStreamIterator.this.record
190 57588 7262 - 7281 Apply org.apache.avro.file.DataFileStream.next FileStreamIterator.this.stream.next(FileStreamIterator.this.record)
190 57589 7253 - 7281 Apply org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.record_= FileStreamIterator.this.record_=(FileStreamIterator.this.stream.next(FileStreamIterator.this.record))
191 57590 7288 - 7294 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamIterator.record FileStreamIterator.this.record
194 57591 7335 - 7349 Apply org.apache.avro.file.DataFileStream.close FileStreamIterator.this.stream.close()
207 57592 7744 - 7783 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord]()
208 57593 7828 - 7830 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.is FileStreamBytesIterator.this.is
208 57594 7832 - 7838 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.reader FileStreamBytesIterator.this.reader
208 57595 7809 - 7839 Apply org.apache.avro.file.DataFileStream.<init> new org.apache.avro.file.DataFileStream[org.apache.avro.generic.GenericRecord](FileStreamBytesIterator.this.is, FileStreamBytesIterator.this.reader)
211 57596 7915 - 7931 Apply org.apache.avro.generic.GenericDatumReader.getSchema FileStreamBytesIterator.this.reader.getSchema()
211 57597 7906 - 7932 Apply org.locationtech.geomesa.convert.avro.AvroConverter.addBytes AvroConverter.this.addBytes(FileStreamBytesIterator.this.reader.getSchema())
211 57598 7887 - 7933 Apply org.apache.avro.generic.GenericDatumReader.setExpected FileStreamBytesIterator.this.reader.setExpected(AvroConverter.this.addBytes(FileStreamBytesIterator.this.reader.getSchema()))
216 57599 8224 - 8251 Apply java.io.ByteArrayOutputStream.<init> new java.io.ByteArrayOutputStream()
217 57600 8315 - 8331 Apply org.apache.avro.file.DataFileStream.getSchema FileStreamBytesIterator.this.stream.getSchema()
217 57601 8277 - 8332 Apply org.apache.avro.generic.GenericDatumWriter.<init> new org.apache.avro.generic.GenericDatumWriter[org.apache.avro.generic.GenericRecord](FileStreamBytesIterator.this.stream.getSchema())
218 57602 8392 - 8395 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.out FileStreamBytesIterator.this.out
218 57603 8397 - 8401 Literal <nosymbol> null
218 57604 8359 - 8402 Apply org.apache.avro.io.EncoderFactory.binaryEncoder org.apache.avro.io.EncoderFactory.get().binaryEncoder(FileStreamBytesIterator.this.out, null)
220 57605 8440 - 8454 Apply org.apache.avro.file.DataFileStream.hasNext FileStreamBytesIterator.this.stream.hasNext()
223 57606 8505 - 8517 Apply scala.Long.+ FileStreamBytesIterator.this.ec.line.+(1)
223 57607 8505 - 8517 Apply org.locationtech.geomesa.convert.EvaluationContext.line_= FileStreamBytesIterator.this.ec.line_=(FileStreamBytesIterator.this.ec.line.+(1))
224 57608 8545 - 8551 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.record FileStreamBytesIterator.this.record
224 57609 8533 - 8552 Apply org.apache.avro.file.DataFileStream.next FileStreamBytesIterator.this.stream.next(FileStreamBytesIterator.this.record)
224 57610 8524 - 8552 Apply org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.record_= FileStreamBytesIterator.this.record_=(FileStreamBytesIterator.this.stream.next(FileStreamBytesIterator.this.record))
226 57611 8621 - 8632 Apply java.io.ByteArrayOutputStream.reset FileStreamBytesIterator.this.out.reset()
227 57612 8652 - 8658 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.record FileStreamBytesIterator.this.record
227 57613 8660 - 8667 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.encoder FileStreamBytesIterator.this.encoder
227 57614 8639 - 8668 Apply org.apache.avro.generic.GenericDatumWriter.write FileStreamBytesIterator.this.writer.write(FileStreamBytesIterator.this.record, FileStreamBytesIterator.this.encoder)
228 57615 8675 - 8690 Apply java.io.Flushable.flush FileStreamBytesIterator.this.encoder.flush()
229 57616 8708 - 8718 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField AvroConverter.this.BytesField
229 57617 8720 - 8735 Apply java.io.ByteArrayOutputStream.toByteArray FileStreamBytesIterator.this.out.toByteArray()
229 57618 8697 - 8736 Apply org.apache.avro.generic.GenericRecord.put FileStreamBytesIterator.this.record.put(AvroConverter.this.BytesField, FileStreamBytesIterator.this.out.toByteArray())
230 57619 8743 - 8749 Select org.locationtech.geomesa.convert.avro.AvroConverter.FileStreamBytesIterator.record FileStreamBytesIterator.this.record
233 57620 8790 - 8804 Apply org.apache.avro.file.DataFileStream.close FileStreamBytesIterator.this.stream.close()