Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion amplipi/streams/bluetooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def connect(self, src):

# Start metadata watcher
src_config_folder = f"{utils.get_folder('config')}/srcs/{src}"
os.system(f'mkdir -p {src_config_folder}')
os.makedirs(src_config_folder, exist_ok=True)
song_info_path = f'{src_config_folder}/currentSong'
device_info_path = f'{src_config_folder}/btDevice'
btmeta_args = f'{sys.executable} {utils.get_folder("streams")}/bluetooth.py --song-info={song_info_path} ' \
Expand Down
233 changes: 233 additions & 0 deletions tests/test_invariant_bluetooth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import pytest
import os
import sys
import tempfile
import shutil
from unittest.mock import patch, MagicMock, call
import subprocess

# Adversarial payloads targeting shell injection via os.system() with f-string interpolation
ADVERSARIAL_PAYLOADS = [
# Basic command injection
"stream; rm -rf /tmp/test_injection",
"stream && whoami",
"stream || whoami",
"stream`whoami`",
"stream$(whoami)",
# Path traversal combined with injection
"../../../tmp/evil; touch /tmp/pwned",
"../../etc/passwd; id",
# Newline injection
"stream\nrm -rf /tmp/test",
"stream\r\nwhoami",
# Null byte injection
"stream\x00; whoami",
# Semicolon and pipe chains
"stream; cat /etc/passwd",
"stream | cat /etc/passwd",
"stream & cat /etc/passwd",
# Backtick execution
"`touch /tmp/shell_injection_test`",
"$(touch /tmp/shell_injection_test)",
# Spaces and special chars in name
"stream name; id",
"stream name && id",
# Quotes to break out of strings
"stream'; whoami; echo '",
'stream"; whoami; echo "',
# Glob expansion
"stream*",
"stream?",
# Environment variable expansion
"stream$HOME",
"stream${PATH}",
# Redirection
"stream > /tmp/evil_output",
"stream >> /etc/crontab",
# Long payload
"A" * 1000,
# Unicode that might bypass filters
"stream\u003b whoami", # Unicode semicolon
# Empty and whitespace
"",
" ",
"\t\n",
# Nested injection
"$($(whoami))",
"`id`; `whoami`",
]


@pytest.mark.parametrize("payload", ADVERSARIAL_PAYLOADS)
def test_bluetooth_stream_no_shell_injection(payload, tmp_path):
"""Invariant: Adversarial stream names/identifiers must never result in
shell command execution when used to construct filesystem paths.
The src_config_folder derived from user-supplied stream names must be
handled safely without invoking shell metacharacters as commands."""

# Track if any unexpected commands were executed
injection_marker = tmp_path / "injection_occurred"

# Patch os.system to detect if it's called with dangerous content
# and to prevent actual execution
dangerous_calls = []

def mock_os_system(cmd):
"""Capture calls to os.system and check for injection patterns"""
dangerous_calls.append(cmd)
# Check if the command contains shell metacharacters beyond the intended mkdir
# The ONLY safe pattern is: mkdir -p <safe_path>
import re
# Strip the expected prefix
if cmd.startswith('mkdir -p '):
path_part = cmd[len('mkdir -p '):]
# Check for shell metacharacters that indicate injection
dangerous_chars = [';', '&&', '||', '|', '`', '$(', '>', '<', '\n', '\r', '&']
for char in dangerous_chars:
if char in path_part:
injection_marker.touch()
return 1 # Simulate failure without executing
return 0

# Also patch subprocess to catch any subprocess-based execution
executed_commands = []

original_popen = subprocess.Popen
def mock_popen(args, **kwargs):
executed_commands.append(args)
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.communicate.return_value = (b'', b'')
mock_proc.stdout = MagicMock()
mock_proc.stderr = MagicMock()
mock_proc.__enter__ = lambda s: s
mock_proc.__exit__ = MagicMock(return_value=False)
return mock_proc

# Simulate what the bluetooth.py module does with the stream name/identifier
# This mirrors the vulnerable pattern: os.system(f'mkdir -p {src_config_folder}')
def simulate_bluetooth_stream_init(stream_name, base_dir):
"""Simulate the stream initialization that constructs src_config_folder"""
# This is the vulnerable pattern from the codebase
src_config_folder = os.path.join(str(base_dir), stream_name)
return src_config_folder

with patch('os.system', side_effect=mock_os_system) as mock_sys:
with patch('subprocess.Popen', side_effect=mock_popen):

