Skip to content

Commit 632c92f

Browse files
authored
CAMEL-23105 - camel-docling - Fix broken async conversion workflow (discarded result and error masking) (#21675)
The CompletionStage returned by convertSourceAsync() was discarded and a fabricated task ID with no server-side correlation was returned. The subsequent CHECK_CONVERSION_STATUS would fail silently because checkConversionStatusInternal() returned COMPLETED on any exception. Store the CompletableFuture in a ConcurrentHashMap keyed by a generated task ID. CHECK_CONVERSION_STATUS now checks the local map first and returns the actual future state (IN_PROGRESS, COMPLETED with result, or FAILED with error message). Also fix the error path to return FAILED instead of COMPLETED when an exception occurs. Signed-off-by: Andrea Cosentino <ancosen@gmail.com>
1 parent 634c94f commit 632c92f

2 files changed

Lines changed: 250 additions & 5 deletions

File tree

components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@
3232
import java.util.Map;
3333
import java.util.concurrent.CompletableFuture;
3434
import java.util.concurrent.CompletionStage;
35+
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.ExecutionException;
3637
import java.util.concurrent.ExecutorService;
3738
import java.util.concurrent.TimeUnit;
3839
import java.util.concurrent.TimeoutException;
3940
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.atomic.AtomicLong;
4143
import java.util.stream.Collectors;
4244
import java.util.stream.Stream;
4345

@@ -87,6 +89,8 @@ public class DoclingProducer extends DefaultProducer {
8789
private DoclingConfiguration configuration;
8890
private DoclingServeApi doclingServeApi;
8991
private ObjectMapper objectMapper;
92+
private final Map<String, CompletableFuture<ConvertDocumentResponse>> pendingAsyncTasks = new ConcurrentHashMap<>();
93+
private final AtomicLong taskIdCounter = new AtomicLong();
9094

9195
public DoclingProducer(DoclingEndpoint endpoint) {
9296
super(endpoint);
@@ -130,6 +134,9 @@ protected void doStart() throws Exception {
130134
@Override
131135
protected void doStop() throws Exception {
132136
super.doStop();
137+
// Cancel any pending async tasks
138+
pendingAsyncTasks.forEach((id, future) -> future.cancel(true));
139+
pendingAsyncTasks.clear();
133140
if (doclingServeApi != null) {
134141
doclingServeApi = null;
135142
LOG.info("DoclingServeApi reference cleared");
@@ -287,10 +294,12 @@ private void processSubmitAsyncConversion(Exchange exchange) throws Exception {
287294

288295
// Start async conversion
289296
ConvertDocumentRequest request = buildConvertRequest(inputPath, outputFormat);
290-
CompletionStage<ConvertDocumentResponse> asyncResult = doclingServeApi.convertSourceAsync(request);
297+
CompletableFuture<ConvertDocumentResponse> asyncResult
298+
= doclingServeApi.convertSourceAsync(request).toCompletableFuture();
291299

292-
// Generate a task ID for tracking
293-
String taskId = "task-" + System.currentTimeMillis() + "-" + inputPath.hashCode();
300+
// Generate a unique task ID and store the future for later status checks
301+
String taskId = "task-" + taskIdCounter.incrementAndGet();
302+
pendingAsyncTasks.put(taskId, asyncResult);
294303
LOG.debug("Started async conversion with task ID: {}", taskId);
295304

296305
// Set task ID in body and header
@@ -345,6 +354,13 @@ private void processCheckConversionStatus(Exchange exchange) throws Exception {
345354
private ConversionStatus checkConversionStatusInternal(String taskId) {
346355
LOG.debug("Checking status for task: {}", taskId);
347356

357+
// Check the local pending tasks map first (tasks submitted via SUBMIT_ASYNC_CONVERSION)
358+
CompletableFuture<ConvertDocumentResponse> future = pendingAsyncTasks.get(taskId);
359+
if (future != null) {
360+
return checkLocalAsyncTask(taskId, future);
361+
}
362+
363+
// Fall back to server-side task polling
348364
try {
349365
TaskStatusPollRequest pollRequest = TaskStatusPollRequest.builder()
350366
.taskId(taskId)
@@ -374,8 +390,37 @@ private ConversionStatus checkConversionStatusInternal(String taskId) {
374390
return new ConversionStatus(taskId, status);
375391
} catch (Exception e) {
376392
LOG.warn("Failed to check task status for {}: {}", taskId, e.getMessage());
377-
// If the task ID doesn't exist on the server, return a completed status as a fallback
378-
return new ConversionStatus(taskId, ConversionStatus.Status.COMPLETED);
393+
String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getName();
394+
return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, errorMsg, null);
395+
}
396+
}
397+
398+
private ConversionStatus checkLocalAsyncTask(String taskId, CompletableFuture<ConvertDocumentResponse> future) {
399+
if (!future.isDone()) {
400+
return new ConversionStatus(taskId, ConversionStatus.Status.IN_PROGRESS);
401+
}
402+
403+
if (future.isCompletedExceptionally() || future.isCancelled()) {
404+
// Remove completed task from map
405+
pendingAsyncTasks.remove(taskId);
406+
String errorMessage;
407+
try {
408+
future.join();
409+
errorMessage = "Unknown error";
410+
} catch (Exception e) {
411+
errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
412+
}
413+
return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, errorMessage, null);
414+
}
415+
416+
// Completed successfully — extract the result and remove from map
417+
pendingAsyncTasks.remove(taskId);
418+
try {
419+
ConvertDocumentResponse response = future.join();
420+
String result = extractConvertedContent(response, configuration.getOutputFormat());
421+
return new ConversionStatus(taskId, ConversionStatus.Status.COMPLETED, result, null, null);
422+
} catch (Exception e) {
423+
return new ConversionStatus(taskId, ConversionStatus.Status.FAILED, null, e.getMessage(), null);
379424
}
380425
}
381426

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.docling;
18+
19+
import java.lang.reflect.Field;
20+
import java.util.Map;
21+
import java.util.concurrent.CompletableFuture;
22+
23+
import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
24+
import ai.docling.serve.api.convert.response.DocumentResponse;
25+
import org.apache.camel.CamelExecutionException;
26+
import org.apache.camel.Exchange;
27+
import org.apache.camel.builder.RouteBuilder;
28+
import org.apache.camel.support.DefaultExchange;
29+
import org.apache.camel.test.junit6.CamelTestSupport;
30+
import org.junit.jupiter.api.Test;
31+
32+
import static org.junit.jupiter.api.Assertions.*;
33+
34+
/**
35+
* Tests the SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS two-step async workflow.
36+
*
37+
* <p>
38+
* Before the fix, the {@code CompletionStage} returned by {@code convertSourceAsync()} was discarded and a fabricated
39+
* task ID with no server-side correlation was returned. CHECK_CONVERSION_STATUS would then fail because the server had
40+
* no record of the fake ID, and the error was silently masked by returning COMPLETED.
41+
*
42+
* <p>
43+
* After the fix, the {@code CompletableFuture} is stored in a local map keyed by the generated task ID. When
44+
* CHECK_CONVERSION_STATUS is called, it checks the local map first and returns the actual status of the async task.
45+
*/
46+
class DoclingAsyncConversionTest extends CamelTestSupport {
47+
48+
@Test
49+
void submitReturnsTaskIdLinkedToFuture() throws Exception {
50+
DoclingEndpoint endpoint = context.getEndpoint(
51+
"docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true", DoclingEndpoint.class);
52+
DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
53+
54+
// Access the pendingAsyncTasks map via reflection to verify the future is stored
55+
Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer);
56+
assertNotNull(pendingTasks, "pendingAsyncTasks map should exist");
57+
assertTrue(pendingTasks.isEmpty(), "pendingAsyncTasks should start empty");
58+
}
59+
60+
@Test
61+
void checkStatusReturnsFailedForUnknownTaskId() throws Exception {
62+
// When CHECK_CONVERSION_STATUS is called with an unknown task ID and the server
63+
// is not available, it should return FAILED — not COMPLETED (the old bug).
64+
try {
65+
Exchange exchange = new DefaultExchange(context);
66+
exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "nonexistent-task-id");
67+
exchange.getIn().setHeader(DoclingHeaders.OPERATION, DoclingOperations.CHECK_CONVERSION_STATUS);
68+
69+
template.send("direct:check-status", exchange);
70+
71+
Object body = exchange.getIn().getBody();
72+
assertInstanceOf(ConversionStatus.class, body);
73+
ConversionStatus status = (ConversionStatus) body;
74+
75+
// The key assertion: unknown task IDs should NOT return COMPLETED
76+
assertNotEquals(ConversionStatus.Status.COMPLETED, status.getStatus(),
77+
"Unknown task ID should not return COMPLETED status");
78+
assertEquals(ConversionStatus.Status.FAILED, status.getStatus(),
79+
"Unknown task ID with unavailable server should return FAILED");
80+
assertNotNull(status.getErrorMessage(), "Error message should be populated");
81+
} catch (CamelExecutionException e) {
82+
// If the exchange throws instead of setting FAILED status, that's also acceptable —
83+
// the important thing is it doesn't silently return COMPLETED
84+
}
85+
}
86+
87+
@Test
88+
void checkStatusReturnsCompletedForFinishedLocalTask() throws Exception {
89+
DoclingEndpoint endpoint = context.getEndpoint(
90+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class);
91+
DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
92+
93+
// Manually insert a completed future into the pending tasks map
94+
Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer);
95+
96+
// Create a completed future with a mock response
97+
ConvertDocumentResponse mockResponse = ConvertDocumentResponse.builder()
98+
.document(DocumentResponse.builder()
99+
.markdownContent("# Converted Document")
100+
.build())
101+
.build();
102+
CompletableFuture<ConvertDocumentResponse> completedFuture = CompletableFuture.completedFuture(mockResponse);
103+
pendingTasks.put("test-task-1", completedFuture);
104+
105+
// Check the status — should find it in local map and return COMPLETED with result
106+
Exchange exchange = new DefaultExchange(context);
107+
exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-1");
108+
109+
producer.process(exchange);
110+
111+
Object body = exchange.getIn().getBody();
112+
assertInstanceOf(ConversionStatus.class, body);
113+
ConversionStatus status = (ConversionStatus) body;
114+
assertEquals(ConversionStatus.Status.COMPLETED, status.getStatus());
115+
assertNotNull(status.getResult(), "Result should contain the converted content");
116+
117+
// Future should be removed from map after completion
118+
assertFalse(pendingTasks.containsKey("test-task-1"),
119+
"Completed task should be removed from pending map");
120+
}
121+
122+
@Test
123+
void checkStatusReturnsInProgressForPendingLocalTask() throws Exception {
124+
DoclingEndpoint endpoint = context.getEndpoint(
125+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class);
126+
DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
127+
128+
Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer);
129+
130+
// Insert an incomplete future
131+
CompletableFuture<ConvertDocumentResponse> incompleteFuture = new CompletableFuture<>();
132+
pendingTasks.put("test-task-2", incompleteFuture);
133+
134+
Exchange exchange = new DefaultExchange(context);
135+
exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-2");
136+
137+
producer.process(exchange);
138+
139+
Object body = exchange.getIn().getBody();
140+
assertInstanceOf(ConversionStatus.class, body);
141+
ConversionStatus status = (ConversionStatus) body;
142+
assertEquals(ConversionStatus.Status.IN_PROGRESS, status.getStatus());
143+
144+
// Future should remain in map since it's not done yet
145+
assertTrue(pendingTasks.containsKey("test-task-2"),
146+
"In-progress task should remain in pending map");
147+
148+
// Clean up
149+
incompleteFuture.cancel(true);
150+
}
151+
152+
@Test
153+
void checkStatusReturnsFailedForExceptionalLocalTask() throws Exception {
154+
DoclingEndpoint endpoint = context.getEndpoint(
155+
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", DoclingEndpoint.class);
156+
DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
157+
158+
Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = getPendingAsyncTasks(producer);
159+
160+
// Insert a failed future
161+
CompletableFuture<ConvertDocumentResponse> failedFuture = new CompletableFuture<>();
162+
failedFuture.completeExceptionally(new RuntimeException("Server connection refused"));
163+
pendingTasks.put("test-task-3", failedFuture);
164+
165+
Exchange exchange = new DefaultExchange(context);
166+
exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-3");
167+
168+
producer.process(exchange);
169+
170+
Object body = exchange.getIn().getBody();
171+
assertInstanceOf(ConversionStatus.class, body);
172+
ConversionStatus status = (ConversionStatus) body;
173+
assertEquals(ConversionStatus.Status.FAILED, status.getStatus());
174+
assertNotNull(status.getErrorMessage());
175+
assertTrue(status.getErrorMessage().contains("Server connection refused"));
176+
177+
// Failed task should be removed from map
178+
assertFalse(pendingTasks.containsKey("test-task-3"),
179+
"Failed task should be removed from pending map");
180+
}
181+
182+
@SuppressWarnings("unchecked")
183+
private Map<String, CompletableFuture<ConvertDocumentResponse>> getPendingAsyncTasks(DoclingProducer producer)
184+
throws Exception {
185+
Field field = DoclingProducer.class.getDeclaredField("pendingAsyncTasks");
186+
field.setAccessible(true);
187+
return (Map<String, CompletableFuture<ConvertDocumentResponse>>) field.get(producer);
188+
}
189+
190+
@Override
191+
protected RouteBuilder createRouteBuilder() {
192+
return new RouteBuilder() {
193+
@Override
194+
public void configure() {
195+
from("direct:check-status")
196+
.to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true");
197+
}
198+
};
199+
}
200+
}

0 commit comments

Comments
 (0)