Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions client/fleet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package client

import (
"bytes"
"context"
"encoding/json"
)

type GetFleetURLResponse struct {
URL string `json:"url"`
}

type UpdateFleetURLOptions struct {
URL string `json:"url"`
}

type RegisterFleetOptions struct {
ActivationToken string `json:"activationToken"`
}

// GetFleetURL retrieves the configured Fleet server URL.
func (c *Client) GetFleetURL(ctx context.Context) (*GetFleetURLResponse, error) {
resp, err := c.Requester.Do(ctx, &RequestOptions{
Type: SyncRequest,
Method: "GET",
Path: "api/v1/fleet/url",
})
if err != nil {
return nil, err
}

var response GetFleetURLResponse

err = resp.DecodeResult(&response)
if err != nil {
return nil, err
}

return &response, nil
}

// UpdateFleetURL sets the Fleet server URL.
func (c *Client) UpdateFleetURL(ctx context.Context, opts *UpdateFleetURLOptions) error {
var body bytes.Buffer

err := json.NewEncoder(&body).Encode(opts)
if err != nil {
return err
}

_, err = c.Requester.Do(ctx, &RequestOptions{
Type: SyncRequest,
Method: "PUT",
Path: "api/v1/fleet/url",
Body: &body,
})
if err != nil {
return err
}

return nil
}

// RegisterFleet registers the Core instance with a Fleet using an activation token.
func (c *Client) RegisterFleet(ctx context.Context, opts *RegisterFleetOptions) error {
var body bytes.Buffer

err := json.NewEncoder(&body).Encode(opts)
if err != nil {
return err
}

_, err = c.Requester.Do(ctx, &RequestOptions{
Type: SyncRequest,
Method: "POST",
Path: "api/v1/fleet/register",
Body: &body,
})
if err != nil {
return err
}

return nil
}

// UnregisterFleet unregisters the Core instance from a Fleet.
func (c *Client) UnregisterFleet(ctx context.Context) error {
_, err := c.Requester.Do(ctx, &RequestOptions{
Type: SyncRequest,
Method: "POST",
Path: "api/v1/fleet/unregister",
})
if err != nil {
return err
}

return nil
}
6 changes: 6 additions & 0 deletions client/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,19 @@ type ClusterStatus struct {
PendingMigration *PendingMigration `json:"pendingMigration,omitempty"`
}

type FleetStatus struct {
Managed bool `json:"managed"`
LastSyncAt string `json:"lastSyncAt,omitempty"`
}

type Status struct {
Version string `json:"version"`
Revision string `json:"revision,omitempty"`
Initialized bool `json:"initialized"`
Ready bool `json:"ready"`
SchemaVersion int `json:"schemaVersion"`
Cluster *ClusterStatus `json:"cluster,omitempty"`
Fleet FleetStatus `json:"fleet"`
}

// GetStatus retrieves the current status of the system.
Expand Down
82 changes: 82 additions & 0 deletions fleet/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2026 Ella Networks

package fleet

import (
"sync"

"github.com/ellanetworks/core/fleet/client"
)

const defaultFleetBufferCapacity = 10_000

// FleetBuffer is a bounded in-memory ring buffer for flow entries destined for
// Fleet sync. Writes are non-blocking: when the buffer is full the oldest entry
// is dropped. Reads drain the entire buffer atomically.
type FleetBuffer struct {
mu sync.Mutex
entries []client.FlowEntry
capacity int
head int
count int
}

// NewFleetBuffer creates a FleetBuffer with the given maximum capacity.
// If capacity <= 0 the default (10 000) is used.
func NewFleetBuffer(capacity int) *FleetBuffer {
if capacity <= 0 {
capacity = defaultFleetBufferCapacity
}

return &FleetBuffer{
entries: make([]client.FlowEntry, capacity),
capacity: capacity,
}
}

// EnqueueFlow appends a flow entry to the buffer. If the buffer is at
// capacity the oldest entry is silently dropped to make room.
func (b *FleetBuffer) EnqueueFlow(entry client.FlowEntry) {
b.mu.Lock()
defer b.mu.Unlock()

idx := (b.head + b.count) % b.capacity

if b.count == b.capacity {
b.entries[b.head] = entry
b.head = (b.head + 1) % b.capacity
} else {
b.entries[idx] = entry
b.count++
}
}

// DrainFlows returns all buffered flow entries and resets the buffer.
// The caller owns the returned slice. Returns nil when empty.
func (b *FleetBuffer) DrainFlows() []client.FlowEntry {
b.mu.Lock()
defer b.mu.Unlock()

if b.count == 0 {
return nil
}

out := make([]client.FlowEntry, b.count)

for i := range b.count {
out[i] = b.entries[(b.head+i)%b.capacity]
}

b.head = 0
b.count = 0

return out
}

// Len returns the current number of buffered entries.
func (b *FleetBuffer) Len() int {
b.mu.Lock()
defer b.mu.Unlock()

return b.count
}
86 changes: 86 additions & 0 deletions fleet/client/fleet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2026 Ella Networks

package client

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"
)

// Fleet is the Core-side client for the Fleet management API. After a
// successful Register call the caller stores the returned token via
// SetToken; every subsequent request carries it in the Authorization header.
type Fleet struct {
url string
client *http.Client
token string
}

// New constructs a Fleet client.
//
// When insecureSkipVerify is false (the default in production
// deployments), TLS server certificates are validated against the system
// trust store. Self-hosted Fleet deployments using self-signed certs
// either install the cert into the Core host's trust store, front Fleet
// with a real cert (Let's Encrypt or internal PKI), or set
// `fleet.insecure-skip-verify: true` in core.yaml to disable validation
// — appropriate for integration tests and local-development setups, but
// not for production.
func New(url string, insecureSkipVerify bool) *Fleet {
transport := http.DefaultTransport

if insecureSkipVerify {
transport = &http.Transport{
// #nosec G402 -- explicitly gated behind fleet.insecure-skip-verify in core.yaml
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
}

return &Fleet{
url: url,
client: &http.Client{
Timeout: 15 * time.Second,
Transport: transport,
},
}
}

// SetToken sets the bearer token used to authenticate sync and unregister
// requests. Empty token disables the Authorization header (only valid before
// registration completes).
func (fc *Fleet) SetToken(token string) {
fc.token = token
}

// addAuth attaches the bearer token to the request when set. Callers use
// this for every request that requires authentication (sync, unregister).
func (fc *Fleet) addAuth(req *http.Request) {
if fc.token != "" {
req.Header.Set("Authorization", "Bearer "+fc.token)
}
}

// checkResponseContentType returns a user-friendly error when the fleet
// server replies with something other than JSON (usually HTML from a
// mis-configured URL).
func checkResponseContentType(res *http.Response) error {
ct := res.Header.Get("Content-Type")
if ct != "" && !isJSONContentType(ct) {
return fmt.Errorf("fleet server at %s returned an unexpected response (Content-Type: %s) — verify the Fleet URL is correct", res.Request.URL.Host, ct)
}

return nil
}

func isJSONContentType(ct string) bool {
for _, prefix := range []string{"application/json", "text/json"} {
if strings.HasPrefix(ct, prefix) {
return true
}
}

return false
}
Loading