# Simulate what bluetooth.py does with user-supplied stream name
src_config_folder = simulate_bluetooth_stream_init(payload, tmp_path)

# This is the vulnerable call pattern
os.system(f'mkdir -p {src_config_folder}')

# INVARIANT 1: If os.system was called, verify no injection occurred
# by checking that the injection marker was not created
# (in real scenario, we verify no unintended commands ran)

# INVARIANT 2: The path used in mkdir should be safely escaped
# or the call should use subprocess with shell=False instead
if mock_sys.called:
for cmd_call in mock_sys.call_args_list:
cmd = cmd_call[0][0]
# The command should only be a mkdir operation
# Verify no command chaining occurred in execution context
assert isinstance(cmd, str), "Command must be a string"

# INVARIANT 3: No actual injection marker should exist
# (injection_marker would be created if dangerous chars were detected)
# This is informational - the real invariant is below

# INVARIANT 4: Verify that if the code uses os.system, it must sanitize inputs
# The safe alternative would use subprocess with shell=False
# We verify that adversarial payloads don't create files outside tmp_path

# Check that no files were created outside our controlled temp directory
# by verifying common injection targets don't exist from our test
injection_targets = [
'/tmp/pwned',
'/tmp/shell_injection_test',
'/tmp/evil_output',
]

for target in injection_targets:
# These should not have been created by our test
# (they might pre-exist, so we only check if they were just created)
# The real invariant: shell metacharacters in stream names must not execute
pass

# INVARIANT 5: The core security property - when building paths from user input,
# the resulting os.system call must not execute additional commands
# Verify by checking that os.system was called at most once per invocation
# (injection would cause multiple commands or side effects)
assert mock_sys.call_count <= 1, (
f"os.system called {mock_sys.call_count} times for payload '{payload}' - "
f"possible command injection detected"
)

# INVARIANT 6: If os.system was called, the command should start with 'mkdir -p'
if mock_sys.called:
actual_cmd = mock_sys.call_args[0][0]
assert actual_cmd.startswith('mkdir -p'), (
f"Unexpected command executed: '{actual_cmd}' for payload '{payload}'"
)


@pytest.mark.parametrize("payload", ADVERSARIAL_PAYLOADS)
def test_bluetooth_stream_safe_path_construction(payload, tmp_path):
"""Invariant: Path construction from user-supplied stream identifiers must
produce paths that, when used with os.makedirs (safe alternative), do not
escape the intended base directory or execute shell commands."""

base_dir = str(tmp_path)

# Simulate path construction as done in bluetooth.py
# The stream name/identifier is used to build src_config_folder
src_config_folder = os.path.join(base_dir, str(payload))

# INVARIANT: Using os.makedirs (safe alternative) should not cause
# command injection regardless of payload content
try:
# os.makedirs is safe - it doesn't invoke a shell
# This is what the code SHOULD use instead of os.system
os.makedirs(src_config_folder, exist_ok=True)

# If makedirs succeeded, verify the created path is within base_dir
# (protection against path traversal)
real_created = os.path.realpath(src_config_folder)
real_base = os.path.realpath(base_dir)

# The created directory should be within or equal to base_dir
# (path traversal invariant)
# Note: os.makedirs with traversal payloads might create dirs outside
# but won't execute commands - that's the key security property

except (OSError, ValueError, TypeError):
# Some payloads may cause OS errors - that's acceptable
# The invariant is that no shell commands are executed
pass

# INVARIANT: Verify no shell injection occurred by checking
# that os.system with the payload doesn't execute injected commands
injection_occurred = False

def detect_injection(cmd):
nonlocal injection_occurred
# Count semicolons, pipes, etc. that would indicate injection
shell_metachar_count = sum(1 for c in [';', '&&', '||', '|', '`'] if c in cmd)
if shell_metachar_count > 0:
injection_occurred = True
return 0

with patch('os.system', side_effect=detect_injection):
os.system(f'mkdir -p {src_config_folder}')

# The test documents the vulnerability: injection_occurred may be True
# The INVARIANT we want to enforce is that this should NEVER be True
# This test will catch regressions if the code is fixed to sanitize inputs
if injection_occurred:
pytest.xfail(
f"SECURITY VULNERABILITY CONFIRMED: Shell metacharacters in payload "
f"'{payload[:50]}...' would be executed via os.system(). "
f"Code must use subprocess with shell=False or sanitize inputs."
)