Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ public interface Agent {
* request body regardless of how the original message was formatted.
* </p>
*
* @param messagePayload the message payload from the exchange body; must be either an
* {@link AiAgentBody} or a {@link String}
* @param exchange the Camel exchange containing headers and context information
* @return an {@link AiAgentBody} instance ready for agent processing; returns the
* original payload if it's already an {@link AiAgentBody}, or creates a new
* one from a string payload and relevant headers
* @throws InvalidPayloadRuntimeException if the payload is neither an {@link AiAgentBody} nor a {@link String}
* @throws Exception if any other error occurs during payload processing
* @param messagePayload the message payload from the exchange body; must be either an
* {@link AiAgentBody} or a {@link String}
* @param exchange the Camel exchange containing headers and context information
* @return an {@link AiAgentBody} instance ready for agent processing; returns
* the original payload if it's already an {@link AiAgentBody}, or
* creates a new one from a string payload and relevant headers
* @throws InvalidPayloadRuntimeException if the payload is neither an {@link AiAgentBody} nor a {@link String}
* @throws Exception if any other error occurs during payload processing
*
* @deprecated This method is no longer used by {@code LangChain4jAgentProducer}.
* Body conversion is now handled via Camel TypeConverters.
*/
@Deprecated(since = "4.19.0")
default AiAgentBody<?> processBody(Object messagePayload, Exchange exchange) throws Exception {
if (messagePayload instanceof AiAgentBody<?> payload) {
return payload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ private void registerConverters(TypeConverterRegistry registry) {
}
return answer;
});
addTypeConverter(registry, org.apache.camel.component.langchain4j.agent.api.AiAgentBody.class, java.lang.String.class, false,
(type, exchange, value) -> {
Object answer = org.apache.camel.component.langchain4j.agent.LangChain4jAgentConverter.textToAiAgentBody((java.lang.String) value, exchange);
if (false && answer == null) {
answer = Void.class;
}
return answer;
});
addTypeConverter(registry, org.apache.camel.component.langchain4j.agent.api.AiAgentBody.class, org.apache.camel.WrappedFile.class, false,
(type, exchange, value) -> {
Object answer = org.apache.camel.component.langchain4j.agent.LangChain4jAgentConverter.toAiAgentBody((org.apache.camel.WrappedFile) value, exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URLConnection;
import java.nio.file.Files;
import java.util.Base64;

Expand All @@ -34,6 +35,7 @@
import dev.langchain4j.data.video.Video;
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.WrappedFile;
import org.apache.camel.component.langchain4j.agent.api.AiAgentBody;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,17 +110,7 @@ public static AiAgentBody<?> toAiAgentBody(WrappedFile<?> wrappedFile, Exchange
byte[] fileData = readFileBytes(file);
Content content = createContent(fileData, mimeType);

String userMessage = exchange.getIn().getHeader(USER_MESSAGE, String.class);
String systemMessage = exchange.getIn().getHeader(SYSTEM_MESSAGE, String.class);
Object memoryId = exchange.getIn().getHeader(MEMORY_ID);

AiAgentBody<Content> body = new AiAgentBody<>();
body.setUserMessage(userMessage != null ? userMessage : "");
body.setSystemMessage(systemMessage);
body.setMemoryId(memoryId);
body.setContent(content);

return body;
return buildAiAgentBody(exchange, content, "");
}

/**
Expand Down Expand Up @@ -153,17 +145,7 @@ public static AiAgentBody<?> byteArrayToAiAgentBody(byte[] data, Exchange exchan
String mimeType = detectMimeTypeFromHeaders(exchange);
Content content = createContent(data, mimeType);

String userMessage = exchange.getIn().getHeader(USER_MESSAGE, String.class);
String systemMessage = exchange.getIn().getHeader(SYSTEM_MESSAGE, String.class);
Object memoryId = exchange.getIn().getHeader(MEMORY_ID);

AiAgentBody<Content> body = new AiAgentBody<>();
body.setUserMessage(userMessage != null ? userMessage : "");
body.setSystemMessage(systemMessage);
body.setMemoryId(memoryId);
body.setContent(content);

return body;
return buildAiAgentBody(exchange, content, "");
}

/**
Expand Down Expand Up @@ -191,6 +173,21 @@ public static AiAgentBody<?> inputStreamToAiAgentBody(InputStream inputStream, E
}
}

/**
* Converts a {@link String} to an {@link AiAgentBody} with the appropriate {@link Content} type.
* <p>
* This converter is useful for the text components that return Text Body.
* </p>
*
* @param text String as message
* @param exchange the Camel exchange containing headers
* @return an AiAgentBody with the appropriate Content type
*/
@Converter
public static AiAgentBody<?> textToAiAgentBody(String text, Exchange exchange) {
return buildAiAgentBody(exchange, null, text);
}

/**
* Creates the appropriate LangChain4j Content object based on the MIME type.
*/
Expand Down Expand Up @@ -242,13 +239,13 @@ static Content createContent(byte[] data, String mimeType) {
*/
private static String detectMimeType(File file, Exchange exchange) {
// Check agent-specific header first (highest priority)
String mediaType = exchange.getIn().getHeader(MEDIA_TYPE, String.class);
String mediaType = exchange.getMessage().getHeader(MEDIA_TYPE, String.class);
if (mediaType != null) {
return mediaType;
}

// Check file component's content type header
String fileContentType = exchange.getIn().getHeader(Exchange.FILE_CONTENT_TYPE, String.class);
String fileContentType = exchange.getMessage().getHeader(Exchange.FILE_CONTENT_TYPE, String.class);
if (fileContentType != null) {
return fileContentType;
}
Expand All @@ -273,11 +270,12 @@ private static String detectMimeType(File file, Exchange exchange) {
*/
private static String detectMimeTypeFromHeaders(Exchange exchange) {
// Check agent-specific header first (highest priority)
String mediaType = exchange.getIn().getHeader(MEDIA_TYPE, String.class);
Message message = exchange.getMessage();

String mediaType = message.getHeader(MEDIA_TYPE, String.class);
if (mediaType != null) {
return normalizeContentType(mediaType);
}

// Cloud storage component content type headers
String[] cloudContentTypeHeaders = {
"CamelAwsS3ContentType", // AWS S3
Expand All @@ -289,24 +287,30 @@ private static String detectMimeTypeFromHeaders(Exchange exchange) {
};

for (String header : cloudContentTypeHeaders) {
String cloudContentType = exchange.getIn().getHeader(header, String.class);
String cloudContentType = message.getHeader(header, String.class);
if (cloudContentType != null) {
return normalizeContentType(cloudContentType);
}
}

// Check standard content type header
String contentType = exchange.getIn().getHeader(Exchange.CONTENT_TYPE, String.class);
String contentType = message.getHeader(Exchange.CONTENT_TYPE, String.class);
if (contentType != null) {
return normalizeContentType(contentType);
}

// Check file component's content type header
String fileContentType = exchange.getIn().getHeader(Exchange.FILE_CONTENT_TYPE, String.class);
String fileContentType = message.getHeader(Exchange.FILE_CONTENT_TYPE, String.class);
if (fileContentType != null) {
return normalizeContentType(fileContentType);
}

String fileName = message.getHeader(Exchange.FILE_NAME, String.class);
if (fileName != null) {
String mime = URLConnection.guessContentTypeFromName(fileName);
if (mime != null) {
return normalizeContentType(mime);
}
}

throw new IllegalArgumentException(
"MIME type is required for byte[] or InputStream input. "
+ "Please set the CamelLangChain4jAgentMediaType header.");
Expand Down Expand Up @@ -405,4 +409,31 @@ private static byte[] readFileBytes(File file) {
throw new IllegalArgumentException("Failed to read file: " + file.getAbsolutePath(), e);
}
}

/**
* Utility method to build agent body.
*
* @param exchange the Camel exchange containing message headers
* @param content the LangChain4j content to attach to the agent body
* @param defaultUserMessage the fallback user message if the corresponding header is absent
* @return a fully populated {@link AiAgentBody} instance
* @param <T> the type of LangChain4j {@link Content}
*/
private static <
T extends Content> AiAgentBody<T> buildAiAgentBody(Exchange exchange, T content, String defaultUserMessage) {

Message message = exchange.getMessage();

String userMessage = message.getHeader(USER_MESSAGE, String.class);
String systemMessage = message.getHeader(SYSTEM_MESSAGE, String.class);
Object memoryId = message.getHeader(MEMORY_ID);

AiAgentBody<T> body = new AiAgentBody<>();
body.setUserMessage(userMessage != null ? userMessage : defaultUserMessage);
body.setSystemMessage(systemMessage);
body.setMemoryId(memoryId);
body.setContent(content);

return body;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void process(Exchange exchange) throws Exception {
agent = agentFactory.createAgent(exchange);
}

AiAgentBody<?> aiAgentBody = agent.processBody(messagePayload, exchange);
AiAgentBody<?> aiAgentBody = exchange.getMessage().getMandatoryBody(AiAgentBody.class);

ToolProvider toolProvider = createComposedToolProvider(tags, exchange);
String response = agent.chat(aiAgentBody, toolProvider);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.langchain4j.agent.integration;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.nio.file.Files;

import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.langchain4j.agent.api.Agent;
import org.apache.camel.component.langchain4j.agent.api.AiAgentBody;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit6.CamelTestSupport;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class LangChain4jAgentAutoConversionTest extends CamelTestSupport {

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {

Agent mockAgent = (body, exchange) -> "Processed";

context.getRegistry().bind("mockAgent", mockAgent);

return new RouteBuilder() {
public void configure() {
from("direct:start")
.to("langchain4j-agent:test?agent=#mockAgent")
.to("mock:result");
}
};

}

@Test
void shouldAutoConvertPlainString() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);

template.sendBody("direct:start", "Hello world");

mock.assertIsSatisfied();

String response = mock.getExchanges().get(0)
.getMessage()
.getMandatoryBody(String.class);

assertNotNull(response);
}

@Test
void shouldAutoConvertInputStream() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
context.setStreamCaching(true);
template.send("direct:start", exchange -> {
exchange.getMessage().setBody(
new ByteArrayInputStream("Hello stream".getBytes()));
exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, "text/plain");
});

mock.assertIsSatisfied();
AiAgentBody<?> body = mock.getExchanges().get(0)
.getMessage()
.getBody(AiAgentBody.class);

assertNotNull(body);
}

@Test
void shouldConvertWrappedFile() throws Exception {
Exchange exchange = context.getEndpoint("direct:test").createExchange();

File file = File.createTempFile("camel", ".txt");
Files.writeString(file.toPath(), "Hello file");

GenericFile<File> genericFile = new GenericFile<>();
genericFile.setFile(file);
genericFile.setFileName(file.getName());

AiAgentBody<?> body = context.getTypeConverter()
.convertTo(AiAgentBody.class, exchange, genericFile);

assertNotNull(body);
}

@Test
void shouldConvertByteArray() {
Exchange exchange = context.getEndpoint("direct:test").createExchange();

exchange.getMessage().setHeader(
"CamelLangChain4jAgentMediaType",
"text/plain");

AiAgentBody<?> body = context.getTypeConverter()
.convertTo(AiAgentBody.class, exchange, "Hello".getBytes());

assertNotNull(body);
}

@Test
void shouldConvertInputStream() {
Exchange exchange = context.getEndpoint("direct:test").createExchange();

exchange.getMessage().setHeader(
"CamelLangChain4jAgentMediaType",
"text/plain");

ByteArrayInputStream stream = new ByteArrayInputStream("Hello".getBytes());

AiAgentBody<?> body = context.getTypeConverter()
.convertTo(AiAgentBody.class, exchange, stream);

assertNotNull(body);
}

@Test
void shouldFailForUnsupportedMimeType() {
Exchange exchange = context.getEndpoint("direct:test").createExchange();

exchange.getMessage().setHeader(
"CamelLangChain4jAgentMediaType",
"application/zip");

assertThrows(
org.apache.camel.TypeConversionException.class,
() -> context.getTypeConverter()
.convertTo(AiAgentBody.class, exchange, "data".getBytes()));
}

}
Loading