Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Channel struct {
label string
rwc io.ReadWriteCloser

// PacketOriented enables datagram-boundary recovery: after a parse error the
// remainder of the current datagram is discarded so the next Read always starts
// on a fresh datagram. Set by channel_provider when the endpoint implements
// packetOrientedEndpoint.
PacketOriented bool

ctx context.Context
ctxCancel func()
frameWriter *frame.ReadWriter
Expand All @@ -54,6 +60,7 @@ func (ch *Channel) initialize() error {
ByteReadWriter: ch.rwc,
DialectRW: ch.node.dialectRW,
InKey: ch.node.InKey,
PacketOriented: ch.PacketOriented,
}
err = ch.frameWriter.Initialize()
if err != nil {
Expand Down Expand Up @@ -153,6 +160,9 @@ func (ch *Channel) runReader() error {
if err != nil {
var eerr frame.ReadError
if errors.As(err, &eerr) {
if ch.PacketOriented {
ch.frameWriter.DiscardBuffered()
}
ch.node.pushEvent(&EventParseError{err, ch})
continue
}
Expand Down
12 changes: 12 additions & 0 deletions channel_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import (
"fmt"
)

// packetOrientedEndpoint is implemented by endpoints whose connections carry
// discrete, self-contained packets (e.g. UDP datagrams). When recognised by
// channelProvider, the resulting Channel gets PacketOriented = true, which
// causes the frame reader to discard the remainder of a bad datagram after a
// parse error instead of continuing to scan inside it.
type packetOrientedEndpoint interface {
isPacketOrientedEndpoint() bool
}

type channelProvider struct {
node *Node
endpoint Endpoint
Expand Down Expand Up @@ -45,6 +54,9 @@ func (cp *channelProvider) run() {
label: label,
rwc: rwc,
}
if poe, ok := cp.endpoint.(packetOrientedEndpoint); ok && poe.isPacketOrientedEndpoint() {
ch.PacketOriented = true
}
err = ch.initialize()
if err != nil {
panic(fmt.Errorf("newChannel unexpected error: %w", err))
Expand Down
130 changes: 130 additions & 0 deletions endpoint_client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gomavlib

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -194,6 +195,135 @@ func TestEndpointClient(t *testing.T) {
}
}

func TestEndpointUDPClientRecoverAfterMalformedDatagram(t *testing.T) {
pc, err := net.ListenPacket("udp4", "127.0.0.1:5604")
require.NoError(t, err)
defer pc.Close()

var rawFrame bytes.Buffer
dialectRW := &dialect.ReadWriter{Dialect: testDialect}
err = dialectRW.Initialize()
require.NoError(t, err)

fw := &frame.Writer{
ByteWriter: &rawFrame,
DialectRW: dialectRW,
}
err = fw.Initialize()
require.NoError(t, err)

sw := &streamwriter.Writer{
FrameWriter: fw,
Version: streamwriter.V2,
SystemID: 11,
}
err = sw.Initialize()
require.NoError(t, err)

err = sw.Write(&MessageHeartbeat{
Type: 6,
Autopilot: 5,
BaseMode: 4,
CustomMode: 3,
SystemStatus: 2,
MavlinkVersion: 1,
})
require.NoError(t, err)

validDatagram := append([]byte(nil), rawFrame.Bytes()...)
malformedDatagram := []byte("\x11\x22\x33\x44\x55")
serverErr := make(chan error, 1)

go func() {
buf := make([]byte, 2048)
n, addr, err2 := pc.ReadFrom(buf)
if err2 != nil {
serverErr <- err2
return
}
if n <= 0 {
serverErr <- fmt.Errorf("empty datagram")
return
}

_, err2 = pc.WriteTo(malformedDatagram, addr)
if err2 != nil {
serverErr <- err2
return
}

_, err2 = pc.WriteTo(validDatagram, addr)
if err2 != nil {
serverErr <- err2
return
}

serverErr <- nil
}()

node, err := NewNode(NodeConf{
Dialect: testDialect,
OutVersion: V2,
OutSystemID: 10,
Endpoints: []EndpointConf{EndpointUDPClient{"127.0.0.1:5604"}},
HeartbeatDisable: true,
})
require.NoError(t, err)
defer node.Close()

evt := <-node.Events()
require.Equal(t, &EventChannelOpen{
Channel: evt.(*EventChannelOpen).Channel,
}, evt)

err = node.WriteMessageAll(&MessageHeartbeat{
Type: 1,
Autopilot: 2,
BaseMode: 3,
CustomMode: 6,
SystemStatus: 4,
MavlinkVersion: 5,
})
require.NoError(t, err)

deadline := time.After(2 * time.Second)
gotParseError := false

for {
select {
case err := <-serverErr:
require.NoError(t, err)

case ev := <-node.Events():
switch et := ev.(type) {
case *EventParseError:
gotParseError = true

case *EventFrame:
require.True(t, gotParseError)
require.Equal(t, &frame.V2Frame{
SequenceNumber: 0,
SystemID: 11,
ComponentID: 1,
Message: &MessageHeartbeat{
Type: 6,
Autopilot: 5,
BaseMode: 4,
CustomMode: 3,
SystemStatus: 2,
MavlinkVersion: 1,
},
Checksum: et.Frame.GetChecksum(),
}, et.Frame)
return
}

case <-deadline:
t.Fatalf("did not receive valid frame after malformed UDP datagram")
}
}
}

func TestEndpointClientIdleTimeout(t *testing.T) {
for _, ca := range []string{"tcp"} {
t.Run(ca, func(t *testing.T) {
Expand Down
57 changes: 57 additions & 0 deletions endpoint_udp_client_packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gomavlib

import (
"context"
"net"
)

// EndpointUDPClientPacket is a UDP client endpoint that treats each incoming
// UDP datagram as an atomic unit.
//
// Unlike EndpointUDPClient (which uses a stream-oriented bufio reader), this
// endpoint sets PacketOriented = true on the resulting Channel. After any
// parse error the frame reader discards the remaining bytes of the malformed
// datagram before attempting the next read, so a single bad datagram never
// contaminates parsing of the next one.
//
// Use this endpoint when the remote peer is a MAVLink device that sends
// well-formed datagrams but the link may occasionally deliver corrupted or
// out-of-order packets (e.g. a radio-link UDP tunnel in a lossy RF channel).
type EndpointUDPClientPacket struct {
// domain name or IP of the server to connect to, example: 1.2.3.4:5600
Address string
}

func (conf EndpointUDPClientPacket) init(node *Node) (Endpoint, error) {
e := &endpointClientPacket{
conf: conf,
endpointClient: &endpointClient{
node: node,
conf: EndpointCustomClient{
Connect: func(ctx context.Context) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "udp4", conf.Address)
},
Label: "udp:" + conf.Address,
},
},
}
err := e.endpointClient.initialize()
return e, err
}

// endpointClientPacket wraps endpointClient and marks itself as
// packet-oriented so that channelProvider enables per-datagram recovery.
type endpointClientPacket struct {
conf EndpointUDPClientPacket
*endpointClient
}

// Conf returns the EndpointUDPClientPacket configuration.
func (e *endpointClientPacket) Conf() EndpointConf {
return e.conf
}

// isPacketOrientedEndpoint implements packetOrientedEndpoint.
func (e *endpointClientPacket) isPacketOrientedEndpoint() bool {
return true
}
Loading