AbstractGeoIndex.java

/***********************************************************************
 * Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Apache License, Version 2.0
 * which accompanies this distribution and is available at
 * https://www.apache.org/licenses/LICENSE-2.0
 ***********************************************************************/

package org.locationtech.geomesa.memory.cqengine.index;

import com.googlecode.cqengine.attribute.Attribute;
import com.googlecode.cqengine.index.Index;
import com.googlecode.cqengine.index.support.AbstractAttributeIndex;
import com.googlecode.cqengine.index.support.indextype.OnHeapTypeIndex;
import com.googlecode.cqengine.persistence.support.ObjectSet;
import com.googlecode.cqengine.persistence.support.ObjectStore;
import com.googlecode.cqengine.query.Query;
import com.googlecode.cqengine.query.option.QueryOptions;
import com.googlecode.cqengine.resultset.ResultSet;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.locationtech.geomesa.memory.cqengine.query.Intersects;
import org.locationtech.geomesa.memory.index.SpatialIndex;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.runtime.AbstractFunction1;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public abstract class AbstractGeoIndex<A extends Geometry, O extends SimpleFeature>
      extends AbstractAttributeIndex<A, O> implements OnHeapTypeIndex {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGeoIndex.class);

    private static final int INDEX_RETRIEVAL_COST = 40;

    private final SpatialIndex<O> index;
    private final int geomAttributeIndex;

    public static final ThreadLocal<SpatialIndex<? extends SimpleFeature>> lastUsed = new ThreadLocal<>();

    static Set<Class<? extends Query>> supportedQueries = new HashSet<Class<? extends Query>>() {{
        add(Intersects.class);
    }};

    AbstractGeoIndex(SimpleFeatureType sft, Attribute<O, A> attribute, SpatialIndex<O> index) {
        super(attribute, supportedQueries);
        this.index = index;
        this.geomAttributeIndex = sft.indexOf(attribute.getAttributeName());
    }


    @Override
    public void init(ObjectStore<O> objectStore, QueryOptions queryOptions) {
        addAll(ObjectSet.fromObjectStore(objectStore, queryOptions), queryOptions);
    }

    @Override
    public boolean addAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
        try {
            boolean modified = false;

            for (O object : objectSet) {
                Geometry geom = (Geometry) object.getDefaultGeometry();
                index.insert(geom, object.getID(), object);
                modified = true;
            }

            return modified;
        } finally {
            objectSet.close();
        }
    }

    @Override
    public boolean removeAll(ObjectSet<O> objectSet, QueryOptions queryOptions) {
        try {
            boolean modified = false;

            for (O object : objectSet) {
                Geometry geom = (Geometry) object.getDefaultGeometry();
                index.remove(geom, object.getID());
                modified = true;
            }

            return modified;
        } finally {
            objectSet.close();
        }
    }

    @Override
    public void clear(QueryOptions queryOptions) {
        this.index.clear();
    }

    public void destroy(QueryOptions queryOptions) {
        this.index.clear();
    }

    @Override
    public ResultSet<O> retrieve(final Query<O> query, final QueryOptions queryOptions) {
        lastUsed.set(this.index);
        return new GeoIndexResultSet(query, queryOptions);
    }

    private scala.collection.Iterator<O> getSimpleFeatureIteratorInternal(Intersects query,
                                                                          final QueryOptions queryOptions) {
        final Intersects intersects = query;
        Envelope queryEnvelope = intersects.getEnvelope();
        return index.query(queryEnvelope).filter((Function1<O, Object>) new AbstractFunction1<SimpleFeature, Object>() {
            @Override
            public Object apply(SimpleFeature feature) {
                try {
                    Geometry geom = (Geometry) feature.getAttribute(geomAttributeIndex);
                    return intersects.matchesValue(geom, queryOptions);
                } catch (Exception e) {
                    LOGGER.warn("Caught exception while trying to look up geometry", e);
                    return false;
                }
            }
        });
    }


    @Override
    public boolean isMutable() {
        return true;
    }

    @Override
    public boolean isQuantized() {
        return false;
    }

    @Override
    public Index<O> getEffectiveIndex() {
        return this;
    }

    public class GeoIndexResultSet extends ResultSet<O> {

        private final Query<O> query;
        private final QueryOptions queryOptions;

        public GeoIndexResultSet(Query<O> query, QueryOptions queryOptions) {
            this.query = query;
            this.queryOptions = queryOptions;
        }

        @Override
        public Iterator<O> iterator() {
            scala.collection.Iterator<O> iter =
                  getSimpleFeatureIteratorInternal((Intersects) query, queryOptions);
            return new Iterator<O>() {
                @Override
                public boolean hasNext() {
                    return iter.hasNext();
                }

                @Override
                public O next() {
                    return iter.next();
                }
            };
        }

        @Override
        public boolean contains(O object) {
            final Intersects intersects = (Intersects) query;
            Geometry geom = (Geometry) object.getAttribute(geomAttributeIndex);
            return intersects.matchesValue(geom, queryOptions);
        }

        @Override
        public boolean matches(O object) {
            return query.matches(object, queryOptions);
        }

        @Override
        public Query<O> getQuery() {
            return query;
        }

        @Override
        public QueryOptions getQueryOptions() {
            return queryOptions;
        }

        @Override
        public int getRetrievalCost() {
            return INDEX_RETRIEVAL_COST;
        }

        // Returning the size here as the MergeCost.
        // The geoindex size isn't optimal, so there might be a better
        // measure of this.
        @Override
        public int getMergeCost() {
            return size();
        }

        @Override
        public int size() {
            return getSimpleFeatureIteratorInternal((Intersects) query, queryOptions).size();
        }

        @Override
        public void close() {
        }
    }
}