Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions python/osd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ def crc8_dvb_s2(crc, a):
crc = crc << 1
return crc & 0xFF

def format_bytes(data):
return " ".join(f"{b:02X}" for b in data)

def send_msp(s, body):
crc = 0
for x in body:
crc = crc8_dvb_s2(crc, x)
msp = [ord('$'),ord('X'),ord('<')]
msp = msp + body
msp.append(crc)
msp = bytes([ord('$'), ord('X'), ord('<')] + body + [crc])
s.write(msp)
print('Sending ' + str(msp))
print("Sending " + format_bytes(msp))

def send_clear(s):
msp = [0,0xb6,0x00,1,0,0x02]
Expand All @@ -46,10 +47,86 @@ def send_msg(s, row, col, str):
msp.append(ord(x))
send_msp(s, msp)

class MspReader:
def __init__(self):
self.buffer = bytearray()
self.text_buffer = bytearray()

def _flush_text(self, chunk):
events = []
if not chunk:
return events
self.text_buffer.extend(chunk)
while True:
newline_pos = -1
for marker in (b"\n", b"\r"):
pos = self.text_buffer.find(marker)
if pos != -1 and (newline_pos == -1 or pos < newline_pos):
newline_pos = pos
if newline_pos == -1:
break
line = bytes(self.text_buffer[:newline_pos]).decode("utf-8", errors="replace").strip()
del self.text_buffer[:newline_pos + 1]
if line:
events.append(("text", line))
return events

def feed(self, data):
self.buffer.extend(data)
events = []
while True:
start = self.buffer.find(b"$X")
if start < 0:
events.extend(self._flush_text(self.buffer))
self.buffer.clear()
break
if start > 0:
events.extend(self._flush_text(self.buffer[:start]))
del self.buffer[:start]
if len(self.buffer) < 9:
break
direction = self.buffer[2]
if direction not in (ord("<"), ord(">"), ord("!")):
events.extend(self._flush_text(self.buffer[:1]))
del self.buffer[0]
continue
payload_size = self.buffer[6] | (self.buffer[7] << 8)
frame_size = 9 + payload_size
if len(self.buffer) < frame_size:
break
frame = bytes(self.buffer[:frame_size])
del self.buffer[:frame_size]
body = frame[3:-1]
expected_crc = frame[-1]
crc = 0
for value in body:
crc = crc8_dvb_s2(crc, value)
events.append(("packet", frame, crc == expected_crc))
return events

def describe_packet(frame, crc_valid):
direction = chr(frame[2])
function = frame[5] << 8 | frame[4]
payload_size = frame[6] | (frame[7] << 8)
payload = frame[8:8 + payload_size]
status = "OK" if crc_valid else "BAD CRC"
prefix = f"Received {direction} 0x{function:04X} [{status}]"
if payload:
return f"{prefix}: {format_bytes(payload)}"
return prefix

def thread_function(s: serial.Serial):
reader = MspReader()
while True:
b = s.readall()
if len(b): print(b)
data = s.read(s.in_waiting or 1)
if not data:
continue
for event in reader.feed(data):
if event[0] == "text":
print(f"Serial log: {event[1]}")
else:
_, frame, crc_valid = event
print(describe_packet(frame, crc_valid))

def short_help():
print("Command should be one of:")
Expand All @@ -60,7 +137,7 @@ def short_help():

def help():
print()
print("Depending on the OSD font only UPPERCASE letters ay display as actual letters,")
print("Depending on the OSD font only UPPERCASE letters may display as actual letters,")
print("this is because the other character positions are used to display other symbols on the OSD.")
short_help()
print()
Expand All @@ -82,7 +159,7 @@ def help():
args.port = serials_find.get_serial_port()

s = serial.Serial(port=args.port, baudrate=args.baud, bytesize=8, parity='N', stopbits=1, timeout=1, xonxoff=0, rtscts=0)
threading.Thread(target=thread_function, args=(s,)).start()
threading.Thread(target=thread_function, args=(s,), daemon=True).start()

help()
for line in sys.stdin:
Expand Down
Loading
Loading