From 3a3766ebb74f0b590c28e815bf410edd28e1a85d Mon Sep 17 00:00:00 2001 From: onlykey Date: Mon, 29 Jun 2026 17:24:04 -0400 Subject: [PATCH] age plugin: align PQ host protocol with firmware; 1.2.11 packaging/description - setup.py: enable long_description (markdown), bump 1.2.10 -> 1.2.11 - MANIFEST.in: ship README.md - client.py: prefer hidraw backend (#89) - age_plugin: PQ seed in user ECC slot 101-116 (selectable, validated); key type in buffer[6]; keygen via OKSETPRIV 0xFF trigger; Message-enum framing; multi-packet decapsulation via send_large_message2 (mirrors RSA/ECDH decrypt) - tests: hardware-free ML-KEM/X-Wing crypto + HID wire-framing/multipacket --- MANIFEST.in | 1 + onlykey/age_plugin/__init__.py | 51 +++++++++-- onlykey/age_plugin/cli.py | 74 ++++++++++------ onlykey/age_plugin/onlykey_hid.py | 103 ++++++++++++++-------- onlykey/client.py | 8 +- setup.py | 5 +- tests/test_age_pq.py | 118 +++++++++++++++++++++++++ tests/test_age_wire.py | 137 ++++++++++++++++++++++++++++++ 8 files changed, 427 insertions(+), 70 deletions(-) create mode 100644 MANIFEST.in create mode 100644 tests/test_age_pq.py create mode 100644 tests/test_age_wire.py diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..bb3ec5f --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include README.md diff --git a/onlykey/age_plugin/__init__.py b/onlykey/age_plugin/__init__.py index 66bc267..ec72689 100644 --- a/onlykey/age_plugin/__init__.py +++ b/onlykey/age_plugin/__init__.py @@ -11,11 +11,46 @@ __version__ = "0.1.0" PLUGIN_NAME = "onlykey" -# OnlyKey HID message types (from onlykey.client.Message) -OKGETPUBKEY = 236 -OKDECRYPT = 240 -OKGENKEY = 230 - -# OnlyKey reserved key slots for post-quantum -SLOT_MLKEM = 133 -SLOT_XWING = 134 +# OnlyKey HID opcodes (values match onlykey.client.Message) +OKGETPUBKEY = 236 # Message.OKGETPUBKEY +OKDECRYPT = 240 # Message.OKDECRYPT +OKSETPRIV = 239 # Message.OKSETPRIV — also generates an on-device key when + # the key body is all 0xFF (firmware okcore.cpp set_private + # -> okcrypto_generate_random_key) + +# ECC key slots that can hold the 32-byte post-quantum seed. +# +# A ML-KEM/X-Wing key is just a 32-byte seed stored in an ordinary ECC key +# slot, and the algorithm is chosen by the key-type byte (buffer[6]) below — +# not by the slot number. The slot is therefore caller-selectable, but only +# across the USER key slots: firmware exposes 101-116 (16 slots) for user keys. +# Slots 117-132 are RESERVED (e.g. 128 web-derivation, 129/130 HMAC, 131 +# backup, 132 derivation) and must never be used for PQ keys — writing there +# would clobber internal device keys. Firmware getpubkey/decaps enforce this +# with a `< 117` gate (okcrypto.cpp); the host mirrors it here. +ECC_SLOT_MIN = 101 +ECC_SLOT_MAX = 116 # 16 user ECC key slots: 101-116 (117-132 reserved) +DEFAULT_XWING_SLOT = 101 +DEFAULT_MLKEM_SLOT = 102 + + +def validate_ecc_slot(slot): + """Raise ValueError unless slot is a user ECC key slot (101-116). + + 117-132 are reserved by firmware and are intentionally rejected. + """ + if not (ECC_SLOT_MIN <= int(slot) <= ECC_SLOT_MAX): + raise ValueError( + f"PQ key slot must be a user ECC slot {ECC_SLOT_MIN}-{ECC_SLOT_MAX} " + f"(117-132 are reserved), got {slot}" + ) + return int(slot) + +# Firmware key-type identifiers (okcore.h: KEYTYPE_MLKEM768 / KEYTYPE_XWING). +# Sent in the low nibble of buffer[6] so the device routes the operation. +KEYTYPE_MLKEM768 = 5 +KEYTYPE_XWING = 6 + +# An all-0xFF key body sent with OKSETPRIV tells the firmware to generate the +# key on-device (gen_key trigger in okcore.cpp). +GENERATE_ON_DEVICE = b"\xff" * 8 diff --git a/onlykey/age_plugin/cli.py b/onlykey/age_plugin/cli.py index b5c8e31..82bdbec 100644 --- a/onlykey/age_plugin/cli.py +++ b/onlykey/age_plugin/cli.py @@ -20,7 +20,9 @@ import base64 import hashlib -from onlykey.age_plugin import __version__, PLUGIN_NAME, SLOT_XWING +from onlykey.age_plugin import ( + __version__, PLUGIN_NAME, DEFAULT_XWING_SLOT, validate_ecc_slot, +) from onlykey.age_plugin.protocol import ( Stanza, b64encode_no_pad, b64decode_no_pad, run_identity_v1, run_recipient_v1, @@ -137,7 +139,7 @@ def recipient_fingerprint(pubkey: bytes) -> bytes: return hashlib.sha256(pubkey).digest()[:IDENTITY_FINGERPRINT_LEN] -def encode_identity(slot: int = SLOT_XWING) -> str: +def encode_identity(slot: int = DEFAULT_XWING_SLOT) -> str: """Encode an identity string. Contains just the slot number.""" # Identity data: just the slot byte data = bytes([slot]) @@ -181,16 +183,16 @@ def decode_identity(identity: str) -> dict: -def cmd_generate(): +def cmd_generate(slot: int = DEFAULT_XWING_SLOT): """Generate X-Wing keypair on OnlyKey and print recipient/identity.""" from onlykey.age_plugin.onlykey_hid import OnlyKeyPQ - print("Generating X-Wing keypair on OnlyKey...", file=sys.stderr) + print(f"Generating X-Wing keypair on OnlyKey (ECC slot {slot})...", file=sys.stderr) dev = OnlyKeyPQ() - pk = dev.xwing_keygen() + pk = dev.xwing_keygen(slot) recipient = encode_recipient(pk) - identity = encode_identity(SLOT_XWING) + identity = encode_identity(slot) print("# X-Wing public key (produces native age mlkem768x25519 stanzas)", file=sys.stderr) print(f"# Recipient: {recipient}", file=sys.stderr) @@ -202,19 +204,19 @@ def cmd_generate(): print(identity) -def cmd_recipient(): - """Print the current X-Wing public key as an age recipient.""" +def cmd_recipient(slot: int = DEFAULT_XWING_SLOT): + """Print the X-Wing public key in the given ECC slot as an age recipient.""" from onlykey.age_plugin.onlykey_hid import OnlyKeyPQ dev = OnlyKeyPQ() - pk = dev.xwing_getpubkey() + pk = dev.xwing_getpubkey(slot) print(encode_recipient(pk)) -def cmd_identity(): +def cmd_identity(slot: int = DEFAULT_XWING_SLOT): """Print an identity file for use with age -i.""" - identity = encode_identity(SLOT_XWING) - print(f"# age-plugin-onlykey identity (X-Wing slot {SLOT_XWING})") + identity = encode_identity(slot) + print(f"# age-plugin-onlykey identity (X-Wing ECC slot {slot})") print(identity) @@ -240,19 +242,27 @@ def unwrap_callback(identities, stanzas_per_file): return results dev = OnlyKeyPQ() - device_pubkey = dev.xwing_getpubkey() - if len(device_pubkey) != XWING_RECIPIENT_LEN: - raise ValueError( - f"OnlyKey returned an unexpected X-Wing public key length: {len(device_pubkey)}" - ) - device_fingerprint = recipient_fingerprint(device_pubkey) + # The identity carries the ECC slot the key lives in; any of the 32 ECC + # slots (101-132) is valid. Query that slot's public key and match it. matching_identity = None + matching_slot = None for identity in parsed_identities: - if identity["slot"] != SLOT_XWING: + try: + slot = validate_ecc_slot(identity["slot"]) + except ValueError: continue - if identity["fingerprint"] is None or identity["fingerprint"] == device_fingerprint: + try: + device_pubkey = dev.xwing_getpubkey(slot) + except Exception as exc: + print(f"Could not read X-Wing key in slot {slot}: {exc}", file=sys.stderr) + continue + if len(device_pubkey) != XWING_RECIPIENT_LEN: + continue + if (identity["fingerprint"] is None + or identity["fingerprint"] == recipient_fingerprint(device_pubkey)): matching_identity = identity + matching_slot = slot break if matching_identity is None: @@ -293,8 +303,8 @@ def unwrap_callback(identities, stanzas_per_file): ) - # Send ciphertext to OnlyKey for decapsulation - ss = dev.xwing_decaps(enc) + # Send ciphertext to OnlyKey for decapsulation (in the matched slot) + ss = dev.xwing_decaps(enc, slot=matching_slot) # Use shared secret to decrypt the file key via HPKE try: @@ -360,13 +370,26 @@ def main(): print(f"Unknown state machine: {state_machine}", file=sys.stderr) sys.exit(1) + # Optional --slot N / --slot=N selects which ECC slot (101-132) holds the key. + slot = DEFAULT_XWING_SLOT + for i, arg in enumerate(args): + if arg == "--slot" and i + 1 < len(args): + slot = args[i + 1] + elif arg.startswith("--slot="): + slot = arg.split("=", 1)[1] + try: + slot = validate_ecc_slot(slot) + except ValueError as exc: + print(f"Error: {exc}", file=sys.stderr) + sys.exit(1) + # Direct invocation modes if "--generate" in args or "-g" in args: - cmd_generate() + cmd_generate(slot) elif "--recipient" in args or "-r" in args: - cmd_recipient() + cmd_recipient(slot) elif "--identity" in args or "-i" in args: - cmd_identity() + cmd_identity(slot) elif "--version" in args or "-v" in args: print(f"age-plugin-onlykey {__version__}") elif "--help" in args or "-h" in args: @@ -380,6 +403,7 @@ def main(): print(" --generate Generate X-Wing keypair on OnlyKey") print(" --recipient Print recipient (public key) for encryption") print(" --identity Print identity file for decryption") + print(" --slot N User ECC slot 101-116 to use (default 101)") print(" --help Show full help") print() print("Quick start:") diff --git a/onlykey/age_plugin/onlykey_hid.py b/onlykey/age_plugin/onlykey_hid.py index 5906cca..4bbe624 100644 --- a/onlykey/age_plugin/onlykey_hid.py +++ b/onlykey/age_plugin/onlykey_hid.py @@ -12,7 +12,11 @@ import time from onlykey.client import OnlyKey, Message -from . import OKGETPUBKEY, OKDECRYPT, OKGENKEY, SLOT_MLKEM, SLOT_XWING +from . import ( + OKGETPUBKEY, OKDECRYPT, OKSETPRIV, GENERATE_ON_DEVICE, + DEFAULT_MLKEM_SLOT, DEFAULT_XWING_SLOT, + KEYTYPE_MLKEM768, KEYTYPE_XWING, validate_ecc_slot, +) # Sizes XWING_PK_SIZE = 1216 @@ -51,11 +55,28 @@ def _connect(self): time.sleep(0.5) raise RuntimeError("Could not connect to OnlyKey. Is it plugged in and unlocked?") - def _send_and_receive(self, msg_type, slot, payload=b"", + def _send_and_receive(self, msg_type, slot, payload=b"", key_type=None, expected_size=0, timeout_ms=10000): - """Send a message and collect multi-packet response.""" - self.ok.send_message(msg=msg_type, slot_id=slot, payload=payload) + """Send a SINGLE-packet request and collect the response. + Wire layout expected by firmware: buffer[5]=slot, buffer[6]=key type, + buffer[7:]=payload. ``key_type`` is placed in buffer[6] so the device + routes the request to the ML-KEM / X-Wing handler for the ECC slot. + + This is only valid when the request payload fits in one 64-byte report + (keygen trigger, getpubkey). Large inputs that exceed one report — the + decapsulation ciphertext — must use the multi-packet send path; see + ``*_decaps`` below. + """ + body = bytearray() + if key_type is not None: + body.append(key_type & 0x0F) # firmware buffer[6] + body.extend(payload) + self.ok.send_message(msg=Message(msg_type), slot_id=slot, payload=body) + return self._read_response(expected_size=expected_size, timeout_ms=timeout_ms) + + def _read_response(self, expected_size=0, timeout_ms=10000): + """Collect a (possibly multi-packet) response from the device.""" result = bytearray() deadline = time.time() + timeout_ms / 1000 while time.time() < deadline: @@ -75,11 +96,31 @@ def _send_and_receive(self, msg_type, slot, payload=b"", return bytes(result[:expected_size] if expected_size else result) - def xwing_keygen(self): - """Generate X-Wing keypair. Returns 1216-byte public key.""" + def _decaps(self, ciphertext, slot): + """Send a KEM ciphertext for on-device decapsulation; return 32-byte SS. + + The ciphertext (1088 B for ML-KEM, 1120 B for X-Wing) is far larger than + one 64-byte HID report, so it is streamed with the multi-packet protocol + (``send_large_message2``) — the same path the OnlyKey CLI uses to send + RSA/ECDH ciphertext for OKDECRYPT. Each packet carries + [slot, 0xFF-or-final-length, <=57 bytes], which the firmware accumulates + into its large buffer. The device reads the key TYPE from the key stored + in ``slot`` (not from the packet), waits for a button press, then returns + the 32-byte shared secret. + """ + print("Press OnlyKey button to confirm decryption...", file=sys.stderr) + self.ok.send_large_message2( + msg=Message(OKDECRYPT), payload=list(ciphertext), slot_id=slot, + ) + return self._read_response(expected_size=32, timeout_ms=30000) + + def xwing_keygen(self, slot=DEFAULT_XWING_SLOT): + """Generate an X-Wing keypair in the given ECC slot. Returns 1216-byte pubkey.""" + slot = validate_ecc_slot(slot) print("Press OnlyKey button to confirm key generation...", file=sys.stderr) pk = self._send_and_receive( - OKGENKEY, SLOT_XWING, + OKSETPRIV, slot, + payload=GENERATE_ON_DEVICE, key_type=KEYTYPE_XWING, expected_size=XWING_PK_SIZE, timeout_ms=30000, ) @@ -87,10 +128,11 @@ def xwing_keygen(self): raise RuntimeError(f"X-Wing keygen: got {len(pk)} bytes, expected {XWING_PK_SIZE}") return pk - def xwing_getpubkey(self): - """Get X-Wing public key. Returns 1216-byte public key.""" + def xwing_getpubkey(self, slot=DEFAULT_XWING_SLOT): + """Get the X-Wing public key from the given ECC slot. Returns 1216-byte pubkey.""" + slot = validate_ecc_slot(slot) pk = self._send_and_receive( - OKGETPUBKEY, SLOT_XWING, + OKGETPUBKEY, slot, key_type=KEYTYPE_XWING, expected_size=XWING_PK_SIZE, timeout_ms=10000, ) @@ -98,46 +140,39 @@ def xwing_getpubkey(self): raise RuntimeError(f"X-Wing getpubkey: got {len(pk)} bytes, expected {XWING_PK_SIZE}") return pk - def xwing_decaps(self, ciphertext): - """X-Wing decapsulation. Returns 32-byte shared secret.""" + def xwing_decaps(self, ciphertext, slot=DEFAULT_XWING_SLOT): + """X-Wing decapsulation in the given ECC slot. Returns 32-byte shared secret.""" + slot = validate_ecc_slot(slot) if len(ciphertext) != XWING_CT_SIZE: raise ValueError(f"X-Wing CT must be {XWING_CT_SIZE} bytes, got {len(ciphertext)}") - print("Press OnlyKey button to confirm decryption...", file=sys.stderr) - ss = self._send_and_receive( - OKDECRYPT, SLOT_XWING, - payload=ciphertext, - expected_size=XWING_SS_SIZE, - timeout_ms=30000, - ) + ss = self._decaps(ciphertext, slot) if len(ss) != XWING_SS_SIZE: raise RuntimeError(f"X-Wing decaps: got {len(ss)} bytes, expected {XWING_SS_SIZE}") return ss - def mlkem_keygen(self): - """Generate ML-KEM-768 keypair. Returns 1184-byte public key.""" + def mlkem_keygen(self, slot=DEFAULT_MLKEM_SLOT): + """Generate an ML-KEM-768 keypair in the given ECC slot. Returns 1184-byte pubkey.""" + slot = validate_ecc_slot(slot) print("Press OnlyKey button to confirm key generation...", file=sys.stderr) return self._send_and_receive( - OKGENKEY, SLOT_MLKEM, + OKSETPRIV, slot, + payload=GENERATE_ON_DEVICE, key_type=KEYTYPE_MLKEM768, expected_size=MLKEM_PK_SIZE, timeout_ms=30000, ) - def mlkem_getpubkey(self): - """Get ML-KEM-768 public key. Returns 1184-byte public key.""" + def mlkem_getpubkey(self, slot=DEFAULT_MLKEM_SLOT): + """Get the ML-KEM-768 public key from the given ECC slot. Returns 1184-byte pubkey.""" + slot = validate_ecc_slot(slot) return self._send_and_receive( - OKGETPUBKEY, SLOT_MLKEM, + OKGETPUBKEY, slot, key_type=KEYTYPE_MLKEM768, expected_size=MLKEM_PK_SIZE, timeout_ms=10000, ) - def mlkem_decaps(self, ciphertext): - """ML-KEM-768 decapsulation. Returns 32-byte shared secret.""" + def mlkem_decaps(self, ciphertext, slot=DEFAULT_MLKEM_SLOT): + """ML-KEM-768 decapsulation in the given ECC slot. Returns 32-byte shared secret.""" + slot = validate_ecc_slot(slot) if len(ciphertext) != MLKEM_CT_SIZE: raise ValueError(f"ML-KEM CT must be {MLKEM_CT_SIZE} bytes, got {len(ciphertext)}") - print("Press OnlyKey button to confirm decryption...", file=sys.stderr) - return self._send_and_receive( - OKDECRYPT, SLOT_MLKEM, - payload=ciphertext, - expected_size=32, - timeout_ms=30000, - ) + return self._decaps(ciphertext, slot) diff --git a/onlykey/client.py b/onlykey/client.py index 92db2d3..c1587fa 100644 --- a/onlykey/client.py +++ b/onlykey/client.py @@ -11,7 +11,13 @@ import os import codecs -import hid +try: + # On Linux the hidapi package ships a hidraw-backed module alongside the + # libusb-backed one. Prefer hidraw to avoid hid "open failed" races when the + # OnlyKey's HID interface was just used by another application (e.g. KeePassXC HMAC) + import hidraw as hid +except ImportError: + import hid from aenum import Enum from sys import platform diff --git a/setup.py b/setup.py index bb41473..e6a280b 100644 --- a/setup.py +++ b/setup.py @@ -10,9 +10,10 @@ setup( name='onlykey', - version='1.2.10', + version='1.2.11', description='OnlyKey client and command-line tool', - # long_description=long_description, + long_description=long_description, + long_description_content_type='text/markdown', url='https://github.com/trustcrypto/python-onlykey', author='CryptoTrust', author_email='admin@crp.to', diff --git a/tests/test_age_pq.py b/tests/test_age_pq.py new file mode 100644 index 0000000..d7d4c02 --- /dev/null +++ b/tests/test_age_pq.py @@ -0,0 +1,118 @@ +"""Hardware-free tests for the post-quantum age plugin (ML-KEM-768 / X-Wing). + +These exercise the host-side cryptography only — ML-KEM-768 encapsulation, +the X-Wing hybrid combiner, and the HPKE seal/open that backs age's +``mlkem768x25519`` stanza. Decapsulation that would normally run on the +OnlyKey is emulated in software via ``mlkem_py`` so the full KEM roundtrip +can be verified without a device. + +Run with: pytest tests/test_age_pq.py +Requires: cryptography, kyber-py (installed via the ``age`` extra) +""" +import os +import sys +import types + +import pytest + +# Skip the whole module if the PQ crypto deps aren't present (e.g. the +# package was installed without the ``age`` extra). +pytest.importorskip("cryptography") +pytest.importorskip("kyber_py") + +# Importing the onlykey package pulls in onlykey.client, which imports hid. +# In CI without USB libraries, provide a stub so the import succeeds — the +# tests below never touch a real device. +for _name in ("hid", "hidraw"): + if _name not in sys.modules: + try: + __import__(_name) + except ImportError: + _stub = types.ModuleType(_name) + _stub.device = object + sys.modules[_name] = _stub + +from onlykey.age_plugin import protocol, xwing +from onlykey.age_plugin.mlkem_py import ( + mlkem768_decaps, + mlkem768_encaps, + mlkem768_keygen, +) + + +def test_mlkem768_sizes_and_roundtrip(): + ek, dk = mlkem768_keygen() + assert len(ek) == 1184 + assert len(dk) == 2400 + ss, ct = mlkem768_encaps(ek) + assert len(ss) == 32 + assert len(ct) == 1088 + assert mlkem768_decaps(dk, ct) == ss + + +def _xwing_keypair(): + """Build an X-Wing keypair: (ek_M, dk_M, sk_X, pk_X, pk_xwing).""" + ek, dk = mlkem768_keygen() + sk_x, pk_x = xwing.x25519_keygen() + pk = ek + pk_x # pk_M(1184) || pk_X(32) + return ek, dk, sk_x, pk_x, pk + + +def test_xwing_encaps_decaps_agree(): + _, dk, sk_x, pk_x, pk = _xwing_keypair() + assert len(pk) == 1216 + + ss, ct = xwing.xwing_encaps_host(pk) + assert len(ss) == 32 + assert len(ct) == 1120 + + ct_m, ct_x = ct[:1088], ct[1088:1120] + ss_m = mlkem768_decaps(dk, ct_m) + ss_x = xwing.x25519_scalarmult(sk_x, ct_x) + ss_dec = xwing.xwing_combiner(ss_m, ss_x, ct_x, pk_x) + + assert ss_dec == ss + + +def test_hpke_seal_open_file_key(): + _, dk, sk_x, pk_x, pk = _xwing_keypair() + ss, ct = xwing.xwing_encaps_host(pk) + + ct_m, ct_x = ct[:1088], ct[1088:1120] + ss_dec = xwing.xwing_combiner( + mlkem768_decaps(dk, ct_m), xwing.x25519_scalarmult(sk_x, ct_x), ct_x, pk_x + ) + + file_key = os.urandom(16) + aead_ct = xwing.seal_file_key(ss, ct, file_key) + assert len(aead_ct) == 32 + assert xwing.open_file_key(ss_dec, ct, aead_ct) == file_key + + +def test_wrong_identity_is_rejected(): + _, dk, sk_x, pk_x, pk = _xwing_keypair() + ss, ct = xwing.xwing_encaps_host(pk) + ct_m, ct_x = ct[:1088], ct[1088:1120] + aead_ct = xwing.seal_file_key(ss, ct, os.urandom(16)) + + _, dk_bad = mlkem768_keygen() + ss_bad = xwing.xwing_combiner( + mlkem768_decaps(dk_bad, ct_m), xwing.x25519_scalarmult(sk_x, ct_x), ct_x, pk_x + ) + with pytest.raises(Exception): + xwing.open_file_key(ss_bad, ct, aead_ct) + + +def test_xwing_spec_constants(): + # draft-connolly-cfrg-xwing-kem-09 + assert xwing.KEM_ID == 0x647A + assert xwing.KDF_ID == 0x0001 + assert xwing.AEAD_ID == 0x0003 + assert xwing.XWING_LABEL.hex() == "5c2e2f2f5e5c" + + +def test_age_stanza_roundtrip(): + blob = os.urandom(1120) + assert protocol.b64decode_no_pad(protocol.b64encode_no_pad(blob)) == blob + st = protocol.Stanza("mlkem768x25519", [protocol.b64encode_no_pad(blob)], os.urandom(32)) + assert "mlkem768x25519" in st.encode() diff --git a/tests/test_age_wire.py b/tests/test_age_wire.py new file mode 100644 index 0000000..7998881 --- /dev/null +++ b/tests/test_age_wire.py @@ -0,0 +1,137 @@ +"""Hardware-free checks of the PQ HID wire framing (onlykey_hid). + +Verifies that ML-KEM / X-Wing requests target a real ECC slot (101-116) and +carry the firmware key-type byte in buffer[6], matching the firmware dispatch +in okcrypto.cpp (slot in buffer[5], key type in buffer[6], payload in buffer[7:]). +No OnlyKey hardware is used — a fake transport records what would be sent. +""" +import sys +import types + +import pytest + +# Stub hid so importing the onlykey package doesn't need USB libraries. +for _name in ("hid", "hidraw"): + if _name not in sys.modules: + try: + __import__(_name) + except ImportError: + _m = types.ModuleType(_name) + _m.device = object + sys.modules[_name] = _m + +import pytest as _pytest + +from onlykey.client import Message, OnlyKey +from onlykey.age_plugin import ( + DEFAULT_MLKEM_SLOT, DEFAULT_XWING_SLOT, + KEYTYPE_MLKEM768, KEYTYPE_XWING, validate_ecc_slot, +) +from onlykey.age_plugin.onlykey_hid import OnlyKeyPQ + + +class FakeOK: + """Records send_message calls; feeds zero bytes back for reads. + + Reuses the real OnlyKey.send_large_message2 so the multi-packet decaps + framing is exercised for real (it only calls self.send_message). + """ + + send_large_message2 = OnlyKey.send_large_message2 + + def __init__(self): + self.sent = [] + + def send_message(self, msg=None, slot_id=None, payload=None, **kw): + self.sent.append({ + "msg": msg, + "slot": slot_id, + "body": bytes(payload) if payload is not None else b"", + }) + + def read_bytes(self, n, timeout_ms=0): + return bytes(n) # 64 zero bytes per read + + def read_string(self, timeout_ms=0): + return "" + + +def _dev(): + # Passing ok= skips the hardware _connect(). + return OnlyKeyPQ(ok=FakeOK()) + + +def test_slot_validation_user_slots_only(): + # User ECC key slots 101-116 are valid; 117-132 are reserved and rejected. + for s in range(101, 117): + assert validate_ecc_slot(s) == s + for bad in (100, 117, 128, 132, 133, 1, 200): # 117-132 reserved + with _pytest.raises(ValueError): + validate_ecc_slot(bad) + assert KEYTYPE_MLKEM768 == 5 + assert KEYTYPE_XWING == 6 + + +@_pytest.mark.parametrize("slot", [101, 108, 116]) +def test_xwing_framing_any_slot(slot): + dev = _dev() + dev.xwing_getpubkey(slot) + f = dev.ok.sent[-1] + assert f["msg"] == Message.OKGETPUBKEY + assert f["slot"] == slot + assert f["body"][0] == KEYTYPE_XWING # buffer[6] + + dev.xwing_keygen(slot) + f = dev.ok.sent[-1] + assert f["msg"] == Message.OKSETPRIV # keygen via OKSETPRIV + assert f["slot"] == slot + assert f["body"][0] == KEYTYPE_XWING + assert f["body"][1:9] == b"\xff" * 8 # generate-on-device trigger + + +@_pytest.mark.parametrize("slot", [101, 110, 116]) +def test_decaps_is_multipacket_and_reassembles(slot): + """Decaps streams the 1120-byte ciphertext via send_large_message2. + + Each packet is [slot, 0xFF(more) or final-length, <=57 bytes]; the firmware + accumulates them. No key-type byte is sent — the device reads it from the + stored key. Reassembling the packet bodies must reproduce the ciphertext. + """ + dev = _dev() + ct = bytes((i % 251) for i in range(1120)) # distinct, non-trivial bytes + dev.xwing_decaps(ct, slot=slot) + + pkts = dev.ok.sent + assert len(pkts) > 1 # genuinely multi-packet + assert all(p["msg"] == Message.OKDECRYPT for p in pkts) + assert all(p["body"][0] == slot for p in pkts) # buffer[5] = slot in every packet + flags = [p["body"][1] for p in pkts] + assert all(f == 0xFF for f in flags[:-1]) # 0xFF = "more coming" + assert flags[-1] == len(pkts[-1]["body"]) - 2 # final packet carries its length + # reassemble payload (skip the 2-byte [slot, flag] header on each packet) + reassembled = b"".join(bytes(p["body"][2:]) for p in pkts) + assert reassembled == ct + assert len(reassembled) == 1120 + + +def test_defaults_and_mlkem_framing(): + dev = _dev() + dev.xwing_getpubkey() + assert dev.ok.sent[-1]["slot"] == DEFAULT_XWING_SLOT + + dev.mlkem_keygen(110) + f = dev.ok.sent[-1] + assert f["msg"] == Message.OKSETPRIV + assert f["slot"] == 110 + assert f["body"][0] == KEYTYPE_MLKEM768 + assert f["body"][1:9] == b"\xff" * 8 + + dev.mlkem_getpubkey() + assert dev.ok.sent[-1]["slot"] == DEFAULT_MLKEM_SLOT + + +def test_reserved_slot_rejected(): + dev = _dev() + for reserved in (117, 128, 132): + with _pytest.raises(ValueError): + dev.xwing_getpubkey(reserved)