Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion bumble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
HCI_Connection_Complete_Event,
HCI_Connection_Request_Event,
HCI_Disconnection_Complete_Event,
HCI_Link_Key_Request_Event,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already imported hci module, so we may just remove this chain of imports.

HCI_Encryption_Change_Event,
HCI_LE_Advertising_Report_Event,
HCI_LE_CIS_Established_Event,
Expand Down Expand Up @@ -697,7 +698,7 @@ def on_link_cis_disconnected(self, cig_id: int, cis_id: int) -> None:

############################################################
# Classic link connections
############################################################
############################################################

def on_classic_connection_request(
self, peer_address: Address, link_type: int
Expand Down Expand Up @@ -965,6 +966,56 @@ def on_hci_accept_connection_request_command(
self.link.classic_accept_connection(self, command.bd_addr, command.role)
return None

def on_classic_link_key_request_reply_command(self, command: hci.HCI_Link_Key_Request_Reply_Command) -> Optional[bytes]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_hci_link_key_requestr_reply_command

Handler is gotten from f"on_{command.name}".

'''
See Bluetooth spec Vol 4, Part E - 7.1.10 Link Key Request Reply command
'''
bd_addr = (
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
return bytes([HCI_SUCCESS]) + bd_addr

def on_classic_link_key_request_negative_reply_command(self, command: hci.HCI_Link_Key_Request_Negative_Reply_Command) -> Optional[bytes]:
'''
See Bluetooth spec Vol 4, Part E - 7.1.11 Link Key Request Negative Reply command
'''
bd_addr = (
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
return bytes([HCI_SUCCESS]) + bd_addr

def on_classic_authentication_requested_command(self, command: hci.HCI_Authentication_Requested_Command) -> Optional[bytes]:
'''
See Bluetooth spec Vol 4, Part E - 7.1.15 Authentication Requested command
'''
if connection := self.find_classic_connection_by_handle(
command.connection_handle
):
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)

self.send_hci_packet(
HCI_Link_Key_Request_Event(
bd_addr=connection.peer_address
)
)
else:
logger.warning(
f'!!! no connection for handle 0x{command.connection_handle:04X}'
)
return None



def on_hci_enhanced_setup_synchronous_connection_command(
self, command: hci.HCI_Enhanced_Setup_Synchronous_Connection_Command
) -> Optional[bytes]:
Expand Down
9 changes: 9 additions & 0 deletions tests/device_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS,
Address,
HCI_Authentication_Requested_Command,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
Expand Down Expand Up @@ -789,6 +790,14 @@ async def test_accept_classic_connection(roles: tuple[hci.Role, hci.Role]):
assert devices.connections[1].role == roles[1]



# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_secure_simple_pairing():
devices = await TwoDevices.create_with_connection()
await devices[0].authenticate(devices.connections[0])


# -----------------------------------------------------------------------------
async def run_test_device():
await test_device_connect_parallel()
Expand Down
Loading