From 7987166bf27cdf833b18d31ff7de25b5dab92839 Mon Sep 17 00:00:00 2001 From: dcaayushd Date: Thu, 2 Jul 2026 12:35:20 +0545 Subject: [PATCH] fix: correct default CS channel_map to exclude channel 76 per Core Spec 6.0 --- apps/console.py | 2 +- apps/show.py | 6 ++---- apps/usb_probe.py | 4 ++-- bumble/a2dp.py | 2 +- bumble/crypto/builtin.py | 4 ++-- bumble/device.py | 2 +- bumble/drivers/rtk.py | 1 + bumble/gatt_server.py | 2 +- bumble/pandora/host.py | 2 +- bumble/pandora/l2cap.py | 2 +- bumble/profiles/aics.py | 2 +- bumble/rfcomm.py | 2 +- bumble/smp.py | 2 +- tests/device_test.py | 20 ++++++++++++++++++++ tests/gatt_test.py | 5 +---- 15 files changed, 37 insertions(+), 21 deletions(-) diff --git a/apps/console.py b/apps/console.py index 5a66c381b..febd06778 100644 --- a/apps/console.py +++ b/apps/console.py @@ -579,7 +579,7 @@ async def rssi_monitor_loop(self): async def command(self, command): try: - (keyword, *params) = command.strip().split(' ') + keyword, *params = command.strip().split(' ') keyword = keyword.replace('-', '_').lower() handler = getattr(self, f'do_{keyword}', None) if handler: diff --git a/apps/show.py b/apps/show.py index ddd965290..a29cdf083 100644 --- a/apps/show.py +++ b/apps/show.py @@ -63,9 +63,7 @@ def __init__(self, source): raise ValueError( 'not a valid snoop file, unexpected identification pattern' ) - (self.version_number, self.data_link_type) = struct.unpack( - '>II', source.read(8) - ) + self.version_number, self.data_link_type = struct.unpack('>II', source.read(8)) if self.data_link_type not in (self.DATALINK_H4, self.DATALINK_H1): raise ValueError(f'datalink type {self.data_link_type} not supported') @@ -175,7 +173,7 @@ def read_next_packet(): while not packet_reader.at_end: try: - (timestamp, direction, packet) = read_next_packet() + timestamp, direction, packet = read_next_packet() if packet: tracer.trace(hci.HCI_Packet.from_bytes(packet), direction, timestamp) else: diff --git a/apps/usb_probe.py b/apps/usb_probe.py index 81a266f25..cef4e552a 100644 --- a/apps/usb_probe.py +++ b/apps/usb_probe.py @@ -99,7 +99,7 @@ def show_device_details(device): suffix = ( f'/{alternate_setting}' if interface.getNumSettings() > 1 else '' ) - (class_string, subclass_string) = get_class_info( + class_string, subclass_string = get_class_info( setting.getClass(), setting.getSubClass(), setting.getProtocol() ) details = f'({class_string}, {subclass_string})' @@ -191,7 +191,7 @@ def main(verbose: bool, manufacturer: str, product: str, hci_only: bool): device_id = (device.getVendorID(), device.getProductID()) - (device_class_string, device_subclass_string) = get_class_info( + device_class_string, device_subclass_string = get_class_info( device_class, device_subclass, device_protocol ) diff --git a/bumble/a2dp.py b/bumble/a2dp.py index 9bb2438e6..8e902137d 100644 --- a/bumble/a2dp.py +++ b/bumble/a2dp.py @@ -459,7 +459,7 @@ class VendorSpecificMediaCodecInformation(MediaCodecInformation): @staticmethod def from_bytes(data: bytes) -> VendorSpecificMediaCodecInformation: - (vendor_id, codec_id) = struct.unpack_from(' bytes: diff --git a/bumble/crypto/builtin.py b/bumble/crypto/builtin.py index 03d391e85..909780b2d 100644 --- a/bumble/crypto/builtin.py +++ b/bumble/crypto/builtin.py @@ -166,7 +166,7 @@ def encrypt(self, plaintext: bytes) -> bytes: raise core.InvalidArgumentError(f'wrong block length {len(plaintext)}') rounds = len(self._ke) - 1 - (s1, s2, s3) = [1, 2, 3] + s1, s2, s3 = [1, 2, 3] a = [0, 0, 0, 0] # Convert plaintext to (ints ^ key) @@ -205,7 +205,7 @@ def decrypt(self, cipher_text: bytes) -> bytes: raise core.InvalidArgumentError(f'wrong block length {len(cipher_text)}') rounds = len(self._kd) - 1 - (s1, s2, s3) = [3, 2, 1] + s1, s2, s3 = [3, 2, 1] a = [0, 0, 0, 0] # Convert ciphertext to (ints ^ key) diff --git a/bumble/device.py b/bumble/device.py index f3b431c5e..b39299cb6 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -5442,7 +5442,7 @@ async def create_cs_config( role: int = hci.CsRole.INITIATOR, rtt_type: int = hci.RttType.AA_ONLY, cs_sync_phy: int = hci.CsSyncPhy.LE_1M, - channel_map: bytes = b'\x54\x55\x55\x54\x55\x55\x55\x55\x55\x15', + channel_map: bytes = b'\x54\x55\x55\x54\x55\x55\x55\x55\x55\x05', channel_map_repetition: int = 0x01, channel_selection_type: int = hci.HCI_LE_CS_Create_Config_Command.ChannelSelectionType.ALGO_3B, ch3c_shape: int = hci.HCI_LE_CS_Create_Config_Command.Ch3cShape.HAT, diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py index 24bb78b3a..d6017d2a1 100644 --- a/bumble/drivers/rtk.py +++ b/bumble/drivers/rtk.py @@ -16,6 +16,7 @@ Based on various online bits of information, including the Linux kernel. (see `drivers/bluetooth/btrtl.c`) """ + from __future__ import annotations import asyncio diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py index 31127fb06..4910aa411 100644 --- a/bumble/gatt_server.py +++ b/bumble/gatt_server.py @@ -219,7 +219,7 @@ def get_descriptor_attribute( if not characteristics: return None - (_, characteristic_value) = characteristics + _, characteristic_value = characteristics return next( ( diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index ca686a224..a8ee85289 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -639,7 +639,7 @@ async def Inquiry( await self.device.start_discovery(auto_restart=False) try: while inquiry_result := await inquiry_queue.get(): - (address, class_of_device, eir_data, rssi) = inquiry_result + address, class_of_device, eir_data, rssi = inquiry_result # FIXME: if needed, add support for `page_scan_repetition_mode` and `clock_offset` in Bumble yield InquiryResponse( address=bytes(reversed(bytes(address))), diff --git a/bumble/pandora/l2cap.py b/bumble/pandora/l2cap.py index 85a6642bf..106a9f0b6 100644 --- a/bumble/pandora/l2cap.py +++ b/bumble/pandora/l2cap.py @@ -300,7 +300,7 @@ def craft_pandora_channel( return PandoraChannel(cookie=cookie) def lookup_channel(self, pandora_channel: PandoraChannel) -> L2capChannel: - (connection_handle, source_cid) = json.loads( + connection_handle, source_cid = json.loads( pandora_channel.cookie.value ).values() diff --git a/bumble/profiles/aics.py b/bumble/profiles/aics.py index da3764c96..a7efe4774 100644 --- a/bumble/profiles/aics.py +++ b/bumble/profiles/aics.py @@ -171,7 +171,7 @@ class GainSettingsProperties: @classmethod def from_bytes(cls, data: bytes): - (gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = ( + gain_settings_unit, gain_settings_minimum, gain_settings_maximum = ( struct.unpack('BBB', data) ) return GainSettingsProperties( diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 98f40f5e6..072fc7652 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -870,7 +870,7 @@ def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: ) def on_uih_frame(self, frame: RFCOMM_Frame) -> None: - (mcc_type, c_r, value) = RFCOMM_Frame.parse_mcc(frame.information) + mcc_type, c_r, value = RFCOMM_Frame.parse_mcc(frame.information) if mcc_type == MccType.PN: pn = RFCOMM_MCC_PN.from_bytes(value) diff --git a/bumble/smp.py b/bumble/smp.py index a6b213e32..9f428d431 100644 --- a/bumble/smp.py +++ b/bumble/smp.py @@ -1720,7 +1720,7 @@ def on_smp_pairing_random_command_secure_connections( # Compute the MacKey and LTK a = self.ia + bytes([self.iat]) b = self.ra + bytes([self.rat]) - (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b) + mac_key, self.ltk = crypto.f5(self.dh_key, self.na, self.nb, a, b) # Compute the DH Key checks if self.pairing_method in ( diff --git a/tests/device_test.py b/tests/device_test.py index 3efe4db1e..7df82802c 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -17,6 +17,7 @@ # ----------------------------------------------------------------------------- import asyncio import functools +import inspect import logging import os from unittest import mock @@ -693,6 +694,25 @@ async def test_power_on_default_static_address_should_not_be_any(): assert devices[0].static_address != Address.ANY_RANDOM +# ----------------------------------------------------------------------------- +def test_cs_channel_map_excludes_forbidden_channels(): + forbidden = {0, 1, 23, 24, 25, 76, 77, 78, 79} + default_map = ( + inspect.signature(Device.create_cs_config).parameters['channel_map'].default + ) + + enabled = { + byte_idx * 8 + bit + for byte_idx, byte in enumerate(default_map) + for bit in range(8) + if byte & (1 << bit) + } + + assert enabled.isdisjoint( + forbidden + ), f"Default channel_map enables forbidden CS channels: {enabled & forbidden}" + + # ----------------------------------------------------------------------------- def test_gatt_services_with_gas_and_gatt(): device = Device(host=Host(None, None)) diff --git a/tests/gatt_test.py b/tests/gatt_test.py index e167cc1bd..7294109f7 100644 --- a/tests/gatt_test.py +++ b/tests/gatt_test.py @@ -1191,12 +1191,9 @@ def test_characteristic_property_from_string_assert(): with pytest.raises(TypeError) as e_info: Characteristic.Properties.from_string("BROADCAST,HELLO") - assert ( - str(e_info.value) - == """Characteristic.Properties::from_string() error: + assert str(e_info.value) == """Characteristic.Properties::from_string() error: Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES Got: BROADCAST,HELLO""" - ) # -----------------------------------------------------------------------------