Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexEditor;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexEditorContext;
import org.apache.jackrabbit.oak.plugins.index.search.spi.query.FulltextIndex;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.TextExtractionStatsMBean;
Expand Down Expand Up @@ -384,6 +385,9 @@ private void activate(BundleContext bundleContext, Configuration config) throws
oakRegs.add(whiteboard.register(FeatureToggle.class,
new FeatureToggle(FulltextIndexEditor.FT_OAK_12193, FulltextIndexEditor.FT_OAK_12193_DISABLE),
emptyMap()));
oakRegs.add(whiteboard.register(FeatureToggle.class,
new FeatureToggle(FulltextIndexEditorContext.FT_OAK_12247, FulltextIndexEditorContext.FT_OAK_12247_DISABLE),
emptyMap()));
initializeIndexDir(bundleContext, config);
initializeExtractedTextCache(bundleContext, config, statisticsProvider);
tracker = createTracker(bundleContext, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@

public interface LuceneIndexWriterFactory extends FulltextIndexWriterFactory<Iterable<? extends IndexableField>>, AutoCloseable {
@Override
LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean reindex);
LuceneIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder,
CommitInfo commitInfo, boolean reindex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class DefaultIndexWriter implements LuceneIndexWriter {
private Directory directory;
private long genAtStart = -1;
private boolean indexUpdated = false;
private long totalDocCount = -1L;

public DefaultIndexWriter(LuceneIndexDefinition definition, NodeBuilder definitionBuilder,
DirectoryFactory directoryFactory, String dirName, String suggestDirName,
Expand Down Expand Up @@ -144,6 +145,12 @@ public boolean close(long timestamp) throws IOException {
PERF_LOGGER.end(start, -1, "Completed suggester for directory {}", definition);
}

// OAK-12247: commit() applies all pending delete queries before close(),
// making writer.numDocs() accurate for totalIndexedNodes tracking.
// close() calls commit() internally so this adds no extra I/O.
writer.commit();
totalDocCount = writer.numDocs();

writer.close();
PERF_LOGGER.end(start, -1, "Closed writer for directory {}", definition);

Expand Down Expand Up @@ -296,6 +303,11 @@ private static void trackIndexSizeInfo(@NotNull IndexWriter writer,
log.trace("Directory overall size: {}, files: {}", IOUtils.humanReadableByteCount(overallSize), sb);
}

@Override
public long getTotalDocCount() {
return totalDocCount;
}

@Override
public String toString() {
return "DefaultIndexWriter{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ private DefaultIndexWriter createWriter(Mount m) {
suggestDirName, reindex, writerConfig);
}

@Override
public long getTotalDocCount() {
if (writers.isEmpty()) {
return -1L;
}
return writers.values().stream()
.mapToLong(LuceneIndexWriter::getTotalDocCount)
.filter(c -> c >= 0)
.sum();
}

@Override
public String toString() {
return "MultiplexingIndexWriter{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public String formatStatistics() {
"deletes: " + deleteCount +
"]";
}

@Override
public long getTotalDocCount() {
return delegateWriter.getTotalDocCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ public void useUpdateForNormalIndexing() throws Exception{
writer.close(100);
}

/**
* Verifies that {@code getTotalDocCount()} returns the correct document count
* after the writer is closed (OAK-12247).
*/
@Test
public void getTotalDocCountReflectsIndexedDocuments() throws Exception {
FSDirectoryFactory fsdir = new FSDirectoryFactory(folder.getRoot());
LuceneIndexDefinition defn = new LuceneIndexDefinition(root, builder.getNodeState(), "/foo");
DefaultIndexWriter writer = new DefaultIndexWriter(defn, builder,
fsdir, INDEX_DATA_CHILD_NAME, SUGGEST_DATA_CHILD_NAME, true, writerConfig);

Document doc1 = new Document();
doc1.add(newPathField("/a/b"));
writer.updateDocument("/a/b", doc1);

Document doc2 = new Document();
doc2.add(newPathField("/a/c"));
writer.updateDocument("/a/c", doc2);

writer.close(0);

assertEquals("getTotalDocCount() should return 2 after indexing 2 documents", 2L, writer.getTotalDocCount());
}

private DefaultIndexWriter createWriter(LuceneIndexDefinition defn, boolean reindex) {
return new DefaultIndexWriter(defn, builder,
new DefaultDirectoryFactory(null, null), INDEX_DATA_CHILD_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -66,6 +67,7 @@ static class IndexInfo {
public final NodeBuilder definitionBuilder;
public final boolean waitForESAcknowledgement;
public final boolean isRealTime;
public final LongAdder docCount;
/**
* Exceptions occurred while trying to update index in elasticsearch
*/
Expand All @@ -77,11 +79,16 @@ static class IndexInfo {
boolean indexModified = false;

IndexInfo(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, boolean waitForESAcknowledgement, boolean isRealTime) {
this(indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement, isRealTime, new LongAdder());
}

IndexInfo(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, boolean waitForESAcknowledgement, boolean isRealTime, LongAdder docCount) {
this.indexName = indexName;
this.indexDefinition = indexDefinition;
this.definitionBuilder = definitionBuilder;
this.waitForESAcknowledgement = waitForESAcknowledgement;
this.isRealTime = isRealTime;
this.docCount = docCount;
}
}

Expand Down Expand Up @@ -180,6 +187,13 @@ public ElasticBulkProcessorHandler(@NotNull ElasticConnection elasticConnection)
* This option is available for sync index definitions only.
*/
public void registerIndex(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean waitForESAcknowledgement) {
registerIndex(indexName, indexDefinition, definitionBuilder, commitInfo, waitForESAcknowledgement, new LongAdder());
}

/**
* Registers an ElasticIndex with the given index definition configuration and document-count tracker.
*/
public void registerIndex(String indexName, ElasticIndexDefinition indexDefinition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean waitForESAcknowledgement, LongAdder docCount) {
checkOpen();
if (registeredIndexes.containsKey(indexName)) {
LOG.warn("Index already registered: {}", indexName);
Expand All @@ -205,7 +219,7 @@ public void registerIndex(String indexName, ElasticIndexDefinition indexDefiniti
} else {
isRealTime = false;
}
return new IndexInfo(indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement, isRealTime);
return new IndexInfo(indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement, isRealTime, docCount);
});
}
}
Expand Down Expand Up @@ -461,6 +475,13 @@ public void afterBulk(long executionId, BulkRequest request, List<OperationConte
BulkResponseItem item = response.items().get(i);
if (item.error() == null) {
indexInfo.indexModified = true;
String result = item.result();
if ("created".equals(result)) {
indexInfo.docCount.increment();
} else if ("deleted".equals(result)) {
indexInfo.docCount.add(-1L);
}
// "updated" → no count change
} else {
if (failOnIndexingError && indexInfo.suppressedErrorCauses.size() < MAX_SUPPRESSED_ERROR_CAUSES) {
indexInfo.suppressedErrorCauses.add(item.error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.ArrayList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.LongAdder;

class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
private static final Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
Expand All @@ -65,6 +66,7 @@ class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
private final boolean reindex;
private final String indexName;
private final ElasticRetryPolicy retryPolicy;
private final LongAdder docCount;

ElasticIndexWriter(@NotNull ElasticIndexTracker indexTracker,
@NotNull ElasticConnection elasticConnection,
Expand All @@ -80,6 +82,10 @@ class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
this.bulkProcessorHandler = bulkProcessorHandler;
this.retryPolicy = retryPolicy;

long prevTotal = reindex ? 0L : Math.max(0L, indexDefinition.getTotalIndexedNodes());
this.docCount = new LongAdder();
this.docCount.add(prevTotal);

// We don't use stored index definitions with elastic. Every time a new writer gets created we
// use the actual index name (based on the current seed) while reindexing, or the alias (pointing to the
// old index until the new one gets enabled) during incremental reindexing
Expand Down Expand Up @@ -115,7 +121,7 @@ class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
waitForESAcknowledgement = false;
}
}
bulkProcessorHandler.registerIndex(indexName, indexDefinition, definitionBuilder, commitInfo, waitForESAcknowledgement);
bulkProcessorHandler.registerIndex(indexName, indexDefinition, definitionBuilder, commitInfo, waitForESAcknowledgement, docCount);
}

@TestOnly
Expand All @@ -140,6 +146,7 @@ class ElasticIndexWriter implements FulltextIndexWriter<ElasticDocument> {
this.indexName = indexDefinition.getIndexAlias();
this.retryPolicy = retryPolicy;
this.reindex = reindex;
this.docCount = new LongAdder();
}

@Override
Expand Down Expand Up @@ -167,11 +174,14 @@ public void updateDocument(String path, ElasticDocument doc) throws IOException

@Override
public void deleteDocuments(String path) throws IOException {
// Direct bulk delete: queues exactly 1 delete for the document AT `path`, matched by its
// document ID (derived from the path). Stats for this delete are incremented via
// OakBulkListener.afterBulk() when the response arrives (result="deleted").
retryPolicy.withRetries(() -> bulkProcessorHandler.delete(indexName, ElasticIndexUtils.idFromPath(path)));
if (!ElasticIndexEditorProvider.FT_OAK_12206_DISABLE.get()) {
// Delete all descendants: mirrors Lucene's PrefixQuery on the path term.
// The :ancestors field is indexed with path_hierarchy, so a term query on `path`
// matches every document whose ancestor chain includes that path.
// :ancestors stores parent paths only (not self), so deleteByQuery targets
// strict descendants; the direct bulk delete covers the node itself.
// The ES Bulk API does not support delete by query, so we need to issue a separate request.
// This is not ideal but should be ok since deletes are expected to be less frequent than updates.
// The alternative would be to get the list of affected documents and issue a bulk delete by id,
Expand All @@ -182,6 +192,7 @@ public void deleteDocuments(String path) throws IOException {
response.failures().forEach(f -> LOG.warn("Failed to delete descendants of {}: shard {} reason {}", path, f.id(), f.cause()));
if (response.deleted() != null && response.deleted() > 0) {
LOG.info("Deleted {} descendants of {} in {} ms", response.deleted(), path, response.took());
docCount.add(-response.deleted());
}
});
}
Expand All @@ -203,6 +214,11 @@ public boolean close(long timestamp) throws IOException {
return updateStatus;
}

@Override
public long getTotalDocCount() {
return docCount.sum();
}

private void saveMetrics() {
ElasticIndexNode indexNode = indexTracker.acquireIndexNode(indexDefinition.getIndexPath());
if (indexNode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class IndexDefinition implements Aggregate.AggregateMapper {

public static final String CREATION_TIMESTAMP = "creationTimestamp";
public static final String REINDEX_COMPLETION_TIMESTAMP = "reindexCompletionTimestamp";
public static final String PROP_TOTAL_INDEXED_NODES = "totalIndexedNodes";

/**
* Meta property which provides the unique id
Expand Down Expand Up @@ -746,6 +747,23 @@ public String getUniqueId() {
return uid;
}

/**
* Returns the total number of documents in the index as of the last
* completed indexing cycle, or {@code -1} if never recorded.
*/
public long getTotalIndexedNodes() {
PropertyState prop = definition.getChildNode(STATUS_NODE).getProperty(PROP_TOTAL_INDEXED_NODES);
return prop != null ? prop.getValue(Type.LONG) : -1L;
}

/**
* Returns {@code true} if at least one full reindex cycle has completed
* (i.e. {@code REINDEX_COMPLETION_TIMESTAMP} is present in {@code :status}).
*/
public boolean isReindexCompleted() {
return definition.getChildNode(STATUS_NODE).hasProperty(REINDEX_COMPLETION_TIMESTAMP);
}

public boolean isNRTIndexingEnabled() {
return nrtIndexMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.IOException;
import java.util.Calendar;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;
import static org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROP_RANDOM_SEED;
Expand All @@ -61,6 +62,14 @@ public abstract class FulltextIndexEditorContext<D> {
private static final PerfLogger PERF_LOGGER =
new PerfLogger(LoggerFactory.getLogger(FulltextIndexEditorContext.class.getName() + ".perf"));

public static final String FT_OAK_12247 = "FT_OAK-12247";

/**
* Kill switch for OAK-12247 totalIndexedNodes tracking. Default {@code false}
* (tracking active). Set to {@code true} to revert to legacy behaviour.
*/
public static final AtomicBoolean FT_OAK_12247_DISABLE = new AtomicBoolean(false);

protected IndexDefinition definition;

protected final NodeBuilder definitionBuilder;
Expand Down Expand Up @@ -154,7 +163,8 @@ public void setPropertyUpdateCallback(PropertyUpdateCallback propertyUpdateCallb
public void closeWriter() throws IOException {
Calendar currentTime = getCalendar();
final long start = PERF_LOGGER.start();
boolean indexUpdated = getWriter().close(currentTime.getTimeInMillis());
FulltextIndexWriter<D> writer = getWriter(); // OAK-12247: local ref needed for getTotalDocCount() after close
boolean indexUpdated = writer.close(currentTime.getTimeInMillis());

if (indexUpdated) {
PERF_LOGGER.end(start, -1, "Closed writer for directory {}", definition);
Expand All @@ -175,6 +185,25 @@ public void closeWriter() throws IOException {
textExtractor.done(reindex);
}
}

// OAK-12247: persist totalIndexedNodes and fix the Elastic empty-reindex gap
// (empty reindex returns indexUpdated=false, so the block above never runs and
// REINDEX_COMPLETION_TIMESTAMP would not be written — planner has no signal).
// When indexUpdated=true the legacy block already wrote REINDEX_COMPLETION_TIMESTAMP,
// so we only write it here for the !indexUpdated && reindex case.
if (!FT_OAK_12247_DISABLE.get() && (indexUpdated || reindex)) {
NodeBuilder status = definitionBuilder.child(IndexDefinition.STATUS_NODE);
long total = writer.getTotalDocCount();
if (total >= 0) {
status.setProperty(IndexDefinition.PROP_TOTAL_INDEXED_NODES, total);
}
if (!indexUpdated && reindex) {
status.setProperty(IndexDefinition.REINDEX_COMPLETION_TIMESTAMP,
ISO8601.format(currentTime), Type.DATE);
log.info("{} set for index: {}", IndexDefinition.REINDEX_COMPLETION_TIMESTAMP,
definition.getIndexPath());
}
}
}

private String getUpdatedTime(Calendar currentTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ public interface FulltextIndexWriter<D> {
* @return true if index was updated or any write happened.
*/
boolean close(long timestamp) throws IOException;

/**
* Returns the total number of documents in the index after this writer is closed,
* or {@code -1} if not tracked by this implementation.
*/
default long getTotalDocCount() {
return -1L;
}
}
Loading