1 /***********************************************************************
2  * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
3  * All rights reserved. This program and the accompanying materials
4  * are made available under the terms of the Apache License, Version 2.0
5  * which accompanies this distribution and is available at
6  * http://www.opensource.org/licenses/apache2.0.php.
7  ***********************************************************************/
8 
9 package org.locationtech.geomesa.fs.tools.compact
10 
11 import com.beust.jcommander.{Parameter, ParameterException, Parameters}
12 import com.typesafe.scalalogging.LazyLogging
13 import org.apache.hadoop.fs.Path
14 import org.locationtech.geomesa.fs.data.FileSystemDataStore
15 import org.locationtech.geomesa.fs.storage.orc.OrcFileSystemStorage
16 import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage
17 import org.locationtech.geomesa.fs.tools.FsDataStoreCommand
18 import org.locationtech.geomesa.fs.tools.FsDataStoreCommand.{FsDistributedCommand, FsParams, PartitionParam}
19 import org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.{OrcCompactionJob, ParquetCompactionJob}
20 import org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand
21 import org.locationtech.geomesa.jobs.JobResult.{JobFailure, JobSuccess}
22 import org.locationtech.geomesa.tools.Command.CommandException
23 import org.locationtech.geomesa.tools.DistributedRunParam.RunModes
24 import org.locationtech.geomesa.tools._
25 import org.locationtech.geomesa.tools.ingest.IngestCommand
26 import org.locationtech.geomesa.tools.utils.ParameterConverters.BytesConverter
27 import org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress
28 import org.locationtech.geomesa.utils.io.PathUtils
29 import org.locationtech.geomesa.utils.text.TextTools
30 
31 import java.util.Locale
32 import java.util.concurrent.{CountDownLatch, Executors}
33 import scala.util.control.NonFatal
34 
35 // need to mixin FsDistributedCommand to pick up base libjars file
36 class FsCompactCommand extends CompactCommand with FsDistributedCommand
37 
38 object FsCompactCommand {
39 
40   trait CompactCommand extends FsDataStoreCommand with DistributedCommand with LazyLogging {
41 
42     import scala.collection.JavaConverters._
43 
44     override val name: String = "compact"
45     override val params = new CompactParams
46 
47     override def libjarsFiles: Seq[String] = Seq("org/locationtech/geomesa/tools/ingest-libjars.list")
48 
49     override def execute(): Unit = withDataStore(compact)
50 
51     def compact(ds: FileSystemDataStore): Unit = {
52       Command.user.info("Beginning compaction process...")
53 
54       val storage = ds.storage(params.featureName)
55 
56       val toCompact = if (params.partitions.isEmpty) { storage.getPartitions } else {
57         val filtered = params.partitions.asScala.flatMap(storage.metadata.getPartition)
58         if (filtered.lengthCompare(params.partitions.size()) != 0) {
59           val unmatched = params.partitions.asScala.filterNot(name => filtered.exists(_.name == name))
60           throw new ParameterException(s"Partition(s) ${unmatched.mkString(", ")} cannot be found in metadata")
61         }
62         filtered
63       }
64 
65       val mode = params.mode.getOrElse {
66         if (PathUtils.isRemote(storage.context.root.toString)) { RunModes.Distributed } else { RunModes.Local }
67       }
68       val fileSize = Option(params.targetFileSize).map(_.longValue)
69 
70       Command.user.info(s"Compacting ${toCompact.size} partitions in ${mode.toString.toLowerCase(Locale.US)} mode")
71 
72       val start = System.currentTimeMillis()
73       val status = new PrintProgress(System.err, TextTools.buildString(' ', 60), '\u003d', '\u003e', '\u003e')
74 
75       mode match {
76         case RunModes.Local =>
77           val total = toCompact.length
78           val latch = new CountDownLatch(total)
79           val executor = Executors.newFixedThreadPool(math.max(1, math.min(params.threads, total)))
80 
81           try {
82             toCompact.foreach { p =>
83               executor.submit(
84                 new Runnable() {
85                   override def run(): Unit = {
86                     try {
87                       logger.info(s"Compacting ${p.name}")
88                       storage.compact(Some(p.name), fileSize)
89                     } catch {
90                       case NonFatal(e) => logger.error(s"Error processing partition '${p.name}':", e)
91                     } finally {
92                       latch.countDown()
93                     }
94                   }
95                 }
96               )
97             }
98           } finally {
99             executor.shutdown()
100           }
101 
102           while (latch.getCount > 0) {
103             Thread.sleep(1000)
104             status("", 1f - latch.getCount.toFloat / total, Seq.empty, done = false)
105           }
106           status("", 1f, Seq.empty, done = true)
107           Command.user.info("Compacting metadata")
108           storage.metadata.compact(None, None, math.max(1, params.threads))
109           Command.user.info(s"Local compaction complete in ${TextTools.getTime(start)}")
110 
111         case RunModes.Distributed =>
112           val encoding = storage.metadata.encoding
113           val job = if (ParquetFileSystemStorage.Encoding.equalsIgnoreCase(encoding)) {
114             new ParquetCompactionJob()
115           } else if (OrcFileSystemStorage.Encoding.equalsIgnoreCase(encoding)) {
116             new OrcCompactionJob()
117           } else {
118             throw new ParameterException(s"Compaction is not supported for encoding '$encoding'")
119           }
120           val tempDir = Option(params.tempPath).map(t => new Path(t))
121           job.run(storage, toCompact.toSeq, fileSize, tempDir, libjarsFiles, libjarsPaths, status) match {
122             case JobSuccess(message, counts) =>
123               Command.user.info(s"Distributed compaction complete in ${TextTools.getTime(start)}")
124               val success = counts(FileSystemCompactionJob.MappedCounter)
125               val failed = counts(FileSystemCompactionJob.FailedCounter)
126               Command.user.info(IngestCommand.getStatInfo(success, failed, "Compacted", message))
127 
128             case JobFailure(message) =>
129               Command.user.error(s"Distributed compaction failed in ${TextTools.getTime(start)}")
130               throw new CommandException(message)
131           }
132       }
133     }
134   }
135 
136   @Parameters(commandDescription = "Compact partitions")
137   class CompactParams extends FsParams
138       with RequiredTypeNameParam with TempPathParam with PartitionParam with DistributedRunParam {
139 
140     @Parameter(names = Array("-t", "--threads"), description = "Number of threads if using local mode")
141     var threads: Integer = 4
142 
143     @Parameter(
144       names = Array("--target-file-size"),
145       description = "Target size for data files",
146       converter = classOf[BytesConverter])
147     var targetFileSize: java.lang.Long = _
148   }
149 }
Line Stmt Id Pos Tree Symbol Tests Code
44 189 2249 - 2258 Literal <nosymbol> "compact"
45 190 2285 - 2302 Apply org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactParams.<init> new FsCompactCommand.this.CompactParams()
47 191 2349 - 2406 Apply scala.collection.generic.GenericCompanion.apply scala.collection.Seq.apply[String]("org/locationtech/geomesa/tools/ingest-libjars.list")
49 192 2457 - 2464 Apply org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand.compact CompactCommand.this.compact(ds)
49 193 2443 - 2465 Apply org.locationtech.geomesa.tools.DataStoreCommand.withDataStore CompactCommand.this.withDataStore[Unit]({ ((ds: org.locationtech.geomesa.fs.data.FileSystemDataStore) => CompactCommand.this.compact(ds)) })
52 194 2524 - 2576 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info("Beginning compaction process...")
54 195 2609 - 2627 Select org.locationtech.geomesa.tools.RequiredTypeNameParam.featureName CompactCommand.this.params.featureName
54 196 2598 - 2628 Apply org.locationtech.geomesa.fs.data.FileSystemDataStore.storage ds.storage(CompactCommand.this.params.featureName)
56 197 2656 - 2681 Apply java.util.List.isEmpty CompactCommand.this.params.partitions.isEmpty()
56 198 2685 - 2706 Select org.locationtech.geomesa.fs.storage.api.FileSystemStorage.getPartitions storage.getPartitions
56 199 2685 - 2706 Block org.locationtech.geomesa.fs.storage.api.FileSystemStorage.getPartitions storage.getPartitions
56 214 2714 - 3122 Block <nosymbol> { val filtered: scala.collection.mutable.Buffer[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata] = scala.collection.JavaConverters.asScalaBufferConverter[String](CompactCommand.this.params.partitions).asScala.flatMap[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata, scala.collection.mutable.Buffer[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata]]({ <synthetic> val eta$0$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata = storage.metadata; ((name: String) => scala.this.Option.option2Iterable[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata](eta$0$1.getPartition(name))) })(mutable.this.Buffer.canBuildFrom[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata]); if (filtered.lengthCompare(CompactCommand.this.params.partitions.size()).!=(0)) { val unmatched: scala.collection.mutable.Buffer[String] = scala.collection.JavaConverters.asScalaBufferConverter[String](CompactCommand.this.params.partitions).asScala.filterNot(((name: String) => filtered.exists(((x$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => x$1.name.==(name))))); throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition(s) ", " cannot be found in metadata").s(unmatched.mkString(", "))) } else (); filtered }
57 200 2739 - 2756 Select org.locationtech.geomesa.fs.tools.FsDataStoreCommand.PartitionParam.partitions CompactCommand.this.params.partitions
57 201 2773 - 2802 Apply org.locationtech.geomesa.fs.storage.api.StorageMetadata.getPartition eta$0$1.getPartition(name)
57 202 2773 - 2802 ApplyImplicitView scala.Option.option2Iterable scala.this.Option.option2Iterable[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata](eta$0$1.getPartition(name))
57 203 2772 - 2772 TypeApply scala.collection.mutable.Buffer.canBuildFrom mutable.this.Buffer.canBuildFrom[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata]
57 204 2739 - 2803 ApplyToImplicitArgs scala.collection.TraversableLike.flatMap scala.collection.JavaConverters.asScalaBufferConverter[String](CompactCommand.this.params.partitions).asScala.flatMap[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata, scala.collection.mutable.Buffer[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata]]({ <synthetic> val eta$0$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata = storage.metadata; ((name: String) => scala.this.Option.option2Iterable[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata](eta$0$1.getPartition(name))) })(mutable.this.Buffer.canBuildFrom[org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata])
58 205 2816 - 2869 Apply scala.Int.!= filtered.lengthCompare(CompactCommand.this.params.partitions.size()).!=(0)
58 211 2871 - 3097 Block <nosymbol> { val unmatched: scala.collection.mutable.Buffer[String] = scala.collection.JavaConverters.asScalaBufferConverter[String](CompactCommand.this.params.partitions).asScala.filterNot(((name: String) => filtered.exists(((x$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => x$1.name.==(name))))); throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition(s) ", " cannot be found in metadata").s(unmatched.mkString(", "))) }
58 212 2812 - 2812 Literal <nosymbol> ()
58 213 2812 - 2812 Block <nosymbol> ()
59 206 2899 - 2916 Select org.locationtech.geomesa.fs.tools.FsDataStoreCommand.PartitionParam.partitions CompactCommand.this.params.partitions
59 207 2959 - 2973 Apply java.lang.Object.== x$1.name.==(name)
59 208 2943 - 2974 Apply scala.collection.IterableLike.exists filtered.exists(((x$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => x$1.name.==(name)))
59 209 2899 - 2975 Apply scala.collection.TraversableLike.filterNot scala.collection.JavaConverters.asScalaBufferConverter[String](CompactCommand.this.params.partitions).asScala.filterNot(((name: String) => filtered.exists(((x$1: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => x$1.name.==(name)))))
60 210 2986 - 3087 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Partition(s) ", " cannot be found in metadata").s(unmatched.mkString(", ")))
65 221 3141 - 3284 Apply scala.Option.getOrElse CompactCommand.this.params.mode.getOrElse[org.locationtech.geomesa.tools.DistributedRunParam.RunModes.RunMode](if (org.locationtech.geomesa.utils.io.PathUtils.isRemote(storage.context.root.toString())) org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Distributed else org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Local)
66 215 3196 - 3225 Apply org.apache.hadoop.fs.Path.toString storage.context.root.toString()
66 216 3177 - 3226 Apply org.locationtech.geomesa.utils.io.PathUtils.isRemote org.locationtech.geomesa.utils.io.PathUtils.isRemote(storage.context.root.toString())
66 217 3230 - 3250 Select org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Distributed org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Distributed
66 218 3230 - 3250 Block org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Distributed org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Distributed
66 219 3260 - 3274 Select org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Local org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Local
66 220 3260 - 3274 Block org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Local org.locationtech.geomesa.tools.DistributedRunParam.RunModes.Local
68 222 3313 - 3334 Select org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactParams.targetFileSize CompactCommand.this.params.targetFileSize
68 223 3340 - 3351 Apply java.lang.Long.longValue x$2.longValue()
68 224 3306 - 3352 Apply scala.Option.map scala.Option.apply[Long](CompactCommand.this.params.targetFileSize).map[Long](((x$2: Long) => x$2.longValue()))
70 225 3380 - 3392 Literal <nosymbol> "Compacting "
70 226 3408 - 3424 Literal <nosymbol> " partitions in "
70 227 3462 - 3468 Literal <nosymbol> " mode"
70 228 3393 - 3407 Select scala.collection.SeqLike.size toCompact.size
70 229 3451 - 3460 Select java.util.Locale.US java.util.Locale.US
70 230 3425 - 3461 Apply java.lang.String.toLowerCase mode.toString().toLowerCase(java.util.Locale.US)
70 231 3378 - 3468 Apply scala.StringContext.s scala.StringContext.apply("Compacting ", " partitions in ", " mode").s(toCompact.size, mode.toString().toLowerCase(java.util.Locale.US))
70 232 3360 - 3469 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Compacting ", " partitions in ", " mode").s(toCompact.size, mode.toString().toLowerCase(java.util.Locale.US)))
72 233 3489 - 3515 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
73 234 3553 - 3563 Select java.lang.System.err java.lang.System.err
73 235 3565 - 3595 Apply org.locationtech.geomesa.utils.text.TextTools.buildString org.locationtech.geomesa.utils.text.TextTools.buildString(' ', 60)
73 236 3597 - 3605 Literal <nosymbol> '='
73 237 3607 - 3615 Literal <nosymbol> '>'
73 238 3617 - 3625 Literal <nosymbol> '>'
73 239 3535 - 3626 Apply org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress.<init> new org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress(java.lang.System.err, org.locationtech.geomesa.utils.text.TextTools.buildString(' ', 60), '=', '>', '>')
76 292 3675 - 4969 Block <nosymbol> { val total: Int = toCompact.length; val latch: java.util.concurrent.CountDownLatch = new java.util.concurrent.CountDownLatch(total); val executor: java.util.concurrent.ExecutorService = java.util.concurrent.Executors.newFixedThreadPool(scala.math.`package`.max(1, scala.math.`package`.min(scala.Predef.Integer2int(CompactCommand.this.params.threads), total))); try { toCompact.foreach[java.util.concurrent.Future[_]](((p: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => executor.submit({ final class $anon extends Object with Runnable { def <init>(): <$anon: Runnable> = { $anon.super.<init>(); () }; override def run(): Unit = try { (if (CompactCommand.this.logger.underlying.isInfoEnabled()) CompactCommand.this.logger.underlying.info("Compacting {}", (p.name: AnyRef)) else (): Unit); storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (CompactCommand.this.logger.underlying.isErrorEnabled()) CompactCommand.this.logger.underlying.error(scala.StringContext.apply("Error processing partition \'", "\':").s(p.name), e) else (): Unit) } finally latch.countDown() }; new $anon() }))) } finally executor.shutdown(); while$1(){ if (latch.getCount().>(0)) { { java.lang.Thread.sleep(1000L); status.apply("", 1.0.-(latch.getCount().toFloat./(total)), scala.collection.Seq.empty[Nothing], false) }; while$1() } else () }; status.apply("", 1.0, scala.collection.Seq.empty[Nothing], true); org.locationtech.geomesa.tools.`package`.Command.user.info("Compacting metadata"); storage.metadata.compact(scala.None, scala.None, scala.math.`package`.max(1, scala.Predef.Integer2int(CompactCommand.this.params.threads))); org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Local compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))) }
77 240 3700 - 3716 Select scala.collection.SeqLike.length toCompact.length
78 241 3739 - 3764 Apply java.util.concurrent.CountDownLatch.<init> new java.util.concurrent.CountDownLatch(total)
79 242 3828 - 3829 Literal <nosymbol> 1
79 243 3840 - 3854 Select org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactParams.threads CompactCommand.this.params.threads
79 244 3840 - 3854 ApplyImplicitView scala.Predef.Integer2int scala.Predef.Integer2int(CompactCommand.this.params.threads)
79 245 3831 - 3862 Apply scala.math.min scala.math.`package`.min(scala.Predef.Integer2int(CompactCommand.this.params.threads), total)
79 246 3819 - 3863 Apply scala.math.max scala.math.`package`.max(1, scala.math.`package`.min(scala.Predef.Integer2int(CompactCommand.this.params.threads), total))
79 247 3790 - 3864 Apply java.util.concurrent.Executors.newFixedThreadPool java.util.concurrent.Executors.newFixedThreadPool(scala.math.`package`.max(1, scala.math.`package`.min(scala.Predef.Integer2int(CompactCommand.this.params.threads), total)))
82 257 3894 - 4470 Apply scala.collection.IterableLike.foreach toCompact.foreach[java.util.concurrent.Future[_]](((p: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => executor.submit({ final class $anon extends Object with Runnable { def <init>(): <$anon: Runnable> = { $anon.super.<init>(); () }; override def run(): Unit = try { (if (CompactCommand.this.logger.underlying.isInfoEnabled()) CompactCommand.this.logger.underlying.info("Compacting {}", (p.name: AnyRef)) else (): Unit); storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (CompactCommand.this.logger.underlying.isErrorEnabled()) CompactCommand.this.logger.underlying.error(scala.StringContext.apply("Error processing partition \'", "\':").s(p.name), e) else (): Unit) } finally latch.countDown() }; new $anon() })))
82 258 3894 - 4470 Block scala.collection.IterableLike.foreach toCompact.foreach[java.util.concurrent.Future[_]](((p: org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata) => executor.submit({ final class $anon extends Object with Runnable { def <init>(): <$anon: Runnable> = { $anon.super.<init>(); () }; override def run(): Unit = try { (if (CompactCommand.this.logger.underlying.isInfoEnabled()) CompactCommand.this.logger.underlying.info("Compacting {}", (p.name: AnyRef)) else (): Unit); storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (CompactCommand.this.logger.underlying.isErrorEnabled()) CompactCommand.this.logger.underlying.error(scala.StringContext.apply("Error processing partition \'", "\':").s(p.name), e) else (): Unit) } finally latch.countDown() }; new $anon() })))
83 256 3933 - 4456 Apply java.util.concurrent.ExecutorService.submit executor.submit({ final class $anon extends Object with Runnable { def <init>(): <$anon: Runnable> = { $anon.super.<init>(); () }; override def run(): Unit = try { (if (CompactCommand.this.logger.underlying.isInfoEnabled()) CompactCommand.this.logger.underlying.info("Compacting {}", (p.name: AnyRef)) else (): Unit); storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3) } catch { case scala.util.control.NonFatal.unapply(<unapply-selector>) <unapply> ((e @ _)) => (if (CompactCommand.this.logger.underlying.isErrorEnabled()) CompactCommand.this.logger.underlying.error(scala.StringContext.apply("Error processing partition \'", "\':").s(p.name), e) else (): Unit) } finally latch.countDown() }; new $anon() })
84 255 3966 - 3969 Apply org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand.$anon.<init> new $anon()
86 251 4078 - 4176 Block <nosymbol> { (if (CompactCommand.this.logger.underlying.isInfoEnabled()) CompactCommand.this.logger.underlying.info("Compacting {}", (p.name: AnyRef)) else (): Unit); storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3) }
88 248 4158 - 4164 Select org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata.name p.name
88 249 4153 - 4165 Apply scala.Some.apply scala.Some.apply[String](p.name)
88 250 4137 - 4176 Apply org.locationtech.geomesa.fs.storage.api.Compactable.compact storage.compact(scala.Some.apply[String](p.name), fileSize, storage.compact$default$3)
90 252 4249 - 4308 Typed <nosymbol> (if (CompactCommand.this.logger.underlying.isErrorEnabled()) CompactCommand.this.logger.underlying.error(scala.StringContext.apply("Error processing partition \'", "\':").s(p.name), e) else (): Unit)
92 253 4363 - 4380 Apply java.util.concurrent.CountDownLatch.countDown latch.countDown()
92 254 4363 - 4380 Block java.util.concurrent.CountDownLatch.countDown latch.countDown()
99 259 4505 - 4524 Apply java.util.concurrent.ExecutorService.shutdown executor.shutdown()
99 260 4505 - 4524 Block java.util.concurrent.ExecutorService.shutdown executor.shutdown()
102 261 4555 - 4573 Apply scala.Long.> latch.getCount().>(0)
102 270 4575 - 4575 Apply org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand.while$1 while$1()
102 271 4575 - 4704 Block <nosymbol> { { java.lang.Thread.sleep(1000L); status.apply("", 1.0.-(latch.getCount().toFloat./(total)), scala.collection.Seq.empty[Nothing], false) }; while$1() }
102 272 4548 - 4548 Literal <nosymbol> ()
102 273 4548 - 4548 Block <nosymbol> ()
103 262 4589 - 4607 Apply java.lang.Thread.sleep java.lang.Thread.sleep(1000L)
104 263 4627 - 4629 Literal <nosymbol> ""
104 264 4631 - 4633 Literal <nosymbol> 1.0
104 265 4636 - 4666 Apply scala.Float./ latch.getCount().toFloat./(total)
104 266 4631 - 4666 Apply scala.Float.- 1.0.-(latch.getCount().toFloat./(total))
104 267 4668 - 4677 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
104 268 4686 - 4691 Literal <nosymbol> false
104 269 4620 - 4692 Apply org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress.apply status.apply("", 1.0.-(latch.getCount().toFloat./(total)), scala.collection.Seq.empty[Nothing], false)
106 274 4722 - 4724 Literal <nosymbol> ""
106 275 4726 - 4728 Literal <nosymbol> 1.0
106 276 4730 - 4739 TypeApply scala.collection.generic.GenericCompanion.empty scala.collection.Seq.empty[Nothing]
106 277 4748 - 4752 Literal <nosymbol> true
106 278 4715 - 4753 Apply org.locationtech.geomesa.tools.utils.TerminalCallback.PrintProgress.apply status.apply("", 1.0, scala.collection.Seq.empty[Nothing], true)
107 279 4764 - 4804 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info("Compacting metadata")
108 280 4840 - 4844 Select scala.None scala.None
108 281 4846 - 4850 Select scala.None scala.None
108 282 4861 - 4862 Literal <nosymbol> 1
108 283 4864 - 4878 Select org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactParams.threads CompactCommand.this.params.threads
108 284 4864 - 4878 ApplyImplicitView scala.Predef.Integer2int scala.Predef.Integer2int(CompactCommand.this.params.threads)
108 285 4852 - 4879 Apply scala.math.max scala.math.`package`.max(1, scala.Predef.Integer2int(CompactCommand.this.params.threads))
108 286 4815 - 4880 Apply org.locationtech.geomesa.fs.storage.api.Compactable.compact storage.metadata.compact(scala.None, scala.None, scala.math.`package`.max(1, scala.Predef.Integer2int(CompactCommand.this.params.threads)))
109 287 4911 - 4941 Literal <nosymbol> "Local compaction complete in "
109 288 4967 - 4968 Literal <nosymbol> ""
109 289 4942 - 4966 Apply org.locationtech.geomesa.utils.text.TextTools.getTime org.locationtech.geomesa.utils.text.TextTools.getTime(start)
109 290 4909 - 4968 Apply scala.StringContext.s scala.StringContext.apply("Local compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))
109 291 4891 - 4969 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Local compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start)))
111 329 5005 - 6200 Block <nosymbol> { val encoding: String = storage.metadata.encoding; val job: org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob = if (org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.Encoding.equalsIgnoreCase(encoding)) new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.ParquetCompactionJob() else if (org.locationtech.geomesa.fs.storage.orc.OrcFileSystemStorage.Encoding.equalsIgnoreCase(encoding)) new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob() else throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Compaction is not supported for encoding \'", "\'").s(encoding)); val tempDir: Option[org.apache.hadoop.fs.Path] = scala.Option.apply[String](CompactCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((t: String) => new org.apache.hadoop.fs.Path(t))); job.run(storage, toCompact.toSeq, fileSize, tempDir, CompactCommand.this.libjarsFiles, CompactCommand.this.libjarsPaths, status) match { case (message: String, counts: Map[String,Long])org.locationtech.geomesa.jobs.JobResult.JobSuccess((message @ _), (counts @ _)) => { org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Distributed compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))); val success: Long = counts.apply(FileSystemCompactionJob.MappedCounter); val failed: Long = counts.apply(FileSystemCompactionJob.FailedCounter); org.locationtech.geomesa.tools.`package`.Command.user.info(org.locationtech.geomesa.tools.ingest.IngestCommand.getStatInfo(success, failed, "Compacted", message)) } case (message: String)org.locationtech.geomesa.jobs.JobResult.JobFailure((message @ _)) => { org.locationtech.geomesa.tools.`package`.Command.user.error(scala.StringContext.apply("Distributed compaction failed in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))); throw new org.locationtech.geomesa.tools.`package`.Command.CommandException(message) } } }
112 293 5033 - 5058 Select org.locationtech.geomesa.fs.storage.api.StorageMetadata.encoding storage.metadata.encoding
113 294 5083 - 5143 Apply java.lang.String.equalsIgnoreCase org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.Encoding.equalsIgnoreCase(encoding)
114 295 5159 - 5185 Apply org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.ParquetCompactionJob.<init> new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.ParquetCompactionJob()
114 296 5159 - 5185 Block org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.ParquetCompactionJob.<init> new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.ParquetCompactionJob()
115 297 5207 - 5263 Apply java.lang.String.equalsIgnoreCase org.locationtech.geomesa.fs.storage.orc.OrcFileSystemStorage.Encoding.equalsIgnoreCase(encoding)
115 302 5203 - 5430 If <nosymbol> if (org.locationtech.geomesa.fs.storage.orc.OrcFileSystemStorage.Encoding.equalsIgnoreCase(encoding)) new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob() else throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Compaction is not supported for encoding \'", "\'").s(encoding))
116 298 5279 - 5301 Apply org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob.<init> new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob()
116 299 5279 - 5301 Block org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob.<init> new org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.OrcCompactionJob()
118 300 5333 - 5418 Throw <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Compaction is not supported for encoding \'", "\'").s(encoding))
118 301 5333 - 5418 Block <nosymbol> throw new com.beust.jcommander.ParameterException(scala.StringContext.apply("Compaction is not supported for encoding \'", "\'").s(encoding))
120 303 5462 - 5477 Select org.locationtech.geomesa.tools.TempPathParam.tempPath CompactCommand.this.params.tempPath
120 304 5488 - 5499 Apply org.apache.hadoop.fs.Path.<init> new org.apache.hadoop.fs.Path(t)
120 305 5455 - 5500 Apply scala.Option.map scala.Option.apply[String](CompactCommand.this.params.tempPath).map[org.apache.hadoop.fs.Path](((t: String) => new org.apache.hadoop.fs.Path(t)))
121 306 5528 - 5543 Select scala.collection.SeqLike.toSeq toCompact.toSeq
121 307 5564 - 5576 Select org.locationtech.geomesa.fs.tools.compact.FsCompactCommand.CompactCommand.libjarsFiles CompactCommand.this.libjarsFiles
121 308 5578 - 5590 Select org.locationtech.geomesa.tools.DistributedCommand.libjarsPaths CompactCommand.this.libjarsPaths
121 309 5511 - 5599 Apply org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.run job.run(storage, toCompact.toSeq, fileSize, tempDir, CompactCommand.this.libjarsFiles, CompactCommand.this.libjarsPaths, status)
122 321 5653 - 5999 Block <nosymbol> { org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Distributed compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))); val success: Long = counts.apply(FileSystemCompactionJob.MappedCounter); val failed: Long = counts.apply(FileSystemCompactionJob.FailedCounter); org.locationtech.geomesa.tools.`package`.Command.user.info(org.locationtech.geomesa.tools.ingest.IngestCommand.getStatInfo(success, failed, "Compacted", message)) }
123 310 5690 - 5726 Literal <nosymbol> "Distributed compaction complete in "
123 311 5752 - 5753 Literal <nosymbol> ""
123 312 5727 - 5751 Apply org.locationtech.geomesa.utils.text.TextTools.getTime org.locationtech.geomesa.utils.text.TextTools.getTime(start)
123 313 5688 - 5753 Apply scala.StringContext.s scala.StringContext.apply("Distributed compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))
123 314 5670 - 5754 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info(scala.StringContext.apply("Distributed compaction complete in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start)))
124 315 5790 - 5827 Select org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.MappedCounter FileSystemCompactionJob.MappedCounter
124 316 5783 - 5828 Apply scala.collection.MapLike.apply counts.apply(FileSystemCompactionJob.MappedCounter)
125 317 5863 - 5900 Select org.locationtech.geomesa.fs.tools.compact.FileSystemCompactionJob.FailedCounter FileSystemCompactionJob.FailedCounter
125 318 5856 - 5901 Apply scala.collection.MapLike.apply counts.apply(FileSystemCompactionJob.FailedCounter)
126 319 5934 - 5998 Apply org.locationtech.geomesa.tools.ingest.IngestCommand.getStatInfo org.locationtech.geomesa.tools.ingest.IngestCommand.getStatInfo(success, failed, "Compacted", message)
126 320 5916 - 5999 Apply org.slf4j.Logger.info org.locationtech.geomesa.tools.`package`.Command.user.info(org.locationtech.geomesa.tools.ingest.IngestCommand.getStatInfo(success, failed, "Compacted", message))
128 328 6038 - 6188 Block <nosymbol> { org.locationtech.geomesa.tools.`package`.Command.user.error(scala.StringContext.apply("Distributed compaction failed in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))); throw new org.locationtech.geomesa.tools.`package`.Command.CommandException(message) }
129 322 6076 - 6110 Literal <nosymbol> "Distributed compaction failed in "
129 323 6136 - 6137 Literal <nosymbol> ""
129 324 6111 - 6135 Apply org.locationtech.geomesa.utils.text.TextTools.getTime org.locationtech.geomesa.utils.text.TextTools.getTime(start)
129 325 6074 - 6137 Apply scala.StringContext.s scala.StringContext.apply("Distributed compaction failed in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start))
129 326 6055 - 6138 Apply org.slf4j.Logger.error org.locationtech.geomesa.tools.`package`.Command.user.error(scala.StringContext.apply("Distributed compaction failed in ", "").s(org.locationtech.geomesa.utils.text.TextTools.getTime(start)))
130 327 6153 - 6188 Throw <nosymbol> throw new org.locationtech.geomesa.tools.`package`.Command.CommandException(message)
141 330 6547 - 6548 ApplyImplicitView scala.Predef.int2Integer scala.Predef.int2Integer(4)