Skip to content

Commit 46f69cb

Browse files
authored
🚚 release (#64)
2 parents ba934a6 + f8eace7 commit 46f69cb

9 files changed

Lines changed: 3423 additions & 1552 deletions

File tree

‎README.md‎

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,170 @@ Request-level metadata is included in the `IngestRequest` and can be useful for:
286286
- Debugging and auditing data imports
287287
- Adding contextual information for downstream processing
288288

289+
### Message chunking
290+
291+
When ingesting large datasets, you may encounter gRPC message size limits (typically 4 MB). The SDK provides automatic message chunking to split large entity lists into size-appropriate chunks that stay within these limits.
292+
293+
#### Automatic chunking
294+
295+
Enable chunking by using `WithChunking()` when calling `Ingest`:
296+
297+
```go
298+
package main
299+
300+
import (
301+
"context"
302+
"fmt"
303+
"log"
304+
305+
"github.com/netboxlabs/diode-sdk-go/diode"
306+
)
307+
308+
func main() {
309+
client, err := diode.NewClient(
310+
"grpc://localhost:8080/diode",
311+
"example-app",
312+
"0.1.0",
313+
diode.WithClientID("YOUR_CLIENT_ID"),
314+
diode.WithClientSecret("YOUR_CLIENT_SECRET"),
315+
)
316+
if err != nil {
317+
log.Fatal(err)
318+
}
319+
320+
// Create a large number of entities
321+
entities := make([]diode.Entity, 0)
322+
for i := 0; i < 10000; i++ {
323+
entities = append(entities, &diode.Device{
324+
Name: diode.String(fmt.Sprintf("Device %d", i)),
325+
Site: &diode.Site{
326+
Name: diode.String("Site ABC"),
327+
},
328+
DeviceType: &diode.DeviceType{
329+
Model: diode.String("Device Type A"),
330+
},
331+
Role: &diode.DeviceRole{
332+
Name: diode.String("Role ABC"),
333+
},
334+
})
335+
}
336+
337+
// Use chunking with default 3.0 MB chunk size
338+
resp, err := client.Ingest(
339+
context.Background(),
340+
entities,
341+
diode.WithChunking(0), // 0 = use default
342+
)
343+
if err != nil {
344+
log.Fatal(err)
345+
}
346+
log.Printf("Success\n")
347+
}
348+
```
349+
350+
#### Custom chunk size
351+
352+
You can specify a custom chunk size in megabytes:
353+
354+
```go
355+
// Use 3.5 MB chunks instead of the default 3.0 MB
356+
resp, err := client.Ingest(
357+
context.Background(),
358+
entities,
359+
diode.WithChunking(3.5),
360+
)
361+
```
362+
363+
#### Manual chunking
364+
365+
For more control, you can manually chunk entities using the `CreateMessageChunks` function:
366+
367+
```go
368+
package main
369+
370+
import (
371+
"context"
372+
"log"
373+
374+
"github.com/netboxlabs/diode-sdk-go/diode"
375+
pb "github.com/netboxlabs/diode-sdk-go/diode/v1/diodepb"
376+
)
377+
378+
func main() {
379+
client, err := diode.NewClient(
380+
"grpc://localhost:8080/diode",
381+
"example-app",
382+
"0.1.0",
383+
diode.WithClientID("YOUR_CLIENT_ID"),
384+
diode.WithClientSecret("YOUR_CLIENT_SECRET"),
385+
)
386+
if err != nil {
387+
log.Fatal(err)
388+
}
389+
390+
// Create entities
391+
entities := []diode.Entity{
392+
// ... many entities
393+
}
394+
395+
// Convert to proto entities
396+
protoEntities := make([]*pb.Entity, 0)
397+
for _, entity := range entities {
398+
protoEntities = append(protoEntities, entity.ConvertToProtoEntity())
399+
}
400+
401+
// Manually chunk with custom size (3.5 MB)
402+
chunks := diode.CreateMessageChunks(protoEntities, 3.5)
403+
404+
log.Printf("Split %d entities into %d chunks\n", len(protoEntities), len(chunks))
405+
406+
// Ingest each chunk
407+
for i, chunk := range chunks {
408+
log.Printf("Ingesting chunk %d of %d (%d entities)\n", i+1, len(chunks), len(chunk))
409+
resp, err := client.IngestProto(context.Background(), chunk)
410+
if err != nil {
411+
log.Fatalf("Failed to ingest chunk %d: %v\n", i+1, err)
412+
}
413+
if resp != nil && resp.Errors != nil {
414+
log.Printf("Chunk %d errors: %v\n", i+1, resp.Errors)
415+
}
416+
}
417+
log.Printf("Successfully ingested all chunks\n")
418+
}
419+
```
420+
421+
#### Estimating message size
422+
423+
You can estimate the size of your entities before chunking:
424+
425+
```go
426+
import pb "github.com/netboxlabs/diode-sdk-go/diode/v1/diodepb"
427+
428+
protoEntities := make([]*pb.Entity, 0)
429+
for _, entity := range entities {
430+
protoEntities = append(protoEntities, entity.ConvertToProtoEntity())
431+
}
432+
433+
sizeBytes := diode.EstimateMessageSize(protoEntities)
434+
sizeMB := float64(sizeBytes) / (1024 * 1024)
435+
log.Printf("Estimated message size: %.2f MB\n", sizeMB)
436+
437+
if sizeMB > 3.0 {
438+
log.Printf("Message exceeds 3 MB, chunking recommended\n")
439+
}
440+
```
441+
442+
#### How chunking works
443+
444+
The chunking algorithm uses greedy bin-packing to efficiently group entities:
445+
446+
1. It accumulates entities until adding the next one would exceed the size limit
447+
2. When the limit would be exceeded, it starts a new chunk
448+
3. Each chunk includes the base overhead of an `IngestRequest` protobuf message
449+
4. Entity order is preserved across chunks
450+
451+
The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB message size limit, accounting for protobuf serialization overhead and network protocol overhead.
452+
289453
### TLS verification and certificates
290454

291455
TLS verification is controlled by the target URL scheme:

‎diode/chunking.go‎

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package diode
2+
3+
import (
4+
"google.golang.org/protobuf/proto"
5+
6+
pb "github.com/netboxlabs/diode-sdk-go/diode/v1/diodepb"
7+
)
8+
9+
const (
10+
// DefaultMaxChunkSizeMB is the default maximum chunk size in megabytes.
11+
// This provides a safe margin below the gRPC 4 MB message size limit,
12+
// accounting for protobuf serialization overhead.
13+
DefaultMaxChunkSizeMB = 3.0
14+
)
15+
16+
// CreateMessageChunks creates size-aware chunks from entities using greedy bin-packing.
17+
//
18+
// This function chunks entities to ensure each chunk stays under the specified
19+
// size limit. It uses a greedy bin-packing algorithm that accumulates entities
20+
// until adding the next entity would exceed the limit, then starts a new chunk.
21+
//
22+
// The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB
23+
// message size limit, accounting for protobuf serialization overhead.
24+
//
25+
// Parameters:
26+
// - entities: Slice of Entity protobuf messages to chunk
27+
// - maxChunkSizeMB: Maximum chunk size in MB (use 0 for default of 3.0 MB)
28+
//
29+
// Returns:
30+
// - Slice of entity chunks, each under maxChunkSizeMB. Returns at least
31+
// one chunk even if the input is empty.
32+
//
33+
// Examples:
34+
//
35+
// entities := []*diodepb.Entity{entity1, entity2, entity3, ...}
36+
// chunks := CreateMessageChunks(entities, 0) // Use default 3.0 MB
37+
// for _, chunk := range chunks {
38+
// client.IngestProto(ctx, chunk)
39+
// }
40+
//
41+
// // Use a custom chunk size
42+
// chunks = CreateMessageChunks(entities, 3.5)
43+
func CreateMessageChunks(entities []*pb.Entity, maxChunkSizeMB float64) [][]*pb.Entity {
44+
// Use default if not specified
45+
if maxChunkSizeMB <= 0 {
46+
maxChunkSizeMB = DefaultMaxChunkSizeMB
47+
}
48+
49+
// Handle empty input
50+
if len(entities) == 0 {
51+
return [][]*pb.Entity{entities}
52+
}
53+
54+
// Convert MB to bytes
55+
maxChunkSizeBytes := int(maxChunkSizeMB * 1024 * 1024)
56+
57+
// Quick check: if all entities fit in one chunk, return early
58+
totalSize := EstimateMessageSize(entities)
59+
if totalSize <= maxChunkSizeBytes {
60+
return [][]*pb.Entity{entities}
61+
}
62+
63+
// Greedy bin-packing: accumulate entities until limit reached
64+
baseOverhead := proto.Size(&pb.IngestRequest{})
65+
chunks := make([][]*pb.Entity, 0)
66+
currentChunk := make([]*pb.Entity, 0)
67+
currentChunkSize := baseOverhead // Start with overhead for the chunk
68+
69+
for _, entity := range entities {
70+
entitySize := proto.Size(entity)
71+
projectedSize := currentChunkSize + entitySize
72+
73+
// Check if adding this entity would exceed limit
74+
if len(currentChunk) > 0 && projectedSize > maxChunkSizeBytes {
75+
// Finalize current chunk and start new one
76+
chunks = append(chunks, currentChunk)
77+
currentChunk = []*pb.Entity{entity}
78+
currentChunkSize = baseOverhead + entitySize
79+
} else {
80+
// Add entity to current chunk
81+
currentChunk = append(currentChunk, entity)
82+
currentChunkSize = projectedSize
83+
}
84+
}
85+
86+
// Add final chunk if not empty
87+
if len(currentChunk) > 0 {
88+
chunks = append(chunks, currentChunk)
89+
}
90+
91+
// Return chunks or original entities if no chunks were created
92+
if len(chunks) == 0 {
93+
return [][]*pb.Entity{entities}
94+
}
95+
96+
return chunks
97+
}
98+
99+
// EstimateMessageSize estimates the serialized size of entities in bytes.
100+
//
101+
// Calculates the total size by summing individual entity sizes plus the
102+
// IngestRequest protobuf overhead.
103+
//
104+
// Parameters:
105+
// - entities: Slice of Entity protobuf messages
106+
//
107+
// Returns:
108+
// - Estimated size in bytes including IngestRequest overhead
109+
//
110+
// Examples:
111+
//
112+
// entities := []*diodepb.Entity{entity1, entity2, entity3}
113+
// sizeBytes := EstimateMessageSize(entities)
114+
// sizeMB := float64(sizeBytes) / (1024 * 1024)
115+
// fmt.Printf("Estimated size: %.2f MB\n", sizeMB)
116+
func EstimateMessageSize(entities []*pb.Entity) int {
117+
baseOverhead := proto.Size(&pb.IngestRequest{})
118+
entitySizesSum := 0
119+
for _, entity := range entities {
120+
entitySizesSum += proto.Size(entity)
121+
}
122+
return baseOverhead + entitySizesSum
123+
}

0 commit comments

Comments
 (0)