1 /***********************************************************************
2  * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * https://www.apache.org/licenses/LICENSE-2.0
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.accumulo.tools.ingest
10 
11 import com.beust.jcommander.{Parameter, ParameterException, Parameters}
12 import com.typesafe.config.Config
13 import org.apache.hadoop.conf.Configuration
14 import org.apache.hadoop.fs.{FileContext, Path}
15 import org.apache.hadoop.mapreduce.Job
16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
17 import org.geotools.api.feature.simple.SimpleFeatureType
18 import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
19 import org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat
20 import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand
21 import org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreParams
22 import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams
23 import org.locationtech.geomesa.index.conf.partition.TablePartition
24 import org.locationtech.geomesa.jobs.JobResult.JobSuccess
25 import org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat
26 import org.locationtech.geomesa.jobs.{Awaitable, JobResult, StatusCallback}
27 import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
28 import org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode
29 import org.locationtech.geomesa.tools._
30 import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestParams, Inputs}
31 import org.locationtech.geomesa.tools.ingest._
32 import org.locationtech.geomesa.tools.utils.{DistributedCopy, Prompt}
33 import org.locationtech.geomesa.utils.hadoop.HadoopDelegate
34 import org.locationtech.geomesa.utils.index.IndexMode
35 
36 import java.io.File
37 
38 class AccumuloBulkIngestCommand extends IngestCommand[AccumuloDataStore] with AccumuloDistributedCommand {
39 
40   override val name = "bulk-ingest"
41   override val params = new AccumuloBulkIngestParams()
42 
43   override protected def startIngest(
44       mode: RunMode,
45       ds: AccumuloDataStore,
46       sft: SimpleFeatureType,
47       converter: Config,
48       inputs: Inputs): Awaitable = {
49 
50     val maxSplitSize =
51       if (!params.combineInputs) { None } else { Option(params.maxSplitSize).map(_.intValue()).orElse(Some(0)) }
52 
53     // validate index param now that we have a datastore and the sft has been created
54     val index = params.loadIndex(ds, sft.getTypeName, IndexMode.Write).map(_.identifier)
55 
56     val partitions = TablePartition(ds, sft).map { tp =>
57       if (params.cqlFilter == null) {
58         throw new ParameterException(
59           s"Schema '${sft.getTypeName}' is a partitioned store. In order to bulk load, the '--cql' parameter " +
60               "must be used to specify the range of the input data set")
61       }
62       tp.partitions(params.cqlFilter).filter(_.nonEmpty).getOrElse {
63         throw new ParameterException(
64           s"Partition filter does not correspond to partition scheme ${tp.getClass.getSimpleName}. Please specify " +
65               "a valid filter using the '--cql' parameter")
66       }
67     }
68 
69 
70     mode match {
71       case RunModes.Local =>
72         throw new IllegalArgumentException("Bulk ingest must be run in distributed mode")
73 
74       case RunModes.Distributed =>
75         val conf = new Configuration()
76         // file output format doesn't let you write to an existing directory
77         val output = new Path(params.outputPath)
78         val context = FileContext.getFileContext(output.toUri, conf)
79         if (context.util.exists(output)) {
80           val warning = s"Output directory '$output' exists"
81           if (params.force) {
82             Command.user.warn(s"$warning - deleting it")
83           } else if (!Prompt.confirm(s"WARNING DATA MAY BE LOST: $warning. Delete it and continue (y/n)? ")) {
84             throw new ParameterException(s"Output directory '$output' exists")
85           }
86           context.delete(output, true)
87         }
88 
89         val tempPath = Option(params.tempPath).map { temp =>
90           val path = new Path(temp)
91           // get a new file context as this is likely to be a different filesystem (i.e. hdfs vs s3)
92           val tempContext = FileContext.getFileContext(path.toUri, conf)
93           val dir = tempContext.makeQualified(path)
94           if (tempContext.util.exists(dir)) {
95             Command.user.info(s"Deleting temp output path $dir")
96             tempContext.delete(dir, true)
97           }
98           dir
99         }
100 
101         Command.user.info(s"Running bulk ingestion in distributed ${if (params.combineInputs) "combine " else "" }mode")
102         new BulkConverterIngest(ds, connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize,
103           index, partitions, libjarsFiles, libjarsPaths)
104 
105       case _ =>
106         throw new UnsupportedOperationException(s"Missing implementation for mode $mode")
107     }
108   }
109 
110   class BulkConverterIngest(
111       ds: AccumuloDataStore,
112       dsParams: Map[String, String],
113       sft: SimpleFeatureType,
114       converterConfig: Config,
115       paths: Seq[String],
116       output: Path,
117       tempOutput: Option[Path],
118       maxSplitSize: Option[Int],
119       index: Option[String],
120       partitions: Option[Seq[String]],
121       libjarsFiles: Seq[String],
122       libjarsPaths: Iterator[() => Seq[File]]
123     ) extends ConverterIngestJob(dsParams, sft, converterConfig, paths, libjarsFiles, libjarsPaths) {
124 
125     private var libjars: String = _
126 
127     override def configureJob(job: Job): Unit = {
128       super.configureJob(job)
129       val dest = tempOutput.getOrElse(output)
130       GeoMesaAccumuloFileOutputFormat.configure(job, ds, dsParams, sft, dest, index, partitions)
131       maxSplitSize.foreach { max =>
132         job.setInputFormatClass(classOf[ConverterCombineInputFormat])
133         if (max > 0) {
134           FileInputFormat.setMaxInputSplitSize(job, max.toLong)
135         }
136       }
137       this.libjars = job.getConfiguration.get("tmpjars")
138     }
139 
140     override def await(reporter: StatusCallback): JobResult = {
141       super.await(reporter).merge {
142         tempOutput.map { dir =>
143           reporter.reset()
144           val conf = new Configuration()
145           conf.set("tmpjars", this.libjars) // copy over out libjars so s3 apis are on the classpath
146           new DistributedCopy(conf).copy(Seq(dir), output, reporter) match {
147             case JobSuccess(message, counts) =>
148               Command.user.info(message)
149               JobSuccess("", counts)
150 
151             case j => j
152           }
153         }
154       }.merge {
155         if (params.skipImport) {
156           Command.user.info("Skipping import of RFiles into Accumulo")
157           Command.user.info(
158             "Files may be imported for each table through the Accumulo shell with the `importdirectory` command")
159         } else {
160           Command.user.info("Importing RFiles into Accumulo")
161           val tableOps = ds.client.tableOperations()
162           val filesPath = new Path(output, GeoMesaAccumuloFileOutputFormat.FilesPath)
163           val fc = FileContext.getFileContext(filesPath.toUri, new Configuration())
164           val files = fc.listLocatedStatus(filesPath)
165           while (files.hasNext) {
166             val file = files.next()
167             val path = file.getPath
168             val table = path.getName
169             if (file.isDirectory && HadoopDelegate.HiddenFileFilter.accept(path) && tableOps.exists(table)) {
170               Command.user.info(s"Importing $table")
171               tableOps.importDirectory(path.toString).to(table).load()
172             }
173           }
174         }
175         None
176       }
177     }
178   }
179 }
180 
181 object AccumuloBulkIngestCommand {
182   @Parameters(commandDescription = "Convert various file formats into bulk loaded Accumulo RFiles")
183   class AccumuloBulkIngestParams extends IngestParams with AccumuloDataStoreParams
184       with OutputPathParam with OptionalIndexParam with OptionalCqlFilterParam with TempPathParam {
185     @Parameter(names = Array("--skip-import"), description = "Generate the files but skip the bulk import into Accumulo")
186     var skipImport: Boolean = false
187   }
188 }
Line Stmt Id Pos Tree Symbol Tests Code
40 75896 2230 - 2243 Literal <nosymbol> "bulk-ingest"
41 75897 2268 - 2298 Apply org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams.<init> new org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams()
51 75899 2539 - 2543 Select scala.None scala.None
51 75898 2514 - 2535 Select scala.Boolean.unary_! AccumuloBulkIngestCommand.this.params.combineInputs.unary_!
51 75901 2560 - 2579 Select org.locationtech.geomesa.tools.DistributedCombineParam.maxSplitSize AccumuloBulkIngestCommand.this.params.maxSplitSize
51 75900 2539 - 2543 Block scala.None scala.None
51 75903 2606 - 2613 Apply scala.Some.apply scala.Some.apply[Int](0)
51 75902 2585 - 2597 Apply java.lang.Integer.intValue x$1.intValue()
51 75905 2553 - 2614 Block scala.Option.orElse scala.Option.apply[Integer](AccumuloBulkIngestCommand.this.params.maxSplitSize).map[Int](((x$1: Integer) => x$1.intValue())).orElse[Int](scala.Some.apply[Int](0))
51 75904 2553 - 2614 Apply scala.Option.orElse scala.Option.apply[Integer](AccumuloBulkIngestCommand.this.params.maxSplitSize).map[Int](((x$1: Integer) => x$1.intValue())).orElse[Int](scala.Some.apply[Int](0))
54 75907 2758 - 2773 Select org.locationtech.geomesa.utils.index.IndexMode.Write org.locationtech.geomesa.utils.index.IndexMode.Write
54 75906 2741 - 2756 Apply org.geotools.api.feature.simple.SimpleFeatureType.getTypeName sft.getTypeName()
54 75909 2720 - 2792 Apply scala.Option.map AccumuloBulkIngestCommand.this.params.loadIndex[org.locationtech.geomesa.accumulo.data.AccumuloDataStore](ds, sft.getTypeName(), org.locationtech.geomesa.utils.index.IndexMode.Write).map[String](((x$2: org.locationtech.geomesa.index.api.GeoMesaFeatureIndex[_, _]) => x$2.identifier))
54 75908 2779 - 2791 Select org.locationtech.geomesa.index.api.GeoMesaFeatureIndex.identifier x$2.identifier
56 75919 2815 - 3419 Apply scala.Option.map org.locationtech.geomesa.index.conf.partition.TablePartition.apply(ds, sft).map[Seq[String]](((tp: org.locationtech.geomesa.index.conf.partition.TablePartition) => { if (AccumuloBulkIngestCommand.this.params.cqlFilter.==(null)) throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set")) else (); tp.partitions(AccumuloBulkIngestCommand.this.params.cqlFilter).filter(((x$3: Seq[String]) => x$3.nonEmpty)).getOrElse[Seq[String]](throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter"))) }))
57 75910 2861 - 2885 Apply java.lang.Object.== AccumuloBulkIngestCommand.this.params.cqlFilter.==(null)
57 75913 2857 - 2857 Literal <nosymbol> ()
57 75914 2857 - 2857 Block <nosymbol> ()
58 75911 2897 - 3112 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set"))
58 75912 2897 - 3112 Block <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set"))
62 75915 3141 - 3157 Select org.locationtech.geomesa.tools.OptionalCqlFilterParam.cqlFilter AccumuloBulkIngestCommand.this.params.cqlFilter
62 75916 3166 - 3176 Select scala.collection.TraversableOnce.nonEmpty x$3.nonEmpty
62 75918 3127 - 3413 Apply scala.Option.getOrElse tp.partitions(AccumuloBulkIngestCommand.this.params.cqlFilter).filter(((x$3: Seq[String]) => x$3.nonEmpty)).getOrElse[Seq[String]](throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter")))
63 75917 3198 - 3405 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter"))
72 75921 3476 - 3557 Block <nosymbol> throw new scala.`package`.IllegalArgumentException("Bulk ingest must be run in distributed mode")
72 75920 3476 - 3557 Throw <nosymbol> throw new scala.`package`.IllegalArgumentException("Bulk ingest must be run in distributed mode")
74 75959 3591 - 5071 Block <nosymbol> { val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration(); val output: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(AccumuloBulkIngestCommand.this.params.outputPath); val context: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(output.toUri(), conf); if (context.util().exists(output)) { val warning: String = scala.StringContext.apply("Output directory \'", "\' exists").s(output); if (AccumuloBulkIngestCommand.this.params.force) (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef)) else (): Unit) else if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!) throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output)) else (); context.delete(output, true) } else (); val tempPath: Option[org.apache.hadoop.fs.Path] = scala.Option.apply[String](AccumuloBulkIngestCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((temp: String) => { val path: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(temp); val tempContext: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf); val dir: org.apache.hadoop.fs.Path = tempContext.makeQualified(path); if (tempContext.util().exists(dir)) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef)) else (): Unit); tempContext.delete(dir, true) } else (); dir })); (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Running bulk ingestion in distributed {}mode", if (AccumuloBulkIngestCommand.this.params.combineInputs) "combine " else "".asInstanceOf[AnyRef]) else (): Unit); new AccumuloBulkIngestCommand.this.BulkConverterIngest(ds, AccumuloBulkIngestCommand.this.connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize, index, partitions, AccumuloBulkIngestCommand.this.libjarsFiles, AccumuloBulkIngestCommand.this.libjarsPaths) }
75 75922 3613 - 3632 Apply org.apache.hadoop.conf.Configuration.<init> new org.apache.hadoop.conf.Configuration()
77 75923 3740 - 3757 Select org.locationtech.geomesa.tools.OutputPathParam.outputPath AccumuloBulkIngestCommand.this.params.outputPath
77 75924 3731 - 3758 Apply org.apache.hadoop.fs.Path.<init> new org.apache.hadoop.fs.Path(AccumuloBulkIngestCommand.this.params.outputPath)
78 75925 3808 - 3820 Apply org.apache.hadoop.fs.Path.toUri output.toUri()
78 75926 3781 - 3827 Apply org.apache.hadoop.fs.FileContext.getFileContext org.apache.hadoop.fs.FileContext.getFileContext(output.toUri(), conf)
79 75927 3840 - 3867 Apply org.apache.hadoop.fs.FileContext.Util.exists context.util().exists(output)
79 75941 3836 - 3836 Literal <nosymbol> ()
79 75940 3869 - 4269 Block <nosymbol> { val warning: String = scala.StringContext.apply("Output directory \'", "\' exists").s(output); if (AccumuloBulkIngestCommand.this.params.force) (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef)) else (): Unit) else if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!) throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output)) else (); context.delete(output, true) }
79 75942 3836 - 3836 Block <nosymbol> ()
80 75928 3895 - 3931 Apply scala.StringContext.s scala.StringContext.apply("Output directory \'", "\' exists").s(output)
81 75929 3946 - 3958 Select org.locationtech.geomesa.tools.OptionalForceParam.force AccumuloBulkIngestCommand.this.params.force
82 75930 3974 - 4018 Typed <nosymbol> (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef)) else (): Unit)
83 75931 4056 - 4125 Apply scala.StringContext.s scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning)
83 75933 4040 - 4126 Select scala.Boolean.unary_! org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!
83 75932 4055 - 4055 Select org.locationtech.geomesa.tools.InteractiveCommand.console AccumuloBulkIngestCommand.this.console
83 75937 4036 - 4036 Block <nosymbol> ()
83 75936 4036 - 4036 Literal <nosymbol> ()
83 75938 4036 - 4220 If <nosymbol> if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!) throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output)) else ()
84 75935 4142 - 4208 Block <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
84 75934 4142 - 4208 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
86 75939 4231 - 4259 Apply org.apache.hadoop.fs.FileContext.delete context.delete(output, true)
89 75943 4301 - 4316 Select org.locationtech.geomesa.tools.TempPathParam.tempPath AccumuloBulkIngestCommand.this.params.tempPath
89 75953 4294 - 4782 Apply scala.Option.map scala.Option.apply[String](AccumuloBulkIngestCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((temp: String) => { val path: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(temp); val tempContext: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf); val dir: org.apache.hadoop.fs.Path = tempContext.makeQualified(path); if (tempContext.util().exists(dir)) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef)) else (): Unit); tempContext.delete(dir, true) } else (); dir }))
90 75944 4353 - 4367 Apply org.apache.hadoop.fs.Path.<init> new org.apache.hadoop.fs.Path(temp)
92 75945 4524 - 4534 Apply org.apache.hadoop.fs.Path.toUri path.toUri()
92 75946 4497 - 4541 Apply org.apache.hadoop.fs.FileContext.getFileContext org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf)
93 75947 4562 - 4593 Apply org.apache.hadoop.fs.FileContext.makeQualified tempContext.makeQualified(path)
94 75948 4608 - 4636 Apply org.apache.hadoop.fs.FileContext.Util.exists tempContext.util().exists(dir)
94 75951 4604 - 4604 Literal <nosymbol> ()
94 75950 4638 - 4758 Block <nosymbol> { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef)) else (): Unit); tempContext.delete(dir, true) }
94 75952 4604 - 4604 Block <nosymbol> ()
96 75949 4717 - 4746 Apply org.apache.hadoop.fs.FileContext.delete tempContext.delete(dir, true)
102 75955 4969 - 4981 Select org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs.paths inputs.paths
102 75954 4941 - 4951 Select org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.connection AccumuloBulkIngestCommand.this.connection
102 75958 4913 - 5071 Apply org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.<init> new AccumuloBulkIngestCommand.this.BulkConverterIngest(ds, AccumuloBulkIngestCommand.this.connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize, index, partitions, AccumuloBulkIngestCommand.this.libjarsFiles, AccumuloBulkIngestCommand.this.libjarsPaths)
103 75957 5058 - 5070 Select org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand.libjarsPaths AccumuloBulkIngestCommand.this.libjarsPaths
103 75956 5044 - 5056 Select org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand.libjarsFiles AccumuloBulkIngestCommand.this.libjarsFiles
106 75961 5097 - 5178 Block <nosymbol> throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("Missing implementation for mode ", "").s(mode))
106 75960 5097 - 5178 Throw <nosymbol> throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("Missing implementation for mode ", "").s(mode))
128 75962 5800 - 5823 Apply org.locationtech.geomesa.tools.ingest.ConverterIngestJob.configureJob BulkConverterIngest.super.configureJob(job)
129 75963 5862 - 5868 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output BulkConverterIngest.this.output
129 75964 5841 - 5869 Apply scala.Option.getOrElse BulkConverterIngest.this.tempOutput.getOrElse[org.apache.hadoop.fs.Path](BulkConverterIngest.this.output)
130 75965 5923 - 5925 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.ds BulkConverterIngest.this.ds
130 75967 5937 - 5940 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.sft BulkConverterIngest.this.sft
130 75966 5927 - 5935 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.dsParams BulkConverterIngest.this.dsParams
130 75969 5955 - 5965 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.partitions BulkConverterIngest.this.partitions
130 75968 5948 - 5953 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.index BulkConverterIngest.this.index
130 75970 5876 - 5966 Apply org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.configure org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.configure(job, BulkConverterIngest.this.ds, BulkConverterIngest.this.dsParams, BulkConverterIngest.this.sft, dest, BulkConverterIngest.this.index, BulkConverterIngest.this.partitions)
131 75978 5973 - 6177 Apply scala.Option.foreach BulkConverterIngest.this.maxSplitSize.foreach[Unit](((max: Int) => { job.setInputFormatClass(classOf[org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat]); if (max.>(0)) org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong) else () }))
132 75971 6011 - 6072 Apply org.apache.hadoop.mapreduce.Job.setInputFormatClass job.setInputFormatClass(classOf[org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat])
133 75972 6085 - 6092 Apply scala.Int.> max.>(0)
133 75977 6081 - 6081 Block <nosymbol> ()
133 75976 6081 - 6081 Literal <nosymbol> ()
134 75973 6148 - 6158 Select scala.Int.toLong max.toLong
134 75975 6106 - 6159 Block org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong)
134 75974 6106 - 6159 Apply org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong)
137 75979 6199 - 6234 Apply org.apache.hadoop.conf.Configuration.get job.getConfiguration().get("tmpjars")
137 75980 6184 - 6234 Apply org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.libjars_= this.libjars_=(job.getConfiguration().get("tmpjars"))
142 75992 6350 - 6792 Apply scala.Option.map BulkConverterIngest.this.tempOutput.map[org.locationtech.geomesa.jobs.JobResult](((dir: org.apache.hadoop.fs.Path) => { reporter.reset(); val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration(); conf.set("tmpjars", this.libjars); new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter) match { case (message: String, counts: Map[String,Long])org.locationtech.geomesa.jobs.JobResult.JobSuccess((message @ _), (counts @ _)) => { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message) else (): Unit); org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts) } case (j @ _) => j } }))
143 75981 6384 - 6400 Apply org.locationtech.geomesa.jobs.StatusCallback.reset reporter.reset()
144 75982 6422 - 6441 Apply org.apache.hadoop.conf.Configuration.<init> new org.apache.hadoop.conf.Configuration()
145 75983 6461 - 6470 Literal <nosymbol> "tmpjars"
145 75985 6452 - 6485 Apply org.apache.hadoop.conf.Configuration.set conf.set("tmpjars", this.libjars)
145 75984 6472 - 6484 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.libjars this.libjars
146 75987 6594 - 6600 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output BulkConverterIngest.this.output
146 75986 6584 - 6592 Apply scala.collection.generic.GenericCompanion.apply scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir)
146 75988 6553 - 6611 Apply org.locationtech.geomesa.tools.utils.DistributedCopy.copy new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter)
147 75990 6665 - 6745 Block <nosymbol> { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message) else (): Unit); org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts) }
149 75989 6723 - 6745 Apply org.locationtech.geomesa.jobs.JobResult.JobSuccess.apply org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts)
151 75991 6769 - 6770 Ident org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.j j
154 76020 6312 - 7845 Apply org.locationtech.geomesa.jobs.JobResult.merge BulkConverterIngest.super.await(reporter).merge(BulkConverterIngest.this.tempOutput.map[org.locationtech.geomesa.jobs.JobResult](((dir: org.apache.hadoop.fs.Path) => { reporter.reset(); val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration(); conf.set("tmpjars", this.libjars); new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter) match { case (message: String, counts: Map[String,Long])org.locationtech.geomesa.jobs.JobResult.JobSuccess((message @ _), (counts @ _)) => { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message) else (): Unit); org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts) } case (j @ _) => j } }))).merge({ if (AccumuloBulkIngestCommand.this.params.skipImport) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Skipping import of RFiles into Accumulo") else (): Unit); (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Files may be imported for each table through the Accumulo shell with the `importdirectory` command") else (): Unit) } else { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing RFiles into Accumulo") else (): Unit); val tableOps: org.apache.accumulo.core.client.admin.TableOperations = BulkConverterIngest.this.ds.client.tableOperations(); val filesPath: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath); val fc: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration()); val files: org.apache.hadoop.fs.RemoteIterator[org.apache.hadoop.fs.LocatedFileStatus] = fc.listLocatedStatus(filesPath); while$1(){ if (files.hasNext()) { { val file: org.apache.hadoop.fs.LocatedFileStatus = files.next(); val path: org.apache.hadoop.fs.Path = file.getPath(); val table: String = path.getName(); if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table))) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef)) else (): Unit); tableOps.importDirectory(path.toString()).to(table).load() } else () }; while$1() } else () } }; scala.None })
155 75993 6821 - 6838 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams.skipImport AccumuloBulkIngestCommand.this.params.skipImport
155 75994 6840 - 7065 Block <nosymbol> { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Skipping import of RFiles into Accumulo") else (): Unit); (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Files may be imported for each table through the Accumulo shell with the `importdirectory` command") else (): Unit) }
159 76018 7071 - 7824 Block <nosymbol> { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing RFiles into Accumulo") else (): Unit); val tableOps: org.apache.accumulo.core.client.admin.TableOperations = BulkConverterIngest.this.ds.client.tableOperations(); val filesPath: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath); val fc: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration()); val files: org.apache.hadoop.fs.RemoteIterator[org.apache.hadoop.fs.LocatedFileStatus] = fc.listLocatedStatus(filesPath); while$1(){ if (files.hasNext()) { { val file: org.apache.hadoop.fs.LocatedFileStatus = files.next(); val path: org.apache.hadoop.fs.Path = file.getPath(); val table: String = path.getName(); if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table))) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef)) else (): Unit); tableOps.importDirectory(path.toString()).to(table).load() } else () }; while$1() } else () } }
161 75995 7160 - 7187 Apply org.apache.accumulo.core.client.AccumuloClient.tableOperations BulkConverterIngest.this.ds.client.tableOperations()
162 75997 7231 - 7272 Select org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath
162 75996 7223 - 7229 Select org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output BulkConverterIngest.this.output
162 75998 7214 - 7273 Apply org.apache.hadoop.fs.Path.<init> new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath)
163 75999 7320 - 7335 Apply org.apache.hadoop.fs.Path.toUri filesPath.toUri()
163 76001 7293 - 7357 Apply org.apache.hadoop.fs.FileContext.getFileContext org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration())
163 76000 7337 - 7356 Apply org.apache.hadoop.conf.Configuration.<init> new org.apache.hadoop.conf.Configuration()
164 76002 7380 - 7411 Apply org.apache.hadoop.fs.FileContext.listLocatedStatus fc.listLocatedStatus(filesPath)
165 76003 7429 - 7442 Apply org.apache.hadoop.fs.RemoteIterator.hasNext files.hasNext()
165 76015 7444 - 7814 Block <nosymbol> { { val file: org.apache.hadoop.fs.LocatedFileStatus = files.next(); val path: org.apache.hadoop.fs.Path = file.getPath(); val table: String = path.getName(); if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table))) { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef)) else (): Unit); tableOps.importDirectory(path.toString()).to(table).load() } else () }; while$1() }
165 76014 7444 - 7444 Apply org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.while$1 while$1()
165 76017 7422 - 7422 Block <nosymbol> ()
165 76016 7422 - 7422 Literal <nosymbol> ()
166 76004 7469 - 7481 Apply org.apache.hadoop.fs.RemoteIterator.next files.next()
167 76005 7505 - 7517 Apply org.apache.hadoop.fs.FileStatus.getPath file.getPath()
168 76006 7542 - 7554 Apply org.apache.hadoop.fs.Path.getName path.getName()
169 76007 7591 - 7635 Apply org.apache.hadoop.fs.PathFilter.accept org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)
169 76009 7571 - 7661 Apply scala.Boolean.&& file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table))
169 76008 7639 - 7661 Apply org.apache.accumulo.core.client.admin.TableOperations.exists tableOps.exists(table)
169 76011 7663 - 7802 Block <nosymbol> { (if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled()) org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef)) else (): Unit); tableOps.importDirectory(path.toString()).to(table).load() }
169 76013 7567 - 7567 Block <nosymbol> ()
169 76012 7567 - 7567 Literal <nosymbol> ()
171 76010 7732 - 7788 Apply org.apache.accumulo.core.client.admin.TableOperations.ImportOptions.load tableOps.importDirectory(path.toString()).to(table).load()
175 76019 7833 - 7837 Select scala.None scala.None
186 76021 8329 - 8334 Literal <nosymbol> false