1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.convert.avro.registry
10 
11 import com.typesafe.config.Config
12 import com.typesafe.scalalogging.LazyLogging
13 import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
14 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
15 import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
16 import org.geotools.api.feature.simple.SimpleFeatureType
17 import org.locationtech.geomesa.convert.EvaluationContext
18 import org.locationtech.geomesa.convert.avro.AvroConverter
19 import org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.{AvroSchemaRegistryConfig, GenericRecordSchemaRegistryBytesIterator, GenericRecordSchemaRegistryIterator}
20 import org.locationtech.geomesa.convert2.AbstractConverter.{BasicField, BasicOptions}
21 import org.locationtech.geomesa.convert2.transforms.Expression
22 import org.locationtech.geomesa.convert2.transforms.Expression.Column
23 import org.locationtech.geomesa.convert2.{AbstractConverter, ConverterConfig, ConverterName}
24 import org.locationtech.geomesa.utils.collection.CloseableIterator
25 import org.locationtech.geomesa.utils.io.CopyingInputStream
26 
27 import java.io.InputStream
28 import java.nio.ByteBuffer
29 
30 class AvroSchemaRegistryConverter(
31     sft: SimpleFeatureType,
32     config: AvroSchemaRegistryConfig,
33     fields: Seq[BasicField],
34     options: BasicOptions
35   ) extends AbstractConverter[GenericRecord, AvroSchemaRegistryConfig, BasicField, BasicOptions](
36     sft, config, fields, options) {
37 
38   private val schemaRegistryClient = new CachedSchemaRegistryClient(config.schemaRegistry, 100)
39 
40   // if required, set the raw bytes in the result array
41   private val requiresBytes = {
42     val expressions = config.idField.toSeq ++ fields.flatMap(_.transforms) ++ config.userData.values
43     Expression.flatten(expressions).contains(Column(0))
44   }
45 
46   override protected def parse(is: InputStream, ec: EvaluationContext): CloseableIterator[GenericRecord] = {
47     if (requiresBytes) {
48       new GenericRecordSchemaRegistryBytesIterator(new CopyingInputStream(is), ec, schemaRegistryClient)
49     } else {
50       new GenericRecordSchemaRegistryIterator(is, ec, schemaRegistryClient)
51     }
52   }
53 
54   override protected def values(
55       parsed: CloseableIterator[GenericRecord],
56       ec: EvaluationContext): CloseableIterator[Array[Any]] = {
57     val array = Array.ofDim[Any](2)
58     if (requiresBytes) {
59       parsed.map { record => array(0) = record.get(AvroConverter.BytesField); array(1) = record; array }
60     } else {
61       parsed.map { record => array(1) = record; array }
62     }
63   }
64 }
65 
66 object AvroSchemaRegistryConverter {
67 
68   private val MagicByteLength = 1
69   private val SchemaIdLength = 4
70 
71   case class AvroSchemaRegistryConfig(
72       `type`: String,
73       converterName: Option[String],
74       schemaRegistry: String,
75       idField: Option[Expression],
76       caches: Map[String, Config],
77       userData: Map[String, Expression]
78     ) extends ConverterConfig with ConverterName
79 
80   /**
81    * Reads avro records using a cached confluent-style schema registry
82    *
83    * @param is input stream
84    * @param ec evaluation context
85    * @param client schema registry lookup
86    */
87   private class GenericRecordSchemaRegistryIterator(is: InputStream, ec: EvaluationContext, client: SchemaRegistryClient)
88       extends CloseableIterator[GenericRecord] with LazyLogging{
89 
90     protected val decoder: BinaryDecoder = DecoderFactory.get.binaryDecoder(is, null)
91     private val readers = scala.collection.mutable.Map.empty[Int, GenericDatumReader[GenericRecord]]
92     private val schemaIdBytes = ByteBuffer.wrap(Array.ofDim[Byte](SchemaIdLength))
93     private var record: GenericRecord = _
94 
95     override def hasNext: Boolean = !decoder.isEnd
96 
97     override def next(): GenericRecord = {
98       ec.line += 1
99       // Read confluent-style bytes
100       decoder.skipFixed(MagicByteLength)
101       decoder.readFixed(schemaIdBytes.array(), 0, SchemaIdLength)
102 
103       val id = schemaIdBytes.position(0).getInt()
104       val reader = readers.getOrElseUpdate(id, loadReader(id))
105 
106       record = reader.read(record, decoder)
107       record
108     }
109 
110     override def close(): Unit = is.close()
111 
112     protected def loadReader(id: Int): GenericDatumReader[GenericRecord] =
113       new GenericDatumReader[GenericRecord](client.getById(id))
114   }
115 
116   /**
117    * Reads avro records using a cached confluent-style schema registry, adding the raw serialized bytes to the record
118    *
119    * @param is input stream
120    * @param ec evaluation context
121    */
122   private class GenericRecordSchemaRegistryBytesIterator(is: CopyingInputStream, ec: EvaluationContext, client: SchemaRegistryClient)
123       extends GenericRecordSchemaRegistryIterator(is, ec, client) with LazyLogging{
124 
125     override def next(): GenericRecord = {
126       val record = super.next()
127       // parse out the bytes read and set them in the record
128       // check to see if the decoder buffered some bytes that weren't actually used
129       val buffered = decoder.inputStream().available()
130       record.put(AvroConverter.BytesField, is.replay(is.copied - buffered))
131       record
132     }
133 
134     override protected def loadReader(id: Int): GenericDatumReader[GenericRecord] = {
135       val schema = client.getById(id)
136       new GenericDatumReader[GenericRecord](schema, AvroConverter.addBytes(schema))
137     }
138   }
139 }
Line Stmt Id Pos Tree Symbol Tests Code
38 1 2043 - 2064 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.AvroSchemaRegistryConfig.schemaRegistry AvroSchemaRegistryConverter.this.config.schemaRegistry
38 2 2066 - 2069 Literal <nosymbol> 100
38 3 2012 - 2070 Apply io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init> new io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(AvroSchemaRegistryConverter.this.config.schemaRegistry, 100)
42 4 2182 - 2196 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.AvroSchemaRegistryConfig.idField AvroSchemaRegistryConverter.this.config.idField
42 5 2221 - 2233 Select org.locationtech.geomesa.convert2.AbstractConverter.BasicField.transforms x$1.transforms
42 6 2221 - 2233 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)
42 7 2220 - 2220 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
42 8 2206 - 2234 ApplyToImplicitArgs scala.collection.TraversableLike.flatMap AvroSchemaRegistryConverter.this.fields.flatMap[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](((x$1: org.locationtech.geomesa.convert2.AbstractConverter.BasicField) => scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression])
42 9 2203 - 2203 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
42 10 2238 - 2260 Select scala.collection.MapLike.values AvroSchemaRegistryConverter.this.config.userData.values
42 11 2235 - 2235 TypeApply scala.collection.Seq.canBuildFrom collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]
42 12 2182 - 2260 ApplyToImplicitArgs scala.collection.TraversableLike.++ scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](AvroSchemaRegistryConverter.this.config.idField).toSeq.++[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](AvroSchemaRegistryConverter.this.fields.flatMap[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](((x$1: org.locationtech.geomesa.convert2.AbstractConverter.BasicField) => scala.this.Option.option2Iterable[org.locationtech.geomesa.convert2.transforms.Expression](x$1.transforms)))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]))(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression]).++[org.locationtech.geomesa.convert2.transforms.Expression, Seq[org.locationtech.geomesa.convert2.transforms.Expression]](AvroSchemaRegistryConverter.this.config.userData.values)(collection.this.Seq.canBuildFrom[org.locationtech.geomesa.convert2.transforms.Expression])
43 13 2306 - 2315 Apply org.locationtech.geomesa.convert2.transforms.Expression.Column.apply org.locationtech.geomesa.convert2.transforms.Expression.Column.apply(0)
43 14 2265 - 2316 Apply scala.collection.SeqLike.contains org.locationtech.geomesa.convert2.transforms.Expression.flatten(expressions).contains[org.locationtech.geomesa.convert2.transforms.Expression](org.locationtech.geomesa.convert2.transforms.Expression.Column.apply(0))
47 15 2439 - 2452 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.requiresBytes AvroSchemaRegistryConverter.this.requiresBytes
48 16 2507 - 2533 Apply org.locationtech.geomesa.utils.io.CopyingInputStream.<init> new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2)
48 17 2539 - 2559 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.schemaRegistryClient AvroSchemaRegistryConverter.this.schemaRegistryClient
48 18 2462 - 2560 Apply org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryBytesIterator.<init> new org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryBytesIterator(new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2), ec, AvroSchemaRegistryConverter.this.schemaRegistryClient)
48 19 2462 - 2560 Block org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryBytesIterator.<init> new org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryBytesIterator(new org.locationtech.geomesa.utils.io.CopyingInputStream(is, io.this.CopyingInputStream.<init>$default$2), ec, AvroSchemaRegistryConverter.this.schemaRegistryClient)
50 20 2628 - 2648 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.schemaRegistryClient AvroSchemaRegistryConverter.this.schemaRegistryClient
50 21 2580 - 2649 Apply org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.<init> new org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator(is, ec, AvroSchemaRegistryConverter.this.schemaRegistryClient)
50 22 2580 - 2649 Block org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.<init> new org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator(is, ec, AvroSchemaRegistryConverter.this.schemaRegistryClient)
57 23 2839 - 2840 Literal <nosymbol> 2
57 24 2822 - 2841 ApplyToImplicitArgs scala.Array.ofDim scala.Array.ofDim[Any](2)((ClassTag.Any: scala.reflect.ClassTag[Any]))
58 25 2850 - 2863 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.requiresBytes AvroSchemaRegistryConverter.this.requiresBytes
59 26 2902 - 2903 Literal <nosymbol> 0
59 27 2918 - 2942 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField org.locationtech.geomesa.convert.avro.AvroConverter.BytesField
59 28 2907 - 2943 Apply org.apache.avro.generic.GenericRecord.get record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)
59 29 2896 - 2943 Apply scala.Array.update array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField))
59 30 2945 - 2962 Apply scala.Array.update array.update(1, record)
59 31 2873 - 2971 Apply org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)); array.update(1, record); array }))
59 32 2873 - 2971 Block org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(0, record.get(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField)); array.update(1, record); array }))
61 33 3014 - 3031 Apply scala.Array.update array.update(1, record)
61 34 2991 - 3040 Apply org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(1, record); array }))
61 35 2991 - 3040 Block org.locationtech.geomesa.utils.collection.CloseableIterator.map parsed.map[Array[Any]](((record: org.apache.avro.generic.GenericRecord) => { array.update(1, record); array }))
68 36 3124 - 3125 Literal <nosymbol> 1
69 37 3157 - 3158 Literal <nosymbol> 4
90 38 3904 - 3906 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.is GenericRecordSchemaRegistryIterator.this.is
90 39 3908 - 3912 Literal <nosymbol> null
90 40 3871 - 3913 Apply org.apache.avro.io.DecoderFactory.binaryDecoder org.apache.avro.io.DecoderFactory.get().binaryDecoder(GenericRecordSchemaRegistryIterator.this.is, null)
91 41 3940 - 4014 TypeApply scala.collection.mutable.Map.empty scala.collection.mutable.Map.empty[Int, org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord]]
92 42 4081 - 4095 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.SchemaIdLength AvroSchemaRegistryConverter.this.SchemaIdLength
92 43 4063 - 4096 ApplyToImplicitArgs scala.Array.ofDim scala.Array.ofDim[Byte](AvroSchemaRegistryConverter.this.SchemaIdLength)((ClassTag.Byte: scala.reflect.ClassTag[Byte]))
92 44 4047 - 4097 Apply java.nio.ByteBuffer.wrap java.nio.ByteBuffer.wrap(scala.Array.ofDim[Byte](AvroSchemaRegistryConverter.this.SchemaIdLength)((ClassTag.Byte: scala.reflect.ClassTag[Byte])))
95 45 4177 - 4191 Select scala.Boolean.unary_! GenericRecordSchemaRegistryIterator.this.decoder.isEnd().unary_!
98 46 4242 - 4254 Apply scala.Long.+ GenericRecordSchemaRegistryIterator.this.ec.line.+(1)
98 47 4242 - 4254 Apply org.locationtech.geomesa.convert.EvaluationContext.line_= GenericRecordSchemaRegistryIterator.this.ec.line_=(GenericRecordSchemaRegistryIterator.this.ec.line.+(1))
100 48 4315 - 4330 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.MagicByteLength AvroSchemaRegistryConverter.this.MagicByteLength
100 49 4297 - 4331 Apply org.apache.avro.io.BinaryDecoder.skipFixed GenericRecordSchemaRegistryIterator.this.decoder.skipFixed(AvroSchemaRegistryConverter.this.MagicByteLength)
101 50 4356 - 4377 Apply java.nio.ByteBuffer.array GenericRecordSchemaRegistryIterator.this.schemaIdBytes.array()
101 51 4379 - 4380 Literal <nosymbol> 0
101 52 4382 - 4396 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.SchemaIdLength AvroSchemaRegistryConverter.this.SchemaIdLength
101 53 4338 - 4397 Apply org.apache.avro.io.BinaryDecoder.readFixed GenericRecordSchemaRegistryIterator.this.decoder.readFixed(GenericRecordSchemaRegistryIterator.this.schemaIdBytes.array(), 0, AvroSchemaRegistryConverter.this.SchemaIdLength)
103 54 4414 - 4448 Apply java.nio.ByteBuffer.getInt GenericRecordSchemaRegistryIterator.this.schemaIdBytes.position(0).getInt()
104 55 4496 - 4510 Apply org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.loadReader GenericRecordSchemaRegistryIterator.this.loadReader(id)
104 56 4468 - 4511 Apply scala.collection.mutable.MapLike.getOrElseUpdate GenericRecordSchemaRegistryIterator.this.readers.getOrElseUpdate(id, GenericRecordSchemaRegistryIterator.this.loadReader(id))
106 57 4540 - 4546 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.record GenericRecordSchemaRegistryIterator.this.record
106 58 4548 - 4555 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.decoder GenericRecordSchemaRegistryIterator.this.decoder
106 59 4528 - 4556 Apply org.apache.avro.generic.GenericDatumReader.read reader.read(GenericRecordSchemaRegistryIterator.this.record, GenericRecordSchemaRegistryIterator.this.decoder)
106 60 4519 - 4556 Apply org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.record_= GenericRecordSchemaRegistryIterator.this.record_=(reader.read(GenericRecordSchemaRegistryIterator.this.record, GenericRecordSchemaRegistryIterator.this.decoder))
107 61 4563 - 4569 Select org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.record GenericRecordSchemaRegistryIterator.this.record
110 62 4610 - 4620 Apply java.io.InputStream.close GenericRecordSchemaRegistryIterator.this.is.close()
113 63 4741 - 4759 Apply io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById GenericRecordSchemaRegistryIterator.this.client.getById(id)
113 64 4703 - 4760 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord](GenericRecordSchemaRegistryIterator.this.client.getById(id))
126 65 5244 - 5256 Apply org.locationtech.geomesa.convert.avro.registry.AvroSchemaRegistryConverter.GenericRecordSchemaRegistryIterator.next GenericRecordSchemaRegistryBytesIterator.super.next()
129 66 5423 - 5456 Apply java.io.InputStream.available GenericRecordSchemaRegistryBytesIterator.this.decoder.inputStream().available()
130 67 5474 - 5498 Select org.locationtech.geomesa.convert.avro.AvroConverter.BytesField org.locationtech.geomesa.convert.avro.AvroConverter.BytesField
130 68 5510 - 5530 Apply scala.Int.- GenericRecordSchemaRegistryBytesIterator.this.is.copied.-(buffered)
130 69 5500 - 5531 Apply org.locationtech.geomesa.utils.io.CopyingInputStream.replay GenericRecordSchemaRegistryBytesIterator.this.is.replay(GenericRecordSchemaRegistryBytesIterator.this.is.copied.-(buffered))
130 70 5463 - 5532 Apply org.apache.avro.generic.GenericRecord.put record.put(org.locationtech.geomesa.convert.avro.AvroConverter.BytesField, GenericRecordSchemaRegistryBytesIterator.this.is.replay(GenericRecordSchemaRegistryBytesIterator.this.is.copied.-(buffered)))
135 71 5658 - 5676 Apply io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById GenericRecordSchemaRegistryBytesIterator.this.client.getById(id)
136 72 5729 - 5759 Apply org.locationtech.geomesa.convert.avro.AvroConverter.addBytes org.locationtech.geomesa.convert.avro.AvroConverter.addBytes(schema)
136 73 5683 - 5760 Apply org.apache.avro.generic.GenericDatumReader.<init> new org.apache.avro.generic.GenericDatumReader[org.apache.avro.generic.GenericRecord](schema, org.locationtech.geomesa.convert.avro.AvroConverter.addBytes(schema))