| Line |
Stmt Id |
Pos |
Tree |
Symbol |
Tests |
Code |
|
40
|
75896
|
2230
-
2243
|
Literal
|
<nosymbol>
|
|
"bulk-ingest"
|
|
41
|
75897
|
2268
-
2298
|
Apply
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams.<init>
|
|
new org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams()
|
|
51
|
75899
|
2539
-
2543
|
Select
|
scala.None
|
|
scala.None
|
|
51
|
75898
|
2514
-
2535
|
Select
|
scala.Boolean.unary_!
|
|
AccumuloBulkIngestCommand.this.params.combineInputs.unary_!
|
|
51
|
75901
|
2560
-
2579
|
Select
|
org.locationtech.geomesa.tools.DistributedCombineParam.maxSplitSize
|
|
AccumuloBulkIngestCommand.this.params.maxSplitSize
|
|
51
|
75900
|
2539
-
2543
|
Block
|
scala.None
|
|
scala.None
|
|
51
|
75903
|
2606
-
2613
|
Apply
|
scala.Some.apply
|
|
scala.Some.apply[Int](0)
|
|
51
|
75902
|
2585
-
2597
|
Apply
|
java.lang.Integer.intValue
|
|
x$1.intValue()
|
|
51
|
75905
|
2553
-
2614
|
Block
|
scala.Option.orElse
|
|
scala.Option.apply[Integer](AccumuloBulkIngestCommand.this.params.maxSplitSize).map[Int](((x$1: Integer) => x$1.intValue())).orElse[Int](scala.Some.apply[Int](0))
|
|
51
|
75904
|
2553
-
2614
|
Apply
|
scala.Option.orElse
|
|
scala.Option.apply[Integer](AccumuloBulkIngestCommand.this.params.maxSplitSize).map[Int](((x$1: Integer) => x$1.intValue())).orElse[Int](scala.Some.apply[Int](0))
|
|
54
|
75907
|
2758
-
2773
|
Select
|
org.locationtech.geomesa.utils.index.IndexMode.Write
|
|
org.locationtech.geomesa.utils.index.IndexMode.Write
|
|
54
|
75906
|
2741
-
2756
|
Apply
|
org.geotools.api.feature.simple.SimpleFeatureType.getTypeName
|
|
sft.getTypeName()
|
|
54
|
75909
|
2720
-
2792
|
Apply
|
scala.Option.map
|
|
AccumuloBulkIngestCommand.this.params.loadIndex[org.locationtech.geomesa.accumulo.data.AccumuloDataStore](ds, sft.getTypeName(), org.locationtech.geomesa.utils.index.IndexMode.Write).map[String](((x$2: org.locationtech.geomesa.index.api.GeoMesaFeatureIndex[_, _]) => x$2.identifier))
|
|
54
|
75908
|
2779
-
2791
|
Select
|
org.locationtech.geomesa.index.api.GeoMesaFeatureIndex.identifier
|
|
x$2.identifier
|
|
56
|
75919
|
2815
-
3419
|
Apply
|
scala.Option.map
|
|
org.locationtech.geomesa.index.conf.partition.TablePartition.apply(ds, sft).map[Seq[String]](((tp: org.locationtech.geomesa.index.conf.partition.TablePartition) => {
if (AccumuloBulkIngestCommand.this.params.cqlFilter.==(null))
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set"))
else
();
tp.partitions(AccumuloBulkIngestCommand.this.params.cqlFilter).filter(((x$3: Seq[String]) => x$3.nonEmpty)).getOrElse[Seq[String]](throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter")))
}))
|
|
57
|
75910
|
2861
-
2885
|
Apply
|
java.lang.Object.==
|
|
AccumuloBulkIngestCommand.this.params.cqlFilter.==(null)
|
|
57
|
75913
|
2857
-
2857
|
Literal
|
<nosymbol>
|
|
()
|
|
57
|
75914
|
2857
-
2857
|
Block
|
<nosymbol>
|
|
()
|
|
58
|
75911
|
2897
-
3112
|
Throw
|
<nosymbol>
|
|
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set"))
|
|
58
|
75912
|
2897
-
3112
|
Block
|
<nosymbol>
|
|
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Schema \'", "\' is a partitioned store. In order to bulk load, the \'--cql\' parameter ").s(sft.getTypeName()).+("must be used to specify the range of the input data set"))
|
|
62
|
75915
|
3141
-
3157
|
Select
|
org.locationtech.geomesa.tools.OptionalCqlFilterParam.cqlFilter
|
|
AccumuloBulkIngestCommand.this.params.cqlFilter
|
|
62
|
75916
|
3166
-
3176
|
Select
|
scala.collection.TraversableOnce.nonEmpty
|
|
x$3.nonEmpty
|
|
62
|
75918
|
3127
-
3413
|
Apply
|
scala.Option.getOrElse
|
|
tp.partitions(AccumuloBulkIngestCommand.this.params.cqlFilter).filter(((x$3: Seq[String]) => x$3.nonEmpty)).getOrElse[Seq[String]](throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter")))
|
|
63
|
75917
|
3198
-
3405
|
Throw
|
<nosymbol>
|
|
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition filter does not correspond to partition scheme ", ". Please specify ").s(tp.getClass().getSimpleName()).+("a valid filter using the \'--cql\' parameter"))
|
|
72
|
75921
|
3476
-
3557
|
Block
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException("Bulk ingest must be run in distributed mode")
|
|
72
|
75920
|
3476
-
3557
|
Throw
|
<nosymbol>
|
|
throw new scala.`package`.IllegalArgumentException("Bulk ingest must be run in distributed mode")
|
|
74
|
75959
|
3591
-
5071
|
Block
|
<nosymbol>
|
|
{
val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration();
val output: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(AccumuloBulkIngestCommand.this.params.outputPath);
val context: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(output.toUri(), conf);
if (context.util().exists(output))
{
val warning: String = scala.StringContext.apply("Output directory \'", "\' exists").s(output);
if (AccumuloBulkIngestCommand.this.params.force)
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef))
else
(): Unit)
else
if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!)
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
else
();
context.delete(output, true)
}
else
();
val tempPath: Option[org.apache.hadoop.fs.Path] = scala.Option.apply[String](AccumuloBulkIngestCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((temp: String) => {
val path: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(temp);
val tempContext: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf);
val dir: org.apache.hadoop.fs.Path = tempContext.makeQualified(path);
if (tempContext.util().exists(dir))
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef))
else
(): Unit);
tempContext.delete(dir, true)
}
else
();
dir
}));
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Running bulk ingestion in distributed {}mode", if (AccumuloBulkIngestCommand.this.params.combineInputs)
"combine "
else
"".asInstanceOf[AnyRef])
else
(): Unit);
new AccumuloBulkIngestCommand.this.BulkConverterIngest(ds, AccumuloBulkIngestCommand.this.connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize, index, partitions, AccumuloBulkIngestCommand.this.libjarsFiles, AccumuloBulkIngestCommand.this.libjarsPaths)
}
|
|
75
|
75922
|
3613
-
3632
|
Apply
|
org.apache.hadoop.conf.Configuration.<init>
|
|
new org.apache.hadoop.conf.Configuration()
|
|
77
|
75923
|
3740
-
3757
|
Select
|
org.locationtech.geomesa.tools.OutputPathParam.outputPath
|
|
AccumuloBulkIngestCommand.this.params.outputPath
|
|
77
|
75924
|
3731
-
3758
|
Apply
|
org.apache.hadoop.fs.Path.<init>
|
|
new org.apache.hadoop.fs.Path(AccumuloBulkIngestCommand.this.params.outputPath)
|
|
78
|
75925
|
3808
-
3820
|
Apply
|
org.apache.hadoop.fs.Path.toUri
|
|
output.toUri()
|
|
78
|
75926
|
3781
-
3827
|
Apply
|
org.apache.hadoop.fs.FileContext.getFileContext
|
|
org.apache.hadoop.fs.FileContext.getFileContext(output.toUri(), conf)
|
|
79
|
75927
|
3840
-
3867
|
Apply
|
org.apache.hadoop.fs.FileContext.Util.exists
|
|
context.util().exists(output)
|
|
79
|
75941
|
3836
-
3836
|
Literal
|
<nosymbol>
|
|
()
|
|
79
|
75940
|
3869
-
4269
|
Block
|
<nosymbol>
|
|
{
val warning: String = scala.StringContext.apply("Output directory \'", "\' exists").s(output);
if (AccumuloBulkIngestCommand.this.params.force)
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef))
else
(): Unit)
else
if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!)
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
else
();
context.delete(output, true)
}
|
|
79
|
75942
|
3836
-
3836
|
Block
|
<nosymbol>
|
|
()
|
|
80
|
75928
|
3895
-
3931
|
Apply
|
scala.StringContext.s
|
|
scala.StringContext.apply("Output directory \'", "\' exists").s(output)
|
|
81
|
75929
|
3946
-
3958
|
Select
|
org.locationtech.geomesa.tools.OptionalForceParam.force
|
|
AccumuloBulkIngestCommand.this.params.force
|
|
82
|
75930
|
3974
-
4018
|
Typed
|
<nosymbol>
|
|
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isWarnEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.warn("{} - deleting it", (warning: AnyRef))
else
(): Unit)
|
|
83
|
75931
|
4056
-
4125
|
Apply
|
scala.StringContext.s
|
|
scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning)
|
|
83
|
75933
|
4040
-
4126
|
Select
|
scala.Boolean.unary_!
|
|
org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!
|
|
83
|
75932
|
4055
-
4055
|
Select
|
org.locationtech.geomesa.tools.InteractiveCommand.console
|
|
AccumuloBulkIngestCommand.this.console
|
|
83
|
75937
|
4036
-
4036
|
Block
|
<nosymbol>
|
|
()
|
|
83
|
75936
|
4036
-
4036
|
Literal
|
<nosymbol>
|
|
()
|
|
83
|
75938
|
4036
-
4220
|
If
|
<nosymbol>
|
|
if (org.locationtech.geomesa.tools.utils.Prompt.confirm(scala.StringContext.apply("WARNING DATA MAY BE LOST: ", ". Delete it and continue (y/n)? ").s(warning), org.locationtech.geomesa.tools.utils.Prompt.confirm$default$2, org.locationtech.geomesa.tools.utils.Prompt.confirm$default$3)(AccumuloBulkIngestCommand.this.console).unary_!)
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
else
()
|
|
84
|
75935
|
4142
-
4208
|
Block
|
<nosymbol>
|
|
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
|
|
84
|
75934
|
4142
-
4208
|
Throw
|
<nosymbol>
|
|
throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Output directory \'", "\' exists").s(output))
|
|
86
|
75939
|
4231
-
4259
|
Apply
|
org.apache.hadoop.fs.FileContext.delete
|
|
context.delete(output, true)
|
|
89
|
75943
|
4301
-
4316
|
Select
|
org.locationtech.geomesa.tools.TempPathParam.tempPath
|
|
AccumuloBulkIngestCommand.this.params.tempPath
|
|
89
|
75953
|
4294
-
4782
|
Apply
|
scala.Option.map
|
|
scala.Option.apply[String](AccumuloBulkIngestCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((temp: String) => {
val path: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(temp);
val tempContext: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf);
val dir: org.apache.hadoop.fs.Path = tempContext.makeQualified(path);
if (tempContext.util().exists(dir))
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef))
else
(): Unit);
tempContext.delete(dir, true)
}
else
();
dir
}))
|
|
90
|
75944
|
4353
-
4367
|
Apply
|
org.apache.hadoop.fs.Path.<init>
|
|
new org.apache.hadoop.fs.Path(temp)
|
|
92
|
75945
|
4524
-
4534
|
Apply
|
org.apache.hadoop.fs.Path.toUri
|
|
path.toUri()
|
|
92
|
75946
|
4497
-
4541
|
Apply
|
org.apache.hadoop.fs.FileContext.getFileContext
|
|
org.apache.hadoop.fs.FileContext.getFileContext(path.toUri(), conf)
|
|
93
|
75947
|
4562
-
4593
|
Apply
|
org.apache.hadoop.fs.FileContext.makeQualified
|
|
tempContext.makeQualified(path)
|
|
94
|
75948
|
4608
-
4636
|
Apply
|
org.apache.hadoop.fs.FileContext.Util.exists
|
|
tempContext.util().exists(dir)
|
|
94
|
75951
|
4604
-
4604
|
Literal
|
<nosymbol>
|
|
()
|
|
94
|
75950
|
4638
-
4758
|
Block
|
<nosymbol>
|
|
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Deleting temp output path {}", (dir: AnyRef))
else
(): Unit);
tempContext.delete(dir, true)
}
|
|
94
|
75952
|
4604
-
4604
|
Block
|
<nosymbol>
|
|
()
|
|
96
|
75949
|
4717
-
4746
|
Apply
|
org.apache.hadoop.fs.FileContext.delete
|
|
tempContext.delete(dir, true)
|
|
102
|
75955
|
4969
-
4981
|
Select
|
org.locationtech.geomesa.tools.ingest.IngestCommand.Inputs.paths
|
|
inputs.paths
|
|
102
|
75954
|
4941
-
4951
|
Select
|
org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.connection
|
|
AccumuloBulkIngestCommand.this.connection
|
|
102
|
75958
|
4913
-
5071
|
Apply
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.<init>
|
|
new AccumuloBulkIngestCommand.this.BulkConverterIngest(ds, AccumuloBulkIngestCommand.this.connection, sft, converter, inputs.paths, output, tempPath, maxSplitSize, index, partitions, AccumuloBulkIngestCommand.this.libjarsFiles, AccumuloBulkIngestCommand.this.libjarsPaths)
|
|
103
|
75957
|
5058
-
5070
|
Select
|
org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand.libjarsPaths
|
|
AccumuloBulkIngestCommand.this.libjarsPaths
|
|
103
|
75956
|
5044
-
5056
|
Select
|
org.locationtech.geomesa.accumulo.tools.AccumuloDataStoreCommand.AccumuloDistributedCommand.libjarsFiles
|
|
AccumuloBulkIngestCommand.this.libjarsFiles
|
|
106
|
75961
|
5097
-
5178
|
Block
|
<nosymbol>
|
|
throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("Missing implementation for mode ", "").s(mode))
|
|
106
|
75960
|
5097
-
5178
|
Throw
|
<nosymbol>
|
|
throw new scala.`package`.UnsupportedOperationException(scala.StringContext.apply("Missing implementation for mode ", "").s(mode))
|
|
128
|
75962
|
5800
-
5823
|
Apply
|
org.locationtech.geomesa.tools.ingest.ConverterIngestJob.configureJob
|
|
BulkConverterIngest.super.configureJob(job)
|
|
129
|
75963
|
5862
-
5868
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output
|
|
BulkConverterIngest.this.output
|
|
129
|
75964
|
5841
-
5869
|
Apply
|
scala.Option.getOrElse
|
|
BulkConverterIngest.this.tempOutput.getOrElse[org.apache.hadoop.fs.Path](BulkConverterIngest.this.output)
|
|
130
|
75965
|
5923
-
5925
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.ds
|
|
BulkConverterIngest.this.ds
|
|
130
|
75967
|
5937
-
5940
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.sft
|
|
BulkConverterIngest.this.sft
|
|
130
|
75966
|
5927
-
5935
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.dsParams
|
|
BulkConverterIngest.this.dsParams
|
|
130
|
75969
|
5955
-
5965
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.partitions
|
|
BulkConverterIngest.this.partitions
|
|
130
|
75968
|
5948
-
5953
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.index
|
|
BulkConverterIngest.this.index
|
|
130
|
75970
|
5876
-
5966
|
Apply
|
org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.configure
|
|
org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.configure(job, BulkConverterIngest.this.ds, BulkConverterIngest.this.dsParams, BulkConverterIngest.this.sft, dest, BulkConverterIngest.this.index, BulkConverterIngest.this.partitions)
|
|
131
|
75978
|
5973
-
6177
|
Apply
|
scala.Option.foreach
|
|
BulkConverterIngest.this.maxSplitSize.foreach[Unit](((max: Int) => {
job.setInputFormatClass(classOf[org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat]);
if (max.>(0))
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong)
else
()
}))
|
|
132
|
75971
|
6011
-
6072
|
Apply
|
org.apache.hadoop.mapreduce.Job.setInputFormatClass
|
|
job.setInputFormatClass(classOf[org.locationtech.geomesa.jobs.mapreduce.ConverterCombineInputFormat])
|
|
133
|
75972
|
6085
-
6092
|
Apply
|
scala.Int.>
|
|
max.>(0)
|
|
133
|
75977
|
6081
-
6081
|
Block
|
<nosymbol>
|
|
()
|
|
133
|
75976
|
6081
-
6081
|
Literal
|
<nosymbol>
|
|
()
|
|
134
|
75973
|
6148
-
6158
|
Select
|
scala.Int.toLong
|
|
max.toLong
|
|
134
|
75975
|
6106
-
6159
|
Block
|
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize
|
|
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong)
|
|
134
|
75974
|
6106
-
6159
|
Apply
|
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize
|
|
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setMaxInputSplitSize(job, max.toLong)
|
|
137
|
75979
|
6199
-
6234
|
Apply
|
org.apache.hadoop.conf.Configuration.get
|
|
job.getConfiguration().get("tmpjars")
|
|
137
|
75980
|
6184
-
6234
|
Apply
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.libjars_=
|
|
this.libjars_=(job.getConfiguration().get("tmpjars"))
|
|
142
|
75992
|
6350
-
6792
|
Apply
|
scala.Option.map
|
|
BulkConverterIngest.this.tempOutput.map[org.locationtech.geomesa.jobs.JobResult](((dir: org.apache.hadoop.fs.Path) => {
reporter.reset();
val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration();
conf.set("tmpjars", this.libjars);
new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter) match {
case (message: String, counts: Map[String,Long])org.locationtech.geomesa.jobs.JobResult.JobSuccess((message @ _), (counts @ _)) => {
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message)
else
(): Unit);
org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts)
}
case (j @ _) => j
}
}))
|
|
143
|
75981
|
6384
-
6400
|
Apply
|
org.locationtech.geomesa.jobs.StatusCallback.reset
|
|
reporter.reset()
|
|
144
|
75982
|
6422
-
6441
|
Apply
|
org.apache.hadoop.conf.Configuration.<init>
|
|
new org.apache.hadoop.conf.Configuration()
|
|
145
|
75983
|
6461
-
6470
|
Literal
|
<nosymbol>
|
|
"tmpjars"
|
|
145
|
75985
|
6452
-
6485
|
Apply
|
org.apache.hadoop.conf.Configuration.set
|
|
conf.set("tmpjars", this.libjars)
|
|
145
|
75984
|
6472
-
6484
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.libjars
|
|
this.libjars
|
|
146
|
75987
|
6594
-
6600
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output
|
|
BulkConverterIngest.this.output
|
|
146
|
75986
|
6584
-
6592
|
Apply
|
scala.collection.generic.GenericCompanion.apply
|
|
scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir)
|
|
146
|
75988
|
6553
-
6611
|
Apply
|
org.locationtech.geomesa.tools.utils.DistributedCopy.copy
|
|
new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter)
|
|
147
|
75990
|
6665
-
6745
|
Block
|
<nosymbol>
|
|
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message)
else
(): Unit);
org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts)
}
|
|
149
|
75989
|
6723
-
6745
|
Apply
|
org.locationtech.geomesa.jobs.JobResult.JobSuccess.apply
|
|
org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts)
|
|
151
|
75991
|
6769
-
6770
|
Ident
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.j
|
|
j
|
|
154
|
76020
|
6312
-
7845
|
Apply
|
org.locationtech.geomesa.jobs.JobResult.merge
|
|
BulkConverterIngest.super.await(reporter).merge(BulkConverterIngest.this.tempOutput.map[org.locationtech.geomesa.jobs.JobResult](((dir: org.apache.hadoop.fs.Path) => {
reporter.reset();
val conf: org.apache.hadoop.conf.Configuration = new org.apache.hadoop.conf.Configuration();
conf.set("tmpjars", this.libjars);
new org.locationtech.geomesa.tools.utils.DistributedCopy(conf).copy(scala.collection.Seq.apply[org.apache.hadoop.fs.Path](dir), BulkConverterIngest.this.output, reporter) match {
case (message: String, counts: Map[String,Long])org.locationtech.geomesa.jobs.JobResult.JobSuccess((message @ _), (counts @ _)) => {
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info(message)
else
(): Unit);
org.locationtech.geomesa.jobs.`package`.JobResult.JobSuccess.apply("", counts)
}
case (j @ _) => j
}
}))).merge({
if (AccumuloBulkIngestCommand.this.params.skipImport)
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Skipping import of RFiles into Accumulo")
else
(): Unit);
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Files may be imported for each table through the Accumulo shell with the `importdirectory` command")
else
(): Unit)
}
else
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing RFiles into Accumulo")
else
(): Unit);
val tableOps: org.apache.accumulo.core.client.admin.TableOperations = BulkConverterIngest.this.ds.client.tableOperations();
val filesPath: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath);
val fc: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration());
val files: org.apache.hadoop.fs.RemoteIterator[org.apache.hadoop.fs.LocatedFileStatus] = fc.listLocatedStatus(filesPath);
while$1(){
if (files.hasNext())
{
{
val file: org.apache.hadoop.fs.LocatedFileStatus = files.next();
val path: org.apache.hadoop.fs.Path = file.getPath();
val table: String = path.getName();
if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table)))
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef))
else
(): Unit);
tableOps.importDirectory(path.toString()).to(table).load()
}
else
()
};
while$1()
}
else
()
}
};
scala.None
})
|
|
155
|
75993
|
6821
-
6838
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.AccumuloBulkIngestParams.skipImport
|
|
AccumuloBulkIngestCommand.this.params.skipImport
|
|
155
|
75994
|
6840
-
7065
|
Block
|
<nosymbol>
|
|
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Skipping import of RFiles into Accumulo")
else
(): Unit);
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Files may be imported for each table through the Accumulo shell with the `importdirectory` command")
else
(): Unit)
}
|
|
159
|
76018
|
7071
-
7824
|
Block
|
<nosymbol>
|
|
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing RFiles into Accumulo")
else
(): Unit);
val tableOps: org.apache.accumulo.core.client.admin.TableOperations = BulkConverterIngest.this.ds.client.tableOperations();
val filesPath: org.apache.hadoop.fs.Path = new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath);
val fc: org.apache.hadoop.fs.FileContext = org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration());
val files: org.apache.hadoop.fs.RemoteIterator[org.apache.hadoop.fs.LocatedFileStatus] = fc.listLocatedStatus(filesPath);
while$1(){
if (files.hasNext())
{
{
val file: org.apache.hadoop.fs.LocatedFileStatus = files.next();
val path: org.apache.hadoop.fs.Path = file.getPath();
val table: String = path.getName();
if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table)))
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef))
else
(): Unit);
tableOps.importDirectory(path.toString()).to(table).load()
}
else
()
};
while$1()
}
else
()
}
}
|
|
161
|
75995
|
7160
-
7187
|
Apply
|
org.apache.accumulo.core.client.AccumuloClient.tableOperations
|
|
BulkConverterIngest.this.ds.client.tableOperations()
|
|
162
|
75997
|
7231
-
7272
|
Select
|
org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath
|
|
org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath
|
|
162
|
75996
|
7223
-
7229
|
Select
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.output
|
|
BulkConverterIngest.this.output
|
|
162
|
75998
|
7214
-
7273
|
Apply
|
org.apache.hadoop.fs.Path.<init>
|
|
new org.apache.hadoop.fs.Path(BulkConverterIngest.this.output, org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloFileOutputFormat.FilesPath)
|
|
163
|
75999
|
7320
-
7335
|
Apply
|
org.apache.hadoop.fs.Path.toUri
|
|
filesPath.toUri()
|
|
163
|
76001
|
7293
-
7357
|
Apply
|
org.apache.hadoop.fs.FileContext.getFileContext
|
|
org.apache.hadoop.fs.FileContext.getFileContext(filesPath.toUri(), new org.apache.hadoop.conf.Configuration())
|
|
163
|
76000
|
7337
-
7356
|
Apply
|
org.apache.hadoop.conf.Configuration.<init>
|
|
new org.apache.hadoop.conf.Configuration()
|
|
164
|
76002
|
7380
-
7411
|
Apply
|
org.apache.hadoop.fs.FileContext.listLocatedStatus
|
|
fc.listLocatedStatus(filesPath)
|
|
165
|
76003
|
7429
-
7442
|
Apply
|
org.apache.hadoop.fs.RemoteIterator.hasNext
|
|
files.hasNext()
|
|
165
|
76015
|
7444
-
7814
|
Block
|
<nosymbol>
|
|
{
{
val file: org.apache.hadoop.fs.LocatedFileStatus = files.next();
val path: org.apache.hadoop.fs.Path = file.getPath();
val table: String = path.getName();
if (file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table)))
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef))
else
(): Unit);
tableOps.importDirectory(path.toString()).to(table).load()
}
else
()
};
while$1()
}
|
|
165
|
76014
|
7444
-
7444
|
Apply
|
org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkIngestCommand.BulkConverterIngest.while$1
|
|
while$1()
|
|
165
|
76017
|
7422
-
7422
|
Block
|
<nosymbol>
|
|
()
|
|
165
|
76016
|
7422
-
7422
|
Literal
|
<nosymbol>
|
|
()
|
|
166
|
76004
|
7469
-
7481
|
Apply
|
org.apache.hadoop.fs.RemoteIterator.next
|
|
files.next()
|
|
167
|
76005
|
7505
-
7517
|
Apply
|
org.apache.hadoop.fs.FileStatus.getPath
|
|
file.getPath()
|
|
168
|
76006
|
7542
-
7554
|
Apply
|
org.apache.hadoop.fs.Path.getName
|
|
path.getName()
|
|
169
|
76007
|
7591
-
7635
|
Apply
|
org.apache.hadoop.fs.PathFilter.accept
|
|
org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)
|
|
169
|
76009
|
7571
-
7661
|
Apply
|
scala.Boolean.&&
|
|
file.isDirectory().&&(org.locationtech.geomesa.utils.hadoop.HadoopDelegate.HiddenFileFilter.accept(path)).&&(tableOps.exists(table))
|
|
169
|
76008
|
7639
-
7661
|
Apply
|
org.apache.accumulo.core.client.admin.TableOperations.exists
|
|
tableOps.exists(table)
|
|
169
|
76011
|
7663
-
7802
|
Block
|
<nosymbol>
|
|
{
(if (org.locationtech.geomesa.tools.`package`.Command.user.underlying.isInfoEnabled())
org.locationtech.geomesa.tools.`package`.Command.user.underlying.info("Importing {}", (table: AnyRef))
else
(): Unit);
tableOps.importDirectory(path.toString()).to(table).load()
}
|
|
169
|
76013
|
7567
-
7567
|
Block
|
<nosymbol>
|
|
()
|
|
169
|
76012
|
7567
-
7567
|
Literal
|
<nosymbol>
|
|
()
|
|
171
|
76010
|
7732
-
7788
|
Apply
|
org.apache.accumulo.core.client.admin.TableOperations.ImportOptions.load
|
|
tableOps.importDirectory(path.toString()).to(table).load()
|
|
175
|
76019
|
7833
-
7837
|
Select
|
scala.None
|
|
scala.None
|
|
186
|
76021
|
8329
-
8334
|
Literal
|
<nosymbol>
|
|
false
|