diff --git a/client/fleet.go b/client/fleet.go new file mode 100644 index 000000000..69755600d --- /dev/null +++ b/client/fleet.go @@ -0,0 +1,98 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" +) + +type GetFleetURLResponse struct { + URL string `json:"url"` +} + +type UpdateFleetURLOptions struct { + URL string `json:"url"` +} + +type RegisterFleetOptions struct { + ActivationToken string `json:"activationToken"` +} + +// GetFleetURL retrieves the configured Fleet server URL. +func (c *Client) GetFleetURL(ctx context.Context) (*GetFleetURLResponse, error) { + resp, err := c.Requester.Do(ctx, &RequestOptions{ + Type: SyncRequest, + Method: "GET", + Path: "api/v1/fleet/url", + }) + if err != nil { + return nil, err + } + + var response GetFleetURLResponse + + err = resp.DecodeResult(&response) + if err != nil { + return nil, err + } + + return &response, nil +} + +// UpdateFleetURL sets the Fleet server URL. +func (c *Client) UpdateFleetURL(ctx context.Context, opts *UpdateFleetURLOptions) error { + var body bytes.Buffer + + err := json.NewEncoder(&body).Encode(opts) + if err != nil { + return err + } + + _, err = c.Requester.Do(ctx, &RequestOptions{ + Type: SyncRequest, + Method: "PUT", + Path: "api/v1/fleet/url", + Body: &body, + }) + if err != nil { + return err + } + + return nil +} + +// RegisterFleet registers the Core instance with a Fleet using an activation token. +func (c *Client) RegisterFleet(ctx context.Context, opts *RegisterFleetOptions) error { + var body bytes.Buffer + + err := json.NewEncoder(&body).Encode(opts) + if err != nil { + return err + } + + _, err = c.Requester.Do(ctx, &RequestOptions{ + Type: SyncRequest, + Method: "POST", + Path: "api/v1/fleet/register", + Body: &body, + }) + if err != nil { + return err + } + + return nil +} + +// UnregisterFleet unregisters the Core instance from a Fleet. +func (c *Client) UnregisterFleet(ctx context.Context) error { + _, err := c.Requester.Do(ctx, &RequestOptions{ + Type: SyncRequest, + Method: "POST", + Path: "api/v1/fleet/unregister", + }) + if err != nil { + return err + } + + return nil +} diff --git a/client/status.go b/client/status.go index b326b257f..813c8b2f5 100644 --- a/client/status.go +++ b/client/status.go @@ -26,6 +26,11 @@ type ClusterStatus struct { PendingMigration *PendingMigration `json:"pendingMigration,omitempty"` } +type FleetStatus struct { + Managed bool `json:"managed"` + LastSyncAt string `json:"lastSyncAt,omitempty"` +} + type Status struct { Version string `json:"version"` Revision string `json:"revision,omitempty"` @@ -33,6 +38,7 @@ type Status struct { Ready bool `json:"ready"` SchemaVersion int `json:"schemaVersion"` Cluster *ClusterStatus `json:"cluster,omitempty"` + Fleet FleetStatus `json:"fleet"` } // GetStatus retrieves the current status of the system. diff --git a/fleet/buffer.go b/fleet/buffer.go new file mode 100644 index 000000000..2aec6ca3b --- /dev/null +++ b/fleet/buffer.go @@ -0,0 +1,82 @@ +// Copyright 2026 Ella Networks + +package fleet + +import ( + "sync" + + "github.com/ellanetworks/core/fleet/client" +) + +const defaultFleetBufferCapacity = 10_000 + +// FleetBuffer is a bounded in-memory ring buffer for flow entries destined for +// Fleet sync. Writes are non-blocking: when the buffer is full the oldest entry +// is dropped. Reads drain the entire buffer atomically. +type FleetBuffer struct { + mu sync.Mutex + entries []client.FlowEntry + capacity int + head int + count int +} + +// NewFleetBuffer creates a FleetBuffer with the given maximum capacity. +// If capacity <= 0 the default (10 000) is used. +func NewFleetBuffer(capacity int) *FleetBuffer { + if capacity <= 0 { + capacity = defaultFleetBufferCapacity + } + + return &FleetBuffer{ + entries: make([]client.FlowEntry, capacity), + capacity: capacity, + } +} + +// EnqueueFlow appends a flow entry to the buffer. If the buffer is at +// capacity the oldest entry is silently dropped to make room. +func (b *FleetBuffer) EnqueueFlow(entry client.FlowEntry) { + b.mu.Lock() + defer b.mu.Unlock() + + idx := (b.head + b.count) % b.capacity + + if b.count == b.capacity { + b.entries[b.head] = entry + b.head = (b.head + 1) % b.capacity + } else { + b.entries[idx] = entry + b.count++ + } +} + +// DrainFlows returns all buffered flow entries and resets the buffer. +// The caller owns the returned slice. Returns nil when empty. +func (b *FleetBuffer) DrainFlows() []client.FlowEntry { + b.mu.Lock() + defer b.mu.Unlock() + + if b.count == 0 { + return nil + } + + out := make([]client.FlowEntry, b.count) + + for i := range b.count { + out[i] = b.entries[(b.head+i)%b.capacity] + } + + b.head = 0 + b.count = 0 + + return out +} + +// Len returns the current number of buffered entries. +func (b *FleetBuffer) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + + return b.count +} diff --git a/fleet/client/fleet.go b/fleet/client/fleet.go new file mode 100644 index 000000000..718643dab --- /dev/null +++ b/fleet/client/fleet.go @@ -0,0 +1,86 @@ +// Copyright 2026 Ella Networks + +package client + +import ( + "crypto/tls" + "fmt" + "net/http" + "strings" + "time" +) + +// Fleet is the Core-side client for the Fleet management API. After a +// successful Register call the caller stores the returned token via +// SetToken; every subsequent request carries it in the Authorization header. +type Fleet struct { + url string + client *http.Client + token string +} + +// New constructs a Fleet client. +// +// When insecureSkipVerify is false (the default in production +// deployments), TLS server certificates are validated against the system +// trust store. Self-hosted Fleet deployments using self-signed certs +// either install the cert into the Core host's trust store, front Fleet +// with a real cert (Let's Encrypt or internal PKI), or set +// `fleet.insecure-skip-verify: true` in core.yaml to disable validation +// — appropriate for integration tests and local-development setups, but +// not for production. +func New(url string, insecureSkipVerify bool) *Fleet { + transport := http.DefaultTransport + + if insecureSkipVerify { + transport = &http.Transport{ + // #nosec G402 -- explicitly gated behind fleet.insecure-skip-verify in core.yaml + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + + return &Fleet{ + url: url, + client: &http.Client{ + Timeout: 15 * time.Second, + Transport: transport, + }, + } +} + +// SetToken sets the bearer token used to authenticate sync and unregister +// requests. Empty token disables the Authorization header (only valid before +// registration completes). +func (fc *Fleet) SetToken(token string) { + fc.token = token +} + +// addAuth attaches the bearer token to the request when set. Callers use +// this for every request that requires authentication (sync, unregister). +func (fc *Fleet) addAuth(req *http.Request) { + if fc.token != "" { + req.Header.Set("Authorization", "Bearer "+fc.token) + } +} + +// checkResponseContentType returns a user-friendly error when the fleet +// server replies with something other than JSON (usually HTML from a +// mis-configured URL). +func checkResponseContentType(res *http.Response) error { + ct := res.Header.Get("Content-Type") + if ct != "" && !isJSONContentType(ct) { + return fmt.Errorf("fleet server at %s returned an unexpected response (Content-Type: %s) — verify the Fleet URL is correct", res.Request.URL.Host, ct) + } + + return nil +} + +func isJSONContentType(ct string) bool { + for _, prefix := range []string{"application/json", "text/json"} { + if strings.HasPrefix(ct, prefix) { + return true + } + } + + return false +} diff --git a/fleet/client/register.go b/fleet/client/register.go new file mode 100644 index 000000000..86024d778 --- /dev/null +++ b/fleet/client/register.go @@ -0,0 +1,450 @@ +// Copyright 2026 Ella Networks + +package client + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" +) + +// ErrUnauthorized is returned when the fleet server rejects the activation token (HTTP 401). +var ErrUnauthorized = errors.New("invalid activation code") + +type OperatorTracking struct { + SupportedTacs []string `json:"supported_tacs"` +} + +type OperatorNASSecurity struct { + Ciphering []string `json:"ciphering"` + Integrity []string `json:"integrity"` +} + +type OperatorSPN struct { + FullName string `json:"full_name"` + ShortName string `json:"short_name"` +} + +type OperatorID struct { + Mcc string `json:"mcc"` + Mnc string `json:"mnc"` +} + +// Operator carries the cluster-scoped operator configuration that Fleet +// manages. AMF identity and ClusterID are intentionally absent: they are +// cluster-local — AMF region/set IDs depend on the operator's broader 5G +// topology and ClusterID is generated inside the cluster. +type Operator struct { + ID OperatorID `json:"id"` + OperatorCode string `json:"operator_code"` + Tracking OperatorTracking `json:"tracking"` + NASSecurity OperatorNASSecurity `json:"nas_security"` + SPN OperatorSPN `json:"spn"` +} + +type HomeNetworkKey struct { + KeyIdentifier int `json:"key_identifier"` + Scheme string `json:"scheme"` + PrivateKey string `json:"private_key"` +} + +type DataNetwork struct { + Name string `json:"name"` + IPPool string `json:"ip_pool"` + DNS string `json:"dns"` + MTU int32 `json:"mtu"` +} + +type Profile struct { + Name string `json:"name"` + UeAmbrUplink string `json:"ue_ambr_uplink"` + UeAmbrDownlink string `json:"ue_ambr_downlink"` +} + +type Slice struct { + Name string `json:"name"` + Sst int32 `json:"sst"` + Sd *string `json:"sd,omitempty"` +} + +type Policy struct { + Name string `json:"name"` + ProfileName string `json:"profile_name"` + SliceName string `json:"slice_name"` + DataNetworkName string `json:"data_network_name"` + Var5qi int32 `json:"var5qi"` + Arp int32 `json:"arp"` + SessionAmbrUplink string `json:"session_ambr_uplink"` + SessionAmbrDownlink string `json:"session_ambr_downlink"` +} + +type Subscriber struct { + Imsi string `json:"imsi"` + ProfileName string `json:"profile_name"` + SequenceNumber string `json:"sequence_number"` + PermanentKey string `json:"permanent_key"` + Opc string `json:"opc"` +} + +// NetworkRule is a per-policy filter rule. Identified by +// (policy_name, direction, precedence); precedence is 1-indexed and +// unique within (policy_name, direction). +type NetworkRule struct { + PolicyName string `json:"policy_name"` + Direction string `json:"direction"` // "uplink" or "downlink" + Precedence int32 `json:"precedence"` + Description string `json:"description"` + RemotePrefix *string `json:"remote_prefix,omitempty"` + Protocol int32 `json:"protocol"` + PortLow int32 `json:"port_low"` + PortHigh int32 `json:"port_high"` + Action string `json:"action"` // "allow" or "deny" +} + +type Route struct { + ID int64 `json:"id"` + Destination string `json:"destination"` + Gateway string `json:"gateway"` + Interface string `json:"interface"` + Metric int `json:"metric"` +} + +type BGPSettings struct { + Enabled bool `json:"enabled"` + LocalAS int `json:"local_as"` + RouterID string `json:"router_id"` + ListenAddress string `json:"listen_address"` +} + +// BGPPeer is per-node: each Core receives only the peers Fleet has +// scoped to the node making the sync request, and reconciles its local +// bgp_peers table against that list. Address is unique per node. +type BGPPeer struct { + Address string `json:"address"` + RemoteAS int `json:"remote_as"` + HoldTime int `json:"hold_time"` + Password string `json:"password,omitempty"` + Description string `json:"description"` +} + +// BGPImportPrefix references a peer by its Address (natural key). +type BGPImportPrefix struct { + PeerAddress string `json:"peer_address"` + Prefix string `json:"prefix"` + MaxLength int `json:"max_length"` +} + +// RetentionPolicy holds the retention days for a single category: +// "audit", "radio", "usage", or "flow_reports". +type RetentionPolicy struct { + Category string `json:"category"` + Days int `json:"days"` +} + +type N2Interface struct { + // Addresses is the resolved list of N2 IPs for the node. When the + // node binds N2 by interface name (cfg.Interfaces.N2.Name), every + // IP currently assigned to that interface is reported; when it + // binds by literal address (cfg.Interfaces.N2.Address), a + // single-element list is reported. Empty when neither is set. + Addresses []string `json:"addresses"` + Port int `json:"port"` +} + +type Vlan struct { + MasterInterface string `json:"master_interface"` + VlanId int `json:"vlan_id"` +} + +type N3Interface struct { + Name string `json:"name"` + Address string `json:"address"` + ExternalAddress string `json:"external_address"` + Vlan *Vlan `json:"vlan,omitempty"` +} + +type N6Interface struct { + Name string `json:"name"` + Vlan *Vlan `json:"vlan,omitempty"` +} + +type APIInterface struct { + Address string `json:"address"` + Port int `json:"port"` +} + +type StatusNetworkInterfaces struct { + N2 N2Interface `json:"n2"` + N3 N3Interface `json:"n3"` + N6 N6Interface `json:"n6"` + API APIInterface `json:"api"` +} + +// NetworkInterfaces holds the per-node interface-level configuration +// that Fleet manages. Today only N3's external address is configurable; +// the struct is shaped so future N2/N6 fields can be added without +// reshaping the wire. +type NetworkInterfaces struct { + N3ExternalAddress string `json:"n3_external_address"` +} + +// ClusterConfig carries the cluster-replicated portion of the Fleet +// config. Applied by the leader via Raft; followers receive an identical +// copy in the sync response but discard it. +type ClusterConfig struct { + Operator Operator `json:"operator"` + HomeNetworkKeys []HomeNetworkKey `json:"home_network_keys"` + DataNetworks []DataNetwork `json:"data_networks"` + Profiles []Profile `json:"profiles"` + Slices []Slice `json:"slices"` + Policies []Policy `json:"policies"` + NetworkRules []NetworkRule `json:"network_rules"` + Subscribers []Subscriber `json:"subscribers"` + RetentionPolicies []RetentionPolicy `json:"retention_policies"` +} + +// NodeConfig carries the per-node portion of the Fleet config. Each +// member receives the slice scoped to its own node and applies it +// locally; writes bypass Raft. +type NodeConfig struct { + Routes []Route `json:"routes"` + NAT bool `json:"nat"` + FlowAccounting bool `json:"flow_accounting"` + NetworkInterfaces NetworkInterfaces `json:"network_interfaces"` + BGP BGPSettings `json:"bgp"` + BGPPeers []BGPPeer `json:"bgp_peers"` + BGPImportPrefixes []BGPImportPrefix `json:"bgp_import_prefixes"` +} + +// Config is the unified Fleet config payload exchanged in both +// directions: Cores send it as RegisterParams.InitialConfig, and Fleet +// returns it as SyncResponse.Config. The Cluster / Node split mirrors +// the apply paths in core2: only the leader applies Cluster; every node +// applies Node. +type Config struct { + Cluster ClusterConfig `json:"cluster"` + Node NodeConfig `json:"node"` +} + +type PlmnID struct { + Mcc string `json:"mcc"` + Mnc string `json:"mnc"` +} + +type Tai struct { + PlmnID PlmnID `json:"plmnID"` + Tac string `json:"tac"` +} + +type Snssai struct { + Sst int32 `json:"sst"` + Sd string `json:"sd"` +} + +type SupportedTAI struct { + Tai Tai `json:"tai"` + SNssais []Snssai `json:"snssais"` +} + +type Radio struct { + Name string `json:"name"` + ID string `json:"id"` + Address string `json:"address"` + SupportedTAIs []SupportedTAI `json:"supported_tais"` + RanNodeType string `json:"ran_node_type"` + LastSeenAtUnix int64 `json:"last_seen_at_unix"` +} + +// PDUSession is one active PDU session for a subscriber, projected from +// internal/amf.PDUSessionExport. Fleet uses this to populate the per-IMSI +// sessions card. +type PDUSession struct { + SessionID int `json:"session_id"` + DataNetworkName string `json:"data_network_name"` + Sst int32 `json:"sst"` + Sd string `json:"sd"` + AllocatedIP string `json:"allocated_ip"` + QoS5QI int32 `json:"qos_5qi"` + QoSARP int32 `json:"qos_arp"` + EstablishedAtUnix int64 `json:"established_at_unix"` +} + +type SubscriberStatus struct { + Imsi string `json:"imsi"` + Registered bool `json:"registered"` + IPAddress string `json:"ip_address"` + Imei string `json:"imei,omitempty"` + CipheringAlgorithm string `json:"ciphering_algorithm,omitempty"` + IntegrityAlgorithm string `json:"integrity_algorithm,omitempty"` + LastSeenAt string `json:"last_seen_at,omitempty"` + LastSeenRadio string `json:"last_seen_radio,omitempty"` + Sessions []PDUSession `json:"sessions,omitempty"` +} + +// IPLease mirrors db.IPLease on the wire. Fleet uses these to render +// the per-DNN IP allocations table and to compute pool utilization. +// `data_network_name` is included so Fleet can group by DNN without +// having to resolve UUIDs (Core's pool IDs aren't meaningful in +// Fleet). `session_id` is nil for static reservations. +type IPLease struct { + Address string `json:"address"` + IMSI string `json:"imsi"` + Type string `json:"type"` + SessionID *int `json:"session_id,omitempty"` + DataNetworkName string `json:"data_network_name"` + EstablishedAtUnix int64 `json:"established_at_unix,omitempty"` +} + +// BGPPeerState mirrors internal/bgp.BGPPeerStatus on the wire. Reported +// per peer per sync cycle when BGP is enabled; empty otherwise. +type BGPPeerState struct { + Address string `json:"address"` + RemoteAS int `json:"remote_as"` + State string `json:"state"` + Uptime string `json:"uptime,omitempty"` + PrefixesSent int `json:"prefixes_sent"` + PrefixesReceived int `json:"prefixes_received"` +} + +// BGPAdvertisedRoute mirrors internal/bgp.BGPRoute on the wire. Routes +// originated by this Core; subscriber is the IMSI when the route comes +// from a UE allocation. +type BGPAdvertisedRoute struct { + Subscriber string `json:"subscriber,omitempty"` + Prefix string `json:"prefix"` + NextHop string `json:"next_hop"` +} + +// BGPLearnedRoute mirrors internal/bgp.LearnedRoute on the wire. +type BGPLearnedRoute struct { + Prefix string `json:"prefix"` + NextHop string `json:"next_hop"` + PeerAddress string `json:"peer_address"` +} + +type EllaCoreStatus struct { + NetworkInterfaces StatusNetworkInterfaces `json:"network_interfaces"` + Radios []Radio `json:"radios"` + Subscribers []SubscriberStatus `json:"subscribers"` + BGPPeerStates []BGPPeerState `json:"bgp_peer_states,omitempty"` + AdvertisedRoutes []BGPAdvertisedRoute `json:"advertised_routes,omitempty"` + LearnedRoutes []BGPLearnedRoute `json:"learned_routes,omitempty"` + IPLeases []IPLease `json:"ip_leases,omitempty"` +} + +type EllaCoreMetrics struct { + UplinkBytesTotal int64 `json:"uplink_bytes_total"` + DownlinkBytesTotal int64 `json:"downlink_bytes_total"` + PDUSessionsTotal int64 `json:"pdu_sessions_total"` + ProcessCPUSecondsTotal float64 `json:"process_cpu_seconds_total"` + ProcessResidentMemoryBytes int64 `json:"process_resident_memory_bytes"` + GoGoroutines int64 `json:"go_goroutines"` + ProcessStartTimeSeconds float64 `json:"process_start_time_seconds"` + DatabaseStorageBytes int64 `json:"database_storage_bytes"` + IPAddresses int64 `json:"ip_addresses"` + IPAddressesAllocated int64 `json:"ip_addresses_allocated"` + RegistrationAttemptsAccepted int64 `json:"registration_attempts_accepted"` + RegistrationAttemptsRejected int64 `json:"registration_attempts_rejected"` +} + +type RegisterParams struct { + ActivationToken string `json:"activation_token"` + ClusterEnabled bool `json:"cluster_enabled"` + ClusterID string `json:"cluster_id,omitempty"` + NodeID int `json:"node_id,omitempty"` + InitialConfig Config `json:"initial_config"` + InitialStatus EllaCoreStatus `json:"initial_status"` + InitialMetrics EllaCoreMetrics `json:"initial_metrics"` + InitialUsage []SubscriberUsageEntry `json:"initial_usage,omitempty"` +} + +// RegisterResponse carries the bearer token issued by Fleet. The Core +// stores it locally and presents it on every subsequent sync request. +type RegisterResponse struct { + Token string `json:"token"` +} + +type ErrorResponse struct { + Error string `json:"error"` +} + +type Response struct { + Result json.RawMessage `json:"result"` +} + +type RegisterInput struct { + ActivationToken string + ClusterEnabled bool + ClusterID string + NodeID int + InitialConfig Config + InitialStatus EllaCoreStatus + InitialMetrics EllaCoreMetrics + InitialUsage []SubscriberUsageEntry +} + +func (fc *Fleet) Register(ctx context.Context, in RegisterInput) (*RegisterResponse, error) { + params := &RegisterParams{ + ActivationToken: in.ActivationToken, + ClusterEnabled: in.ClusterEnabled, + ClusterID: in.ClusterID, + NodeID: in.NodeID, + InitialConfig: in.InitialConfig, + InitialStatus: in.InitialStatus, + InitialMetrics: in.InitialMetrics, + InitialUsage: in.InitialUsage, + } + + body, err := json.Marshal(params) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "POST", fc.url+"/api/v1/cores/register", bytes.NewReader(body)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + + res, err := fc.client.Do(req) + if err != nil { + return nil, err + } + + defer func() { + _ = res.Body.Close() + }() + + if err := checkResponseContentType(res); err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + var errResp ErrorResponse + if err := json.NewDecoder(res.Body).Decode(&errResp); err != nil { + return nil, fmt.Errorf("unexpected status code %d and failed to decode error: %w", res.StatusCode, err) + } + + if res.StatusCode == http.StatusUnauthorized { + return nil, fmt.Errorf("%w: %s", ErrUnauthorized, errResp.Error) + } + + return nil, fmt.Errorf("register failed (status %d): %s", res.StatusCode, errResp.Error) + } + + var envelope Response + if err := json.NewDecoder(res.Body).Decode(&envelope); err != nil { + return nil, fmt.Errorf("decoding response envelope: %w", err) + } + + var registerResponse RegisterResponse + if err := json.Unmarshal(envelope.Result, ®isterResponse); err != nil { + return nil, fmt.Errorf("decoding register result: %w", err) + } + + return ®isterResponse, nil +} diff --git a/fleet/client/sync.go b/fleet/client/sync.go new file mode 100644 index 000000000..31122018c --- /dev/null +++ b/fleet/client/sync.go @@ -0,0 +1,105 @@ +// Copyright 2026 Ella Networks + +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type SubscriberUsageEntry struct { + EpochDay int64 `json:"epoch_day"` + IMSI string `json:"imsi"` + UplinkBytes int64 `json:"uplink_bytes"` + DownlinkBytes int64 `json:"downlink_bytes"` +} + +// FlowEntry represents a single completed UE data flow captured by the eBPF +// data plane and forwarded to Fleet as part of the sync cycle. +type FlowEntry struct { + SubscriberID string `json:"subscriber_id"` + SourceIP string `json:"source_ip"` + DestinationIP string `json:"destination_ip"` + SourcePort int `json:"source_port"` + DestinationPort int `json:"destination_port"` + Protocol int `json:"protocol"` + Packets int64 `json:"packets"` + Bytes int64 `json:"bytes"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + Direction string `json:"direction"` + // Action mirrors dbwriter.FlowReport.Action: "allow" when the flow + // matched an allow rule, "drop" when it was dropped by an SDF + // filter. Empty when the producer didn't classify (older Cores). + Action string `json:"action,omitempty"` +} + +type SyncParams struct { + Version string `json:"version"` + ClusterEnabled bool `json:"cluster_enabled"` + NodeID int `json:"node_id,omitempty"` + ClusterID string `json:"cluster_id,omitempty"` + IsLeader bool `json:"is_leader"` + LastKnownRevision int64 `json:"last_known_revision"` + Status *EllaCoreStatus `json:"status,omitempty"` + Metrics EllaCoreMetrics `json:"metrics"` + Flows []FlowEntry `json:"flows,omitempty"` + SubscriberUsage []SubscriberUsageEntry `json:"subscriber_usage,omitempty"` +} + +type SyncResponse struct { + Config *Config `json:"config,omitempty"` + ConfigRevision int64 `json:"config_revision"` +} + +func (fc *Fleet) Sync(ctx context.Context, params *SyncParams) (*SyncResponse, error) { + body, err := json.Marshal(params) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "POST", fc.url+"/api/v1/cores/sync", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("creating sync request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + fc.addAuth(req) + + res, err := fc.client.Do(req) + if err != nil { + return nil, fmt.Errorf("sending sync: %w", err) + } + + defer func() { + _ = res.Body.Close() + }() + + if err := checkResponseContentType(res); err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + var errResp ErrorResponse + if err := json.NewDecoder(res.Body).Decode(&errResp); err != nil { + return nil, fmt.Errorf("sync: unexpected status code %d and failed to decode error: %w", res.StatusCode, err) + } + + return nil, fmt.Errorf("sync failed (status %d): %s", res.StatusCode, errResp.Error) + } + + var envelope Response + if err := json.NewDecoder(res.Body).Decode(&envelope); err != nil { + return nil, fmt.Errorf("decoding response envelope: %w", err) + } + + var syncResponse SyncResponse + if err := json.Unmarshal(envelope.Result, &syncResponse); err != nil { + return nil, fmt.Errorf("decoding sync result: %w", err) + } + + return &syncResponse, nil +} diff --git a/fleet/client/unregister.go b/fleet/client/unregister.go new file mode 100644 index 000000000..edd57103c --- /dev/null +++ b/fleet/client/unregister.go @@ -0,0 +1,38 @@ +// Copyright 2026 Ella Networks + +package client + +import ( + "context" + "fmt" + "net/http" +) + +func (fc *Fleet) Unregister(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "POST", fc.url+"/api/v1/cores/unregister", nil) + if err != nil { + return fmt.Errorf("creating unregister request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + fc.addAuth(req) + + res, err := fc.client.Do(req) + if err != nil { + return fmt.Errorf("sending unregister: %w", err) + } + + defer func() { + _ = res.Body.Close() + }() + + if err := checkResponseContentType(res); err != nil { + return err + } + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("unregister failed (status %d)", res.StatusCode) + } + + return nil +} diff --git a/fleet/fleet.go b/fleet/fleet.go new file mode 100644 index 000000000..abd1fb0ec --- /dev/null +++ b/fleet/fleet.go @@ -0,0 +1,355 @@ +// Copyright 2026 Ella Networks + +package fleet + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/ellanetworks/core/fleet/client" + "github.com/ellanetworks/core/internal/db" + "github.com/ellanetworks/core/internal/logger" + "github.com/ellanetworks/core/version" + "go.uber.org/zap" +) + +// recentUsageDays is the number of recent days (including today) to include +// in each sync payload. Fleet uses upsert semantics, so re-sending the same +// day is safe and ensures late-arriving counters are propagated. +const recentUsageDays = 3 + +// syncInterval is the cadence at which each node contacts Fleet. Long +// enough to keep the Raft log overhead from leader-side state writes +// manageable, short enough to push timely metrics/flows. +const syncInterval = 15 * time.Second + +// ConfigReloader is called after a fleet sync applies a new config to the +// database so that runtime components (e.g. UPF/BPF) can be reloaded to +// match. Implementations must be safe for concurrent use. +type ConfigReloader interface { + ReloadNAT(natEnabled bool) error + ReloadFlowAccounting(flowAccountingEnabled bool) error +} + +// SyncHandle is returned by ResumeSync and lets the caller attach a +// ConfigReloader after the sync loop is already running. +type SyncHandle struct { + mu sync.Mutex + reloader ConfigReloader +} + +func (h *SyncHandle) SetConfigReloader(r ConfigReloader) { + h.mu.Lock() + defer h.mu.Unlock() + + h.reloader = r +} + +func (h *SyncHandle) getReloader() ConfigReloader { + h.mu.Lock() + defer h.mu.Unlock() + + return h.reloader +} + +// SyncCallback is invoked after each sync attempt with the success flag. +type SyncCallback func(ctx context.Context, success bool) + +// StatusProvider returns the current local status of this node. +type StatusProvider func() client.EllaCoreStatus + +// MetricsProvider returns the current local metrics snapshot. +type MetricsProvider func() client.EllaCoreMetrics + +var ( + mu sync.Mutex + cancelPrevSync context.CancelFunc +) + +func statusHash(s client.EllaCoreStatus) [sha256.Size]byte { + b, _ := json.Marshal(s) + return sha256.Sum256(b) +} + +// statusTracker tracks the SHA-256 hash of the last successfully sent +// status and decides whether to include it in the next sync request. +// Status is always included on the first call. +type statusTracker struct { + lastHash [sha256.Size]byte + hasSent bool +} + +func (t *statusTracker) Prepare(s client.EllaCoreStatus) *client.EllaCoreStatus { + h := statusHash(s) + if !t.hasSent || h != t.lastHash { + return &s + } + + return nil +} + +func (t *statusTracker) Confirm(s client.EllaCoreStatus) { + t.lastHash = statusHash(s) + t.hasSent = true +} + +// syncer abstracts the fleet HTTP Sync call for testability. +type syncer interface { + Sync(ctx context.Context, params *client.SyncParams) (*client.SyncResponse, error) +} + +// syncDB abstracts the database operations needed by the sync loop. +// UpdateClusterConfig and GetRawDailyUsage are leader-only (replicated +// tables; the leader's apply propagates via Raft). UpdateNodeConfig and +// UpdateFleetConfigRevision run on every node — they write to local-only +// tables (per-node network config and the fleet registration row). +type syncDB interface { + UpdateClusterConfig(ctx context.Context, cfg client.ClusterConfig) error + UpdateNodeConfig(ctx context.Context, cfg client.NodeConfig) error + UpdateFleetConfigRevision(ctx context.Context, revision int64) error + GetRawDailyUsage(ctx context.Context, start, end time.Time) ([]db.DailyUsage, error) + GetFleet(ctx context.Context) (*db.Fleet, error) + IsLeader() bool + NodeID() int +} + +const maxFlowRetries = 3 + +type syncRunner struct { + syncer syncer + db syncDB + statusProvider StatusProvider + metricsProvider MetricsProvider + tracker statusTracker + version string + clusterEnabled bool + clusterID string + lastKnownRev int64 + onSync SyncCallback + buffer *FleetBuffer + heldFlows []client.FlowEntry + flowRetries int + handle *SyncHandle +} + +func (r *syncRunner) runOneCycle(ctx context.Context) { + currentStatus := r.statusProvider() + isLeader := r.db.IsLeader() + + var flowsToSend []client.FlowEntry + + if len(r.heldFlows) > 0 { + flowsToSend = r.heldFlows + } else if r.buffer != nil { + flowsToSend = r.buffer.DrainFlows() + } + + params := &client.SyncParams{ + Version: r.version, + ClusterEnabled: r.clusterEnabled, + NodeID: r.db.NodeID(), + ClusterID: r.clusterID, + IsLeader: isLeader, + LastKnownRevision: r.lastKnownRev, + Status: r.tracker.Prepare(currentStatus), + Metrics: r.metricsProvider(), + Flows: flowsToSend, + } + + if isLeader { + params.SubscriberUsage = r.collectUsage(ctx) + } + + resp, err := r.syncer.Sync(ctx, params) + if err != nil { + logger.EllaLog.Error("sync failed", zap.Error(err)) + + if len(flowsToSend) > 0 { + r.heldFlows = flowsToSend + r.flowRetries++ + + if r.flowRetries >= maxFlowRetries { + logger.EllaLog.Warn("dropping flow batch after max retries", + zap.Int("dropped_flows", len(r.heldFlows)), + zap.Int("retries", r.flowRetries)) + r.heldFlows = nil + r.flowRetries = 0 + } + } + + if r.onSync != nil { + r.onSync(ctx, false) + } + + return + } + + r.heldFlows = nil + r.flowRetries = 0 + + if params.Status != nil { + r.tracker.Confirm(currentStatus) + } + + // Cluster-wide replicated state (operator, profiles, slices, policies, + // subscribers, etc.) is applied only by the leader; the resulting + // changeset propagates to followers via Raft. Per-node state (NAT, + // flow accounting, N3, routes, BGP) is applied locally on every node + // against its own node-scoped slice of the response. + if resp.Config != nil { + if isLeader { + if err := r.db.UpdateClusterConfig(ctx, resp.Config.Cluster); err != nil { + logger.EllaLog.Error("failed to apply fleet cluster config", zap.Error(err)) + } + } + + if err := r.db.UpdateNodeConfig(ctx, resp.Config.Node); err != nil { + logger.EllaLog.Error("failed to apply per-node fleet config", zap.Error(err)) + } else { + r.lastKnownRev = resp.ConfigRevision + + if err := r.db.UpdateFleetConfigRevision(ctx, resp.ConfigRevision); err != nil { + logger.EllaLog.Error("failed to update config revision", zap.Error(err)) + } + + r.reloadConfig(resp.Config) + } + } + + if r.onSync != nil { + r.onSync(ctx, true) + } +} + +func (r *syncRunner) reloadConfig(cfg *client.Config) { + if r.handle == nil { + return + } + + reloader := r.handle.getReloader() + if reloader == nil { + return + } + + if err := reloader.ReloadNAT(cfg.Node.NAT); err != nil { + logger.EllaLog.Error("failed to reload NAT after fleet sync", zap.Error(err)) + } + + if err := reloader.ReloadFlowAccounting(cfg.Node.FlowAccounting); err != nil { + logger.EllaLog.Error("failed to reload flow accounting after fleet sync", zap.Error(err)) + } +} + +func (r *syncRunner) collectUsage(ctx context.Context) []client.SubscriberUsageEntry { + now := time.Now().UTC() + start := now.AddDate(0, 0, -(recentUsageDays - 1)) + + rows, err := r.db.GetRawDailyUsage(ctx, start, now) + if err != nil { + logger.EllaLog.Warn("failed to collect subscriber usage for fleet sync", zap.Error(err)) + return nil + } + + entries := make([]client.SubscriberUsageEntry, 0, len(rows)) + for _, row := range rows { + entries = append(entries, client.SubscriberUsageEntry{ + EpochDay: row.EpochDay, + IMSI: row.IMSI, + UplinkBytes: row.BytesUplink, + DownlinkBytes: row.BytesDownlink, + }) + } + + return entries +} + +// ResumeSyncInput groups the dependencies for starting the sync loop. +type ResumeSyncInput struct { + FleetURL string + Token string + InsecureSkipVerify bool + DB *db.Database + StatusProvider StatusProvider + MetricsProvider MetricsProvider + OnSync SyncCallback + Buffer *FleetBuffer + ClusterEnabled bool + ClusterID string +} + +// ResumeSync spins up the per-node sync loop. Every node — leader or +// follower — reports metrics, flows, and status; only the current leader +// sends subscriber usage and applies config pushed back from Fleet. +func ResumeSync(ctx context.Context, in ResumeSyncInput) (*SyncHandle, error) { + fc := client.New(in.FleetURL, in.InsecureSkipVerify) + fc.SetToken(in.Token) + + fleetData, err := in.DB.GetFleet(ctx) + if err != nil { + return nil, fmt.Errorf("couldn't get fleet data: %w", err) + } + + handle := &SyncHandle{} + + runner := &syncRunner{ + syncer: fc, + db: in.DB, + statusProvider: in.StatusProvider, + metricsProvider: in.MetricsProvider, + version: version.GetVersion().Version, + clusterEnabled: in.ClusterEnabled, + clusterID: in.ClusterID, + lastKnownRev: fleetData.ConfigRevision, + onSync: in.OnSync, + buffer: in.Buffer, + handle: handle, + } + + runner.runOneCycle(ctx) + + mu.Lock() + + if cancelPrevSync != nil { + cancelPrevSync() + } + + syncCtx, cancel := context.WithCancel(ctx) + cancelPrevSync = cancel + + mu.Unlock() + + ticker := time.NewTicker(syncInterval) + + go func() { + for { + select { + case <-ticker.C: + runner.runOneCycle(syncCtx) + case <-syncCtx.Done(): + ticker.Stop() + return + } + } + }() + + logger.EllaLog.Info("Resumed fleet sync from existing credentials") + + return handle, nil +} + +// StopSync cancels the running sync goroutine, if any. +func StopSync() { + mu.Lock() + defer mu.Unlock() + + if cancelPrevSync != nil { + cancelPrevSync() + cancelPrevSync = nil + + logger.EllaLog.Info("Fleet sync stopped") + } +} diff --git a/internal/api/server/api_fleet.go b/internal/api/server/api_fleet.go new file mode 100644 index 000000000..36cff8c3f --- /dev/null +++ b/internal/api/server/api_fleet.go @@ -0,0 +1,954 @@ +// Copyright 2026 Ella Networks + +package server + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math" + "net/http" + "time" + + "github.com/ellanetworks/core/etsi" + "github.com/ellanetworks/core/fleet/client" + "github.com/ellanetworks/core/internal/amf" + "github.com/ellanetworks/core/internal/bgp" + "github.com/ellanetworks/core/internal/config" + "github.com/ellanetworks/core/internal/db" + "github.com/ellanetworks/core/internal/logger" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type RegisterFleetParams struct { + ActivationToken string `json:"activationToken"` +} + +type UpdateFleetURLParams struct { + URL string `json:"url"` +} + +type FleetURLResponse struct { + URL string `json:"url"` +} + +const ( + RegisterFleetAction = "register_fleet" + UnregisterFleetAction = "unregister_fleet" + UpdateFleetURLAction = "update_fleet_url" +) + +// RegisterFleet handles the initial registration request. The fleet +// table is local-only: registration runs against the node receiving the +// request and stores credentials in that node's local DB only. To bring +// every node under Fleet management the operator registers each node +// individually. UPF reload wiring lives in the supervisor, not here. +func RegisterFleet(dbInstance *db.Database, cfg config.Config, amfInstance *amf.AMF, bgpService *bgp.BGPService) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + email := r.Context().Value(contextKeyEmail) + + emailStr, ok := email.(string) + if !ok { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to get email", nil, logger.APILog) + return + } + + var params RegisterFleetParams + if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { + writeError(r.Context(), w, http.StatusBadRequest, "Invalid request data", err, logger.APILog) + return + } + + if params.ActivationToken == "" { + writeError(r.Context(), w, http.StatusBadRequest, "activationToken is missing", nil, logger.APILog) + return + } + + err := register(r.Context(), dbInstance, params.ActivationToken, cfg, amfInstance, bgpService) + if err != nil { + if errors.Is(err, client.ErrUnauthorized) { + writeError(r.Context(), w, http.StatusUnauthorized, "Invalid activation code", err, logger.APILog) + return + } + + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to register to fleet", err, logger.APILog) + + return + } + + logger.LogAuditEvent( + r.Context(), + RegisterFleetAction, + emailStr, + getClientIP(r), + "User registered Core to Fleet", + ) + + writeResponse(r.Context(), w, SuccessResponse{Message: "Core registered to Fleet successfully"}, http.StatusCreated, logger.APILog) + } +} + +func register(ctx context.Context, dbInstance *db.Database, activationToken string, cfg config.Config, amfInstance *amf.AMF, bgpService *bgp.BGPService) error { + fleetURL, err := dbInstance.GetFleetURL(ctx) + if err != nil { + return fmt.Errorf("couldn't get fleet URL from database: %w", err) + } + + if fleetURL == "" { + return fmt.Errorf("fleet URL is not configured") + } + + fc := client.New(fleetURL, cfg.Fleet.InsecureSkipVerify) + + initialConfig, err := buildInitialConfig(ctx, dbInstance) + if err != nil { + return fmt.Errorf("couldn't build initial config: %w", err) + } + + clusterID := "" + + if op, err := dbInstance.GetOperator(ctx); err == nil { + clusterID = op.ClusterID + } + + data, err := fc.Register(ctx, client.RegisterInput{ + ActivationToken: activationToken, + ClusterEnabled: cfg.Cluster.Enabled, + ClusterID: clusterID, + NodeID: dbInstance.NodeID(), + InitialConfig: initialConfig, + InitialStatus: BuildStatus(ctx, dbInstance, cfg, amfInstance, bgpService), + InitialMetrics: BuildMetrics(), + InitialUsage: collectInitialUsage(ctx, dbInstance), + }) + if err != nil { + return fmt.Errorf("couldn't register to fleet: %w", err) + } + + logger.EllaLog.Info("Registered to fleet successfully") + + if err := dbInstance.UpdateFleetToken(ctx, []byte(data.Token)); err != nil { + return fmt.Errorf("couldn't store fleet token in database: %w", err) + } + + logger.EllaLog.Info("Fleet token stored successfully; fleet supervisor will start the sync loop") + + return nil +} + +// BuildStatus returns a fresh status snapshot for this node. Each node +// reports its own network interfaces, connected radios, per-UE session +// state from its local AMF, and BGP live state when BGP is enabled. +func BuildStatus(ctx context.Context, dbInstance *db.Database, cfg config.Config, amfInstance *amf.AMF, bgpService *bgp.BGPService) client.EllaCoreStatus { + // N2 may bind by interface name (resolve to every IP on that + // interface, like the local /interfaces endpoint does) or by a + // single address. Mirror that resolution here so Fleet can list + // every N2 bind point per core. + var n2Addresses []string + + switch { + case cfg.Interfaces.N2.Name != "": + ips, err := config.GetInterfaceIPs(cfg.Interfaces.N2.Name) + if err != nil { + logger.APILog.Warn("failed to resolve N2 interface IPs for fleet sync", + zap.String("interface", cfg.Interfaces.N2.Name), zap.Error(err)) + } else { + n2Addresses = ips + } + case cfg.Interfaces.N2.Address != "": + n2Addresses = []string{cfg.Interfaces.N2.Address} + } + + networkInterfaces := client.StatusNetworkInterfaces{ + N2: client.N2Interface{ + Addresses: n2Addresses, + Port: cfg.Interfaces.N2.Port, + }, + N3: client.N3Interface{ + Name: cfg.Interfaces.N3.Name, + Address: cfg.Interfaces.N3.Address, + }, + N6: client.N6Interface{ + Name: cfg.Interfaces.N6.Name, + }, + API: client.APIInterface{ + Address: cfg.Interfaces.API.Address, + Port: cfg.Interfaces.API.Port, + }, + } + + if cfg.Interfaces.N3.VlanConfig != nil { + networkInterfaces.N3.Vlan = &client.Vlan{ + MasterInterface: cfg.Interfaces.N3.VlanConfig.MasterInterface, + VlanId: cfg.Interfaces.N3.VlanConfig.VlanId, + } + } + + if cfg.Interfaces.N6.VlanConfig != nil { + networkInterfaces.N6.Vlan = &client.Vlan{ + MasterInterface: cfg.Interfaces.N6.VlanConfig.MasterInterface, + VlanId: cfg.Interfaces.N6.VlanConfig.VlanId, + } + } + + bgpPeerStates, advertisedRoutes, learnedRoutes := getBGPState(ctx, bgpService) + + return client.EllaCoreStatus{ + NetworkInterfaces: networkInterfaces, + Radios: getRadiosStatus(amfInstance), + Subscribers: getSubscribersStatus(ctx, dbInstance, amfInstance), + BGPPeerStates: bgpPeerStates, + AdvertisedRoutes: advertisedRoutes, + LearnedRoutes: learnedRoutes, + IPLeases: getIPLeasesStatus(ctx, dbInstance), + } +} + +// getIPLeasesStatus projects every IP lease (dynamic + static, active + +// inactive) into the wire shape Fleet expects. The pool UUID is mapped +// to the data-network name via a single ListDataNetworks call so Fleet +// doesn't have to resolve UUIDs. Lease rows whose pool no longer +// exists in the data_networks table are dropped silently. +func getIPLeasesStatus(ctx context.Context, dbInstance *db.Database) []client.IPLease { + if dbInstance == nil { + return nil + } + + leases, err := dbInstance.ListAllLeases(ctx) + if err != nil { + logger.APILog.Warn("failed to list IP leases for fleet sync", zap.Error(err)) + return nil + } + + if len(leases) == 0 { + return nil + } + + dataNetworks, err := dbInstance.ListAllDataNetworks(ctx) + if err != nil { + logger.APILog.Warn("failed to list data networks for IP lease projection", zap.Error(err)) + return nil + } + + nameByID := make(map[string]string, len(dataNetworks)) + for _, dn := range dataNetworks { + nameByID[dn.ID] = dn.Name + } + + out := make([]client.IPLease, 0, len(leases)) + + for _, l := range leases { + dnName, ok := nameByID[l.PoolID] + if !ok { + continue + } + + out = append(out, client.IPLease{ + Address: l.Address().String(), + IMSI: l.IMSI, + Type: l.Type, + SessionID: l.SessionID, + DataNetworkName: dnName, + EstablishedAtUnix: l.CreatedAt, + }) + } + + return out +} + +// getBGPState projects internal/bgp.BGPService snapshots into the wire +// types Fleet expects. Returns empty slices when BGP is disabled or +// the service is nil; never errors. +func getBGPState(ctx context.Context, bgpService *bgp.BGPService) ([]client.BGPPeerState, []client.BGPAdvertisedRoute, []client.BGPLearnedRoute) { + if bgpService == nil || !bgpService.IsRunning() { + return nil, nil, nil + } + + var peerStates []client.BGPPeerState + + if status, err := bgpService.GetStatus(ctx); err == nil && status != nil { + peerStates = make([]client.BGPPeerState, 0, len(status.Peers)) + for _, p := range status.Peers { + peerStates = append(peerStates, client.BGPPeerState{ + Address: p.Address, + RemoteAS: p.RemoteAS, + State: p.State, + Uptime: p.Uptime, + PrefixesSent: p.PrefixesSent, + PrefixesReceived: p.PrefixesReceived, + }) + } + } else if err != nil { + logger.EllaLog.Warn("BGP status snapshot failed", zap.Error(err)) + } + + var advertised []client.BGPAdvertisedRoute + + if routes, err := bgpService.GetRoutes(); err == nil { + advertised = make([]client.BGPAdvertisedRoute, 0, len(routes)) + for _, r := range routes { + advertised = append(advertised, client.BGPAdvertisedRoute{ + Subscriber: r.Subscriber, + Prefix: r.Prefix, + NextHop: r.NextHop, + }) + } + } else { + logger.EllaLog.Warn("BGP advertised routes snapshot failed", zap.Error(err)) + } + + learnedSrc := bgpService.GetLearnedRoutes() + learned := make([]client.BGPLearnedRoute, 0, len(learnedSrc)) + + for _, r := range learnedSrc { + learned = append(learned, client.BGPLearnedRoute{ + Prefix: r.Prefix, + NextHop: r.NextHop, + PeerAddress: r.Peer, + }) + } + + return peerStates, advertised, learned +} + +// BuildMetrics scrapes the local Prometheus registry and packages the +// values into the EllaCoreMetrics shape expected by Fleet. +func BuildMetrics() client.EllaCoreMetrics { + metrics := client.EllaCoreMetrics{} + + mfs, err := prometheus.DefaultGatherer.Gather() + if err != nil { + logger.EllaLog.Warn("failed to gather prometheus metrics", zap.Error(err)) + return metrics + } + + for _, mf := range mfs { + name := mf.GetName() + + ms := mf.GetMetric() + if len(ms) == 0 { + continue + } + + m := ms[0] + + switch name { + case "app_uplink_bytes": + if c := m.GetCounter(); c != nil { + metrics.UplinkBytesTotal = int64(math.Round(c.GetValue())) + } + case "app_downlink_bytes": + if c := m.GetCounter(); c != nil { + metrics.DownlinkBytesTotal = int64(math.Round(c.GetValue())) + } + case "app_pdu_sessions_total": + if g := m.GetGauge(); g != nil { + metrics.PDUSessionsTotal = int64(math.Round(g.GetValue())) + } + case "process_cpu_seconds_total": + if c := m.GetCounter(); c != nil { + metrics.ProcessCPUSecondsTotal = c.GetValue() + } + case "process_resident_memory_bytes": + if g := m.GetGauge(); g != nil { + metrics.ProcessResidentMemoryBytes = int64(math.Round(g.GetValue())) + } + case "go_goroutines": + if g := m.GetGauge(); g != nil { + metrics.GoGoroutines = int64(math.Round(g.GetValue())) + } + case "process_start_time_seconds": + if g := m.GetGauge(); g != nil { + metrics.ProcessStartTimeSeconds = g.GetValue() + } + case "app_database_storage_bytes": + if g := m.GetGauge(); g != nil { + metrics.DatabaseStorageBytes = int64(math.Round(g.GetValue())) + } + case "app_ip_addresses_total": + if g := m.GetGauge(); g != nil { + metrics.IPAddresses = int64(math.Round(g.GetValue())) + } + case "app_ip_addresses_allocated_total": + if g := m.GetGauge(); g != nil { + metrics.IPAddressesAllocated = int64(math.Round(g.GetValue())) + } + case "app_registration_attempts_total": + for _, sample := range ms { + var result string + + for _, lp := range sample.GetLabel() { + if lp.GetName() == "result" { + result = lp.GetValue() + } + } + + if c := sample.GetCounter(); c != nil { + switch result { + case "accept": + metrics.RegistrationAttemptsAccepted += int64(math.Round(c.GetValue())) + case "reject": + metrics.RegistrationAttemptsRejected += int64(math.Round(c.GetValue())) + } + } + } + } + } + + return metrics +} + +func getRadiosStatus(amfInstance *amf.AMF) []client.Radio { + if amfInstance == nil { + return nil + } + + _, ranList := amfInstance.ListAmfRan(1, 1000) + + radios := make([]client.Radio, 0, len(ranList)) + for _, radio := range ranList { + supportedTAIs := make([]client.SupportedTAI, 0, len(radio.SupportedTAIs)) + for _, tai := range radio.SupportedTAIs { + snssais := make([]client.Snssai, 0, len(tai.SNssaiList)) + for _, snssai := range tai.SNssaiList { + snssais = append(snssais, client.Snssai{ + Sst: snssai.Sst, + Sd: snssai.Sd, + }) + } + + supportedTAIs = append(supportedTAIs, client.SupportedTAI{ + Tai: client.Tai{ + PlmnID: client.PlmnID{ + Mcc: tai.Tai.PlmnID.Mcc, + Mnc: tai.Tai.PlmnID.Mnc, + }, + Tac: tai.Tai.Tac, + }, + SNssais: snssais, + }) + } + + radioAddress := "" + + if radio.Conn != nil { + if addr := radio.Conn.RemoteAddr(); addr != nil { + radioAddress = addr.String() + } + } + + radioID := "" + if radio.RanID != nil && radio.RanID.GNbID != nil { + radioID = radio.RanID.GNbID.GNBValue + } + + radios = append(radios, client.Radio{ + Name: radio.Name, + ID: radioID, + Address: radioAddress, + SupportedTAIs: supportedTAIs, + RanNodeType: radio.RanNodeTypeName(), + LastSeenAtUnix: radio.GetLastSeenAt().Unix(), + }) + } + + return radios +} + +func getSubscribersStatus(ctx context.Context, dbInstance *db.Database, amfInstance *amf.AMF) []client.SubscriberStatus { + subscribers, _, err := dbInstance.ListSubscribersPage(ctx, 1, 1000) + if err != nil { + logger.EllaLog.Error("failed to list subscribers for status", zap.Error(err)) + return nil + } + + statuses := make([]client.SubscriberStatus, 0, len(subscribers)) + + for _, s := range subscribers { + status := client.SubscriberStatus{ + Imsi: s.Imsi, + } + + if amfInstance != nil { + supi, err := etsi.NewSUPIFromIMSI(s.Imsi) + if err == nil { + if snap, found := amfInstance.GetUESnapshot(supi); found { + status.Registered = snap.State == amf.Registered + status.CipheringAlgorithm = snap.CipheringAlgorithm + status.IntegrityAlgorithm = snap.IntegrityAlgorithm + status.LastSeenRadio = snap.LastSeenRadio + + if snap.Pei != "" { + if converted, convErr := etsi.IMEIFromPEI(snap.Pei); convErr == nil { + status.Imei = converted + } + } + + if !snap.LastSeenAt.IsZero() { + status.LastSeenAt = snap.LastSeenAt.UTC().Format(time.RFC3339) + } + } + + if sessions, found := amfInstance.GetUEPDUSessions(supi); found { + status.Sessions = projectPDUSessions(sessions) + } + } + } + + statuses = append(statuses, status) + } + + return statuses +} + +// projectPDUSessions converts AMF's PDUSessionExport into the wire shape +// Fleet expects. AMF's export carries Snssai, DNN, PDUAddress, and a +// PolicyData snapshot; the wire copies those plus 5QI/ARP from the +// QosData when present. +func projectPDUSessions(sessions []amf.PDUSessionExport) []client.PDUSession { + if len(sessions) == 0 { + return nil + } + + out := make([]client.PDUSession, 0, len(sessions)) + + for _, s := range sessions { + ps := client.PDUSession{ + SessionID: int(s.PDUSessionID), + DataNetworkName: s.DNN, + AllocatedIP: s.PDUAddress, + } + + if s.Snssai != nil { + ps.Sst = s.Snssai.Sst + ps.Sd = s.Snssai.Sd + } + + if s.PolicyData != nil && s.PolicyData.QosData != nil { + ps.QoS5QI = s.PolicyData.QosData.Var5qi + ps.QoSARP = s.PolicyData.QosData.Arp.PriorityLevel + } + + out = append(out, ps) + } + + return out +} + +func buildInitialConfig(ctx context.Context, dbInstance *db.Database) (client.Config, error) { + op, err := dbInstance.GetOperator(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't get operator from database: %w", err) + } + + supportedTacs, err := op.GetSupportedTacs() + if err != nil { + return client.Config{}, fmt.Errorf("couldn't get supported tacs: %w", err) + } + + ciphering, _ := op.GetCiphering() + integrity, _ := op.GetIntegrity() + + routes, _, err := dbInstance.ListRoutesPage(ctx, 1, 100) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list routes: %w", err) + } + + routesCfg := make([]client.Route, len(routes)) + for i, r := range routes { + routesCfg[i] = client.Route{ + ID: r.ID, + Destination: r.Destination, + Gateway: r.Gateway, + Interface: r.Interface.String(), + Metric: r.Metric, + } + } + + natEnabled, err := dbInstance.IsNATEnabled(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't get NAT configuration: %w", err) + } + + flowAccEnabled, err := dbInstance.IsFlowAccountingEnabled(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't get flow accounting configuration: %w", err) + } + + n3Settings, err := dbInstance.GetN3Settings(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't get N3 settings: %w", err) + } + + dataNetworks, err := dbInstance.ListAllDataNetworks(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list data networks: %w", err) + } + + dnCfg := make([]client.DataNetwork, len(dataNetworks)) + for i, dn := range dataNetworks { + dnCfg[i] = client.DataNetwork{ + Name: dn.Name, + IPPool: dn.IPPool, + DNS: dn.DNS, + MTU: dn.MTU, + } + } + + profiles, _, err := dbInstance.ListProfilesPage(ctx, 1, 1000) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list profiles: %w", err) + } + + profileCfg := make([]client.Profile, len(profiles)) + + profileNameByID := make(map[string]string, len(profiles)) + + for i, p := range profiles { + profileCfg[i] = client.Profile{ + Name: p.Name, + UeAmbrUplink: p.UeAmbrUplink, + UeAmbrDownlink: p.UeAmbrDownlink, + } + profileNameByID[p.ID] = p.Name + } + + slices, err := dbInstance.ListAllNetworkSlices(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list slices: %w", err) + } + + sliceCfg := make([]client.Slice, len(slices)) + + sliceNameByID := make(map[string]string, len(slices)) + + for i, s := range slices { + sliceCfg[i] = client.Slice{ + Name: s.Name, + Sst: s.Sst, + Sd: s.Sd, + } + sliceNameByID[s.ID] = s.Name + } + + policies, _, err := dbInstance.ListPoliciesPage(ctx, 1, 1000) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list policies: %w", err) + } + + dnNameByID := make(map[string]string, len(dataNetworks)) + for _, dn := range dataNetworks { + dnNameByID[dn.ID] = dn.Name + } + + policyCfg := make([]client.Policy, 0, len(policies)) + ruleCfg := make([]client.NetworkRule, 0, len(policies)*4) + + for _, p := range policies { + policyCfg = append(policyCfg, client.Policy{ + Name: p.Name, + ProfileName: profileNameByID[p.ProfileID], + SliceName: sliceNameByID[p.SliceID], + DataNetworkName: dnNameByID[p.DataNetworkID], + Var5qi: p.Var5qi, + Arp: p.Arp, + SessionAmbrUplink: p.SessionAmbrUplink, + SessionAmbrDownlink: p.SessionAmbrDownlink, + }) + + rules, err := dbInstance.ListRulesForPolicy(ctx, p.ID) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list rules for policy %q: %w", p.Name, err) + } + + for _, r := range rules { + ruleCfg = append(ruleCfg, client.NetworkRule{ + PolicyName: p.Name, + Direction: r.Direction, + Precedence: r.Precedence, + Description: r.Description, + RemotePrefix: r.RemotePrefix, + Protocol: r.Protocol, + PortLow: r.PortLow, + PortHigh: r.PortHigh, + Action: r.Action, + }) + } + } + + subscribers, _, err := dbInstance.ListSubscribersPage(ctx, 1, 1000) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list subscribers: %w", err) + } + + subCfg := make([]client.Subscriber, len(subscribers)) + for i, s := range subscribers { + subCfg[i] = client.Subscriber{ + Imsi: s.Imsi, + ProfileName: profileNameByID[s.ProfileID], + SequenceNumber: s.SequenceNumber, + PermanentKey: s.PermanentKey, + Opc: s.Opc, + } + } + + hnKeys, err := dbInstance.ListHomeNetworkKeys(ctx) + if err != nil { + return client.Config{}, fmt.Errorf("couldn't list home network keys: %w", err) + } + + hnKeyCfg := make([]client.HomeNetworkKey, len(hnKeys)) + for i, k := range hnKeys { + hnKeyCfg[i] = client.HomeNetworkKey{ + KeyIdentifier: k.KeyIdentifier, + Scheme: k.Scheme, + PrivateKey: k.PrivateKey, + } + } + + bgpCfg, bgpPeersCfg, bgpImportCfg, err := collectBGPConfig(ctx, dbInstance) + if err != nil { + return client.Config{}, err + } + + retentionCfg, err := collectRetentionPolicies(ctx, dbInstance) + if err != nil { + return client.Config{}, err + } + + return client.Config{ + Cluster: client.ClusterConfig{ + Operator: client.Operator{ + ID: client.OperatorID{Mcc: op.Mcc, Mnc: op.Mnc}, + OperatorCode: op.OperatorCode, + Tracking: client.OperatorTracking{SupportedTacs: supportedTacs}, + NASSecurity: client.OperatorNASSecurity{Ciphering: ciphering, Integrity: integrity}, + SPN: client.OperatorSPN{FullName: op.SpnFullName, ShortName: op.SpnShortName}, + }, + HomeNetworkKeys: hnKeyCfg, + DataNetworks: dnCfg, + Profiles: profileCfg, + Slices: sliceCfg, + Policies: policyCfg, + NetworkRules: ruleCfg, + Subscribers: subCfg, + RetentionPolicies: retentionCfg, + }, + Node: client.NodeConfig{ + Routes: routesCfg, + NAT: natEnabled, + FlowAccounting: flowAccEnabled, + NetworkInterfaces: client.NetworkInterfaces{N3ExternalAddress: n3Settings.ExternalAddress}, + BGP: bgpCfg, + BGPPeers: bgpPeersCfg, + BGPImportPrefixes: bgpImportCfg, + }, + }, nil +} + +// collectBGPConfig returns this node's BGP settings, peers, and import +// prefixes. BGP is per-node (local-only); the snapshot reflects only +// what is configured on the node serving the request. +func collectBGPConfig(ctx context.Context, dbInstance *db.Database) (client.BGPSettings, []client.BGPPeer, []client.BGPImportPrefix, error) { + settings, err := dbInstance.GetBGPSettings(ctx) + if err != nil { + return client.BGPSettings{}, nil, nil, fmt.Errorf("couldn't get BGP settings: %w", err) + } + + bgpCfg := client.BGPSettings{ + Enabled: settings.Enabled, + LocalAS: settings.LocalAS, + RouterID: settings.RouterID, + ListenAddress: settings.ListenAddress, + } + + peers, err := dbInstance.ListAllBGPPeers(ctx) + if err != nil { + return client.BGPSettings{}, nil, nil, fmt.Errorf("couldn't list BGP peers: %w", err) + } + + peerCfg := make([]client.BGPPeer, 0, len(peers)) + + var prefixCfg []client.BGPImportPrefix + + for _, p := range peers { + peerCfg = append(peerCfg, client.BGPPeer{ + Address: p.Address, + RemoteAS: p.RemoteAS, + HoldTime: p.HoldTime, + Password: p.Password, + Description: p.Description, + }) + + prefixes, err := dbInstance.ListImportPrefixesByPeer(ctx, p.ID) + if err != nil { + return client.BGPSettings{}, nil, nil, fmt.Errorf("couldn't list import prefixes for peer %s: %w", p.Address, err) + } + + for _, pr := range prefixes { + prefixCfg = append(prefixCfg, client.BGPImportPrefix{ + PeerAddress: p.Address, + Prefix: pr.Prefix, + MaxLength: pr.MaxLength, + }) + } + } + + return bgpCfg, peerCfg, prefixCfg, nil +} + +func collectRetentionPolicies(ctx context.Context, dbInstance *db.Database) ([]client.RetentionPolicy, error) { + categories := []db.RetentionCategory{ + db.CategoryAuditLogs, + db.CategoryRadioLogs, + db.CategorySubscriberUsage, + db.CategoryFlowReports, + } + + out := make([]client.RetentionPolicy, 0, len(categories)) + + for _, cat := range categories { + days, err := dbInstance.GetRetentionPolicy(ctx, cat) + if err != nil { + return nil, fmt.Errorf("couldn't get retention policy %s: %w", cat, err) + } + + out = append(out, client.RetentionPolicy{ + Category: string(cat), + Days: days, + }) + } + + return out, nil +} + +func UnregisterFleet(dbInstance *db.Database, cfg config.Config) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + email := r.Context().Value(contextKeyEmail) + + emailStr, ok := email.(string) + if !ok { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to get email", nil, logger.APILog) + return + } + + managed, err := dbInstance.IsFleetManaged(r.Context()) + if err != nil { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to check fleet status", err, logger.APILog) + return + } + + if !managed { + writeError(r.Context(), w, http.StatusBadRequest, "Core is not registered to a Fleet", nil, logger.APILog) + return + } + + fleetData, err := dbInstance.GetFleet(r.Context()) + if err != nil { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to load fleet data", err, logger.APILog) + return + } + + fc := client.New(fleetData.URL, cfg.Fleet.InsecureSkipVerify) + fc.SetToken(string(fleetData.Token)) + + if err := fc.Unregister(r.Context()); err != nil { + logger.APILog.Warn("couldn't notify fleet server about unregistration", zap.Error(err)) + } + + if err := dbInstance.ClearFleetCredentials(r.Context()); err != nil { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to unregister from Fleet", err, logger.APILog) + return + } + + logger.LogAuditEvent( + r.Context(), + UnregisterFleetAction, + emailStr, + getClientIP(r), + "User unregistered Core from Fleet", + ) + + writeResponse(r.Context(), w, SuccessResponse{Message: "Core unregistered from Fleet successfully"}, http.StatusOK, logger.APILog) + } +} + +// collectInitialUsage gathers recent per-subscriber daily usage counters +// to include in the Fleet registration payload. +func collectInitialUsage(ctx context.Context, dbInstance *db.Database) []client.SubscriberUsageEntry { + const recentDays = 7 + + now := time.Now().UTC() + start := now.AddDate(0, 0, -(recentDays - 1)) + + rows, err := dbInstance.GetRawDailyUsage(ctx, start, now) + if err != nil { + logger.APILog.Warn("failed to collect initial usage for fleet registration", zap.Error(err)) + return nil + } + + entries := make([]client.SubscriberUsageEntry, 0, len(rows)) + for _, r := range rows { + entries = append(entries, client.SubscriberUsageEntry{ + EpochDay: r.EpochDay, + IMSI: r.IMSI, + UplinkBytes: r.BytesUplink, + DownlinkBytes: r.BytesDownlink, + }) + } + + return entries +} + +func GetFleetURL(dbInstance *db.Database) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + url, err := dbInstance.GetFleetURL(r.Context()) + if err != nil { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to get fleet URL", err, logger.APILog) + return + } + + writeResponse(r.Context(), w, FleetURLResponse{URL: url}, http.StatusOK, logger.APILog) + } +} + +func UpdateFleetURL(dbInstance *db.Database) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + email := r.Context().Value(contextKeyEmail) + + emailStr, ok := email.(string) + if !ok { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to get email", nil, logger.APILog) + return + } + + var params UpdateFleetURLParams + if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { + writeError(r.Context(), w, http.StatusBadRequest, "Invalid request data", err, logger.APILog) + return + } + + if params.URL == "" { + writeError(r.Context(), w, http.StatusBadRequest, "url is missing", nil, logger.APILog) + return + } + + err := dbInstance.UpdateFleetURL(r.Context(), params.URL) + if err != nil { + writeError(r.Context(), w, http.StatusInternalServerError, "Failed to update fleet URL", err, logger.APILog) + return + } + + logger.LogAuditEvent( + r.Context(), + UpdateFleetURLAction, + emailStr, + getClientIP(r), + fmt.Sprintf("User updated fleet URL to %s", params.URL), + ) + + writeResponse(r.Context(), w, SuccessResponse{Message: "Fleet URL updated successfully"}, http.StatusOK, logger.APILog) + } +} diff --git a/internal/api/server/api_status.go b/internal/api/server/api_status.go index 689038787..c0796bcd9 100644 --- a/internal/api/server/api_status.go +++ b/internal/api/server/api_status.go @@ -60,6 +60,11 @@ type ClusterStatusResponse struct { PendingMigration *PendingMigrationResponse `json:"pendingMigration,omitempty"` } +type FleetStatusResponse struct { + Managed bool `json:"managed"` + LastSyncAt string `json:"lastSyncAt,omitempty"` +} + type StatusResponse struct { Version string `json:"version"` Revision string `json:"revision"` @@ -67,6 +72,7 @@ type StatusResponse struct { Ready bool `json:"ready"` SchemaVersion int `json:"schemaVersion"` Cluster *ClusterStatusResponse `json:"cluster,omitempty"` + Fleet FleetStatusResponse `json:"fleet"` } func GetStatus(dbInstance *db.Database, ready *atomic.Bool) http.Handler { @@ -83,12 +89,22 @@ func GetStatus(dbInstance *db.Database, ready *atomic.Bool) http.Handler { ver := version.GetVersion() + var fleetStatus FleetStatusResponse + + if fleetData, err := dbInstance.GetFleet(ctx); err == nil && fleetData != nil { + fleetStatus.Managed = len(fleetData.Token) > 0 + fleetStatus.LastSyncAt = fleetData.LastSyncAt + } else if err != nil { + logger.APILog.Warn("couldn't read fleet row for status", zap.Error(err)) + } + statusResponse := StatusResponse{ Version: ver.Version, Revision: ver.Revision, Initialized: initialized, Ready: ready.Load(), SchemaVersion: db.SchemaVersion(), + Fleet: fleetStatus, } if dbInstance.ClusterEnabled() { diff --git a/internal/api/server/authorization_middleware.go b/internal/api/server/authorization_middleware.go index 2a20deade..63a5a7a53 100644 --- a/internal/api/server/authorization_middleware.go +++ b/internal/api/server/authorization_middleware.go @@ -192,6 +192,12 @@ const ( // Cluster permissions PermManageCluster = "cluster:manage" + + // Fleet permissions + PermGetFleetURL = "fleet:get_url" + PermUpdateFleetURL = "fleet:update_url" + PermRegisterFleet = "fleet:register" + PermUnregisterFleet = "fleet:unregister" ) func Authorize(permission string, next http.Handler) http.Handler { diff --git a/internal/api/server/fleet_middleware.go b/internal/api/server/fleet_middleware.go new file mode 100644 index 000000000..08e2204fa --- /dev/null +++ b/internal/api/server/fleet_middleware.go @@ -0,0 +1,69 @@ +// Copyright 2026 Ella Networks + +package server + +import ( + "net/http" + "strings" + + "github.com/ellanetworks/core/internal/db" + "github.com/ellanetworks/core/internal/logger" +) + +// fleetReadOnlyExemptPrefixes are URL path prefixes permitted even when +// the Core is Fleet-managed. Covers auth, observability, backup, cluster +// admin, and the fleet endpoints themselves. +var fleetReadOnlyExemptPrefixes = []string{ + "/api/v1/auth/", + "/api/v1/init", + "/api/v1/status", + "/api/v1/metrics", + "/api/v1/openapi.yaml", + "/api/v1/users", + "/api/v1/fleet/", + "/api/v1/backup", + "/api/v1/restore", + "/api/v1/support-bundle", + "/api/v1/logs/", + "/api/v1/pprof/", + "/api/v1/cluster/", +} + +// FleetReadOnlyMiddleware rejects mutating API requests (POST, PUT, +// DELETE, PATCH) while the Core is registered to a Fleet. Reads and +// exempt paths always pass through. +func FleetReadOnlyMiddleware(dbInstance *db.Database, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet || r.Method == http.MethodHead || r.Method == http.MethodOptions { + next.ServeHTTP(w, r) + return + } + + if !strings.HasPrefix(r.URL.Path, "/api/") { + next.ServeHTTP(w, r) + return + } + + for _, prefix := range fleetReadOnlyExemptPrefixes { + if strings.HasPrefix(r.URL.Path, prefix) { + next.ServeHTTP(w, r) + return + } + } + + managed, err := dbInstance.IsFleetManaged(r.Context()) + if err != nil { + logger.APILog.Warn("couldn't check fleet status for read-only guard") + next.ServeHTTP(w, r) + + return + } + + if managed { + writeError(r.Context(), w, http.StatusForbidden, "This Core is managed by Fleet. Changes must be made through Fleet.", nil, logger.APILog) + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/internal/api/server/server.go b/internal/api/server/server.go index ff7142412..0ea22fe9d 100644 --- a/internal/api/server/server.go +++ b/internal/api/server/server.go @@ -207,6 +207,12 @@ func NewHandler(cfg HandlerConfig) http.Handler { mux.HandleFunc("PUT /api/v1/logs/audit/retention", Authenticate(jwtSecret, dbInstance, Authorize(PermSetAuditLogRetentionPolicy, UpdateAuditLogRetentionPolicy(dbInstance))).ServeHTTP) mux.HandleFunc("GET /api/v1/logs/audit", Authenticate(jwtSecret, dbInstance, Authorize(PermListAuditLogs, ListAuditLogs(dbInstance))).ServeHTTP) + // Fleet (Authenticated) + mux.HandleFunc("GET /api/v1/fleet/url", Authenticate(jwtSecret, dbInstance, Authorize(PermGetFleetURL, GetFleetURL(dbInstance))).ServeHTTP) + mux.HandleFunc("PUT /api/v1/fleet/url", Authenticate(jwtSecret, dbInstance, Authorize(PermUpdateFleetURL, UpdateFleetURL(dbInstance))).ServeHTTP) + mux.HandleFunc("POST /api/v1/fleet/register", Authenticate(jwtSecret, dbInstance, Authorize(PermRegisterFleet, RegisterFleet(dbInstance, appCfg, amfInstance, bgpService))).ServeHTTP) + mux.HandleFunc("POST /api/v1/fleet/unregister", Authenticate(jwtSecret, dbInstance, Authorize(PermUnregisterFleet, UnregisterFleet(dbInstance, appCfg))).ServeHTTP) + // Cluster (Authenticated, admin only) mux.HandleFunc("GET /api/v1/cluster/members", Authenticate(jwtSecret, dbInstance, Authorize(PermManageCluster, ListClusterMembers(dbInstance))).ServeHTTP) mux.HandleFunc("DELETE /api/v1/cluster/members/{id}", Authenticate(jwtSecret, dbInstance, Authorize(PermManageCluster, RemoveClusterMember(dbInstance))).ServeHTTP) @@ -239,6 +245,7 @@ func NewHandler(cfg HandlerConfig) http.Handler { var handler http.Handler = mux handler = MaxBodySizeMiddleware(handler) + handler = FleetReadOnlyMiddleware(dbInstance, handler) handler = SecurityHeadersMiddleware(secureCookie, handler) handler = MetricsMiddleware(handler) diff --git a/internal/config/config.go b/internal/config/config.go index f72389e15..aacd8133a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -129,6 +129,19 @@ type ClusterYaml struct { InitialSuffrage string `yaml:"initial-suffrage"` } +// FleetYaml holds Core-side knobs for talking to a Fleet control plane. +// The Fleet URL and the registration credential live in the DB and are set +// through the Core API; this YAML section is for client-behavior options +// that need to be available before the operator can call any Core API. +type FleetYaml struct { + // InsecureSkipVerify, when true, disables TLS server-certificate + // verification on outbound calls to Fleet. Use only for integration + // tests and local-development setups where Fleet serves a self-signed + // cert that the Core host's trust store does not know about; never + // set in production. + InsecureSkipVerify bool `yaml:"insecure-skip-verify"` +} + type ConfigYAML struct { Logging LoggingYaml `yaml:"logging"` DB DBYaml `yaml:"db"` @@ -136,6 +149,7 @@ type ConfigYAML struct { XDP XDPYaml `yaml:"xdp"` Telemetry TelemetryYaml `yaml:"telemetry"` Cluster ClusterYaml `yaml:"cluster"` + Fleet FleetYaml `yaml:"fleet"` } type N2Interface struct { @@ -210,6 +224,11 @@ type Cluster struct { InitialSuffrage string } +// Fleet holds resolved Core-side knobs for the Fleet client. +type Fleet struct { + InsecureSkipVerify bool +} + type Config struct { Logging Logging DB DB @@ -217,6 +236,7 @@ type Config struct { XDP XDP Telemetry Telemetry Cluster Cluster + Fleet Fleet } type VlanConfig struct { @@ -412,6 +432,10 @@ func Validate(filePath string) (Config, error) { config.Cluster = cluster + config.Fleet = Fleet{ + InsecureSkipVerify: c.Fleet.InsecureSkipVerify, + } + return config, nil } diff --git a/internal/db/applier.go b/internal/db/applier.go index b99722e4e..672cb6cf5 100644 --- a/internal/db/applier.go +++ b/internal/db/applier.go @@ -1050,6 +1050,26 @@ func (db *Database) applyCreateHomeNetworkKey(ctx context.Context, k *HomeNetwor return nil, nil } +func (db *Database) applyUpdateHomeNetworkKey(ctx context.Context, k *HomeNetworkKey) (any, error) { + var outcome sqlair.Outcome + + err := db.runner(ctx).Query(ctx, db.updateHomeNetworkKeyStmt, k).Get(&outcome) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + + rowsAffected, err := outcome.Result().RowsAffected() + if err != nil { + return nil, fmt.Errorf("rows affected: %w", err) + } + + if rowsAffected == 0 { + return nil, ErrNotFound + } + + return nil, nil +} + func (db *Database) applyDeleteHomeNetworkKey(ctx context.Context, p *stringPayload) (any, error) { var outcome sqlair.Outcome @@ -1090,80 +1110,80 @@ func (db *Database) applyCreateBGPPeer(ctx context.Context, p *BGPPeer) (any, er return int(id), nil } -func (db *Database) applyUpdateBGPPeer(ctx context.Context, p *BGPPeer) (any, error) { +func (db *Database) applyUpdateBGPPeer(ctx context.Context, p *BGPPeer) error { var outcome sqlair.Outcome err := db.runner(ctx).Query(ctx, db.updateBGPPeerStmt, p).Get(&outcome) if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } rowsAffected, err := outcome.Result().RowsAffected() if err != nil { - return nil, fmt.Errorf("rows affected: %w", err) + return fmt.Errorf("rows affected: %w", err) } if rowsAffected == 0 { - return nil, ErrNotFound + return ErrNotFound } - return struct{}{}, nil + return nil } -func (db *Database) applyDeleteBGPPeer(ctx context.Context, p *intPayload) (any, error) { +func (db *Database) applyDeleteBGPPeer(ctx context.Context, p *intPayload) error { var outcome sqlair.Outcome err := db.runner(ctx).Query(ctx, db.deleteBGPPeerStmt, BGPPeer{ID: p.Value}).Get(&outcome) if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } rowsAffected, err := outcome.Result().RowsAffected() if err != nil { - return nil, fmt.Errorf("rows affected: %w", err) + return fmt.Errorf("rows affected: %w", err) } if rowsAffected == 0 { - return nil, ErrNotFound + return ErrNotFound } - return struct{}{}, nil + return nil } -func (db *Database) applyUpdateBGPSettings(ctx context.Context, s *BGPSettings) (any, error) { +func (db *Database) applyUpdateBGPSettings(ctx context.Context, s *BGPSettings) error { err := db.runner(ctx).Query(ctx, db.upsertBGPSettingsStmt, s).Run() if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } - return struct{}{}, nil + return nil } -func (db *Database) applyUpdateNATSettings(ctx context.Context, p *boolPayload) (any, error) { +func (db *Database) applyUpdateNATSettings(ctx context.Context, p *boolPayload) error { err := db.runner(ctx).Query(ctx, db.upsertNATSettingsStmt, NATSettings{Enabled: p.Value}).Run() if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } - return struct{}{}, nil + return nil } -func (db *Database) applyUpdateN3Settings(ctx context.Context, p *stringPayload) (any, error) { +func (db *Database) applyUpdateN3Settings(ctx context.Context, p *stringPayload) error { err := db.runner(ctx).Query(ctx, db.updateN3SettingsStmt, N3Settings{ExternalAddress: p.Value}).Run() if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } - return struct{}{}, nil + return nil } -func (db *Database) applyUpdateFlowAccountingSettings(ctx context.Context, p *boolPayload) (any, error) { +func (db *Database) applyUpdateFlowAccountingSettings(ctx context.Context, p *boolPayload) error { err := db.runner(ctx).Query(ctx, db.upsertFlowAccountingSettingsStmt, FlowAccountingSettings{Enabled: p.Value}).Run() if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } - return struct{}{}, nil + return nil } func (db *Database) applySetRetentionPolicy(ctx context.Context, rp *RetentionPolicy) (any, error) { @@ -1276,24 +1296,24 @@ func (db *Database) applyCreateRoute(ctx context.Context, r *Route) (any, error) return id, nil } -func (db *Database) applyDeleteRoute(ctx context.Context, p *int64Payload) (any, error) { +func (db *Database) applyDeleteRoute(ctx context.Context, p *int64Payload) error { var outcome sqlair.Outcome err := db.runner(ctx).Query(ctx, db.deleteRouteStmt, Route{ID: p.Value}).Get(&outcome) if err != nil { - return nil, fmt.Errorf("query failed: %w", err) + return fmt.Errorf("query failed: %w", err) } rowsAffected, err := outcome.Result().RowsAffected() if err != nil { - return nil, fmt.Errorf("rows affected: %w", err) + return fmt.Errorf("rows affected: %w", err) } if rowsAffected == 0 { - return nil, ErrNotFound + return ErrNotFound } - return struct{}{}, nil + return nil } func (db *Database) applyUpsertClusterMember(ctx context.Context, m *ClusterMember) (any, error) { diff --git a/internal/db/bgp_peers.go b/internal/db/bgp_peers.go index 30cdc2c48..e32dc14b1 100644 --- a/internal/db/bgp_peers.go +++ b/internal/db/bgp_peers.go @@ -223,7 +223,7 @@ func (db *Database) UpdateBGPPeer(ctx context.Context, peer *BGPPeer) error { DBQueriesTotal.WithLabelValues(BGPPeersTableName, "update").Inc() - _, err := db.applyUpdateBGPPeer(ctx, peer) + err := db.applyUpdateBGPPeer(ctx, peer) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -255,7 +255,7 @@ func (db *Database) DeleteBGPPeer(ctx context.Context, id int) error { DBQueriesTotal.WithLabelValues(BGPPeersTableName, "delete").Inc() - _, err := db.applyDeleteBGPPeer(ctx, &intPayload{Value: id}) + err := db.applyDeleteBGPPeer(ctx, &intPayload{Value: id}) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/bgp_settings.go b/internal/db/bgp_settings.go index 9c552c3ab..271cd780f 100644 --- a/internal/db/bgp_settings.go +++ b/internal/db/bgp_settings.go @@ -124,7 +124,7 @@ func (db *Database) UpdateBGPSettings(ctx context.Context, settings *BGPSettings DBQueriesTotal.WithLabelValues(BGPSettingsTableName, "update").Inc() - _, err := db.applyUpdateBGPSettings(ctx, settings) + err := db.applyUpdateBGPSettings(ctx, settings) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/changeset_replication.go b/internal/db/changeset_replication.go index 046237853..2d7c467e5 100644 --- a/internal/db/changeset_replication.go +++ b/internal/db/changeset_replication.go @@ -87,6 +87,7 @@ var localOnlyTables = []string{ N3SettingsTableName, NATSettingsTableName, FlowAccountingSettingsTableName, + FleetTableName, } func (db *Database) assertTableReplicationClassification(ctx context.Context) error { diff --git a/internal/db/daily_usage.go b/internal/db/daily_usage.go index 8945852f4..ffbe8e265 100644 --- a/internal/db/daily_usage.go +++ b/internal/db/daily_usage.go @@ -54,6 +54,18 @@ GROUP BY imsi ORDER BY COALESCE(SUM(bytes_uplink), 0) + COALESCE(SUM(bytes_downlink), 0) DESC` ) +const getRawDailyUsageStmt = ` +SELECT + &DailyUsage.epoch_day, + &DailyUsage.imsi, + &DailyUsage.bytes_uplink, + &DailyUsage.bytes_downlink +FROM %s +WHERE + epoch_day >= $UsageFilters.start_date + AND epoch_day <= $UsageFilters.end_date +ORDER BY epoch_day ASC, imsi ASC` + type UsagePerDay struct { EpochDay int64 `db:"epoch_day"` BytesUplink int64 `db:"bytes_uplink"` @@ -172,6 +184,49 @@ func (db *Database) GetUsagePerDay(ctx context.Context, imsi string, startDate t return dailyUsage, nil } +func (db *Database) GetRawDailyUsage(ctx context.Context, startDate time.Time, endDate time.Time) ([]DailyUsage, error) { + ctx, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (raw)", "SELECT", DailyUsageTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("SELECT"), + attribute.String("db.collection", DailyUsageTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(DailyUsageTableName, "select")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(DailyUsageTableName, "select").Inc() + + filters := UsageFilters{ + StartDate: DaysSinceEpoch(startDate), + EndDate: DaysSinceEpoch(endDate), + } + + var rows []DailyUsage + + err := db.conn().Query(ctx, db.getRawDailyUsageStmt, filters).GetAll(&rows) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + span.SetStatus(codes.Ok, "no rows") + return nil, nil + } + + span.RecordError(err) + span.SetStatus(codes.Error, "query failed") + + return nil, fmt.Errorf("query failed: %w", err) + } + + span.SetStatus(codes.Ok, "") + + return rows, nil +} + func (db *Database) GetUsagePerSubscriber(ctx context.Context, imsi string, startDate time.Time, endDate time.Time) ([]UsagePerSub, error) { ctx, span := tracer.Start( ctx, diff --git a/internal/db/db.go b/internal/db/db.go index 611fee44b..215b40fc3 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -98,6 +98,7 @@ type Database struct { incrementDailyUsageStmt *sqlair.Statement getUsagePerDayStmt *sqlair.Statement getUsagePerSubscriberStmt *sqlair.Statement + getRawDailyUsageStmt *sqlair.Statement deleteAllDailyUsageStmt *sqlair.Statement deleteOldDailyUsageStmt *sqlair.Statement @@ -157,6 +158,7 @@ type Database struct { getHomeNetworkKeyStmt *sqlair.Statement getHomeNetworkKeyBySchemeAndIdentifierStmt *sqlair.Statement createHomeNetworkKeyStmt *sqlair.Statement + updateHomeNetworkKeyStmt *sqlair.Statement deleteHomeNetworkKeyStmt *sqlair.Statement countHomeNetworkKeysStmt *sqlair.Statement @@ -257,6 +259,15 @@ type Database struct { deleteUserStmt *sqlair.Statement countUsersStmt *sqlair.Statement + // Fleet statements + getFleetStmt *sqlair.Statement + initializeFleetStmt *sqlair.Statement + updateFleetTokenStmt *sqlair.Statement + clearFleetCredentialsStmt *sqlair.Statement + updateFleetSyncStatusStmt *sqlair.Statement + updateFleetConfigRevisionStmt *sqlair.Statement + updateFleetURLStmt *sqlair.Statement + // Cluster Members statements listClusterMembersStmt *sqlair.Statement getClusterMemberStmt *sqlair.Statement @@ -1250,6 +1261,7 @@ func (db *Database) PrepareStatements() error { {&db.incrementDailyUsageStmt, fmt.Sprintf(incrementDailyUsageStmt, DailyUsageTableName), []any{DailyUsage{}}}, {&db.getUsagePerDayStmt, fmt.Sprintf(getUsagePerDayStmt, DailyUsageTableName), []any{UsageFilters{}, UsagePerDay{}}}, {&db.getUsagePerSubscriberStmt, fmt.Sprintf(getUsagePerSubscriberStmt, DailyUsageTableName), []any{UsageFilters{}, UsagePerSub{}}}, + {&db.getRawDailyUsageStmt, fmt.Sprintf(getRawDailyUsageStmt, DailyUsageTableName), []any{UsageFilters{}, DailyUsage{}}}, {&db.deleteAllDailyUsageStmt, fmt.Sprintf(deleteAllDailyUsageStmt, DailyUsageTableName), nil}, {&db.deleteOldDailyUsageStmt, fmt.Sprintf(deleteOldDailyUsageStmt, DailyUsageTableName), []any{cutoffDaysArgs{}}}, @@ -1309,6 +1321,7 @@ func (db *Database) PrepareStatements() error { {&db.getHomeNetworkKeyStmt, fmt.Sprintf(getHomeNetworkKeyStmtStr, HomeNetworkKeysTableName), []any{HomeNetworkKey{}}}, {&db.getHomeNetworkKeyBySchemeAndIdentifierStmt, fmt.Sprintf(getHomeNetworkKeyBySchemeAndIdentifierStmtStr, HomeNetworkKeysTableName), []any{HomeNetworkKey{}}}, {&db.createHomeNetworkKeyStmt, fmt.Sprintf(createHomeNetworkKeyStmtStr, HomeNetworkKeysTableName), []any{HomeNetworkKey{}}}, + {&db.updateHomeNetworkKeyStmt, fmt.Sprintf(updateHomeNetworkKeyStmtStr, HomeNetworkKeysTableName), []any{HomeNetworkKey{}}}, {&db.deleteHomeNetworkKeyStmt, fmt.Sprintf(deleteHomeNetworkKeyStmtStr, HomeNetworkKeysTableName), []any{HomeNetworkKey{}}}, {&db.countHomeNetworkKeysStmt, fmt.Sprintf(countHomeNetworkKeysStmtStr, HomeNetworkKeysTableName), []any{NumItems{}}}, @@ -1408,6 +1421,15 @@ func (db *Database) PrepareStatements() error { {&db.deleteUserStmt, fmt.Sprintf(deleteUserStmt, UsersTableName), []any{User{}}}, {&db.countUsersStmt, fmt.Sprintf(countUsersStmt, UsersTableName), []any{NumItems{}}}, + // Fleet + {&db.getFleetStmt, fmt.Sprintf(getFleetStmt, FleetTableName), []any{Fleet{}}}, + {&db.initializeFleetStmt, fmt.Sprintf(initializeFleetStmt, FleetTableName), []any{Fleet{}}}, + {&db.updateFleetTokenStmt, fmt.Sprintf(updateFleetTokenStmt, FleetTableName), []any{Fleet{}}}, + {&db.clearFleetCredentialsStmt, fmt.Sprintf(clearFleetCredentialsStmt, FleetTableName), []any{Fleet{}}}, + {&db.updateFleetSyncStatusStmt, fmt.Sprintf(updateFleetSyncStatusStmt, FleetTableName), []any{Fleet{}}}, + {&db.updateFleetConfigRevisionStmt, fmt.Sprintf(updateFleetConfigRevisionStmt, FleetTableName), []any{Fleet{}}}, + {&db.updateFleetURLStmt, fmt.Sprintf(updateFleetURLStmt, FleetTableName), []any{Fleet{}}}, + // Cluster Members {&db.listClusterMembersStmt, fmt.Sprintf(listClusterMembersStmtStr, ClusterMembersTableName), []any{ClusterMember{}}}, {&db.getClusterMemberStmt, fmt.Sprintf(getClusterMemberStmtStr, ClusterMembersTableName), []any{ClusterMember{}}}, @@ -1466,9 +1488,10 @@ func (db *Database) WaitForInitialization(ctx context.Context, timeout time.Dura // InitializeLocalSettings seeds the singleton row of every local-only // settings table (nat_settings, flow_accounting_settings, bgp_settings, -// n3_settings) with documented defaults. Each Initialize* is idempotent: -// an existing row (whether it holds the default or an operator-set value) -// is left untouched, so a daemon restart never overwrites operator state. +// n3_settings, fleet) with documented defaults. Each Initialize* is +// idempotent: an existing row (whether it holds the default or an +// operator-set value) is left untouched, so a daemon restart never +// overwrites operator state. // // Runs on every node — leader, follower, standalone — from NewDatabase. // Local-only writes do not go through Raft, so this is safe to call @@ -1487,6 +1510,7 @@ func (db *Database) InitializeLocalSettings(ctx context.Context) error { {"flow accounting settings", func() error { return db.InitializeFlowAccountingSettings(ctx) }}, {"BGP settings", func() error { return db.InitializeBGPSettings(ctx) }}, {"N3 settings", func() error { return db.InitializeN3Settings(ctx) }}, + {"fleet", func() error { return db.InitializeFleet(ctx) }}, } for _, step := range steps { diff --git a/internal/db/fleet.go b/internal/db/fleet.go new file mode 100644 index 000000000..9881ec7fa --- /dev/null +++ b/internal/db/fleet.go @@ -0,0 +1,364 @@ +// Copyright 2026 Ella Networks + +package db + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.opentelemetry.io/otel/trace" +) + +const FleetTableName = "fleet" + +const ( + getFleetStmt = "SELECT &Fleet.* FROM %s WHERE singleton" + initializeFleetStmt = "INSERT OR IGNORE INTO %s (singleton, url, token, last_sync_at, config_revision) VALUES (TRUE, $Fleet.url, $Fleet.token, $Fleet.last_sync_at, $Fleet.config_revision)" + updateFleetTokenStmt = "UPDATE %s SET token=$Fleet.token WHERE singleton" + clearFleetCredentialsStmt = "UPDATE %s SET token=$Fleet.token, last_sync_at=$Fleet.last_sync_at, config_revision=$Fleet.config_revision WHERE singleton" + updateFleetSyncStatusStmt = "UPDATE %s SET last_sync_at=$Fleet.last_sync_at WHERE singleton" + updateFleetConfigRevisionStmt = "UPDATE %s SET config_revision=$Fleet.config_revision WHERE singleton" + updateFleetURLStmt = "UPDATE %s SET url=$Fleet.url WHERE singleton" +) + +// Fleet is the singleton row holding this node's Fleet registration state. +// Token is the bearer credential issued by Fleet at registration time; it +// is presented in the Authorization header on every sync. +type Fleet struct { + URL string `db:"url" json:"url"` + Token []byte `db:"token" json:"token"` + LastSyncAt string `db:"last_sync_at" json:"last_sync_at"` + ConfigRevision int64 `db:"config_revision" json:"config_revision"` +} + +// InitializeFleet inserts the default fleet row if it does not exist. +func (db *Database) InitializeFleet(ctx context.Context) error { + _, err := db.GetFleet(ctx) + if err == nil { + return nil + } + + if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("failed to check fleet row: %w", err) + } + + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s", "INSERT", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("INSERT"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "insert")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "insert").Inc() + + err = db.applyInitializeFleet(ctx, &Fleet{ + URL: "", + Token: []byte{}, + LastSyncAt: "", + ConfigRevision: 0, + }) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyInitializeFleet(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.initializeFleetStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// GetFleet retrieves the singleton fleet row. Returns sql.ErrNoRows when +// the row has not yet been initialized. +func (db *Database) GetFleet(ctx context.Context) (*Fleet, error) { + ctx, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s", "SELECT", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("SELECT"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "select")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "select").Inc() + + var fleet Fleet + + err := db.conn().Query(ctx, db.getFleetStmt).Get(&fleet) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "query failed") + + return nil, err + } + + span.SetStatus(codes.Ok, "") + + return &fleet, nil +} + +// UpdateFleetToken stores the bearer token issued by Fleet at registration. +func (db *Database) UpdateFleetToken(ctx context.Context, token []byte) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (token)", "UPDATE", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "update").Inc() + + err := db.applyUpdateFleetToken(ctx, &Fleet{Token: token}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyUpdateFleetToken(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.updateFleetTokenStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// IsFleetManaged returns true when the Core is registered to a Fleet +// (has a non-empty bearer token). +func (db *Database) IsFleetManaged(ctx context.Context) (bool, error) { + fleet, err := db.GetFleet(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + + return false, err + } + + return len(fleet.Token) > 0, nil +} + +// UpdateFleetSyncStatus records the timestamp of the last successful sync. +func (db *Database) UpdateFleetSyncStatus(ctx context.Context) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (sync status)", "UPDATE", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "update").Inc() + + ts := time.Now().UTC().Format(time.RFC3339) + + err := db.applyUpdateFleetSyncStatus(ctx, &Fleet{LastSyncAt: ts}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyUpdateFleetSyncStatus(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.updateFleetSyncStatusStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// ClearFleetCredentials removes the bearer token, sync status, and config +// revision — effectively unregistering the Core. +func (db *Database) ClearFleetCredentials(ctx context.Context) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (clear)", "UPDATE", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "update").Inc() + + err := db.applyClearFleetCredentials(ctx, &Fleet{ + Token: []byte{}, + LastSyncAt: "", + ConfigRevision: 0, + }) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyClearFleetCredentials(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.clearFleetCredentialsStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// UpdateFleetConfigRevision stores the latest config revision received from Fleet. +func (db *Database) UpdateFleetConfigRevision(ctx context.Context, revision int64) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (config revision)", "UPDATE", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "update").Inc() + + err := db.applyUpdateFleetConfigRevision(ctx, &Fleet{ConfigRevision: revision}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyUpdateFleetConfigRevision(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.updateFleetConfigRevisionStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// UpdateFleetURL stores the user-specified Fleet server URL. +func (db *Database) UpdateFleetURL(ctx context.Context, url string) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s (url)", "UPDATE", FleetTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", FleetTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(FleetTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(FleetTableName, "update").Inc() + + err := db.applyUpdateFleetURL(ctx, &Fleet{URL: url}) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyUpdateFleetURL(ctx context.Context, fleet *Fleet) error { + if err := db.runner(ctx).Query(ctx, db.updateFleetURLStmt, *fleet).Run(); err != nil { + return fmt.Errorf("query failed: %w", err) + } + + return nil +} + +// GetFleetURL returns the user-specified Fleet server URL. +func (db *Database) GetFleetURL(ctx context.Context) (string, error) { + fleet, err := db.GetFleet(ctx) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return "", nil + } + + return "", err + } + + return fleet.URL, nil +} diff --git a/internal/db/flow_accounting_settings.go b/internal/db/flow_accounting_settings.go index 792bc7b2d..219645972 100644 --- a/internal/db/flow_accounting_settings.go +++ b/internal/db/flow_accounting_settings.go @@ -96,7 +96,7 @@ func (db *Database) UpdateFlowAccountingSettings(ctx context.Context, enabled bo DBQueriesTotal.WithLabelValues(FlowAccountingSettingsTableName, "update").Inc() - _, err := db.applyUpdateFlowAccountingSettings(ctx, &boolPayload{Value: enabled}) + err := db.applyUpdateFlowAccountingSettings(ctx, &boolPayload{Value: enabled}) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/home_network_key.go b/internal/db/home_network_key.go index c6a15d5bb..634e6f0f1 100644 --- a/internal/db/home_network_key.go +++ b/internal/db/home_network_key.go @@ -29,6 +29,7 @@ const ( getHomeNetworkKeyStmtStr = "SELECT &HomeNetworkKey.* FROM %s WHERE id==$HomeNetworkKey.id" getHomeNetworkKeyBySchemeAndIdentifierStmtStr = "SELECT &HomeNetworkKey.* FROM %s WHERE scheme==$HomeNetworkKey.scheme AND key_identifier==$HomeNetworkKey.key_identifier" createHomeNetworkKeyStmtStr = "INSERT INTO %s (id, key_identifier, scheme, private_key) VALUES ($HomeNetworkKey.id, $HomeNetworkKey.key_identifier, $HomeNetworkKey.scheme, $HomeNetworkKey.private_key)" + updateHomeNetworkKeyStmtStr = "UPDATE %s SET scheme=$HomeNetworkKey.scheme, private_key=$HomeNetworkKey.private_key WHERE id==$HomeNetworkKey.id" deleteHomeNetworkKeyStmtStr = "DELETE FROM %s WHERE id==$HomeNetworkKey.id" countHomeNetworkKeysStmtStr = "SELECT COUNT(*) AS &NumItems.count FROM %s" ) @@ -234,6 +235,45 @@ func (db *Database) CreateHomeNetworkKey(ctx context.Context, key *HomeNetworkKe return nil } +// UpdateHomeNetworkKey rewrites the scheme and private_key of an +// existing identifier. Matches by UUID; the key_identifier is +// immutable (changing it is "rotate to a new key", a create+delete +// pair, not an update). +func (db *Database) UpdateHomeNetworkKey(ctx context.Context, key *HomeNetworkKey) error { + _, span := tracer.Start( + ctx, + fmt.Sprintf("%s %s", "UPDATE", HomeNetworkKeysTableName), + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + semconv.DBSystemNameSQLite, + semconv.DBOperationName("UPDATE"), + attribute.String("db.collection", HomeNetworkKeysTableName), + ), + ) + defer span.End() + + timer := prometheus.NewTimer(DBQueryDuration.WithLabelValues(HomeNetworkKeysTableName, "update")) + defer timer.ObserveDuration() + + DBQueriesTotal.WithLabelValues(HomeNetworkKeysTableName, "update").Inc() + + if key.ID == "" { + return fmt.Errorf("UpdateHomeNetworkKey: ID must be set by the caller") + } + + _, err := opUpdateHomeNetworkKey.Invoke(db, key) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + // DeleteHomeNetworkKey removes a home network key by its UUID. func (db *Database) DeleteHomeNetworkKey(ctx context.Context, id string) error { _, span := tracer.Start( diff --git a/internal/db/ip_leases.go b/internal/db/ip_leases.go index ec7243151..11eb24acf 100644 --- a/internal/db/ip_leases.go +++ b/internal/db/ip_leases.go @@ -496,6 +496,12 @@ func (db *Database) ListActiveLeasesByNode(ctx context.Context, nodeID int) ([]I return leases, nil } +// ListAllLeases returns every lease row. Used by the support bundle +// export and by the Fleet sync exporter. +func (db *Database) ListAllLeases(ctx context.Context) ([]IPLease, error) { + return db.listAllLeases(ctx) +} + // listAllLeases returns every lease row. Used only by the support bundle export. func (db *Database) listAllLeases(ctx context.Context) ([]IPLease, error) { var leases []IPLease diff --git a/internal/db/local_only_defaults_test.go b/internal/db/local_only_defaults_test.go index 7ad256472..0e3466fd5 100644 --- a/internal/db/local_only_defaults_test.go +++ b/internal/db/local_only_defaults_test.go @@ -106,6 +106,33 @@ func TestLocalOnlySingletons_SeededOnFreshDB(t *testing.T) { } }) + t.Run("fleet", func(t *testing.T) { + got, err := database.GetFleet(ctx) + if err != nil { + t.Fatalf("GetFleet on fresh DB returned error: %v", err) + } + + if got == nil { + t.Fatal("GetFleet returned nil row") + } + + if got.URL != "" { + t.Fatalf("URL = %q, want default \"\"", got.URL) + } + + if len(got.Token) != 0 { + t.Fatalf("Token len = %d, want 0", len(got.Token)) + } + + if got.LastSyncAt != "" { + t.Fatalf("LastSyncAt = %q, want default \"\"", got.LastSyncAt) + } + + if got.ConfigRevision != 0 { + t.Fatalf("ConfigRevision = %d, want default 0", got.ConfigRevision) + } + }) + // Restart preserves operator state. Switch every singleton away from // its default, close the DB, reopen it, and confirm the seed step // did not overwrite the operator-set values. @@ -131,6 +158,10 @@ func TestLocalOnlySingletons_SeededOnFreshDB(t *testing.T) { t.Fatalf("UpdateBGPSettings: %v", err) } + if err := database.UpdateFleetURL(ctx, "https://fleet.example.com"); err != nil { + t.Fatalf("UpdateFleetURL: %v", err) + } + if err := database.Close(); err != nil { t.Fatalf("Close: %v", err) } @@ -183,5 +214,14 @@ func TestLocalOnlySingletons_SeededOnFreshDB(t *testing.T) { if bgp.LocalAS != 65000 || bgp.RouterID != "10.0.0.1" || bgp.Enabled == db.BGPDefaultEnabled { t.Fatalf("BGP settings were reset after restart: %+v", bgp) } + + fleet, err := reopened.GetFleet(ctx) + if err != nil { + t.Fatalf("GetFleet after restart: %v", err) + } + + if fleet.URL != "https://fleet.example.com" { + t.Fatalf("fleet URL was reset after restart: got %q", fleet.URL) + } }) } diff --git a/internal/db/migration_v13.go b/internal/db/migration_v13.go new file mode 100644 index 000000000..83dd4d5dc --- /dev/null +++ b/internal/db/migration_v13.go @@ -0,0 +1,33 @@ +// Copyright 2026 Ella Networks + +package db + +import ( + "context" + "database/sql" + "fmt" +) + +// V13 introduces the fleet singleton table that tracks registration state +// with a Fleet control plane (URL, bearer sync token, and last sync +// progress). The table is local-only: each node holds its own registration +// and token and connects to Fleet independently. + +const v13CreateFleetTable = ` + CREATE TABLE IF NOT EXISTS %s ( + singleton BOOLEAN PRIMARY KEY DEFAULT TRUE, + url TEXT NOT NULL DEFAULT '', + token BLOB NOT NULL DEFAULT X'', + last_sync_at TEXT NOT NULL DEFAULT '', + config_revision INTEGER NOT NULL DEFAULT 0, + CHECK (singleton) +)` + +func migrateV13(ctx context.Context, tx *sql.Tx) error { + stmt := fmt.Sprintf(v13CreateFleetTable, FleetTableName) + if _, err := tx.ExecContext(ctx, stmt); err != nil { + return fmt.Errorf("failed to execute %q: %w", stmt, err) + } + + return nil +} diff --git a/internal/db/migrations.go b/internal/db/migrations.go index eeef0580c..8af2fe8d8 100644 --- a/internal/db/migrations.go +++ b/internal/db/migrations.go @@ -33,6 +33,7 @@ var migrations = []migration{ {10, "drop bgp_peers.nodeID and cluster_members.maxSchemaVersion (both dead post-HA-redesign)", migrateV10}, {11, "replicated AUTOINCREMENT PKs → TEXT (UUID); spec_uuid.md", migrateV11}, {12, "replace chain-PKI cluster TLS with fingerprint pinning (cluster_node_certs)", migrateV12}, + {13, "add fleet registration table", migrateV13}, } // baselineVersion is the highest migration that runs locally during diff --git a/internal/db/n3_settings.go b/internal/db/n3_settings.go index affd04951..ae48bb304 100644 --- a/internal/db/n3_settings.go +++ b/internal/db/n3_settings.go @@ -63,7 +63,7 @@ func (db *Database) UpdateN3Settings(ctx context.Context, externalAddress string DBQueriesTotal.WithLabelValues(N3SettingsTableName, "update").Inc() - _, err := db.applyUpdateN3Settings(ctx, &stringPayload{Value: externalAddress}) + err := db.applyUpdateN3Settings(ctx, &stringPayload{Value: externalAddress}) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/nat_settings.go b/internal/db/nat_settings.go index 514ff1ff2..251033bad 100644 --- a/internal/db/nat_settings.go +++ b/internal/db/nat_settings.go @@ -97,7 +97,7 @@ func (db *Database) UpdateNATSettings(ctx context.Context, enabled bool) error { DBQueriesTotal.WithLabelValues(NATSettingsTableName, "update").Inc() - _, err := db.applyUpdateNATSettings(ctx, &boolPayload{Value: enabled}) + err := db.applyUpdateNATSettings(ctx, &boolPayload{Value: enabled}) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/operations_register.go b/internal/db/operations_register.go index 5fb744f9d..f89f5d155 100644 --- a/internal/db/operations_register.go +++ b/internal/db/operations_register.go @@ -102,6 +102,7 @@ var ( // Home network key var ( opCreateHomeNetworkKey = registerChangesetOp("CreateHomeNetworkKey", (*Database).applyCreateHomeNetworkKey) + opUpdateHomeNetworkKey = registerChangesetOp("UpdateHomeNetworkKey", (*Database).applyUpdateHomeNetworkKey) opDeleteHomeNetworkKey = registerChangesetOp("DeleteHomeNetworkKey", (*Database).applyDeleteHomeNetworkKey) ) @@ -149,6 +150,14 @@ var ( opInitJoinHMAC = registerChangesetOp("InitClusterJoinHMACKey", (*Database).applyInitJoinHMAC, RequireSchema(12)) ) +// Fleet. The fleet table is local-only (per-node mTLS material and +// registration state); writes call apply* directly and skip Raft. +// UpdateConfig stays replicated because it mutates replicated tables +// (operator, profiles, slices, policies, subscribers, etc.). +var ( + opUpdateConfig = registerChangesetOp("UpdateConfig", (*Database).applyUpdateConfig, RequireSchema(13)) +) + // Intent ops — bulk deletes and migrations dispatched explicitly by the // FSM via CommandType. Call sites use intentOp.Invoke; the forwarded-op // envelope carries the same name the leader's dispatcher looks up here. diff --git a/internal/db/routes.go b/internal/db/routes.go index b56c334da..56b9782a1 100644 --- a/internal/db/routes.go +++ b/internal/db/routes.go @@ -273,7 +273,7 @@ func (db *Database) DeleteRoute(ctx context.Context, id int64) error { DBQueriesTotal.WithLabelValues(RoutesTableName, "delete").Inc() - _, err := db.applyDeleteRoute(ctx, &int64Payload{Value: id}) + err := db.applyDeleteRoute(ctx, &int64Payload{Value: id}) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) diff --git a/internal/db/sync_config.go b/internal/db/sync_config.go new file mode 100644 index 000000000..7a7c32a69 --- /dev/null +++ b/internal/db/sync_config.go @@ -0,0 +1,1236 @@ +// Copyright 2026 Ella Networks + +package db + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/ellanetworks/core/fleet/client" + "github.com/ellanetworks/core/internal/logger" + "github.com/google/uuid" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +// newUUIDv7 generates a fresh UUIDv7 string for entities created during +// fleet sync. Keep all generation in one place so the error path is +// uniform. +func newUUIDv7() (string, error) { + id, err := uuid.NewV7() + if err != nil { + return "", fmt.Errorf("generate uuid: %w", err) + } + + return id.String(), nil +} + +// UpdateClusterConfig applies the cluster-wide portion of the fleet +// config to the replicated tables. Only the leader calls this; the +// resulting changeset is replayed on every follower via Raft. +// +// Operator, profiles/slices/data networks, policies, subscribers, home +// network keys, network rules, and retention policies are reconciled +// here. Per-node network state (NAT, flow accounting, N3, routes, BGP) +// is applied separately via UpdateNodeConfig — those tables are +// local-only and each node owns its own copy. +// +// Fleet-side IDs are resolved to local IDs via name at apply time, so +// the IDs seen here do not need to match what the local DB last +// assigned. +func (db *Database) UpdateClusterConfig(ctx context.Context, cfg client.ClusterConfig) error { + _, span := tracer.Start(ctx, "UpdateClusterConfig", trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + + _, err := opUpdateConfig.Invoke(db, &cfg) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +// UpdateNodeConfig applies the per-node portion of the fleet config to +// local-only tables. Every node calls this with its own node-scoped +// payload; writes bypass Raft. +func (db *Database) UpdateNodeConfig(ctx context.Context, cfg client.NodeConfig) error { + _, span := tracer.Start(ctx, "UpdateNodeConfig", trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + + if err := db.applyNodeConfig(ctx, &cfg); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + span.SetStatus(codes.Ok, "") + + return nil +} + +func (db *Database) applyUpdateConfig(ctx context.Context, cfg *client.ClusterConfig) (any, error) { + return nil, db.applyClusterConfig(ctx, cfg) +} + +func (db *Database) applyClusterConfig(ctx context.Context, cfg *client.ClusterConfig) error { + if err := db.syncOperator(ctx, cfg.Operator); err != nil { + return fmt.Errorf("sync operator: %w", err) + } + + if err := db.syncHomeNetworkKeys(ctx, cfg.HomeNetworkKeys); err != nil { + return fmt.Errorf("sync home network keys: %w", err) + } + + if err := db.syncDataNetworks(ctx, cfg.DataNetworks); err != nil { + return fmt.Errorf("sync data networks: %w", err) + } + + if err := db.syncProfiles(ctx, cfg.Profiles); err != nil { + return fmt.Errorf("sync profiles: %w", err) + } + + if err := db.syncSlices(ctx, cfg.Slices); err != nil { + return fmt.Errorf("sync slices: %w", err) + } + + // Subscribers reference profiles; policies reference profiles, slices, + // and data networks. Delete-then-upsert both to survive FK constraints + // during rename-style reconciliations. + if err := db.syncSubscribersDeletes(ctx, cfg.Subscribers); err != nil { + return fmt.Errorf("sync subscribers (deletes): %w", err) + } + + if err := db.syncPoliciesDeletes(ctx, cfg.Policies); err != nil { + return fmt.Errorf("sync policies (deletes): %w", err) + } + + if err := db.syncPoliciesUpserts(ctx, cfg.Policies); err != nil { + return fmt.Errorf("sync policies (upserts): %w", err) + } + + if err := db.syncSubscribersUpserts(ctx, cfg.Subscribers); err != nil { + return fmt.Errorf("sync subscribers (upserts): %w", err) + } + + if err := db.syncProfilesDeletes(ctx, cfg.Profiles); err != nil { + return fmt.Errorf("sync profiles (deletes): %w", err) + } + + if err := db.syncSlicesDeletes(ctx, cfg.Slices); err != nil { + return fmt.Errorf("sync slices (deletes): %w", err) + } + + if err := db.syncDataNetworksDeletes(ctx, cfg.DataNetworks); err != nil { + return fmt.Errorf("sync data networks (deletes): %w", err) + } + + if err := db.syncNetworkRules(ctx, cfg.NetworkRules); err != nil { + return fmt.Errorf("sync network rules: %w", err) + } + + if err := db.syncRetentionPolicies(ctx, cfg.RetentionPolicies); err != nil { + return fmt.Errorf("sync retention policies: %w", err) + } + + return nil +} + +// applyNodeConfig writes the per-node portion of the fleet config to +// local-only tables. Every node runs this against its own DB; writes are +// not captured into a Raft changeset. +func (db *Database) applyNodeConfig(ctx context.Context, cfg *client.NodeConfig) error { + if err := db.syncRoutes(ctx, cfg.Routes); err != nil { + return fmt.Errorf("sync routes: %w", err) + } + + if err := db.syncBGPSettings(ctx, cfg.BGP); err != nil { + return fmt.Errorf("sync BGP settings: %w", err) + } + + if err := db.syncBGPPeersAndPrefixes(ctx, cfg.BGPPeers, cfg.BGPImportPrefixes); err != nil { + return fmt.Errorf("sync BGP peers: %w", err) + } + + if err := db.applyUpdateNATSettings(ctx, &boolPayload{Value: cfg.NAT}); err != nil { + return fmt.Errorf("sync NAT: %w", err) + } + + if err := db.applyUpdateFlowAccountingSettings(ctx, &boolPayload{Value: cfg.FlowAccounting}); err != nil { + return fmt.Errorf("sync flow accounting: %w", err) + } + + if err := db.applyUpdateN3Settings(ctx, &stringPayload{Value: cfg.NetworkInterfaces.N3ExternalAddress}); err != nil { + return fmt.Errorf("sync N3 settings: %w", err) + } + + return nil +} + +// syncOperator updates the singleton operator row in place. +// +// Cluster ID and AMF identity are intentionally not synced from Fleet: +// ClusterID is generated locally inside the cluster (see +// ensureClusterID) and AMF Region/Set IDs are cluster-local topology +// values that Fleet has no authority over. Operators set AMF identity +// via the cluster's own API (UpdateOperatorAMFIdentity). +func (db *Database) syncOperator(ctx context.Context, desired client.Operator) error { + if _, err := db.applyUpdateOperatorID(ctx, &Operator{Mcc: desired.ID.Mcc, Mnc: desired.ID.Mnc}); err != nil { + return fmt.Errorf("update operator id: %w", err) + } + + if _, err := db.applyUpdateOperatorCode(ctx, &Operator{OperatorCode: desired.OperatorCode}); err != nil { + return fmt.Errorf("update operator code: %w", err) + } + + tacJSON, err := json.Marshal(desired.Tracking.SupportedTacs) + if err != nil { + return fmt.Errorf("marshal supported TACs: %w", err) + } + + if _, err := db.applyUpdateOperatorTracking(ctx, &Operator{SupportedTACs: string(tacJSON)}); err != nil { + return fmt.Errorf("update operator tracking: %w", err) + } + + ciphering, err := json.Marshal(desired.NASSecurity.Ciphering) + if err != nil { + return fmt.Errorf("marshal ciphering: %w", err) + } + + integrity, err := json.Marshal(desired.NASSecurity.Integrity) + if err != nil { + return fmt.Errorf("marshal integrity: %w", err) + } + + if _, err := db.applyUpdateOperatorSecurityAlgorithms(ctx, &Operator{Ciphering: string(ciphering), Integrity: string(integrity)}); err != nil { + return fmt.Errorf("update operator NAS security: %w", err) + } + + if _, err := db.applyUpdateOperatorSPN(ctx, &Operator{SpnFullName: desired.SPN.FullName, SpnShortName: desired.SPN.ShortName}); err != nil { + return fmt.Errorf("update operator SPN: %w", err) + } + + return nil +} + +// syncHomeNetworkKeys reconciles home network keys by keyIdentifier. +// +// Reconcile contract: +// - missing identifiers are created from the desired payload, +// - identifiers present locally but absent from the desired payload +// are deleted, +// - identifiers present in both are updated in place when scheme or +// private_key drifts from the desired value. +// +// Mirrors syncDataNetworks and the other resource reconcilers so Fleet +// can treat HNKs as ordinary CRUD; the UUID stays stable across an +// update (key_identifier itself is the operator-facing handle). +func (db *Database) syncHomeNetworkKeys(ctx context.Context, desired []client.HomeNetworkKey) error { + existing, err := db.listHomeNetworkKeysPinned(ctx) + if err != nil { + return err + } + + have := make(map[int]HomeNetworkKey, len(existing)) + for _, k := range existing { + have[k.KeyIdentifier] = k + } + + want := make(map[int]bool, len(desired)) + + for _, k := range desired { + want[k.KeyIdentifier] = true + + cur, ok := have[k.KeyIdentifier] + if !ok { + id, err := newUUIDv7() + if err != nil { + return err + } + + newKey := &HomeNetworkKey{ + ID: id, + KeyIdentifier: k.KeyIdentifier, + Scheme: k.Scheme, + PrivateKey: k.PrivateKey, + } + + if _, err := db.applyCreateHomeNetworkKey(ctx, newKey); err != nil { + return fmt.Errorf("create home network key %d: %w", k.KeyIdentifier, err) + } + + logger.DBLog.Info("Created home network key from fleet config", zap.Int("key_identifier", k.KeyIdentifier)) + + continue + } + + if cur.Scheme == k.Scheme && cur.PrivateKey == k.PrivateKey { + continue + } + + updated := &HomeNetworkKey{ + ID: cur.ID, + KeyIdentifier: k.KeyIdentifier, + Scheme: k.Scheme, + PrivateKey: k.PrivateKey, + } + + if _, err := db.applyUpdateHomeNetworkKey(ctx, updated); err != nil { + return fmt.Errorf("update home network key %d: %w", k.KeyIdentifier, err) + } + + logger.DBLog.Info("Updated home network key from fleet config", zap.Int("key_identifier", k.KeyIdentifier)) + } + + for _, k := range existing { + if want[k.KeyIdentifier] { + continue + } + + if _, err := db.applyDeleteHomeNetworkKey(ctx, &stringPayload{Value: k.ID}); err != nil { + return fmt.Errorf("delete home network key %d: %w", k.KeyIdentifier, err) + } + + logger.DBLog.Info("Deleted home network key from fleet config", zap.Int("key_identifier", k.KeyIdentifier)) + } + + return nil +} + +// syncDataNetworks upserts desired data networks. Deletions are deferred +// to syncDataNetworksDeletes so policies referencing a DN can be rewired +// or removed first. +func (db *Database) syncDataNetworks(ctx context.Context, desired []client.DataNetwork) error { + existing, err := db.listAllDataNetworksPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]DataNetwork, len(existing)) + for _, dn := range existing { + have[dn.Name] = dn + } + + for _, d := range desired { + want := DataNetwork{Name: d.Name, IPPool: d.IPPool, DNS: d.DNS, MTU: d.MTU} + + cur, ok := have[d.Name] + if !ok { + id, err := newUUIDv7() + if err != nil { + return err + } + + want.ID = id + + if _, err := db.applyCreateDataNetwork(ctx, &want); err != nil { + return fmt.Errorf("create data network %q: %w", d.Name, err) + } + + logger.DBLog.Info("Created data network from fleet config", zap.String("name", d.Name)) + + continue + } + + if cur.IPPool != d.IPPool || cur.DNS != d.DNS || cur.MTU != d.MTU { + if _, err := db.applyUpdateDataNetwork(ctx, &want); err != nil { + return fmt.Errorf("update data network %q: %w", d.Name, err) + } + + logger.DBLog.Info("Updated data network from fleet config", zap.String("name", d.Name)) + } + } + + return nil +} + +func (db *Database) syncDataNetworksDeletes(ctx context.Context, desired []client.DataNetwork) error { + existing, err := db.listAllDataNetworksPinned(ctx) + if err != nil { + return err + } + + want := make(map[string]bool, len(desired)) + for _, d := range desired { + want[d.Name] = true + } + + for _, cur := range existing { + if want[cur.Name] { + continue + } + + if _, err := db.applyDeleteDataNetwork(ctx, &stringPayload{Value: cur.Name}); err != nil { + return fmt.Errorf("delete data network %q: %w", cur.Name, err) + } + + logger.DBLog.Info("Deleted data network from fleet config", zap.String("name", cur.Name)) + } + + return nil +} + +func (db *Database) syncProfiles(ctx context.Context, desired []client.Profile) error { + existing, err := db.listAllProfilesPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]Profile, len(existing)) + for _, p := range existing { + have[p.Name] = p + } + + for _, d := range desired { + want := Profile{Name: d.Name, UeAmbrUplink: d.UeAmbrUplink, UeAmbrDownlink: d.UeAmbrDownlink} + + cur, ok := have[d.Name] + if !ok { + id, err := newUUIDv7() + if err != nil { + return err + } + + want.ID = id + + if _, err := db.applyCreateProfile(ctx, &want); err != nil { + return fmt.Errorf("create profile %q: %w", d.Name, err) + } + + logger.DBLog.Info("Created profile from fleet config", zap.String("name", d.Name)) + + continue + } + + if cur.UeAmbrUplink != d.UeAmbrUplink || cur.UeAmbrDownlink != d.UeAmbrDownlink { + if _, err := db.applyUpdateProfile(ctx, &want); err != nil { + return fmt.Errorf("update profile %q: %w", d.Name, err) + } + + logger.DBLog.Info("Updated profile from fleet config", zap.String("name", d.Name)) + } + } + + return nil +} + +func (db *Database) syncProfilesDeletes(ctx context.Context, desired []client.Profile) error { + existing, err := db.listAllProfilesPinned(ctx) + if err != nil { + return err + } + + want := make(map[string]bool, len(desired)) + for _, d := range desired { + want[d.Name] = true + } + + for _, cur := range existing { + if want[cur.Name] { + continue + } + + if _, err := db.applyDeleteProfile(ctx, &stringPayload{Value: cur.Name}); err != nil { + return fmt.Errorf("delete profile %q: %w", cur.Name, err) + } + + logger.DBLog.Info("Deleted profile from fleet config", zap.String("name", cur.Name)) + } + + return nil +} + +func (db *Database) syncSlices(ctx context.Context, desired []client.Slice) error { + existing, err := db.listAllSlicesPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]NetworkSlice, len(existing)) + for _, s := range existing { + have[s.Name] = s + } + + for _, d := range desired { + want := NetworkSlice{Name: d.Name, Sst: d.Sst, Sd: d.Sd} + + cur, ok := have[d.Name] + if !ok { + id, err := newUUIDv7() + if err != nil { + return err + } + + want.ID = id + + if _, err := db.applyCreateNetworkSlice(ctx, &want); err != nil { + return fmt.Errorf("create slice %q: %w", d.Name, err) + } + + logger.DBLog.Info("Created slice from fleet config", zap.String("name", d.Name)) + + continue + } + + if !slicesEqual(cur, want) { + if _, err := db.applyUpdateNetworkSlice(ctx, &want); err != nil { + return fmt.Errorf("update slice %q: %w", d.Name, err) + } + + logger.DBLog.Info("Updated slice from fleet config", zap.String("name", d.Name)) + } + } + + return nil +} + +func (db *Database) syncSlicesDeletes(ctx context.Context, desired []client.Slice) error { + existing, err := db.listAllSlicesPinned(ctx) + if err != nil { + return err + } + + want := make(map[string]bool, len(desired)) + for _, d := range desired { + want[d.Name] = true + } + + for _, cur := range existing { + if want[cur.Name] { + continue + } + + if _, err := db.applyDeleteNetworkSlice(ctx, &stringPayload{Value: cur.Name}); err != nil { + return fmt.Errorf("delete slice %q: %w", cur.Name, err) + } + + logger.DBLog.Info("Deleted slice from fleet config", zap.String("name", cur.Name)) + } + + return nil +} + +func slicesEqual(a NetworkSlice, b NetworkSlice) bool { + if a.Sst != b.Sst { + return false + } + + if (a.Sd == nil) != (b.Sd == nil) { + return false + } + + if a.Sd != nil && *a.Sd != *b.Sd { + return false + } + + return true +} + +func (db *Database) syncPoliciesDeletes(ctx context.Context, desired []client.Policy) error { + existing, err := db.listAllPoliciesPinned(ctx) + if err != nil { + return err + } + + want := make(map[string]bool, len(desired)) + for _, d := range desired { + want[d.Name] = true + } + + for _, cur := range existing { + if want[cur.Name] { + continue + } + + if _, err := db.applyDeletePolicy(ctx, &stringPayload{Value: cur.Name}); err != nil { + return fmt.Errorf("delete policy %q: %w", cur.Name, err) + } + + logger.DBLog.Info("Deleted policy from fleet config", zap.String("name", cur.Name)) + } + + return nil +} + +// syncPoliciesUpserts resolves fleet policy references by name and +// upserts local rows. Profile/slice/data-network rows for referenced +// names must already exist locally (they are synced earlier). +func (db *Database) syncPoliciesUpserts(ctx context.Context, desired []client.Policy) error { + profiles, err := db.listAllProfilesPinned(ctx) + if err != nil { + return err + } + + profileID := make(map[string]string, len(profiles)) + for _, p := range profiles { + profileID[p.Name] = p.ID + } + + slices, err := db.listAllSlicesPinned(ctx) + if err != nil { + return err + } + + sliceID := make(map[string]string, len(slices)) + for _, s := range slices { + sliceID[s.Name] = s.ID + } + + dataNetworks, err := db.listAllDataNetworksPinned(ctx) + if err != nil { + return err + } + + dnID := make(map[string]string, len(dataNetworks)) + for _, dn := range dataNetworks { + dnID[dn.Name] = dn.ID + } + + existing, err := db.listAllPoliciesPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]Policy, len(existing)) + for _, p := range existing { + have[p.Name] = p + } + + for _, d := range desired { + pid, ok := profileID[d.ProfileName] + if !ok { + return fmt.Errorf("policy %q references unknown profile %q", d.Name, d.ProfileName) + } + + slid, ok := sliceID[d.SliceName] + if !ok { + return fmt.Errorf("policy %q references unknown slice %q", d.Name, d.SliceName) + } + + did, ok := dnID[d.DataNetworkName] + if !ok { + return fmt.Errorf("policy %q references unknown data network %q", d.Name, d.DataNetworkName) + } + + want := Policy{ + Name: d.Name, + ProfileID: pid, + SliceID: slid, + DataNetworkID: did, + Var5qi: d.Var5qi, + Arp: d.Arp, + SessionAmbrUplink: d.SessionAmbrUplink, + SessionAmbrDownlink: d.SessionAmbrDownlink, + } + + cur, ok := have[d.Name] + if !ok { + id, err := newUUIDv7() + if err != nil { + return err + } + + want.ID = id + + if _, err := db.applyCreatePolicy(ctx, &want); err != nil { + return fmt.Errorf("create policy %q: %w", d.Name, err) + } + + logger.DBLog.Info("Created policy from fleet config", zap.String("name", d.Name)) + + continue + } + + if policiesEqual(cur, want) { + continue + } + + if _, err := db.applyUpdatePolicy(ctx, &want); err != nil { + return fmt.Errorf("update policy %q: %w", d.Name, err) + } + + logger.DBLog.Info("Updated policy from fleet config", zap.String("name", d.Name)) + } + + return nil +} + +func policiesEqual(a, b Policy) bool { + return a.ProfileID == b.ProfileID && + a.SliceID == b.SliceID && + a.DataNetworkID == b.DataNetworkID && + a.Var5qi == b.Var5qi && + a.Arp == b.Arp && + a.SessionAmbrUplink == b.SessionAmbrUplink && + a.SessionAmbrDownlink == b.SessionAmbrDownlink +} + +func (db *Database) syncSubscribersDeletes(ctx context.Context, desired []client.Subscriber) error { + existing, err := db.listAllSubscribersPinned(ctx) + if err != nil { + return err + } + + want := make(map[string]bool, len(desired)) + for _, d := range desired { + want[d.Imsi] = true + } + + for _, cur := range existing { + if want[cur.Imsi] { + continue + } + + if _, err := db.applyDeleteSubscriber(ctx, &stringPayload{Value: cur.Imsi}); err != nil { + return fmt.Errorf("delete subscriber %q: %w", cur.Imsi, err) + } + + logger.DBLog.Info("Deleted subscriber from fleet config", zap.String("imsi", cur.Imsi)) + } + + return nil +} + +// syncSubscribersUpserts creates new subscribers and re-homes existing ones +// to a different profile. Permanent key / OPC / sequence number are local +// secrets and are never overwritten once the row exists. +func (db *Database) syncSubscribersUpserts(ctx context.Context, desired []client.Subscriber) error { + profiles, err := db.listAllProfilesPinned(ctx) + if err != nil { + return err + } + + profileID := make(map[string]string, len(profiles)) + for _, p := range profiles { + profileID[p.Name] = p.ID + } + + existing, err := db.listAllSubscribersPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]Subscriber, len(existing)) + for _, s := range existing { + have[s.Imsi] = s + } + + for _, d := range desired { + pid, ok := profileID[d.ProfileName] + if !ok { + return fmt.Errorf("subscriber %q references unknown profile %q", d.Imsi, d.ProfileName) + } + + cur, exists := have[d.Imsi] + if !exists { + id, err := newUUIDv7() + if err != nil { + return err + } + + sub := &Subscriber{ + ID: id, + Imsi: d.Imsi, + SequenceNumber: d.SequenceNumber, + PermanentKey: d.PermanentKey, + Opc: d.Opc, + ProfileID: pid, + } + + if _, err := db.applyCreateSubscriber(ctx, sub); err != nil { + return fmt.Errorf("create subscriber %q: %w", d.Imsi, err) + } + + logger.DBLog.Info("Created subscriber from fleet config", zap.String("imsi", d.Imsi)) + + continue + } + + if cur.ProfileID == pid { + continue + } + + if _, err := db.applyUpdateSubscriberProfile(ctx, &Subscriber{Imsi: d.Imsi, ProfileID: pid}); err != nil { + return fmt.Errorf("update subscriber %q profile: %w", d.Imsi, err) + } + + logger.DBLog.Info("Updated subscriber profile from fleet config", zap.String("imsi", d.Imsi)) + } + + return nil +} + +// syncRoutes reconciles routes by their natural key (destination, gateway, +// interface, metric) since they have no stable ID from Fleet's side. +func (db *Database) syncRoutes(ctx context.Context, desired []client.Route) error { + existing, err := db.listAllRoutesPinned(ctx) + if err != nil { + return err + } + + type routeKey struct { + Destination string + Gateway string + Interface NetworkInterface + Metric int + } + + have := make(map[routeKey]int64, len(existing)) + for _, r := range existing { + have[routeKey{r.Destination, r.Gateway, r.Interface, r.Metric}] = r.ID + } + + want := make(map[routeKey]bool, len(desired)) + + for _, d := range desired { + iface := parseFleetNetworkInterface(d.Interface) + key := routeKey{d.Destination, d.Gateway, iface, d.Metric} + want[key] = true + + if _, ok := have[key]; ok { + continue + } + + r := &Route{ + Destination: d.Destination, + Gateway: d.Gateway, + Interface: iface, + Metric: d.Metric, + } + + if _, err := db.applyCreateRoute(ctx, r); err != nil { + return fmt.Errorf("create route %s→%s: %w", d.Destination, d.Gateway, err) + } + + logger.DBLog.Info("Created route from fleet config", + zap.String("destination", d.Destination), + zap.String("gateway", d.Gateway), + ) + } + + for key, id := range have { + if want[key] { + continue + } + + if err := db.applyDeleteRoute(ctx, &int64Payload{Value: id}); err != nil { + return fmt.Errorf("delete route id=%d: %w", id, err) + } + + logger.DBLog.Info("Deleted route from fleet config", zap.Int64("id", id)) + } + + return nil +} + +func parseFleetNetworkInterface(s string) NetworkInterface { + switch s { + case "n3": + return N3 + case "n6": + return N6 + default: + return N6 + } +} + +// --- pinned-runner list helpers --- +// +// These run under the pinned connection during changeset capture so the +// reads see the uncommitted writes made earlier in the same sync. Outside +// capture they fall through to the shared connection. They intentionally +// bypass tracing/metrics since the caller (applySyncConfig) already spans +// the whole operation. + +func (db *Database) listAllDataNetworksPinned(ctx context.Context) ([]DataNetwork, error) { + var rows []DataNetwork + + err := db.runner(ctx).Query(ctx, db.listAllDataNetworksStmt).GetAll(&rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list data networks: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllProfilesPinned(ctx context.Context) ([]Profile, error) { + // No prepared listAllProfilesStmt exists yet; reuse page statement. + var rows []Profile + + args := ListArgs{Limit: 1_000_000, Offset: 0} + + var counts []NumItems + + err := db.runner(ctx).Query(ctx, db.listProfilesStmt, args).GetAll(&rows, &counts) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list profiles: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllSlicesPinned(ctx context.Context) ([]NetworkSlice, error) { + var rows []NetworkSlice + + err := db.runner(ctx).Query(ctx, db.listAllNetworkSlicesStmt).GetAll(&rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list slices: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllPoliciesPinned(ctx context.Context) ([]Policy, error) { + var rows []Policy + + args := ListArgs{Limit: 1_000_000, Offset: 0} + + var counts []NumItems + + err := db.runner(ctx).Query(ctx, db.listPoliciesStmt, args).GetAll(&rows, &counts) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list policies: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllSubscribersPinned(ctx context.Context) ([]Subscriber, error) { + var rows []Subscriber + + args := ListArgs{Limit: 1_000_000, Offset: 0} + + var counts []NumItems + + err := db.runner(ctx).Query(ctx, db.listSubscribersStmt, args).GetAll(&rows, &counts) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list subscribers: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllRoutesPinned(ctx context.Context) ([]Route, error) { + var rows []Route + + args := ListArgs{Limit: 1_000_000, Offset: 0} + + var counts []NumItems + + err := db.runner(ctx).Query(ctx, db.listRoutesStmt, args).GetAll(&rows, &counts) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list routes: %w", err) + } + + return rows, nil +} + +func (db *Database) listHomeNetworkKeysPinned(ctx context.Context) ([]HomeNetworkKey, error) { + var rows []HomeNetworkKey + + err := db.runner(ctx).Query(ctx, db.listHomeNetworkKeysStmt).GetAll(&rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list home network keys: %w", err) + } + + return rows, nil +} + +func (db *Database) listAllBGPPeersPinned(ctx context.Context) ([]BGPPeer, error) { + var rows []BGPPeer + + err := db.runner(ctx).Query(ctx, db.listAllBGPPeersStmt).GetAll(&rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list BGP peers: %w", err) + } + + return rows, nil +} + +func (db *Database) listImportPrefixesByPeerPinned(ctx context.Context, peerID int) ([]BGPImportPrefix, error) { + var rows []BGPImportPrefix + + err := db.runner(ctx).Query(ctx, db.listImportPrefixesByPeerStmt, BGPImportPrefix{PeerID: peerID}).GetAll(&rows) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("list import prefixes for peer %d: %w", peerID, err) + } + + return rows, nil +} + +// syncNetworkRules reconciles per-policy filter rules. For each policy in +// the local DB we delete every existing rule, then re-create the desired +// rules in precedence order — matching the API's delete-all-then-create +// semantics so there is one consistent apply path. +func (db *Database) syncNetworkRules(ctx context.Context, desired []client.NetworkRule) error { + policies, err := db.listAllPoliciesPinned(ctx) + if err != nil { + return err + } + + policyIDByName := make(map[string]string, len(policies)) + for _, p := range policies { + policyIDByName[p.Name] = p.ID + } + + byPolicy := make(map[string][]client.NetworkRule, len(desired)) + + for _, r := range desired { + byPolicy[r.PolicyName] = append(byPolicy[r.PolicyName], r) + } + + for _, p := range policies { + if _, err := db.applyDeleteNetworkRulesByPolicy(ctx, &stringPayload{Value: p.ID}); err != nil { + return fmt.Errorf("delete rules for policy %q: %w", p.Name, err) + } + } + + for policyName, rules := range byPolicy { + policyID, ok := policyIDByName[policyName] + if !ok { + return fmt.Errorf("network rule references unknown policy %q", policyName) + } + + for _, r := range rules { + id, err := newUUIDv7() + if err != nil { + return err + } + + dbRule := &NetworkRule{ + ID: id, + PolicyID: policyID, + Description: r.Description, + Direction: r.Direction, + RemotePrefix: r.RemotePrefix, + Protocol: r.Protocol, + PortLow: r.PortLow, + PortHigh: r.PortHigh, + Action: r.Action, + Precedence: r.Precedence, + } + + if _, err := db.applyCreateNetworkRule(ctx, dbRule); err != nil { + return fmt.Errorf("create rule for policy %q (precedence %d): %w", policyName, r.Precedence, err) + } + } + } + + return nil +} + +func (db *Database) syncBGPSettings(ctx context.Context, desired client.BGPSettings) error { + s := &BGPSettings{ + Enabled: desired.Enabled, + LocalAS: desired.LocalAS, + RouterID: desired.RouterID, + ListenAddress: desired.ListenAddress, + } + + if err := db.applyUpdateBGPSettings(ctx, s); err != nil { + return fmt.Errorf("update BGP settings: %w", err) + } + + return nil +} + +// syncBGPPeersAndPrefixes reconciles this node's BGP peers against the +// desired set Fleet has scoped to this node. Import prefixes are grouped +// by peer address and replaced atomically per peer. +func (db *Database) syncBGPPeersAndPrefixes(ctx context.Context, desiredPeers []client.BGPPeer, desiredPrefixes []client.BGPImportPrefix) error { + existing, err := db.listAllBGPPeersPinned(ctx) + if err != nil { + return err + } + + have := make(map[string]BGPPeer, len(existing)) + for _, p := range existing { + have[p.Address] = p + } + + want := make(map[string]client.BGPPeer, len(desiredPeers)) + for _, p := range desiredPeers { + want[p.Address] = p + } + + for addr, cur := range have { + if _, ok := want[addr]; ok { + continue + } + + if err := db.applyDeleteBGPPeer(ctx, &intPayload{Value: cur.ID}); err != nil { + return fmt.Errorf("delete BGP peer %s: %w", addr, err) + } + } + + for _, d := range desiredPeers { + cur, ok := have[d.Address] + if !ok { + newPeer := &BGPPeer{ + Address: d.Address, + RemoteAS: d.RemoteAS, + HoldTime: d.HoldTime, + Password: d.Password, + Description: d.Description, + } + + if _, err := db.applyCreateBGPPeer(ctx, newPeer); err != nil { + return fmt.Errorf("create BGP peer %s: %w", d.Address, err) + } + + continue + } + + if bgpPeerEqual(cur, d) { + continue + } + + upd := &BGPPeer{ + ID: cur.ID, + Address: d.Address, + RemoteAS: d.RemoteAS, + HoldTime: d.HoldTime, + Password: d.Password, + Description: d.Description, + } + + if err := db.applyUpdateBGPPeer(ctx, upd); err != nil { + return fmt.Errorf("update BGP peer %s: %w", d.Address, err) + } + } + + // Re-read peers so newly-created rows have their assigned IDs. + refreshed, err := db.listAllBGPPeersPinned(ctx) + if err != nil { + return err + } + + peerIDByAddress := make(map[string]int, len(refreshed)) + + for _, p := range refreshed { + peerIDByAddress[p.Address] = p.ID + } + + prefixesByPeer := make(map[string][]BGPImportPrefix, len(desiredPrefixes)) + for _, pr := range desiredPrefixes { + prefixesByPeer[pr.PeerAddress] = append(prefixesByPeer[pr.PeerAddress], BGPImportPrefix{ + Prefix: pr.Prefix, + MaxLength: pr.MaxLength, + }) + } + + // Replace prefixes for every cluster-wide peer (desired ones get the + // new list; peers with no entries get an empty list, clearing stale + // prefixes). + for addr, peerID := range peerIDByAddress { + if err := db.replacePeerImportPrefixes(ctx, peerID, prefixesByPeer[addr]); err != nil { + return fmt.Errorf("replace import prefixes for peer %s: %w", addr, err) + } + } + + for addr := range prefixesByPeer { + if _, ok := peerIDByAddress[addr]; !ok { + return fmt.Errorf("import prefix references unknown BGP peer %q", addr) + } + } + + return nil +} + +// replacePeerImportPrefixes atomically swaps a peer's import prefixes +// via the local-only delete-then-insert path on bgp_import_prefixes. +func (db *Database) replacePeerImportPrefixes(ctx context.Context, peerID int, prefixes []BGPImportPrefix) error { + current, err := db.listImportPrefixesByPeerPinned(ctx, peerID) + if err != nil { + return err + } + + if len(current) == 0 && len(prefixes) == 0 { + return nil + } + + return db.SetImportPrefixesForPeer(ctx, peerID, prefixes) +} + +func bgpPeerEqual(cur BGPPeer, want client.BGPPeer) bool { + return cur.RemoteAS == want.RemoteAS && + cur.HoldTime == want.HoldTime && + cur.Password == want.Password && + cur.Description == want.Description +} + +// fleetManagedRetentionCategories lists the retention categories +// Fleet has authority over. Any of these missing from the sync +// payload is interpreted as "Fleet deleted the override" and reset +// to Core's built-in default. Categories outside this set (audit, +// radio) are Core-local and never touched by sync. +var fleetManagedRetentionCategories = []RetentionCategory{ + CategorySubscriberUsage, + CategoryFlowReports, +} + +// defaultRetentionDaysFor returns the boot-seeded default for a +// category. Mirrors the constants applied in db.Initialize so a +// Fleet "delete" round-trips to the same value a fresh Core would +// have used at startup. +func defaultRetentionDaysFor(cat RetentionCategory) int { + switch cat { + case CategorySubscriberUsage: + return DefaultSubscriberUsageRetentionDays + case CategoryFlowReports: + return DefaultFlowReportsRetentionDays + case CategoryAuditLogs, CategoryRadioLogs: + return DefaultLogRetentionDays + } + + return 0 +} + +func (db *Database) syncRetentionPolicies(ctx context.Context, desired []client.RetentionPolicy) error { + want := make(map[RetentionCategory]bool, len(desired)) + + for _, rp := range desired { + want[RetentionCategory(rp.Category)] = true + + policy := &RetentionPolicy{ + Category: RetentionCategory(rp.Category), + Days: rp.Days, + } + + if _, err := db.applySetRetentionPolicy(ctx, policy); err != nil { + return fmt.Errorf("set retention policy %s: %w", rp.Category, err) + } + } + + // Reset Fleet-managed categories Fleet didn't send back to Core's + // boot-seeded default. Matches the symmetric CRUD contract Fleet + // expects: a DELETE on the retention endpoint clears the + // operator's override and reverts to the platform default. + for _, cat := range fleetManagedRetentionCategories { + if want[cat] { + continue + } + + policy := &RetentionPolicy{ + Category: cat, + Days: defaultRetentionDaysFor(cat), + } + + if _, err := db.applySetRetentionPolicy(ctx, policy); err != nil { + return fmt.Errorf("reset retention policy %s: %w", cat, err) + } + + logger.DBLog.Info("Reset retention policy to default from fleet config", zap.String("category", string(cat)), zap.Int("days", policy.Days)) + } + + return nil +} diff --git a/pkg/runtime/fleet_supervisor.go b/pkg/runtime/fleet_supervisor.go new file mode 100644 index 000000000..d11c8cf46 --- /dev/null +++ b/pkg/runtime/fleet_supervisor.go @@ -0,0 +1,121 @@ +// Copyright 2026 Ella Networks + +package runtime + +import ( + "context" + "time" + + "github.com/ellanetworks/core/fleet" + "github.com/ellanetworks/core/fleet/client" + "github.com/ellanetworks/core/internal/amf" + "github.com/ellanetworks/core/internal/api/server" + "github.com/ellanetworks/core/internal/bgp" + "github.com/ellanetworks/core/internal/config" + "github.com/ellanetworks/core/internal/db" + "github.com/ellanetworks/core/internal/logger" + "github.com/ellanetworks/core/internal/upf" + "go.uber.org/zap" +) + +// fleetSupervisorInterval is how often the supervisor re-checks whether +// the Core is currently Fleet-managed. +const fleetSupervisorInterval = 30 * time.Second + +// runFleetSupervisor polls the local fleet row on every node and starts +// or stops the per-node sync loop to match. The fleet table is +// local-only: each node holds its own registration state and connects +// to Fleet independently. +func runFleetSupervisor(ctx context.Context, dbInstance *db.Database, cfg config.Config, amfInstance *amf.AMF, upfInstance *upf.UPF, bgpService *bgp.BGPService, buffer *fleet.FleetBuffer) { + ticker := time.NewTicker(fleetSupervisorInterval) + defer ticker.Stop() + + var running bool + + check := func() { + managed, err := dbInstance.IsFleetManaged(ctx) + if err != nil { + logger.EllaLog.Warn("fleet supervisor: couldn't read fleet state", zap.Error(err)) + return + } + + switch { + case managed && !running: + if err := startFleetSync(ctx, dbInstance, cfg, amfInstance, upfInstance, bgpService, buffer); err != nil { + logger.EllaLog.Warn("fleet supervisor: failed to start sync", zap.Error(err)) + return + } + + running = true + + case !managed && running: + fleet.StopSync() + + running = false + } + } + + check() + + for { + select { + case <-ctx.Done(): + if running { + fleet.StopSync() + } + + return + case <-ticker.C: + check() + } + } +} + +func startFleetSync(ctx context.Context, dbInstance *db.Database, cfg config.Config, amfInstance *amf.AMF, upfInstance *upf.UPF, bgpService *bgp.BGPService, buffer *fleet.FleetBuffer) error { + fleetData, err := dbInstance.GetFleet(ctx) + if err != nil { + return err + } + + clusterID := "" + + if op, err := dbInstance.GetOperator(ctx); err == nil { + clusterID = op.ClusterID + } + + statusProvider := func() client.EllaCoreStatus { + return server.BuildStatus(context.Background(), dbInstance, cfg, amfInstance, bgpService) + } + + metricsProvider := func() client.EllaCoreMetrics { + return server.BuildMetrics() + } + + handle, err := fleet.ResumeSync(ctx, fleet.ResumeSyncInput{ + FleetURL: fleetData.URL, + Token: string(fleetData.Token), + InsecureSkipVerify: cfg.Fleet.InsecureSkipVerify, + DB: dbInstance, + StatusProvider: statusProvider, + MetricsProvider: metricsProvider, + OnSync: func(syncCtx context.Context, success bool) { + if success { + if err := dbInstance.UpdateFleetSyncStatus(syncCtx); err != nil { + logger.EllaLog.Error("couldn't update fleet sync status", zap.Error(err)) + } + } + }, + Buffer: buffer, + ClusterEnabled: cfg.Cluster.Enabled, + ClusterID: clusterID, + }) + if err != nil { + return err + } + + if upfInstance != nil { + handle.SetConfigReloader(upfInstance) + } + + return nil +} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index e4b0914e2..45311e302 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/ellanetworks/core/fleet" "github.com/ellanetworks/core/internal/amf" "github.com/ellanetworks/core/internal/amf/nas" "github.com/ellanetworks/core/internal/amf/nas/gmm" @@ -459,6 +460,8 @@ func Start(ctx context.Context, rc RuntimeConfig) error { gmm.RegisterMetrics() ngap.RegisterMetrics() + fleetBuffer := fleet.NewFleetBuffer(0) + // --- Phase B: upgrade the API server to serve all routes now that // the cluster is formed, settings are seeded, and NFs are running. --- if err := apiServer.Upgrade(ctx, api.UpgradeConfig{ @@ -473,6 +476,10 @@ func Start(ctx context.Context, rc RuntimeConfig) error { return fmt.Errorf("couldn't upgrade API: %w", err) } + wg.Go(func() { + runFleetSupervisor(ctx, dbInstance, cfg, amfInstance, upfInstance, bgpService, fleetBuffer) + }) + nasLogger.SetLogLevel(0) // Suppress free5gc NAS log output sctpServer := service.NewServer(service.Callbacks{ diff --git a/ui/src/components/ConfigureFleetModal.tsx b/ui/src/components/ConfigureFleetModal.tsx new file mode 100644 index 000000000..e0ca13fab --- /dev/null +++ b/ui/src/components/ConfigureFleetModal.tsx @@ -0,0 +1,138 @@ +import React, { useState, useEffect } from "react"; +import { + Dialog, + DialogTitle, + DialogContent, + DialogActions, + TextField, + Button, + Alert, + Collapse, + Typography, +} from "@mui/material"; +import { registerFleet, getFleetURL, updateFleetURL } from "@/queries/fleet"; +import { useNavigate } from "react-router-dom"; +import { useAuth } from "@/contexts/AuthContext"; + +interface ConfigureFleetModalProps { + open: boolean; + onClose: () => void; +} + +const ConfigureFleetModal: React.FC = ({ + open, + onClose, +}) => { + const navigate = useNavigate(); + const { accessToken, authReady } = useAuth(); + if (!authReady || !accessToken) navigate("/login"); + + const [fleetURL, setFleetURL] = useState(""); + const [activationToken, setActivationToken] = useState(""); + const [loading, setLoading] = useState(false); + const [alert, setAlert] = useState<{ message: string }>({ message: "" }); + + useEffect(() => { + if (open && accessToken) { + getFleetURL(accessToken) + .then((resp) => { + if (resp.url) setFleetURL(resp.url); + }) + .catch(() => {}); + } + }, [open, accessToken]); + + const handleClose = () => { + setActivationToken(""); + setAlert({ message: "" }); + onClose(); + }; + + const handleSubmit = async () => { + if (!accessToken) return; + if (!fleetURL.trim()) return; + if (!activationToken.trim()) return; + setLoading(true); + setAlert({ message: "" }); + try { + await updateFleetURL(accessToken, fleetURL.trim()); + await registerFleet(accessToken, activationToken.trim()); + handleClose(); + } catch (error: unknown) { + let errorMessage = "Unknown error occurred."; + if (error instanceof Error) { + errorMessage = error.message; + } + setAlert({ + message: `Failed to register to fleet: ${errorMessage}`, + }); + console.error("Failed to register to fleet:", error); + } finally { + setLoading(false); + } + }; + + return ( + + Configure Fleet + + + setAlert({ message: "" })} + sx={{ mb: 2 }} + severity="error" + > + {alert.message} + + + + Enter the Ella Fleet server address and paste the activation token to + connect this instance. + + setFleetURL(e.target.value)} + placeholder="https://fleet.example.com:5003" + margin="normal" + disabled={loading} + /> + setActivationToken(e.target.value)} + placeholder="Paste your activation token here" + multiline + minRows={3} + maxRows={6} + margin="normal" + disabled={loading} + /> + + + + + + + ); +}; + +export default ConfigureFleetModal; diff --git a/ui/src/components/DrawerLayout.tsx b/ui/src/components/DrawerLayout.tsx index ddf87cbe1..c262d0fcd 100644 --- a/ui/src/components/DrawerLayout.tsx +++ b/ui/src/components/DrawerLayout.tsx @@ -40,11 +40,13 @@ import { Link, useLocation, useNavigate } from "react-router-dom"; import Logo from "@/components/Logo"; import SupportModal from "@/components/SupportModal"; import { useAuth } from "@/contexts/AuthContext"; +import { useFleet } from "@/contexts/FleetContext"; import useMediaQuery from "@mui/material/useMediaQuery"; import { useTheme } from "@mui/material/styles"; import IconButton from "@mui/material/IconButton"; import MenuIcon from "@mui/icons-material/Menu"; import Footer from "@/components/Footer"; +import FleetBanner from "@/components/FleetBanner"; import { logout } from "@/queries/auth"; const drawerWidth = 250; @@ -83,6 +85,7 @@ export default function DrawerLayout({ const theme = useTheme(); const isMobile = useMediaQuery(theme.breakpoints.down("lg")); const { role, setAuthData } = useAuth(); + const { isFleetManaged } = useFleet(); const isFirstRender = useRef(true); useEffect(() => { @@ -182,15 +185,27 @@ export default function DrawerLayout({ Ella Core - + {isFleetManaged ? ( + + ) : ( + + )} @@ -494,6 +509,7 @@ export default function DrawerLayout({ }} > + {children}