Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Changelog
Unreleased
----------

* Added optional ``spec.cluster.exposure`` field to the CrateDB CRD, supporting
``loadbalancer`` (default, existing behavior) and ``traefik``. When set to
``traefik``, the operator creates a ClusterIP service with Traefik
IngressRouteTCP and MiddlewareTCP resources instead of a cloud LoadBalancer.

2.60.0 (2026-04-22)
-------------------

Expand Down
1 change: 1 addition & 0 deletions crate/operator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ class OperationType(str, enum.Enum):
SUSPEND = "suspend"
RESUME = "resume"
CHANGE_COMPUTE = "change_compute"
CHANGE_EXPOSURE = "change_exposure"
UNKNOWN = "unknown"
97 changes: 56 additions & 41 deletions crate/operator/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1231,59 +1231,67 @@ def get_data_service(
dns_record: Optional[str],
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> V1Service:
res_annotations = {}
if config.CLOUD_PROVIDER == CloudProvider.AWS:
res_annotations.update(
{
# https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa
# Default idle timeout is 60s, which kills the connection on long-running queries # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa
}
)
elif config.CLOUD_PROVIDER == CloudProvider.AZURE:
# https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations
# https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset
res_annotations.update(
{
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa
}
)

if dns_record:
res_annotations.update(
{"external-dns.alpha.kubernetes.io/hostname": dns_record}
)
if not use_traefik:
if config.CLOUD_PROVIDER == CloudProvider.AWS:
res_annotations.update(
{
# https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa
# Default idle timeout is 60s, which kills the connection on long-running queries # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa
}
)
elif config.CLOUD_PROVIDER == CloudProvider.AZURE:
# https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations
# https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset
res_annotations.update(
{
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa
}
)

if dns_record:
res_annotations.update(
{"external-dns.alpha.kubernetes.io/hostname": dns_record}
)

if additional_annotations:
res_annotations.update(additional_annotations)

service_name = f"crate-{name}"
service_type = "ClusterIP" if use_traefik else "LoadBalancer"

spec_kwargs = dict(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type=service_type,
)

if not use_traefik:
spec_kwargs["external_traffic_policy"] = "Local"
if source_ranges:
spec_kwargs["load_balancer_source_ranges"] = source_ranges

return V1Service(
metadata=V1ObjectMeta(
annotations=res_annotations,
labels=labels,
name=service_name,
name=f"crate-{name}",
owner_references=owner_references,
),
spec=V1ServiceSpec(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type="LoadBalancer",
external_traffic_policy="Local",
load_balancer_source_ranges=source_ranges if source_ranges else None,
),
spec=V1ServiceSpec(**spec_kwargs),
)


Expand Down Expand Up @@ -1366,6 +1374,7 @@ async def create_services(
logger: logging.Logger,
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> None:
async with GlobalApiClient() as api_client:
core = CoreV1Api(api_client)
Expand All @@ -1380,6 +1389,7 @@ async def create_services(
dns_record,
source_ranges,
additional_annotations=additional_annotations,
use_traefik=use_traefik,
)
await call_kubeapi(
core.create_namespaced_service,
Expand Down Expand Up @@ -1425,15 +1435,16 @@ async def recreate_services(
transport_port = ports_spec.get("transport", Port.TRANSPORT.value)

cratedb_labels = build_cratedb_labels(name, meta)

owner_references = get_owner_references(name, meta)
source_ranges = spec["cluster"].get("allowedCIDRs", None)

additional_annotations = (
spec.get("cluster", {}).get("service", {}).get("annotations", {})
)
dns_record = spec.get("cluster", {}).get("externalDNS", None)

exposure = spec.get("cluster", {}).get("exposure", "loadbalancer")
use_traefik = exposure == "traefik"

await create_services(
owner_references,
namespace,
Expand All @@ -1446,6 +1457,7 @@ async def recreate_services(
logger,
source_ranges,
additional_annotations,
use_traefik=use_traefik,
)


Expand Down Expand Up @@ -1676,8 +1688,10 @@ async def handle( # type: ignore
logger: logging.Logger,
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
exposure: str = "loadbalancer",
**kwargs: Any,
):
use_traefik = exposure == "traefik"
await create_services(
owner_references,
namespace,
Expand All @@ -1690,6 +1704,7 @@ async def handle( # type: ignore
logger,
source_ranges,
additional_annotations,
use_traefik=use_traefik,
)


Expand Down
Loading
Loading