From 1d173e149dfceb2d0278076279bbee947e76c953 Mon Sep 17 00:00:00 2001 From: Conor Patrick Date: Mon, 28 May 2018 15:38:30 -0400 Subject: [PATCH] passing usb hid tests --- ctap_test.py | 146 +++++++++++++++++++++++++++++++++++++++++++++++++-- ctaphid.c | 45 ++++++++++++++-- 2 files changed, 185 insertions(+), 6 deletions(-) diff --git a/ctap_test.py b/ctap_test.py index 52e3be0..026e632 100644 --- a/ctap_test.py +++ b/ctap_test.py @@ -9,7 +9,7 @@ from fido2.ctap1 import CTAP1 from fido2.ctap2 import * from fido2.cose import * from fido2.utils import Timeout -import sys,os +import sys,os,time from random import randint from binascii import hexlify import array,struct,socket @@ -61,6 +61,8 @@ class Tester(): def send_raw(self, data, cid = None): if cid is None: cid = self.dev._dev.cid + elif type(cid) != type(b''): + cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid]) if type(data) != type(b''): data = struct.pack('%dB' % len(data), *[ord(x) for x in data]) self.dev._dev.InternalSendPacket(Packet(cid + data)) @@ -68,6 +70,11 @@ class Tester(): def cid(self,): return self.dev._dev.cid + def set_cid(self,cid): + if type(cid) not in [type(b''), type(bytearray())]: + cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid]) + self.dev._dev.cid = cid + def recv_raw(self,): cmd,payload = self.dev._dev.InternalRecv() return cmd, payload @@ -98,23 +105,33 @@ class Tester(): raise ValueError('Ping data not echo\'d') except CtapError as e: print('100 byte Ping failed:', e) + raise RuntimeError('ping failed') print('PASS: 100 byte ping') pingdata = os.urandom(7609) try: + t1 = time.time() * 1000 r = self.send_data(CTAPHID.PING, pingdata) + t2 = time.time() * 1000 + delt = t2 - t1 + #if (delt < 140 ): + #raise RuntimeError('Fob is too fast (%d ms)' % delt) + if (delt > 555): + raise RuntimeError('Fob is too slow (%d ms)' % delt) if (r != pingdata): raise ValueError('Ping data not echo\'d') except CtapError as e: print('7609 byte Ping failed:', e) + raise RuntimeError('ping failed') print('PASS: 7609 byte ping') - print('Test non-active cid') try: r = self.send_data(CTAPHID.WINK, '') + assert(len(r) == 0) except CtapError as e: print('wink failed:', e) + raise RuntimeError('wink failed') print('PASS: wink') try: @@ -167,6 +184,8 @@ class Tester(): self.send_raw('\x04') cmd,resp = self.recv_raw() self.check_error(resp, CtapError.ERR.INVALID_SEQ) + cmd,resp = self.recv_raw() + assert(cmd == 0xbf) # timeout print('PASS: invalid sequence') print('Resync and send ping') @@ -190,8 +209,129 @@ class Tester(): raise RuntimeError('resync fail: ', e) print('PASS: interrupt ping with resync') + print('Send ping and abort it with different cid, expect timeout') + oldcid = self.cid() + newcid = '\x11\x22\x33\x44' + self.send_raw('\x81\x10\x00') + self.send_raw('\x00') + self.send_raw('\x01') + self.set_cid(newcid) + self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88') # init from different cid + cmd,r = self.recv_raw() # init response + assert(cmd == 0x86) + self.set_cid(oldcid) + cmd,r = self.recv_raw() # timeout response + assert(cmd == 0xbf) + + print('PASS: resync and timeout') + + + print('Test timeout') + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + t1 = time.time() * 1000 + self.send_raw('\x81\x10\x00') + self.send_raw('\x00') + self.send_raw('\x01') + cmd,r = self.recv_raw() # timeout response + t2 = time.time() * 1000 + delt = t2 - t1 + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.TIMEOUT) + assert(delt < 1000 and delt > 400) + print('Pass timeout') + + print('Test not cont') + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + self.send_raw('\x81\x10\x00') + self.send_raw('\x00') + self.send_raw('\x01') + self.send_raw('\x81\x10\x00') # init packet + cmd,r = self.recv_raw() # timeout response + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.INVALID_SEQ) + print('PASS: Test not cont') + + print('Check random cont ignored') + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + self.send_raw('\x01\x10\x00') + cmd,r = self.recv_raw() # timeout response + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.TIMEOUT) + + print('Check busy') + t1 = time.time() * 1000 + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + oldcid = self.cid() + newcid = '\x11\x22\x33\x44' + self.send_raw('\x81\x10\x00') + self.set_cid(newcid) + self.send_raw('\x81\x10\x00') + cmd,r = self.recv_raw() # busy response + t2 = time.time() * 1000 + assert(t2-t1 < 100) + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.CHANNEL_BUSY) + + self.set_cid(oldcid) + cmd,r = self.recv_raw() # timeout response + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.TIMEOUT) + print('PASS: busy') + + print('Check busy interleaved') + cid1 = '\x11\x22\x33\x44' + cid2 = '\x01\x22\x33\x44' + self.set_cid(cid2) + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + self.set_cid(cid1) + self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') + self.send_raw('\x81\x00\x63') # echo 99 bytes first channel + + self.set_cid(cid2) # send ping on 2nd channel + self.send_raw('\x81\x00\x63') + self.send_raw('\x00') + + self.set_cid(cid1) # finish 1st channel ping + self.send_raw('\x00') + + self.set_cid(cid2) + cmd,r = self.recv_raw() # busy response + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.CHANNEL_BUSY) + + self.set_cid(cid1) + cmd,r = self.recv_raw() # ping response + assert(cmd == 0x81) + assert(len(r) == 0x63) + + cmd,r = self.recv_raw() # timeout + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.TIMEOUT) + print('PASS: busy interleaved') + + + print('Test idle') + try: + cmd,resp = self.recv_raw() + except socket.timeout: + print('Pass: Idle') + + print('Test cid 0 is invalid') + self.set_cid('\x00\x00\x00\x00') + self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\x00\x00\x00\x00') + cmd,r = self.recv_raw() # timeout + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.INVALID_CHANNEL) + print('Pass: cid 0') + + print('Test invalid broadcast cid use') + self.set_cid('\xff\xff\xff\xff') + self.send_raw('\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\xff\xff\xff\xff') + cmd,r = self.recv_raw() # timeout + assert(cmd == 0xbf) + assert(r[0] == CtapError.ERR.INVALID_CHANNEL) + print('Pass: cid broadcast') - print('Send ping and abort it') if __name__ == '__main__': t = Tester() diff --git a/ctaphid.c b/ctaphid.c index 03d0e33..1f0dc88 100644 --- a/ctaphid.c +++ b/ctaphid.c @@ -53,6 +53,7 @@ static struct CID CIDS[10]; static uint64_t active_cid_timestamp; static uint8_t ctap_buffer[CTAPHID_BUFFER_SIZE]; +static uint32_t ctap_buffer_cid; static int ctap_buffer_cmd; static uint16_t ctap_buffer_bcnt; static int ctap_buffer_offset; @@ -155,6 +156,7 @@ static int buffer_packet(CTAPHID_PACKET * pkt) ctap_buffer_bcnt = ctaphid_packet_len(pkt); int pkt_len = (ctap_buffer_bcnt < CTAPHID_INIT_PAYLOAD_SIZE) ? ctap_buffer_bcnt : CTAPHID_INIT_PAYLOAD_SIZE; ctap_buffer_cmd = pkt->pkt.init.cmd; + ctap_buffer_cid = pkt->cid; ctap_buffer_offset = pkt_len; ctap_packet_seq = -1; memmove(ctap_buffer, pkt->pkt.init.payload, pkt_len); @@ -189,6 +191,7 @@ static void buffer_reset() ctap_buffer_bcnt = 0; ctap_buffer_offset = 0; ctap_packet_seq = 0; + ctap_buffer_cid = 0; } static int buffer_status() @@ -212,6 +215,12 @@ static int buffer_cmd() return ctap_buffer_cmd; } +static uint32_t buffer_cid() +{ + return ctap_buffer_cid; +} + + static int buffer_len() { return ctap_buffer_bcnt; @@ -356,7 +365,7 @@ void ctaphid_handle_packet(uint8_t * pkt_raw) if (pkt->cid == 0) { printf("Error, invalid cid 0\n"); - ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_PARAMETER); + ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_CHANNEL); return; } @@ -392,16 +401,46 @@ void ctaphid_handle_packet(uint8_t * pkt_raw) else { // Check if matches existing CID + if (pkt->cid == CTAPHID_BROADCAST_CID) + { + ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_CHANNEL); + return; + } if (cid_exists(pkt->cid)) { + if (buffer_status() == BUFFERING) + { + if (pkt->cid == buffer_cid() && ! is_cont_pkt(pkt)) + { + printf("INVALID_SEQ\n"); + printf("Have %d/%d bytes\n", ctap_buffer_offset, ctap_buffer_bcnt); + ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_SEQ); + return; + } + else if (pkt->cid != buffer_cid()) + { + printf("BUSY with %08x\n", buffer_cid()); + ctaphid_send_error(pkt->cid, CTAP1_ERR_CHANNEL_BUSY); + return; + } + } if (! is_cont_pkt(pkt)) { + if (ctaphid_packet_len(pkt) > CTAPHID_BUFFER_SIZE) { ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_LENGTH); return; } } + else + { + if (buffer_status() == EMPTY || pkt->cid != buffer_cid()) + { + printf("ignoring random cont packet\n"); + return; + } + } if (buffer_packet(pkt) == SEQUENCE_ERROR) { printf("Buffering sequence error\n"); @@ -424,8 +463,8 @@ void ctaphid_handle_packet(uint8_t * pkt_raw) } else { - printf("ignoring unwarranted init packet\n"); - // Ignore + printf("BUSY\n"); + ctaphid_send_error(pkt->cid, CTAP1_ERR_CHANNEL_BUSY); return; } }