Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,641 changes: 1,731 additions & 1,910 deletions pixi.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ python = "3.11.*"
numpy = "==2.0.2"
pygame = "==2.6.1"
protobuf = "==3.20.2"
gymnasium = "==1.0.0"
gymnasium = "==1.3.0"
pyserial = "==3.5"
pytest = "==8.3.4"
ruff = ">=0.9.8"
Expand Down Expand Up @@ -42,7 +42,7 @@ cmake = "<3.27"
libode = ">=0.16.2,<0.17"

[feature.robosim.pypi-dependencies]
rc-robosim = "<2"
rc-robosim = ">=1.2,<2"
numpy = "<=2.0.2"

[tasks]
Expand All @@ -56,3 +56,6 @@ precommit-uninstall = "pre-commit uninstall"

[environments]
robosim = { features = ["robosim"], no-default-feature = true }

[target.linux-64.tasks]
main = "setsid python -m main"
2 changes: 1 addition & 1 deletion utama_core/config/robot_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class RobotParams:
GRSIM_PARAMS = RobotParams(
MAX_VEL=2,
MAX_ANGULAR_VEL=4,
MAX_ACCELERATION=8,
MAX_ACCELERATION=4,
MAX_ANGULAR_ACCELERATION=50,
KICK_SPD=5,
DRIBBLE_SPD=3,
Expand Down
96 changes: 13 additions & 83 deletions utama_core/data_processing/predictors/position.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,94 +3,24 @@
from utama_core.entities.data.vector import Vector2D
from utama_core.entities.game import Game

### Incomplete Code Snippet ###

# def predict_frame_after(self, t: float) -> VisionData:
# yellow_pos = [
# self.predict_object_pos_after(t, RobotEntity(Colour.YELLOW, i))
# for i in range(len(self.get_robots_pos(True)))
# ]
# blue_pos = [
# self.predict_object_pos_after(t, RobotEntity(Colour.BLUE, i))
# for i in range(len(self.get_robots_pos(False)))
# ]
# ball_pos = self.predict_object_pos_after(t, Ball)
# if ball_pos is None or None in yellow_pos or None in blue_pos:
# return None
# else:
# return VisionData(
# self._records[-1].ts + t,
# list(map(lambda pos: VisionRobotData(pos[0], pos[1], 0), yellow_pos)),
# list(map(lambda pos: VisionRobotData(pos[0], pos[1], 0), blue_pos)),
# [VisionBallData(ball_pos[0], ball_pos[1], 0, 1)], # TODO : Support z axis
# )
def predict_ball_pos_at_x(game: Game, x: float) -> Optional[Vector2D]:
"""Predict where the ball will cross a given x-coordinate.

# ### General Object Position Prediction ###
# def predict_object_pos_after(self, t: float, object: GameObject) -> Optional[tuple]:
# # If t is after the object has stopped we return the position at which object stopped.
# sx = 0
# sy = 0

# acceleration = self.get_object_acceleration(object)

# if acceleration is None:
# return None

# ax, ay = acceleration
# vels = self.get_object_velocity(object)

# if vels is None:
# ux, uy = None, None
# else:
# ux, uy = vels

# if object is Ball:
# ball = self.get_ball_pos()
# start_x, start_y = ball[0].x, ball[0].y
# else:
# posn = self._get_object_position_at_frame(len(self._records) - 1, object)
# start_x, start_y = posn.x, posn.y

# if ax and ux:
# sx = self._calculate_displacement(ux, ax, t)

# if ay and uy:
# sy = self._calculate_displacement(uy, ay, t)

# return (
# start_x + sx,
# start_y + sy,
# ) # TODO: Doesn't take into account spin / angular vel

# def _calculate_displacement(self, u, a, t) -> float:
# if a == 0: # Handle zero acceleration case
# return u * t
# else:
# stop_time = -u / a
# if stop_time < 0:
# stop_time = float("inf")
# effective_time = min(t, stop_time)
# displacement = (u * effective_time) + (0.5 * a * effective_time**2)
# logger.debug(
# f"Displacement: {displacement} for time: {effective_time}, stop time: {stop_time}"
# )
# return displacement


def predict_ball_pos_at_x(game: Game, x: float) -> Optional[tuple]:
Returns None if the ball is not moving toward ``x`` (stationary along x,
or moving away). Callers should fall back to their own positioning logic
(e.g. goalkeep uses shadow-based ``stop_y``).
"""
vel = game.ball.v.to_2d()

if not vel or not vel[0] or not vel[1]:
return None
if abs(vel.x) < 1e-12:
return None # Ball not moving along x — cannot predict arrival

ux, uy = vel
pos = game.ball.p
bx = pos.x
by = pos.y
pos = game.ball.p.to_2d()
t = (x - pos.x) / vel.x

if uy == 0:
return (bx, by)
if t < 0:
return None # Ball moving away from target x

t = (x - bx) / ux
y = by + uy * t
y = pos.y + vel.y * t
return Vector2D(x, y)
11 changes: 2 additions & 9 deletions utama_core/motion_planning/src/controllers/fastpathplanning.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from abc import ABC, abstractmethod

import numpy as np

from utama_core.config.enums import Mode
from utama_core.config.physical_constants import ROBOT_RADIUS
from utama_core.entities.data.vector import Vector2D
from utama_core.entities.game import Game
from utama_core.entities.game.field import Field
from utama_core.motion_planning.src.common.motion_controller import MotionController
from utama_core.motion_planning.src.fastpathplanning.planner import FastPathPlanner
from utama_core.motion_planning.src.pid.pid import get_pids
Expand All @@ -15,10 +9,9 @@

class FastPathPlanningController(MotionController):
def __init__(self, mode: Mode, rsim_env: SSLStandardEnv | None = None):
self.mode = mode
self.rsim_env: SSLStandardEnv | None = rsim_env
super().__init__(mode, rsim_env)
self.pid_oren, self.pid_trans = get_pids(mode)
self.fpp = FastPathPlanner(env=self.rsim_env)
self.fpp = FastPathPlanner(env=rsim_env)

def calculate(
self,
Expand Down
17 changes: 10 additions & 7 deletions utama_core/motion_planning/src/fastpathplanning/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class FastPathPlanner:
def __init__(self, env: SSLStandardEnv):
def __init__(self, env: SSLStandardEnv | None):
self._env = env
self.config = config
self.OBSTACLE_CLEARANCE = self.config.OBSTACLE_CLEARANCE
Expand Down Expand Up @@ -62,8 +62,9 @@ def _get_obstacles(

obstacle_list.append(obstacle_segment)

# DRAWING: Show the projected velocity line in Red
self._env.draw_line(obstacle_segment, color="Red")
# DRAWING: Show the projected velocity line in Red when an RSim renderer is available.
if self._env is not None:
self._env.draw_line(obstacle_segment, color="Red")

# Field bounds as obstacles (static, usually not drawn to keep screen clean)
tl, br = np.array(field_bounds.top_left), np.array(field_bounds.bottom_right)
Expand Down Expand Up @@ -270,12 +271,14 @@ def _path_to(
# 4. Plan geometric path
final_trajectory, _ = self.check_segment((our_pos, safe_target), obstacles, 0, safe_target, field_bounds)

# 5. Draw the resulting safe path segments
for i in final_trajectory:
self._env.draw_line(i)
# 5. Draw the resulting safe path segments when an RSim renderer is available.
if self._env is not None:
for i in final_trajectory:
self._env.draw_line(i)

# 6. Smooth the path and draw the final "Carrot" target in Blue
new_target = self.smooth_path(final_trajectory, safe_target, our_pos)
self._env.draw_line((our_pos, new_target), color="Blue")
if self._env is not None:
self._env.draw_line((our_pos, new_target), color="Blue")

return new_target
15 changes: 11 additions & 4 deletions utama_core/run/strategy_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,16 @@ def __init__(
self._stop_event = threading.Event()

def _handle_sigint(self, sig, frame):
if self._stop_event.is_set():
signal.default_int_handler(sig, frame)
self._stop_event.set()
signal.default_int_handler(sig, frame)
self._stop_fps_live()
print("\nStopping gracefully. Press Ctrl+C again to force quit.", flush=True)

def _stop_fps_live(self):
if self._fps_live:
self._fps_live.stop()
self._fps_live = None

def _load_mode(self, mode_str: str) -> Mode:
"""Convert a mode string to a Mode enum value.
Expand Down Expand Up @@ -666,8 +674,7 @@ def close(self, stop_command_repeat: int = 20):
self.replay_writer.close()
if self.rsim_env:
self.rsim_env.close()
if self._fps_live:
self._fps_live.stop()
self._stop_fps_live()

def run_test(
self,
Expand Down Expand Up @@ -752,7 +759,7 @@ def run(self):
"""Run the main loop, stepping the game until interrupted.

If an RSim environment is present, it ensures rendering is on. The loop
continues until a KeyboardInterrupt is received, after which resources
continues until interrupted via SIGINT stop event, after which resources
(such as replay writer and rsim env) are closed.
"""
signal.signal(signal.SIGINT, self._handle_sigint)
Expand Down
Loading
Loading