1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.fs.storage.orc.jobs
10 
11 import org.apache.hadoop.conf.Configuration
12 import org.apache.hadoop.fs.Path
13 import org.apache.hadoop.io.NullWritable
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
15 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
16 import org.apache.orc.mapred.OrcStruct
17 import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}
18 import org.apache.orc.{OrcConf, OrcFile, TypeDescription}
19 import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
20 import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
21 import org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter
22 import org.locationtech.geomesa.fs.storage.orc.utils.OrcOutputFormatWriter
23 
24 class OrcSimpleFeatureOutputFormat extends FileOutputFormat[Void, SimpleFeature] {
25 
26   private val delegate = new OrcOutputFormat[OrcStruct]
27 
28   def getRecordWriter(context: TaskAttemptContext, file: Path): RecordWriter[Void, SimpleFeature] = {
29     val options = org.apache.orc.mapred.OrcOutputFormat.buildOptions(context.getConfiguration)
30     val writer = new OrcMapreduceRecordWriter[OrcStruct](OrcFile.createWriter(file, options))
31     getRecordWriter(context, writer)
32   }
33 
34   override def getRecordWriter(context: TaskAttemptContext): RecordWriter[Void, SimpleFeature] =
35     getRecordWriter(context, delegate.getRecordWriter(context))
36 
37   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path =
38     delegate.getDefaultWorkFile(context, extension)
39 
40   private def getRecordWriter(context: TaskAttemptContext,
41                               orcWriter: RecordWriter[NullWritable, OrcStruct]): RecordWriter[Void, SimpleFeature] = {
42     val sft = StorageConfiguration.getSft(context.getConfiguration)
43     val description = OrcSimpleFeatureOutputFormat.getDescription(context.getConfiguration)
44     new OrcRecordWriter(sft, description, orcWriter)
45   }
46 }
47 
48 object OrcSimpleFeatureOutputFormat {
49 
50   def setDescription(conf: Configuration, description: TypeDescription): Unit =
51     conf.set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute, description.toString)
52 
53   def getDescription(conf: Configuration): TypeDescription =
54     TypeDescription.fromString(OrcConf.MAPRED_OUTPUT_SCHEMA.getString(conf))
55 
56   class OrcRecordWriter(sft: SimpleFeatureType,
57                         description: TypeDescription,
58                         delegate: RecordWriter[NullWritable, OrcStruct])
59       extends RecordWriter[Void, SimpleFeature] {
60 
61     private val writer = OrcOutputFormatWriter(sft, description)
62     private val key = NullWritable.get()
63     private val struct = new OrcStruct(description)
64 
65     override def write(key: Void, value: SimpleFeature): Unit = {
66       writer.apply(value, struct)
67       delegate.write(this.key, struct)
68     }
69 
70     override def close(context: TaskAttemptContext): Unit = delegate.close(context)
71   }
72 }
Line Stmt Id Pos Tree Symbol Tests Code
26 83673 1378 - 1408 Apply org.apache.orc.mapreduce.OrcOutputFormat.<init> new org.apache.orc.mapreduce.OrcOutputFormat[org.apache.orc.mapred.OrcStruct]()
29 83675 1530 - 1606 Apply org.apache.orc.mapred.OrcOutputFormat.buildOptions org.apache.orc.mapred.OrcOutputFormat.buildOptions(context.getConfiguration())
29 83674 1581 - 1605 Apply org.apache.hadoop.mapreduce.JobContext.getConfiguration context.getConfiguration()
30 83677 1624 - 1700 Apply org.apache.orc.mapreduce.OrcMapreduceRecordWriter.<init> new org.apache.orc.mapreduce.OrcMapreduceRecordWriter[org.apache.orc.mapred.OrcStruct](org.apache.orc.OrcFile.createWriter(file, options))
30 83676 1664 - 1699 Apply org.apache.orc.OrcFile.createWriter org.apache.orc.OrcFile.createWriter(file, options)
31 83678 1705 - 1737 Apply org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.getRecordWriter OrcSimpleFeatureOutputFormat.this.getRecordWriter(context, writer)
35 83679 1869 - 1902 Apply org.apache.orc.mapreduce.OrcOutputFormat.getRecordWriter OrcSimpleFeatureOutputFormat.this.delegate.getRecordWriter(context)
35 83680 1844 - 1903 Apply org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.getRecordWriter OrcSimpleFeatureOutputFormat.this.getRecordWriter(context, OrcSimpleFeatureOutputFormat.this.delegate.getRecordWriter(context))
38 83681 1999 - 2046 Apply org.apache.orc.mapreduce.OrcOutputFormat.getDefaultWorkFile OrcSimpleFeatureOutputFormat.this.delegate.getDefaultWorkFile(context, extension)
42 83683 2240 - 2293 Apply org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration.getSft org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration.getSft(context.getConfiguration())
42 83682 2268 - 2292 Apply org.apache.hadoop.mapreduce.JobContext.getConfiguration context.getConfiguration()
43 83685 2316 - 2385 Apply org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.getDescription OrcSimpleFeatureOutputFormat.getDescription(context.getConfiguration())
43 83684 2360 - 2384 Apply org.apache.hadoop.mapreduce.JobContext.getConfiguration context.getConfiguration()
44 83686 2390 - 2438 Apply org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.<init> new org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter(sft, description, orcWriter)
51 83687 2578 - 2619 Apply org.apache.orc.OrcConf.getAttribute MAPRED_OUTPUT_SCHEMA.getAttribute()
51 83689 2569 - 2642 Apply org.apache.hadoop.conf.Configuration.set conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute(), description.toString())
51 83688 2621 - 2641 Apply org.apache.orc.TypeDescription.toString description.toString()
54 83691 2709 - 2781 Apply org.apache.orc.TypeDescription.fromString org.apache.orc.TypeDescription.fromString(MAPRED_OUTPUT_SCHEMA.getString(conf))
54 83690 2736 - 2780 Apply org.apache.orc.OrcConf.getString MAPRED_OUTPUT_SCHEMA.getString(conf)
61 83693 3061 - 3072 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.description OrcRecordWriter.this.description
61 83692 3056 - 3059 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.sft OrcRecordWriter.this.sft
61 83694 3034 - 3073 Apply org.locationtech.geomesa.fs.storage.orc.utils.OrcOutputFormatWriter.apply org.locationtech.geomesa.fs.storage.orc.utils.OrcOutputFormatWriter.apply(OrcRecordWriter.this.sft, OrcRecordWriter.this.description, org.locationtech.geomesa.fs.storage.orc.utils.OrcOutputFormatWriter.apply$default$3)
62 83695 3096 - 3114 Apply org.apache.hadoop.io.NullWritable.get org.apache.hadoop.io.NullWritable.get()
63 83697 3140 - 3166 Apply org.apache.orc.mapred.OrcStruct.<init> new org.apache.orc.mapred.OrcStruct(OrcRecordWriter.this.description)
63 83696 3154 - 3165 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.description OrcRecordWriter.this.description
66 83699 3240 - 3267 Apply org.locationtech.geomesa.fs.storage.orc.utils.OrcOutputFormatWriter.apply OrcRecordWriter.this.writer.apply(value, OrcRecordWriter.this.struct)
66 83698 3260 - 3266 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.struct OrcRecordWriter.this.struct
67 83701 3299 - 3305 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.struct OrcRecordWriter.this.struct
67 83700 3289 - 3297 Select org.locationtech.geomesa.fs.storage.orc.jobs.OrcSimpleFeatureOutputFormat.OrcRecordWriter.key this.key
67 83702 3274 - 3306 Apply org.apache.hadoop.mapreduce.RecordWriter.write OrcRecordWriter.this.delegate.write(this.key, OrcRecordWriter.this.struct)
70 83703 3374 - 3397 Apply org.apache.hadoop.mapreduce.RecordWriter.close OrcRecordWriter.this.delegate.close(context)