Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Changelog
Unreleased
----------

* Added optional ``spec.cluster.exposure`` field to the CrateDB CRD, supporting
``loadbalancer`` (default, existing behavior) and ``traefik``. When set to
``traefik``, the operator creates a ClusterIP service with Traefik
IngressRouteTCP and MiddlewareTCP resources instead of a cloud LoadBalancer.

2.60.0 (2026-04-22)
-------------------

Expand Down
1 change: 1 addition & 0 deletions crate/operator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,5 @@ class OperationType(str, enum.Enum):
SUSPEND = "suspend"
RESUME = "resume"
CHANGE_COMPUTE = "change_compute"
CHANGE_EXPOSURE = "change_exposure"
UNKNOWN = "unknown"
111 changes: 72 additions & 39 deletions crate/operator/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,68 +1222,94 @@ async def create_statefulset(
)


def get_data_service(
owner_references: Optional[List[V1OwnerReference]],
name: str,
labels: LabelType,
http_port: int,
postgres_port: int,
dns_record: Optional[str],
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
) -> V1Service:
res_annotations = {}
def _lb_annotations_to_add(dns_record: Optional[str]) -> dict:
"""
Build the cloud provider LB annotations to add to a service.
"""
annotations: dict = {}
if config.CLOUD_PROVIDER == CloudProvider.AWS:
res_annotations.update(
annotations.update(
{
# https://kubernetes.io/docs/concepts/services-networking/service/#connection-draining-on-aws
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": "true", # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": "1800", # noqa
# Default idle timeout is 60s, which kills the connection on long-running queries # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb", # noqa
"service.beta.kubernetes.io/aws-load-balancer-type": "nlb",
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": "true", # noqa
}
)
elif config.CLOUD_PROVIDER == CloudProvider.AZURE:
# https://docs.microsoft.com/en-us/azure/aks/load-balancer-standard#additional-customizations-via-kubernetes-annotations
# https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-tcp-reset
res_annotations.update(
annotations.update(
{
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": "false", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30", # noqa
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": "30",
}
)

if dns_record:
res_annotations.update(
{"external-dns.alpha.kubernetes.io/hostname": dns_record}
)
annotations["external-dns.alpha.kubernetes.io/hostname"] = dns_record
return annotations


def _lb_annotations_to_remove() -> dict:
"""
Return all LB annotations set to None to remove them via merge patch.
"""
return {
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-enabled": None, # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-draining-timeout": None, # noqa
"service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": None,
"service.beta.kubernetes.io/aws-load-balancer-type": None,
"service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled": None, # noqa
"service.beta.kubernetes.io/azure-load-balancer-disable-tcp-reset": None,
"service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout": None,
"external-dns.alpha.kubernetes.io/hostname": None,
}


def get_data_service(
owner_references: Optional[List[V1OwnerReference]],
name: str,
labels: LabelType,
http_port: int,
postgres_port: int,
dns_record: Optional[str],
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> V1Service:
res_annotations = {}

if not use_traefik:
res_annotations.update(_lb_annotations_to_add(dns_record))

if additional_annotations:
res_annotations.update(additional_annotations)

service_name = f"crate-{name}"
service_type = "ClusterIP" if use_traefik else "LoadBalancer"

spec_kwargs = dict(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type=service_type,
)

if not use_traefik:
spec_kwargs["external_traffic_policy"] = "Local"
if source_ranges:
spec_kwargs["load_balancer_source_ranges"] = source_ranges

return V1Service(
metadata=V1ObjectMeta(
annotations=res_annotations,
labels=labels,
name=service_name,
name=f"crate-{name}",
owner_references=owner_references,
),
spec=V1ServiceSpec(
ports=[
V1ServicePort(name="http", port=http_port, target_port=Port.HTTP.value),
V1ServicePort(
name="psql", port=postgres_port, target_port=Port.POSTGRES.value
),
],
selector={LABEL_COMPONENT: "cratedb", LABEL_NAME: name},
type="LoadBalancer",
external_traffic_policy="Local",
load_balancer_source_ranges=source_ranges if source_ranges else None,
),
spec=V1ServiceSpec(**spec_kwargs),
)


Expand Down Expand Up @@ -1366,6 +1392,7 @@ async def create_services(
logger: logging.Logger,
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
use_traefik: bool = False,
) -> None:
async with GlobalApiClient() as api_client:
core = CoreV1Api(api_client)
Expand All @@ -1380,6 +1407,7 @@ async def create_services(
dns_record,
source_ranges,
additional_annotations=additional_annotations,
use_traefik=use_traefik,
)
await call_kubeapi(
core.create_namespaced_service,
Expand Down Expand Up @@ -1425,15 +1453,16 @@ async def recreate_services(
transport_port = ports_spec.get("transport", Port.TRANSPORT.value)

cratedb_labels = build_cratedb_labels(name, meta)

owner_references = get_owner_references(name, meta)
source_ranges = spec["cluster"].get("allowedCIDRs", None)

additional_annotations = (
spec.get("cluster", {}).get("service", {}).get("annotations", {})
)
dns_record = spec.get("cluster", {}).get("externalDNS", None)

exposure = spec.get("cluster", {}).get("exposure", "loadbalancer")
use_traefik = exposure == "traefik"

await create_services(
owner_references,
namespace,
Expand All @@ -1446,6 +1475,7 @@ async def recreate_services(
logger,
source_ranges,
additional_annotations,
use_traefik=use_traefik,
)


Expand Down Expand Up @@ -1676,8 +1706,10 @@ async def handle( # type: ignore
logger: logging.Logger,
source_ranges: Optional[List[str]] = None,
additional_annotations: Optional[Dict] = None,
exposure: str = "loadbalancer",
**kwargs: Any,
):
use_traefik = exposure == "traefik"
await create_services(
owner_references,
namespace,
Expand All @@ -1690,6 +1722,7 @@ async def handle( # type: ignore
logger,
source_ranges,
additional_annotations,
use_traefik=use_traefik,
)


Expand Down
Loading
Loading