diff --git a/xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java index ba3b536395c..8bd26390a4d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/FilterUtil.java @@ -20,48 +20,33 @@ import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Function; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Any; -import com.google.protobuf.Message; import com.linecorp.armeria.client.ClientDecoration; import com.linecorp.armeria.client.ClientDecorationBuilder; import com.linecorp.armeria.client.ClientPreprocessors; import com.linecorp.armeria.client.ClientPreprocessorsBuilder; -import com.linecorp.armeria.client.DecoratingHttpClientFunction; -import com.linecorp.armeria.client.DecoratingRpcClientFunction; -import com.linecorp.armeria.client.HttpClient; -import com.linecorp.armeria.client.HttpPreprocessor; -import com.linecorp.armeria.client.RpcPreprocessor; -import com.linecorp.armeria.client.retry.RetryingClient; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.filter.HttpFilterFactory; import com.linecorp.armeria.xds.filter.HttpFilterFactoryRegistry; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; +import io.envoyproxy.envoy.config.route.v3.RetryPolicy; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter.ConfigTypeCase; final class FilterUtil { - static Map mergeFilterConfigs( - Map filterConfigs1, - Map filterConfigs2) { - final ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.putAll(filterConfigs1); - builder.putAll(filterConfigs2); - return builder.buildKeepingLast(); - } - - static Map toParsedFilterConfigs(Map filterConfigMap) { - final ImmutableMap.Builder filterConfigsBuilder = ImmutableMap.builder(); - for (Entry e: filterConfigMap.entrySet()) { - filterConfigsBuilder.put(e.getKey(), ParsedFilterConfig.of(e.getKey(), e.getValue())); - } - return filterConfigsBuilder.buildKeepingLast(); + static Map mergeFilterConfigs( + Map filterConfigs1, + Map filterConfigs2) { + return ImmutableMap.builder() + .putAll(filterConfigs1) + .putAll(filterConfigs2) + .buildKeepingLast(); } static ClientPreprocessors buildDownstreamFilter( @@ -73,98 +58,58 @@ static ClientPreprocessors buildDownstreamFilter( final ClientPreprocessorsBuilder builder = ClientPreprocessors.builder(); for (int i = httpFilters.size() - 1; i >= 0; i--) { final HttpFilter httpFilter = httpFilters.get(i); - final XdsFilter xdsFilter = xdsHttpFilter(httpFilter, null); - if (xdsFilter == null) { + final XdsHttpFilter instance = resolveInstance(httpFilter, null); + if (instance == null) { continue; } - if (xdsFilter.filterConfig().disabled()) { - continue; - } - builder.add(xdsFilter.httpPreprocessor()); - builder.addRpc(xdsFilter.rpcPreprocessor()); + builder.add(instance.httpPreprocessor()); + builder.addRpc(instance.rpcPreprocessor()); } return builder.build(); } static ClientDecoration buildUpstreamFilter( - List httpFilters, Map filterConfigs, - @Nullable Function retryingDecorator) { + List httpFilters, Map filterConfigs, + @Nullable RetryPolicy retryPolicy) { final ClientDecorationBuilder builder = ClientDecoration.builder(); for (int i = httpFilters.size() - 1; i >= 0; i--) { final HttpFilter httpFilter = httpFilters.get(i); - final ParsedFilterConfig parsedFilterConfig = filterConfigs.get(httpFilter.getName()); - final XdsFilter xdsFilter = xdsHttpFilter(httpFilter, parsedFilterConfig); - if (xdsFilter == null) { - continue; - } - if (xdsFilter.filterConfig().disabled()) { + final Any perRouteConfig = filterConfigs.get(httpFilter.getName()); + final XdsHttpFilter instance = resolveInstance(httpFilter, perRouteConfig); + if (instance == null) { continue; } - builder.add(xdsFilter.httpDecorator()); - builder.addRpc(xdsFilter.rpcDecorator()); + builder.add(instance.httpDecorator()); + builder.addRpc(instance.rpcDecorator()); } - if (retryingDecorator != null) { + if (retryPolicy != null) { // add the retrying decorator as the first (outermost) decorator if exists - builder.add(retryingDecorator); + builder.add(new RetryStateFactory(retryPolicy).retryingDecorator()); } return builder.build(); } @Nullable - private static XdsFilter xdsHttpFilter(HttpFilter httpFilter, - @Nullable ParsedFilterConfig parsedFilterConfig) { - final HttpFilterFactory filterFactory = + private static XdsHttpFilter resolveInstance( + HttpFilter httpFilter, @Nullable Any perRouteConfig) { + final HttpFilterFactory filterFactory = HttpFilterFactoryRegistry.filterFactory(httpFilter.getName()); if (filterFactory == null) { - if (httpFilter.getIsOptional()) { - return null; + if (!httpFilter.getIsOptional()) { + throw new IllegalArgumentException( + "Unknown HTTP filter '" + httpFilter.getName() + + "': no HttpFilterFactory registered. Register an SPI " + + "HttpFilterFactory implementation to handle this filter."); } - throw new IllegalArgumentException("Couldn't find filter factory: " + httpFilter.getName()); + return null; } checkArgument(httpFilter.getConfigTypeCase() == ConfigTypeCase.TYPED_CONFIG || httpFilter.getConfigTypeCase() == ConfigTypeCase.CONFIGTYPE_NOT_SET, "Only 'typed_config' is supported, but '%s' was supplied", httpFilter.getConfigTypeCase()); - return new XdsFilter<>(filterFactory, httpFilter, parsedFilterConfig); - } - - private static class XdsFilter { - - private final HttpFilterFactory filterFactory; - private final T config; - private final ParsedFilterConfig filterConfig; - - XdsFilter(HttpFilterFactory filterFactory, HttpFilter httpFilter, - @Nullable ParsedFilterConfig filterConfig) { - this.filterFactory = filterFactory; - if (filterConfig != null) { - this.filterConfig = filterConfig; - } else { - this.filterConfig = ParsedFilterConfig.of(httpFilter.getName(), httpFilter.getTypedConfig(), - httpFilter.getIsOptional(), httpFilter.getDisabled()); - } - config = this.filterConfig.parsedConfig(filterFactory.defaultConfig()); - } - - public ParsedFilterConfig filterConfig() { - return filterConfig; - } - - public HttpPreprocessor httpPreprocessor() { - return filterFactory.httpPreprocessor(config); - } - - public RpcPreprocessor rpcPreprocessor() { - return filterFactory.rpcPreprocessor(config); - } - - public DecoratingHttpClientFunction httpDecorator() { - return filterFactory.httpDecorator(config); - } - - public DecoratingRpcClientFunction rpcDecorator() { - return filterFactory.rpcDecorator(config); - } + final Any effectiveConfig = + perRouteConfig != null ? perRouteConfig : httpFilter.getTypedConfig(); + return filterFactory.create(httpFilter, effectiveConfig); } private FilterUtil() {} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java index 6a9e8ff7154..3b5d587d81b 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerSnapshot.java @@ -42,11 +42,7 @@ public final class ListenerSnapshot implements Snapshot { ListenerSnapshot(ListenerXdsResource listenerXdsResource, @Nullable RouteSnapshot routeSnapshot) { this.listenerXdsResource = listenerXdsResource; - if (listenerXdsResource.router() != null && routeSnapshot != null) { - this.routeSnapshot = routeSnapshot.withRouter(listenerXdsResource.router()); - } else { - this.routeSnapshot = routeSnapshot; - } + this.routeSnapshot = routeSnapshot; downstreamFilter = FilterUtil.buildDownstreamFilter(listenerXdsResource.connectionManager()); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java index 74d22fce52c..0aeb3a41a1e 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/ListenerStream.java @@ -62,14 +62,14 @@ protected Subscription onStart(SnapshotWatcher watcher) { .subscribe(watcher); } - private SnapshotStream resource2snapshot(ListenerXdsResource resource, - @Nullable ConfigSource parentConfigSource) { + private SnapshotStream resource2snapshot( + ListenerXdsResource resource, @Nullable ConfigSource parentConfigSource) { SnapshotStream node = null; final HttpConnectionManager connectionManager = resource.connectionManager(); if (connectionManager != null) { if (connectionManager.hasRouteConfig()) { final RouteConfiguration routeConfig = connectionManager.getRouteConfig(); - node = new RouteStream(context, routeConfig) + node = new RouteStream(context, routeConfig, resource) .map(routeSnapshot -> new ListenerSnapshot(resource, routeSnapshot)); } else if (connectionManager.hasRds()) { final Rds rds = connectionManager.getRds(); @@ -81,7 +81,7 @@ private SnapshotStream resource2snapshot(ListenerXdsResource r return SnapshotStream.error(new XdsResourceException(LISTENER, resourceName, "config source not found")); } - node = new RouteStream(configSource, routeName, context) + node = new RouteStream(configSource, routeName, context, resource) .map(routeSnapshot -> new ListenerSnapshot(resource, routeSnapshot)); } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/ParsedFilterConfig.java b/xds/src/main/java/com/linecorp/armeria/xds/ParsedFilterConfig.java deleted file mode 100644 index 3b06e1cc58f..00000000000 --- a/xds/src/main/java/com/linecorp/armeria/xds/ParsedFilterConfig.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2025 LY Corporation - * - * LY Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.xds; - -import static java.util.Objects.requireNonNull; - -import com.google.protobuf.Any; -import com.google.protobuf.Message; - -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.annotation.UnstableApi; -import com.linecorp.armeria.xds.filter.HttpFilterFactory; -import com.linecorp.armeria.xds.filter.HttpFilterFactoryRegistry; - -import io.envoyproxy.envoy.config.route.v3.FilterConfig; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; - -/** - * Represents a {@link FilterConfig} that is pre-parsed by the global {@link HttpFilterFactory}. - */ -@UnstableApi -public final class ParsedFilterConfig { - - private static final String FILTER_CONFIG_TYPE_URL = "envoy.config.route.v3.FilterConfig"; - - /** - * Creates a {@link ParsedFilterConfig} based on the provided {@code config}. - * - * @param filterName the name of the {@link HttpFilter} - * @param config the config to be parsed - */ - static ParsedFilterConfig of(String filterName, Any config) { - requireNonNull(config, "config"); - if (FILTER_CONFIG_TYPE_URL.equals(config.getTypeUrl())) { - final FilterConfig filterConfig; - filterConfig = XdsValidatorIndexRegistry.unpack(config, FilterConfig.class); - return new ParsedFilterConfig(filterName, filterConfig.getConfig(), filterConfig.getIsOptional(), - filterConfig.getDisabled()); - } - return new ParsedFilterConfig(filterName, config, false, false); - } - - /** - * Creates a {@link ParsedFilterConfig} based on the provided {@code config} and flags. - * - * @param filterName the name of the {@link HttpFilter} - * @param config the config to be parsed - * @param optional true if this config is optional - * @param disabled true if the filter corresponding to this config should be disabled - */ - public static ParsedFilterConfig of(String filterName, Any config, boolean optional, boolean disabled) { - requireNonNull(filterName, "name"); - requireNonNull(config, "config"); - return new ParsedFilterConfig(filterName, config, optional, disabled); - } - - @Nullable - private final Object parsedConfig; - private final boolean disabled; - - private ParsedFilterConfig(String filterName, Any config, boolean optional, boolean disabled) { - final HttpFilterFactory filterFactory = HttpFilterFactoryRegistry.filterFactory(filterName); - if (filterFactory == null) { - if (!optional) { - throw new IllegalArgumentException("Filter config for filter '" + filterName + - "' is not registered in HttpFilterFactoryRegistry."); - } - } - if (filterFactory != null) { - parsedConfig = maybeParseConfig(config, filterFactory.configClass()); - } else { - parsedConfig = null; - } - this.disabled = disabled; - } - - @Nullable - private static T maybeParseConfig(Any config, Class clazz) { - if (config == Any.getDefaultInstance()) { - return null; - } - return XdsValidatorIndexRegistry.unpack(config, clazz); - } - - /** - * Returns {@code true} if the filter corresponding to this config should be disabled. - */ - public boolean disabled() { - return disabled; - } - - /** - * Returns the pre-parsed configuration for the supplied {@link HttpFilterFactory}. - * If the configuration is optional and the {@link HttpFilterFactory} is not registered in - * the {@link HttpFilterFactoryRegistry}, the supplied {@code defaultConfig} will be returned. - */ - @UnstableApi - @SuppressWarnings("unchecked") - public T parsedConfig(T defaultConfig) { - if (parsedConfig != null) { - return (T) parsedConfig; - } - return defaultConfig; - } -} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java index 20a08f395c8..b3e0811212e 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteEntry.java @@ -16,20 +16,18 @@ package com.linecorp.armeria.xds; -import static com.linecorp.armeria.xds.FilterUtil.toParsedFilterConfigs; - import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Function; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Any; import com.linecorp.armeria.client.ClientDecoration; import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.HttpClient; import com.linecorp.armeria.client.RpcClient; -import com.linecorp.armeria.client.retry.RetryingClient; import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; @@ -50,56 +48,47 @@ public final class RouteEntry { private final Route route; @Nullable private final ClusterSnapshot clusterSnapshot; - private final Map filterConfigs; + private final Map filterConfigs; private final int index; - @Nullable - private final Function retryingDecorator; - private final ClientDecoration upstreamFilter; - private final RouteEntryMatcher matcher; - @Nullable private final HttpClient httpClient; - @Nullable private final RpcClient rpcClient; + private final RouteEntryMatcher matcher; - RouteEntry(Route route, @Nullable ClusterSnapshot clusterSnapshot, int index) { + RouteEntry(Route route, @Nullable ClusterSnapshot clusterSnapshot, int index, + @Nullable ListenerXdsResource listenerResource, RouteXdsResource routeResource, + VirtualHostXdsResource vhostResource) { this.route = route; this.clusterSnapshot = clusterSnapshot; - filterConfigs = toParsedFilterConfigs(route.getTypedPerFilterConfigMap()); this.index = index; - upstreamFilter = ClientDecoration.of(); - if (route.getRoute().getRetryPolicy() != RetryPolicy.getDefaultInstance()) { - retryingDecorator = new RetryStateFactory(route.getRoute().getRetryPolicy()).retryingDecorator(); - } else { - retryingDecorator = null; - } matcher = new RouteEntryMatcher(route.getMatch()); - httpClient = upstreamFilter.decorate(DelegatingHttpClient.of()); - rpcClient = upstreamFilter.rpcDecorate(DelegatingRpcClient.of()); - } - - private RouteEntry(Route route, @Nullable ClusterSnapshot clusterSnapshot, int index, - Map filterConfigs, List upstreamFilters, - @Nullable Function retryingDecorator, - RouteEntryMatcher matcher) { - this.route = route; - this.clusterSnapshot = clusterSnapshot; - this.filterConfigs = filterConfigs; - this.index = index; - upstreamFilter = FilterUtil.buildUpstreamFilter(upstreamFilters, filterConfigs, retryingDecorator); - this.retryingDecorator = retryingDecorator; - this.matcher = matcher; - - httpClient = upstreamFilter.decorate(DelegatingHttpClient.of()); - rpcClient = upstreamFilter.rpcDecorate(DelegatingRpcClient.of()); - } + // Merge per_filter_config: route-config level < vhost level < route level + final Map routeConfigFilterConfigs = + routeResource.resource().getTypedPerFilterConfigMap(); + final Map vhostFilterConfigs = + vhostResource.resource().getTypedPerFilterConfigMap(); + final Map routeFilterConfigs = + route.getTypedPerFilterConfigMap(); + filterConfigs = FilterUtil.mergeFilterConfigs( + FilterUtil.mergeFilterConfigs(routeConfigFilterConfigs, vhostFilterConfigs), + routeFilterConfigs); + + // Extract upstream HTTP filters from the Router (last filter in HCM filter chain) + final List upstreamFilters; + if (listenerResource != null && listenerResource.router() != null) { + upstreamFilters = listenerResource.router().getUpstreamHttpFiltersList(); + } else { + upstreamFilters = ImmutableList.of(); + } - RouteEntry withFilterConfigs(Map parentFilterConfigs, - List upstreamFilters) { - final Map mergedFilterConfigs = - FilterUtil.mergeFilterConfigs(parentFilterConfigs, filterConfigs); - return new RouteEntry(route, clusterSnapshot, index, mergedFilterConfigs, upstreamFilters, - retryingDecorator, matcher); + // Determine retry policy + final RetryPolicy retryPolicy = route.getRoute().getRetryPolicy(); + final RetryPolicy effectiveRetryPolicy = + retryPolicy == RetryPolicy.getDefaultInstance() ? null : retryPolicy; + final ClientDecoration clientDecoration = FilterUtil.buildUpstreamFilter( + upstreamFilters, filterConfigs, effectiveRetryPolicy); + httpClient = clientDecoration.decorate(DelegatingHttpClient.of()); + rpcClient = clientDecoration.rpcDecorate(DelegatingRpcClient.of()); } /** @@ -119,12 +108,13 @@ public ClusterSnapshot clusterSnapshot() { } /** - * Returns the parsed {@link Route#getTypedPerFilterConfigMap()}. + * Returns the raw per-route filter config {@link Any} from + * {@link Route#getTypedPerFilterConfigMap()}, or {@code null} if not present. * * @param filterName the filter name represented by {@link HttpFilter#getName()} */ @Nullable - public ParsedFilterConfig filterConfig(String filterName) { + public Any filterConfig(String filterName) { return filterConfigs.get(filterName); } @@ -144,18 +134,14 @@ public void applyUpstreamFilter(ClientRequestContext ctx) { if (ctxExt == null) { return; } - if (httpClient != null) { - ctxExt.httpClientCustomizer(actualClient -> { - DelegatingHttpClient.setDelegate(ctx, actualClient); - return httpClient; - }); - } - if (rpcClient != null) { - ctxExt.rpcClientCustomizer(actualClient -> { - DelegatingRpcClient.setDelegate(ctx, actualClient); - return rpcClient; - }); - } + ctxExt.httpClientCustomizer(actualClient -> { + DelegatingHttpClient.setDelegate(ctx, actualClient); + return httpClient; + }); + ctxExt.rpcClientCustomizer(actualClient -> { + DelegatingRpcClient.setDelegate(ctx, actualClient); + return rpcClient; + }); } /** diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteSnapshot.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteSnapshot.java index fb7915f7251..f54ddb98dfe 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteSnapshot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteSnapshot.java @@ -16,10 +16,7 @@ package com.linecorp.armeria.xds; -import static com.linecorp.armeria.xds.FilterUtil.toParsedFilterConfigs; - import java.util.List; -import java.util.Map; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; @@ -28,8 +25,6 @@ import com.linecorp.armeria.common.annotation.UnstableApi; import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; -import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; /** * A snapshot of a {@link RouteConfiguration} resource. @@ -39,30 +34,10 @@ public final class RouteSnapshot implements Snapshot { private final RouteXdsResource routeXdsResource; private final List virtualHostSnapshots; - private final Map filterConfigs; RouteSnapshot(RouteXdsResource routeXdsResource, List virtualHostSnapshots) { this.routeXdsResource = routeXdsResource; - filterConfigs = toParsedFilterConfigs(routeXdsResource.resource().getTypedPerFilterConfigMap()); - this.virtualHostSnapshots = - virtualHostSnapshots.stream().map(vhs -> vhs.withFilterConfigs(filterConfigs, - ImmutableList.of())) - .collect(ImmutableList.toImmutableList()); - } - - private RouteSnapshot(RouteXdsResource routeXdsResource, List upstreamFilters, - Map filterConfigs, - List virtualHostSnapshots) { - this.routeXdsResource = routeXdsResource; - this.filterConfigs = filterConfigs; - this.virtualHostSnapshots = - virtualHostSnapshots.stream().map(vhs -> vhs.withFilterConfigs(filterConfigs, upstreamFilters)) - .collect(ImmutableList.toImmutableList()); - } - - RouteSnapshot withRouter(Router router) { - return new RouteSnapshot(routeXdsResource, router.getUpstreamHttpFiltersList(), - filterConfigs, virtualHostSnapshots); + this.virtualHostSnapshots = ImmutableList.copyOf(virtualHostSnapshots); } @Override diff --git a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java index 29ff2a49875..5ffee74f89a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/RouteStream.java @@ -35,31 +35,38 @@ final class RouteStream extends RefCountedStream { private final ConfigSource configSource; private final String resourceName; private final SubscriptionContext context; + @Nullable + private final ListenerXdsResource listenerResource; - RouteStream(SubscriptionContext context, RouteConfiguration routeConfiguration) { + RouteStream(SubscriptionContext context, RouteConfiguration routeConfiguration, + @Nullable ListenerXdsResource listenerResource) { this.context = context; this.routeConfiguration = routeConfiguration; resourceName = routeConfiguration.getName(); configSource = null; + this.listenerResource = listenerResource; } - RouteStream(ConfigSource configSource, String resourceName, SubscriptionContext context) { + RouteStream(ConfigSource configSource, String resourceName, SubscriptionContext context, + @Nullable ListenerXdsResource listenerResource) { this.configSource = configSource; this.resourceName = resourceName; this.context = context; routeConfiguration = null; + this.listenerResource = listenerResource; } @Override protected Subscription onStart(SnapshotWatcher watcher) { if (routeConfiguration != null) { - return new RouteSnapshotStream(new RouteXdsResource(routeConfiguration), context) + return new RouteSnapshotStream(new RouteXdsResource(routeConfiguration), context, listenerResource) .subscribe(watcher); } assert configSource != null; - final SnapshotStream snapshotStream = new ResourceNodeAdapter( - configSource, context, resourceName, ROUTE) - .switchMapEager(routeResource -> new RouteSnapshotStream(routeResource, context)); + final SnapshotStream snapshotStream = + new ResourceNodeAdapter(configSource, context, resourceName, ROUTE) + .switchMapEager(routeResource -> new RouteSnapshotStream(routeResource, context, + listenerResource)); return snapshotStream.subscribe(watcher); } @@ -67,10 +74,14 @@ private static class RouteSnapshotStream extends RefCountedStream private final RouteXdsResource routeResource; private final SubscriptionContext context; + @Nullable + private final ListenerXdsResource listenerResource; - RouteSnapshotStream(RouteXdsResource routeResource, SubscriptionContext context) { + RouteSnapshotStream(RouteXdsResource routeResource, SubscriptionContext context, + @Nullable ListenerXdsResource listenerResource) { this.routeResource = routeResource; this.context = context; + this.listenerResource = listenerResource; } @Override @@ -82,7 +93,8 @@ protected Subscription onStart(SnapshotWatcher watcher) { final VirtualHostXdsResource vhostResource = new VirtualHostXdsResource(virtualHost, routeResource.version(), routeResource.revision()); - nodesBuilder.add(new VirtualHostStream(i, vhostResource, context)); + nodesBuilder.add(new VirtualHostStream(i, vhostResource, context, + listenerResource, routeResource)); } final SnapshotStream routeSnapshotStream = SnapshotStream.combineNLatest(nodesBuilder.build()) @@ -96,11 +108,18 @@ private static class VirtualHostStream extends RefCountedStream watcher) { final ImmutableList.Builder routeEntryNodesBuilder = ImmutableList.builder(); for (int i = 0; i < virtualHost.getRoutesList().size(); i++) { final Route route = virtualHost.getRoutesList().get(i); - routeEntryNodesBuilder.add(new RouteEntryStream(i, route, context)); + routeEntryNodesBuilder.add(new RouteEntryStream(i, route, context, + listenerResource, routeResource, resource)); } final SnapshotStream vHostStream = SnapshotStream.combineNLatest(routeEntryNodesBuilder.build()) - .map(list -> { - return new VirtualHostSnapshot(resource, list, index); - }); + .map(list -> new VirtualHostSnapshot(resource, list, index)); return vHostStream.subscribe(watcher); } } @@ -126,26 +144,37 @@ private static class RouteEntryStream extends RefCountedStream { private final Route route; private final SubscriptionContext context; private final String clusterName; + @Nullable + private final ListenerXdsResource listenerResource; + private final RouteXdsResource routeResource; + private final VirtualHostXdsResource vhostResource; - RouteEntryStream(int index, Route route, SubscriptionContext context) { + RouteEntryStream(int index, Route route, SubscriptionContext context, + @Nullable ListenerXdsResource listenerResource, + RouteXdsResource routeResource, VirtualHostXdsResource vhostResource) { this.index = index; this.route = route; this.context = context; clusterName = route.getRoute().getCluster(); + this.listenerResource = listenerResource; + this.routeResource = routeResource; + this.vhostResource = vhostResource; } @Override protected Subscription onStart(SnapshotWatcher watcher) { if (!route.getRoute().hasCluster()) { - watcher.onUpdate(new RouteEntry(route, null, index), null); - return () -> {}; + return SnapshotStream.just(new RouteEntry(route, null, index, + listenerResource, routeResource, vhostResource)) + .subscribe(watcher); } final SnapshotWatcher mapped = (snapshot, t) -> { if (snapshot == null) { watcher.onUpdate(null, t); return; } - watcher.onUpdate(new RouteEntry(route, snapshot, index), null); + watcher.onUpdate(new RouteEntry(route, snapshot, index, + listenerResource, routeResource, vhostResource), null); }; return context.clusterManager().register(clusterName, context, mapped); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostSnapshot.java b/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostSnapshot.java index e024ee8b4dc..f338bdfcf4d 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostSnapshot.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/VirtualHostSnapshot.java @@ -17,7 +17,6 @@ package com.linecorp.armeria.xds; import java.util.List; -import java.util.Map; import java.util.Objects; import com.google.common.base.MoreObjects; @@ -25,7 +24,6 @@ import io.envoyproxy.envoy.config.route.v3.RouteConfiguration; import io.envoyproxy.envoy.config.route.v3.VirtualHost; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; /** * A snapshot of a {@link VirtualHost}. @@ -33,28 +31,16 @@ public final class VirtualHostSnapshot implements Snapshot { private final VirtualHostXdsResource virtualHostXdsResource; - private final Map filterConfigs; private final List routeEntries; private final int index; VirtualHostSnapshot(VirtualHostXdsResource virtualHostXdsResource, List routeEntries, int index) { this.virtualHostXdsResource = virtualHostXdsResource; - final VirtualHost virtualHost = virtualHostXdsResource.resource(); - filterConfigs = FilterUtil.toParsedFilterConfigs(virtualHost.getTypedPerFilterConfigMap()); this.routeEntries = ImmutableList.copyOf(routeEntries); this.index = index; } - private VirtualHostSnapshot(VirtualHostXdsResource virtualHostXdsResource, - List routeEntries, Map filterConfigs, - int index) { - this.virtualHostXdsResource = virtualHostXdsResource; - this.routeEntries = routeEntries; - this.filterConfigs = filterConfigs; - this.index = index; - } - @Override public VirtualHostXdsResource xdsResource() { return virtualHostXdsResource; @@ -74,17 +60,6 @@ public int index() { return index; } - VirtualHostSnapshot withFilterConfigs(Map parentFilterConfigs, - List upstreamFilters) { - final Map mergedFilterConfigs = - FilterUtil.mergeFilterConfigs(parentFilterConfigs, filterConfigs); - final List routeEntries = - this.routeEntries.stream().map(routeEntry -> routeEntry.withFilterConfigs(mergedFilterConfigs, - upstreamFilters)) - .collect(ImmutableList.toImmutableList()); - return new VirtualHostSnapshot(virtualHostXdsResource, routeEntries, mergedFilterConfigs, index); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java index 3bb37da4bc2..f4f3aa28694 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/RouterFilterFactory.java @@ -16,6 +16,8 @@ package com.linecorp.armeria.xds.client.endpoint; +import com.google.protobuf.Any; + import com.linecorp.armeria.client.HttpPreprocessor; import com.linecorp.armeria.client.RpcPreprocessor; import com.linecorp.armeria.common.HttpRequest; @@ -24,46 +26,40 @@ import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.common.annotation.UnstableApi; import com.linecorp.armeria.xds.filter.HttpFilterFactory; +import com.linecorp.armeria.xds.filter.XdsHttpFilter; import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; +import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; /** * A {@link HttpFilterFactory} implementation of the {@link Router} filter. */ @UnstableApi -public final class RouterFilterFactory implements HttpFilterFactory { +public final class RouterFilterFactory implements HttpFilterFactory { private static final String NAME = "envoy.filters.http.router"; private static final RouterFilter rpcFilter = new RouterFilter<>(); private static final RouterFilter httpFilter = new RouterFilter<>(); - /** - * Creates an instance of a {@link HttpFilterFactory} for {@link Router}. - */ - public RouterFilterFactory() {} + private static final XdsHttpFilter ROUTER_FILTER = new XdsHttpFilter() { + @Override + public HttpPreprocessor httpPreprocessor() { + return httpFilter::execute; + } - @Override - public RpcPreprocessor rpcPreprocessor(Router config) { - return rpcFilter::execute; - } + @Override + public RpcPreprocessor rpcPreprocessor() { + return rpcFilter::execute; + } + }; @Override - public HttpPreprocessor httpPreprocessor(Router config) { - return httpFilter::execute; - } - - @Override - public Class configClass() { - return Router.class; - } - - @Override - public Router defaultConfig() { - return Router.getDefaultInstance(); + public String filterName() { + return NAME; } @Override - public String filterName() { - return NAME; + public XdsHttpFilter create(HttpFilter filter, Any config) { + return ROUTER_FILTER; } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java index 3c6e17cee95..f513a737845 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactory.java @@ -16,73 +16,45 @@ package com.linecorp.armeria.xds.filter; -import com.google.protobuf.Message; +import com.google.protobuf.Any; -import com.linecorp.armeria.client.DecoratingHttpClientFunction; -import com.linecorp.armeria.client.DecoratingRpcClientFunction; -import com.linecorp.armeria.client.HttpClient; -import com.linecorp.armeria.client.HttpPreprocessor; -import com.linecorp.armeria.client.PreClient; -import com.linecorp.armeria.client.RpcClient; -import com.linecorp.armeria.client.RpcPreprocessor; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.annotation.UnstableApi; -import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; /** - * An {@link HttpFilterFactory} is a factory for creating a decorator implementation equivalent to - * an {@link HttpFilter}. + * A factory that creates an {@link XdsHttpFilter} for a given {@link HttpFilter}. + * + *

Implementations are discovered via the Java {@link java.util.ServiceLoader} mechanism. + * The raw {@link Any} typed config is passed as-is from xDS — factories are responsible for + * all config parsing, including unwrapping {@code FilterConfig} envelopes for per-route overrides. + * Returning {@code null} from {@link #create} causes the filter to be silently skipped. */ @UnstableApi -public interface HttpFilterFactory { - - /** - * Generates a {@link RpcPreprocessor} which acts as a downstream {@link HttpFilter} when - * registered in {@link HttpConnectionManager#getHttpFiltersList()}. - */ - default RpcPreprocessor rpcPreprocessor(T config) { - return PreClient::execute; - } - - /** - * Generates a {@link HttpPreprocessor} which acts as a downstream {@link HttpFilter} when - * registered in {@link HttpConnectionManager#getHttpFiltersList()}. - */ - default HttpPreprocessor httpPreprocessor(T config) { - return PreClient::execute; - } - - /** - * Generates a {@link DecoratingHttpClientFunction} which acts as an upstream {@link HttpFilter} when - * registered in {@link Router#getUpstreamHttpFiltersList()}. - * Unlike decorators added to clients, this decorator will not be invoked for RPC clients. - */ - default DecoratingHttpClientFunction httpDecorator(T config) { - return HttpClient::execute; - } +public interface HttpFilterFactory { /** - * Generates a {@link DecoratingRpcClientFunction} which acts as an upstream {@link HttpFilter} when - * registered in {@link Router#getUpstreamHttpFiltersList()}. - */ - default DecoratingRpcClientFunction rpcDecorator(T config) { - return RpcClient::execute; - } - - /** - * The class type of the filter configuration represented by {@link HttpFilter#getTypedConfig()}. - */ - Class configClass(); - - /** - * The default configuration to be used if an appropriate configuration cannot be found. + * The filter name that should be equivalent to {@link HttpFilter#getName()}. */ - T defaultConfig(); + String filterName(); /** - * The filter name that should be equivalent to {@link HttpFilter#getName()}. + * Creates an {@link XdsHttpFilter} for the given filter and its raw typed config. + * + *

For filter-level configs, {@code config} is {@link HttpFilter#getTypedConfig()}. + * For per-route override configs, {@code config} is the raw {@link Any} from + * {@code typed_per_filter_config}, which may be a {@code FilterConfig} envelope. + * + *

Returns {@code null} to skip this filter entirely. The {@link HttpFilter} argument + * is provided so factories can inspect fields such as {@link HttpFilter#getDisabled()} or + * {@link HttpFilter#getIsOptional()}. + * + * @param httpFilter the filter descriptor from {@link HttpConnectionManager#getHttpFiltersList()} + * @param config the raw typed config {@link Any}; may be {@link Any#getDefaultInstance()} + * if no config was provided */ - String filterName(); + @Nullable + XdsHttpFilter create(HttpFilter httpFilter, Any config); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java index bee729470a6..bc8371044fa 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/filter/HttpFilterFactoryRegistry.java @@ -32,10 +32,10 @@ @UnstableApi public final class HttpFilterFactoryRegistry { - private static final Map> factories; + private static final Map factories; static { - final ImmutableMap.Builder> factoriesBuilder = ImmutableMap.builder(); + final ImmutableMap.Builder factoriesBuilder = ImmutableMap.builder(); ServiceLoader.load(HttpFilterFactory.class).forEach(factory -> { factoriesBuilder.put(factory.filterName(), factory); }); @@ -48,7 +48,7 @@ public final class HttpFilterFactoryRegistry { * @param name the name of the filter represented by {@link HttpFilter#getName()} */ @Nullable - public static HttpFilterFactory filterFactory(String name) { + public static HttpFilterFactory filterFactory(String name) { return factories.get(name); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.java b/xds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.java new file mode 100644 index 00000000000..7a648f898a3 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/filter/XdsHttpFilter.java @@ -0,0 +1,61 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.filter; + +import com.linecorp.armeria.client.DecoratingHttpClientFunction; +import com.linecorp.armeria.client.DecoratingRpcClientFunction; +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.HttpPreprocessor; +import com.linecorp.armeria.client.PreClient; +import com.linecorp.armeria.client.RpcClient; +import com.linecorp.armeria.client.RpcPreprocessor; +import com.linecorp.armeria.common.annotation.UnstableApi; + +/** + * Represents a resolved HTTP filter returned by {@link HttpFilterFactory#create}. + */ +@UnstableApi +public interface XdsHttpFilter { + + /** + * Returns the {@link HttpPreprocessor} for downstream filter usage. + */ + default HttpPreprocessor httpPreprocessor() { + return PreClient::execute; + } + + /** + * Returns the {@link RpcPreprocessor} for downstream filter usage. + */ + default RpcPreprocessor rpcPreprocessor() { + return PreClient::execute; + } + + /** + * Returns the {@link DecoratingHttpClientFunction} for upstream filter usage. + */ + default DecoratingHttpClientFunction httpDecorator() { + return HttpClient::execute; + } + + /** + * Returns the {@link DecoratingRpcClientFunction} for upstream filter usage. + */ + default DecoratingRpcClientFunction rpcDecorator() { + return RpcClient::execute; + } +}