Skip to content
Open
10 changes: 8 additions & 2 deletions dimos/robot/unitree/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import asyncio
from dataclasses import dataclass
import functools
import os
import threading
import time
from typing import Any, TypeAlias
Expand Down Expand Up @@ -85,12 +86,17 @@ def to_ndarray(self, format=None): # type: ignore[no-untyped-def]
class UnitreeWebRTCConnection(Resource):
_SPORT_API_ID_RAGEMODE: int = 2059

def __init__(self, ip: str, mode: str = "ai") -> None:
def __init__(self, ip: str, mode: str = "ai", aes_128_key: str | None = None) -> None:
self.ip = ip
self.mode = mode
self.stop_timer: threading.Timer | None = None
self.cmd_vel_timeout = 0.2
self.conn = LegionConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip)
# Per-device AES-128 key required by G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also include GO2 firmware 1.1.15 as it has the same issue and this should fix it for the GO2 too !

# Fetch with: unitree-fetch-aes-key --email YOU --sn <serial>
if aes_128_key is None:
aes_128_key = os.environ.get("UNITREE_AES_128_KEY")
Comment thread
mihai-chiorean marked this conversation as resolved.
Outdated
extra: dict[str, Any] = {"aes_128_key": aes_128_key} if aes_128_key else {}
self.conn = LegionConnection(WebRTCConnectionMethod.LocalSTA, ip=self.ip, **extra)
self.connect()

def connect(self) -> None:
Expand Down
7 changes: 6 additions & 1 deletion dimos/robot/unitree/g1/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
class G1Config(ModuleConfig):
ip: str = Field(default_factory=lambda m: m["g"].robot_ip)
connection_type: str = Field(default_factory=lambda m: m["g"].unitree_connection_type)
# Per-device AES-128 key for G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
# If unset here, UnitreeWebRTCConnection falls back to the UNITREE_AES_128_KEY env var.
aes_128_key: str | None = None


class G1ConnectionBase(Module, ABC):
Expand Down Expand Up @@ -78,7 +81,9 @@ def start(self) -> None:

match self.config.connection_type:
case "webrtc":
self.connection = UnitreeWebRTCConnection(self.config.ip)
self.connection = UnitreeWebRTCConnection(
self.config.ip, aes_128_key=self.config.aes_128_key
)
case "replay":
raise ValueError("Replay connection not implemented for G1 robot")
case "mujoco":
Expand Down
9 changes: 8 additions & 1 deletion dimos/robot/unitree/g1/effectors/high_level/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
class G1HighLevelWebRtcConfig(ModuleConfig):
ip: str | None = None
connection_mode: str = "ai"
# Per-device AES-128 key for G1 firmware >= 1.5.1 (data2=3 WebRTC handshake).
# If unset here, UnitreeWebRTCConnection falls back to the UNITREE_AES_128_KEY env var.
aes_128_key: str | None = None


class G1HighLevelWebRtc(Module, HighLevelG1Spec):
Expand All @@ -62,7 +65,11 @@ def __init__(self, *args: Any, g: GlobalConfig = global_config, **kwargs: Any) -
def start(self) -> None:
super().start()
assert self.config.ip is not None, "ip must be set in G1HighLevelWebRtcConfig"
self.connection = UnitreeWebRTCConnection(self.config.ip, self.config.connection_mode)
self.connection = UnitreeWebRTCConnection(
self.config.ip,
self.config.connection_mode,
aes_128_key=self.config.aes_128_key,
)
self.connection.start()
self.register_disposable(Disposable(self.cmd_vel.subscribe(self.move)))

Expand Down
91 changes: 91 additions & 0 deletions dimos/robot/unitree/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2025-2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for UnitreeWebRTCConnection's aes_128_key kwarg + env-var fallback.

Pure-Python tests — no hardware, no network. Mocks the LegionConnection driver
and the connect() side-effect so __init__ stays inside the kwarg-forwarding logic.
"""

from typing import Any
from unittest.mock import MagicMock

import pytest

from dimos.robot.unitree import connection as conn_mod
from dimos.robot.unitree.connection import UnitreeWebRTCConnection


@pytest.fixture
def stub_legion(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
"""Replace LegionConnection in the module with a MagicMock and suppress
UnitreeWebRTCConnection.connect so __init__ doesn't try to dial out."""
monkeypatch.setattr(UnitreeWebRTCConnection, "connect", lambda self: None)
legion = MagicMock(name="LegionConnection")
monkeypatch.setattr(conn_mod, "LegionConnection", legion)
return legion


def _aes_kwarg(legion: MagicMock) -> Any:
"""Pull aes_128_key out of the LegionConnection call args, or None if absent."""
_args, kwargs = legion.call_args
return kwargs.get("aes_128_key")


def test_aes_key_omitted_when_neither_kwarg_nor_env(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Default behaviour: no kwarg, no env var → aes_128_key not forwarded.

Guarantees the call is byte-identical to the pre-PR behaviour for users
on G1 firmware <1.5.1 and all Go2 robots.
"""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161")
assert _aes_kwarg(stub_legion) is None
assert "aes_128_key" not in stub_legion.call_args.kwargs


def test_aes_key_from_explicit_kwarg(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Caller passes the key directly → forwarded verbatim."""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="aa" * 16)
assert _aes_kwarg(stub_legion) == "aa" * 16


def test_aes_key_from_env_when_kwarg_none(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Env-var fallback: kwarg unset → UNITREE_AES_128_KEY is used."""
monkeypatch.setenv("UNITREE_AES_128_KEY", "bb" * 16)
UnitreeWebRTCConnection(ip="192.168.123.161")
assert _aes_kwarg(stub_legion) == "bb" * 16


def test_explicit_kwarg_beats_env(stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch) -> None:
"""Precedence: explicit kwarg wins over UNITREE_AES_128_KEY env var."""
monkeypatch.setenv("UNITREE_AES_128_KEY", "from-env")
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="from-kwarg")
assert _aes_kwarg(stub_legion) == "from-kwarg"


def test_empty_string_kwarg_skips_forwarding(
stub_legion: MagicMock, monkeypatch: pytest.MonkeyPatch
) -> None:
"""An empty-string kwarg is treated as 'no key' (truthiness guard)."""
monkeypatch.delenv("UNITREE_AES_128_KEY", raising=False)
UnitreeWebRTCConnection(ip="192.168.123.161", aes_128_key="")
assert "aes_128_key" not in stub_legion.call_args.kwargs