Skip to content

Commit ae4f479

Browse files
fhuaulmefhuaulme-sfdcdsmiley
authored
SOLR-18189: Skip indexing duplicate docs via a content hash URP (apache#4263)
Co-authored-by: fhuaulme <fhuaulme@salesforce.com> Co-authored-by: David Smiley <dsmiley@apache.org>
1 parent cac69ae commit ae4f479

9 files changed

Lines changed: 1124 additions & 2 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
title: New ContentHashVersionProcessor to avoid index churn when adding same-content documents.
2+
type: added
3+
authors:
4+
- name: Francois Huaulme
5+
- name: David Smiley
6+
links:
7+
- name: SOLR-18189
8+
url: https://issues.apache.org/jira/browse/SOLR-18189
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.solr.update.processor;
19+
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.util.Arrays;
23+
import java.util.Collection;
24+
import java.util.Comparator;
25+
import java.util.Optional;
26+
import java.util.Set;
27+
import java.util.function.Predicate;
28+
import org.apache.lucene.util.BytesRef;
29+
import org.apache.solr.common.SolrInputDocument;
30+
import org.apache.solr.common.SolrInputField;
31+
import org.apache.solr.core.SolrCore;
32+
import org.apache.solr.handler.component.RealTimeGetComponent;
33+
import org.apache.solr.handler.component.RealTimeGetComponent.Resolution;
34+
import org.apache.solr.request.SolrQueryRequest;
35+
import org.apache.solr.response.SolrQueryResponse;
36+
import org.apache.solr.schema.IndexSchema;
37+
import org.apache.solr.schema.SchemaField;
38+
import org.apache.solr.update.AddUpdateCommand;
39+
40+
/**
41+
* An implementation of {@link UpdateRequestProcessor} which computes a hash of field values, and
42+
* uses this hash to reject/accept document updates.
43+
*
44+
* <ul>
45+
* <li>When no corresponding document with same id exists (create), the computed hash is added to
46+
* the document.
47+
* <li>When a previous document exists (update), a new hash is computed from the incoming field
48+
* values and compared with the stored hash.
49+
* </ul>
50+
*
51+
* <p>Depending on {#dropSameDocuments} value, this processor may drop or accept document updates.
52+
* This implementation can be used for monitoring or dropping no-op updates (updates that do not
53+
* change the Solr document content).
54+
*
55+
* <p>Note: the hash is computed using {@link Lookup3Signature} and must be stored in a field with
56+
* docValues enabled for retrieval.
57+
*
58+
* @see Lookup3Signature
59+
*/
60+
public class ContentHashVersionProcessor extends UpdateRequestProcessor {
61+
private final SchemaField hashField;
62+
private final SolrQueryResponse rsp;
63+
private final SolrCore core;
64+
private final Predicate<SolrInputField> includedFields; // Matcher for included fields in hash
65+
private final Predicate<SolrInputField> excludedFields; // Matcher for excluded fields from hash
66+
private boolean dropSameDocuments;
67+
private int sameCount = 0;
68+
private int differentCount = 0;
69+
70+
public ContentHashVersionProcessor(
71+
Predicate<SolrInputField> hashIncludedFields,
72+
Predicate<SolrInputField> hashExcludedFields,
73+
String hashFieldName,
74+
boolean dropSameDocuments,
75+
SolrQueryRequest req,
76+
SolrQueryResponse rsp,
77+
UpdateRequestProcessor next) {
78+
super(next);
79+
this.core = req.getCore();
80+
81+
IndexSchema schema = core.getLatestSchema();
82+
this.hashField = schema.getField(hashFieldName);
83+
this.dropSameDocuments = dropSameDocuments;
84+
this.rsp = rsp;
85+
this.includedFields = hashIncludedFields;
86+
this.excludedFields = hashExcludedFields;
87+
}
88+
89+
@Override
90+
public void processAdd(AddUpdateCommand cmd) throws IOException {
91+
SolrInputDocument newDoc = cmd.getSolrInputDocument();
92+
byte[] newHash = computeDocHash(newDoc);
93+
newDoc.setField(hashField.getName(), newHash);
94+
95+
if (!isHashAcceptable(cmd.getIndexedId(), newHash)) {
96+
return;
97+
}
98+
super.processAdd(cmd);
99+
}
100+
101+
@Override
102+
public void finish() throws IOException {
103+
try {
104+
super.finish();
105+
} finally {
106+
if (sameCount + differentCount > 0) {
107+
if (dropSameDocuments) {
108+
rsp.addToLog("contentHash.duplicatesDropped", sameCount);
109+
} else {
110+
rsp.addToLog("contentHash.duplicatesDetected", sameCount);
111+
}
112+
}
113+
}
114+
}
115+
116+
private boolean isHashAcceptable(BytesRef indexedDocId, byte[] newHash) throws IOException {
117+
assert null != indexedDocId;
118+
119+
Optional<byte[]> oldDocHash = getOldDocHash(indexedDocId);
120+
if (oldDocHash.isPresent()) {
121+
if (Arrays.equals(newHash, oldDocHash.get())) {
122+
sameCount++;
123+
return !dropSameDocuments;
124+
} else {
125+
differentCount++;
126+
return true;
127+
}
128+
}
129+
return true; // Doc not found
130+
}
131+
132+
/** Retrieves the hash value from the old document identified by the given ID. */
133+
private Optional<byte[]> getOldDocHash(BytesRef indexedDocId) throws IOException {
134+
SolrInputDocument oldDoc =
135+
RealTimeGetComponent.getInputDocument(
136+
core, indexedDocId, indexedDocId, null, Set.of(hashField.getName()), Resolution.DOC);
137+
if (oldDoc == null) {
138+
return Optional.empty();
139+
}
140+
Object o = oldDoc.getFieldValue(hashField.getName());
141+
if (o instanceof byte[] bytes) {
142+
return Optional.of(bytes);
143+
} else if (o instanceof ByteBuffer buf) {
144+
byte[] bytes = new byte[buf.remaining()];
145+
buf.duplicate().get(bytes);
146+
return Optional.of(bytes);
147+
}
148+
return Optional.empty();
149+
}
150+
151+
byte[] computeDocHash(SolrInputDocument doc) {
152+
final Signature sig = new Lookup3Signature();
153+
154+
// Stream field names, filter, sort, and process in a single pass
155+
doc.values().stream()
156+
.filter(includedFields) // Keep fields that match 'included fields' matcher
157+
.filter(excludedFields.negate()) // Exclude fields that match 'excluded fields' matcher
158+
.sorted(
159+
Comparator.comparing(
160+
SolrInputField
161+
::getName)) // Sort to ensure consistent field order across different doc field
162+
// orders
163+
.forEach(
164+
inputField -> {
165+
sig.add(inputField.getName());
166+
Object o = inputField.getValue();
167+
if (o instanceof Collection) {
168+
for (Object oo : (Collection<?>) o) {
169+
sig.add(String.valueOf(oo));
170+
}
171+
} else {
172+
sig.add(String.valueOf(o));
173+
}
174+
});
175+
176+
return sig.getSignature();
177+
}
178+
}

0 commit comments

Comments
 (0)