1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.jobs.mapreduce
10 
11 import org.apache.hadoop.fs.{Path, Seekable}
12 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
13 import org.geotools.api.feature.simple.SimpleFeature
14 import org.locationtech.geomesa.features.ScalaSimpleFeature
15 import org.locationtech.geomesa.features.avro.io.AvroDataFileReader
16 import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
17 
18 import java.io.{Closeable, InputStream}
19 
20 /**
21  * Class for reading avro files written using
22  * <code>org.locationtech.geomesa.features.avro.AvroDataFileWriter</code>.
23  */
24 class AvroFileInputFormat extends FileStreamInputFormat {
25   override def createRecordReader(): FileStreamRecordReader = new AvroFileRecordReader
26 }
27 
28 object AvroFileInputFormat {
29   object Counters {
30     val Group = "org.locationtech.geomesa.jobs.input.avro"
31     val Read  = "read"
32   }
33 
34   def setTypeName(job: Job, typeName: String): Unit =
35     job.getConfiguration.set(FileStreamInputFormat.TypeNameKey, typeName)
36 }
37 
38 class AvroFileRecordReader extends FileStreamRecordReader {
39 
40   import AvroFileInputFormat.Counters
41 
42   override def createIterator(stream: InputStream with Seekable,
43                               filePath: Path,
44                               context: TaskAttemptContext): Iterator[SimpleFeature] with Closeable = {
45     val typeName = context.getConfiguration.get(FileStreamInputFormat.TypeNameKey)
46     val reader = new AvroDataFileReader(stream)
47     val dataSft = reader.getSft
48     val counter = context.getCounter(Counters.Group, Counters.Read)
49 
50     if (typeName == null || dataSft.getTypeName == typeName) {
51       new Iterator[SimpleFeature] with Closeable {
52         override def hasNext: Boolean = reader.hasNext
53         override def next(): SimpleFeature = {
54           counter.increment(1)
55           reader.next()
56         }
57         override def close(): Unit = reader.close()
58       }
59     } else {
60       val sft = SimpleFeatureTypes.renameSft(dataSft, typeName)
61       new Iterator[SimpleFeature] with Closeable {
62         override def hasNext: Boolean = reader.hasNext
63         override def next(): SimpleFeature = {
64           val sf = reader.next()
65           counter.increment(1)
66           new ScalaSimpleFeature(sft, sf.getID, sf.getAttributes.toArray)
67         }
68         override def close(): Unit = reader.close()
69       }
70     }
71   }
72 }
Line Stmt Id Pos Tree Symbol Tests Code
25 180 1158 - 1182 Apply org.locationtech.geomesa.jobs.mapreduce.AvroFileRecordReader.<init> new AvroFileRecordReader()
30 181 1251 - 1293 Literal <nosymbol> "org.locationtech.geomesa.jobs.input.avro"
31 182 1310 - 1316 Literal <nosymbol> "read"
35 183 1405 - 1438 Select org.locationtech.geomesa.jobs.mapreduce.FileStreamInputFormat.TypeNameKey FileStreamInputFormat.TypeNameKey
35 184 1380 - 1449 Apply org.apache.hadoop.conf.Configuration.set job.getConfiguration().set(FileStreamInputFormat.TypeNameKey, typeName)
45 185 1815 - 1848 Select org.locationtech.geomesa.jobs.mapreduce.FileStreamInputFormat.TypeNameKey FileStreamInputFormat.TypeNameKey
45 186 1786 - 1849 Apply org.apache.hadoop.conf.Configuration.get context.getConfiguration().get(FileStreamInputFormat.TypeNameKey)
46 187 1867 - 1897 Apply org.locationtech.geomesa.features.avro.io.AvroDataFileReader.<init> new org.locationtech.geomesa.features.avro.io.AvroDataFileReader(stream)
47 188 1916 - 1929 Select org.locationtech.geomesa.features.avro.io.AvroDataFileReader.getSft reader.getSft
48 189 1967 - 1981 Select org.locationtech.geomesa.jobs.mapreduce.AvroFileInputFormat.Counters.Group AvroFileInputFormat.Counters.Group
48 190 1983 - 1996 Select org.locationtech.geomesa.jobs.mapreduce.AvroFileInputFormat.Counters.Read AvroFileInputFormat.Counters.Read
48 191 1948 - 1997 Apply org.apache.hadoop.mapreduce.TaskAttemptContext.getCounter context.getCounter(AvroFileInputFormat.Counters.Group, AvroFileInputFormat.Counters.Read)
50 192 2019 - 2023 Literal <nosymbol> null
50 193 2027 - 2058 Apply java.lang.Object.== dataSft.getTypeName().==(typeName)
50 194 2007 - 2058 Apply scala.Boolean.|| typeName.==(null).||(dataSft.getTypeName().==(typeName))
51 199 2068 - 2071 Apply org.locationtech.geomesa.jobs.mapreduce.AvroFileRecordReader.$anon.<init> new $anon()
51 200 2068 - 2339 Block <nosymbol> { final class $anon extends AnyRef with Iterator[org.geotools.api.feature.simple.SimpleFeature] with java.io.Closeable { def <init>(): <$anon: Iterator[org.geotools.api.feature.simple.SimpleFeature] with java.io.Closeable> = { $anon.super.<init>(); () }; override def hasNext: Boolean = reader.hasNext; override def next(): org.geotools.api.feature.simple.SimpleFeature = { counter.increment(1L); reader.next() }; override def close(): Unit = reader.close() }; new $anon() }
52 195 2153 - 2167 Select org.locationtech.geomesa.features.avro.io.AvroDataFileReader.hasNext reader.hasNext
54 196 2225 - 2245 Apply org.apache.hadoop.mapreduce.Counter.increment counter.increment(1L)
55 197 2256 - 2269 Apply org.locationtech.geomesa.features.avro.io.AvroDataFileReader.next reader.next()
57 198 2317 - 2331 Apply org.locationtech.geomesa.features.avro.io.AvroDataFileReader.close reader.close()
59 210 2351 - 2783 Block <nosymbol> { val sft: org.geotools.api.feature.simple.SimpleFeatureType = org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.renameSft(dataSft, typeName); { final class $anon extends AnyRef with Iterator[org.geotools.api.feature.simple.SimpleFeature] with java.io.Closeable { def <init>(): <$anon: Iterator[org.geotools.api.feature.simple.SimpleFeature] with java.io.Closeable> = { $anon.super.<init>(); () }; override def hasNext: Boolean = reader.hasNext; override def next(): org.geotools.api.feature.simple.SimpleFeature = { val sf: org.geotools.api.feature.simple.SimpleFeature = reader.next(); counter.increment(1L); new org.locationtech.geomesa.features.ScalaSimpleFeature(sft, sf.getID(), sf.getAttributes().toArray(), features.this.ScalaSimpleFeature.<init>$default$4) }; override def close(): Unit = reader.close() }; new $anon() } }
60 201 2369 - 2416 Apply org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.renameSft org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.renameSft(dataSft, typeName)
61 209 2423 - 2426 Apply org.locationtech.geomesa.jobs.mapreduce.AvroFileRecordReader.$anon.<init> new $anon()
62 202 2508 - 2522 Select org.locationtech.geomesa.features.avro.io.AvroDataFileReader.hasNext reader.hasNext
64 203 2589 - 2602 Apply org.locationtech.geomesa.features.avro.io.AvroDataFileReader.next reader.next()
65 204 2613 - 2633 Apply org.apache.hadoop.mapreduce.Counter.increment counter.increment(1L)
66 205 2672 - 2680 Apply org.geotools.api.feature.simple.SimpleFeature.getID sf.getID()
66 206 2682 - 2706 Apply java.util.List.toArray sf.getAttributes().toArray()
66 207 2644 - 2707 Apply org.locationtech.geomesa.features.ScalaSimpleFeature.<init> new org.locationtech.geomesa.features.ScalaSimpleFeature(sft, sf.getID(), sf.getAttributes().toArray(), features.this.ScalaSimpleFeature.<init>$default$4)
68 208 2755 - 2769 Apply org.locationtech.geomesa.features.avro.io.AvroDataFileReader.close reader.close()