passing usb hid tests

This commit is contained in:
Conor Patrick 2018-05-28 15:38:30 -04:00
parent 9c42823cd2
commit 1d173e149d
2 changed files with 185 additions and 6 deletions

View File

@ -9,7 +9,7 @@ from fido2.ctap1 import CTAP1
from fido2.ctap2 import * from fido2.ctap2 import *
from fido2.cose import * from fido2.cose import *
from fido2.utils import Timeout from fido2.utils import Timeout
import sys,os import sys,os,time
from random import randint from random import randint
from binascii import hexlify from binascii import hexlify
import array,struct,socket import array,struct,socket
@ -61,6 +61,8 @@ class Tester():
def send_raw(self, data, cid = None): def send_raw(self, data, cid = None):
if cid is None: if cid is None:
cid = self.dev._dev.cid cid = self.dev._dev.cid
elif type(cid) != type(b''):
cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid])
if type(data) != type(b''): if type(data) != type(b''):
data = struct.pack('%dB' % len(data), *[ord(x) for x in data]) data = struct.pack('%dB' % len(data), *[ord(x) for x in data])
self.dev._dev.InternalSendPacket(Packet(cid + data)) self.dev._dev.InternalSendPacket(Packet(cid + data))
@ -68,6 +70,11 @@ class Tester():
def cid(self,): def cid(self,):
return self.dev._dev.cid return self.dev._dev.cid
def set_cid(self,cid):
if type(cid) not in [type(b''), type(bytearray())]:
cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid])
self.dev._dev.cid = cid
def recv_raw(self,): def recv_raw(self,):
cmd,payload = self.dev._dev.InternalRecv() cmd,payload = self.dev._dev.InternalRecv()
return cmd, payload return cmd, payload
@ -98,23 +105,33 @@ class Tester():
raise ValueError('Ping data not echo\'d') raise ValueError('Ping data not echo\'d')
except CtapError as e: except CtapError as e:
print('100 byte Ping failed:', e) print('100 byte Ping failed:', e)
raise RuntimeError('ping failed')
print('PASS: 100 byte ping') print('PASS: 100 byte ping')
pingdata = os.urandom(7609) pingdata = os.urandom(7609)
try: try:
t1 = time.time() * 1000
r = self.send_data(CTAPHID.PING, pingdata) r = self.send_data(CTAPHID.PING, pingdata)
t2 = time.time() * 1000
delt = t2 - t1
#if (delt < 140 ):
#raise RuntimeError('Fob is too fast (%d ms)' % delt)
if (delt > 555):
raise RuntimeError('Fob is too slow (%d ms)' % delt)
if (r != pingdata): if (r != pingdata):
raise ValueError('Ping data not echo\'d') raise ValueError('Ping data not echo\'d')
except CtapError as e: except CtapError as e:
print('7609 byte Ping failed:', e) print('7609 byte Ping failed:', e)
raise RuntimeError('ping failed')
print('PASS: 7609 byte ping') print('PASS: 7609 byte ping')
print('Test non-active cid')
try: try:
r = self.send_data(CTAPHID.WINK, '') r = self.send_data(CTAPHID.WINK, '')
assert(len(r) == 0)
except CtapError as e: except CtapError as e:
print('wink failed:', e) print('wink failed:', e)
raise RuntimeError('wink failed')
print('PASS: wink') print('PASS: wink')
try: try:
@ -167,6 +184,8 @@ class Tester():
self.send_raw('\x04') self.send_raw('\x04')
cmd,resp = self.recv_raw() cmd,resp = self.recv_raw()
self.check_error(resp, CtapError.ERR.INVALID_SEQ) self.check_error(resp, CtapError.ERR.INVALID_SEQ)
cmd,resp = self.recv_raw()
assert(cmd == 0xbf) # timeout
print('PASS: invalid sequence') print('PASS: invalid sequence')
print('Resync and send ping') print('Resync and send ping')
@ -190,8 +209,129 @@ class Tester():
raise RuntimeError('resync fail: ', e) raise RuntimeError('resync fail: ', e)
print('PASS: interrupt ping with resync') print('PASS: interrupt ping with resync')
print('Send ping and abort it with different cid, expect timeout')
oldcid = self.cid()
newcid = '\x11\x22\x33\x44'
self.send_raw('\x81\x10\x00')
self.send_raw('\x00')
self.send_raw('\x01')
self.set_cid(newcid)
self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88') # init from different cid
cmd,r = self.recv_raw() # init response
assert(cmd == 0x86)
self.set_cid(oldcid)
cmd,r = self.recv_raw() # timeout response
assert(cmd == 0xbf)
print('PASS: resync and timeout')
print('Test timeout')
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
t1 = time.time() * 1000
self.send_raw('\x81\x10\x00')
self.send_raw('\x00')
self.send_raw('\x01')
cmd,r = self.recv_raw() # timeout response
t2 = time.time() * 1000
delt = t2 - t1
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.TIMEOUT)
assert(delt < 1000 and delt > 400)
print('Pass timeout')
print('Test not cont')
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
self.send_raw('\x81\x10\x00')
self.send_raw('\x00')
self.send_raw('\x01')
self.send_raw('\x81\x10\x00') # init packet
cmd,r = self.recv_raw() # timeout response
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.INVALID_SEQ)
print('PASS: Test not cont')
print('Check random cont ignored')
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
self.send_raw('\x01\x10\x00')
cmd,r = self.recv_raw() # timeout response
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.TIMEOUT)
print('Check busy')
t1 = time.time() * 1000
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
oldcid = self.cid()
newcid = '\x11\x22\x33\x44'
self.send_raw('\x81\x10\x00')
self.set_cid(newcid)
self.send_raw('\x81\x10\x00')
cmd,r = self.recv_raw() # busy response
t2 = time.time() * 1000
assert(t2-t1 < 100)
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.CHANNEL_BUSY)
self.set_cid(oldcid)
cmd,r = self.recv_raw() # timeout response
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.TIMEOUT)
print('PASS: busy')
print('Check busy interleaved')
cid1 = '\x11\x22\x33\x44'
cid2 = '\x01\x22\x33\x44'
self.set_cid(cid2)
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
self.set_cid(cid1)
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
self.send_raw('\x81\x00\x63') # echo 99 bytes first channel
self.set_cid(cid2) # send ping on 2nd channel
self.send_raw('\x81\x00\x63')
self.send_raw('\x00')
self.set_cid(cid1) # finish 1st channel ping
self.send_raw('\x00')
self.set_cid(cid2)
cmd,r = self.recv_raw() # busy response
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.CHANNEL_BUSY)
self.set_cid(cid1)
cmd,r = self.recv_raw() # ping response
assert(cmd == 0x81)
assert(len(r) == 0x63)
cmd,r = self.recv_raw() # timeout
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.TIMEOUT)
print('PASS: busy interleaved')
print('Test idle')
try:
cmd,resp = self.recv_raw()
except socket.timeout:
print('Pass: Idle')
print('Test cid 0 is invalid')
self.set_cid('\x00\x00\x00\x00')
self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\x00\x00\x00\x00')
cmd,r = self.recv_raw() # timeout
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.INVALID_CHANNEL)
print('Pass: cid 0')
print('Test invalid broadcast cid use')
self.set_cid('\xff\xff\xff\xff')
self.send_raw('\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\xff\xff\xff\xff')
cmd,r = self.recv_raw() # timeout
assert(cmd == 0xbf)
assert(r[0] == CtapError.ERR.INVALID_CHANNEL)
print('Pass: cid broadcast')
print('Send ping and abort it')
if __name__ == '__main__': if __name__ == '__main__':
t = Tester() t = Tester()

View File

@ -53,6 +53,7 @@ static struct CID CIDS[10];
static uint64_t active_cid_timestamp; static uint64_t active_cid_timestamp;
static uint8_t ctap_buffer[CTAPHID_BUFFER_SIZE]; static uint8_t ctap_buffer[CTAPHID_BUFFER_SIZE];
static uint32_t ctap_buffer_cid;
static int ctap_buffer_cmd; static int ctap_buffer_cmd;
static uint16_t ctap_buffer_bcnt; static uint16_t ctap_buffer_bcnt;
static int ctap_buffer_offset; static int ctap_buffer_offset;
@ -155,6 +156,7 @@ static int buffer_packet(CTAPHID_PACKET * pkt)
ctap_buffer_bcnt = ctaphid_packet_len(pkt); ctap_buffer_bcnt = ctaphid_packet_len(pkt);
int pkt_len = (ctap_buffer_bcnt < CTAPHID_INIT_PAYLOAD_SIZE) ? ctap_buffer_bcnt : CTAPHID_INIT_PAYLOAD_SIZE; int pkt_len = (ctap_buffer_bcnt < CTAPHID_INIT_PAYLOAD_SIZE) ? ctap_buffer_bcnt : CTAPHID_INIT_PAYLOAD_SIZE;
ctap_buffer_cmd = pkt->pkt.init.cmd; ctap_buffer_cmd = pkt->pkt.init.cmd;
ctap_buffer_cid = pkt->cid;
ctap_buffer_offset = pkt_len; ctap_buffer_offset = pkt_len;
ctap_packet_seq = -1; ctap_packet_seq = -1;
memmove(ctap_buffer, pkt->pkt.init.payload, pkt_len); memmove(ctap_buffer, pkt->pkt.init.payload, pkt_len);
@ -189,6 +191,7 @@ static void buffer_reset()
ctap_buffer_bcnt = 0; ctap_buffer_bcnt = 0;
ctap_buffer_offset = 0; ctap_buffer_offset = 0;
ctap_packet_seq = 0; ctap_packet_seq = 0;
ctap_buffer_cid = 0;
} }
static int buffer_status() static int buffer_status()
@ -212,6 +215,12 @@ static int buffer_cmd()
return ctap_buffer_cmd; return ctap_buffer_cmd;
} }
static uint32_t buffer_cid()
{
return ctap_buffer_cid;
}
static int buffer_len() static int buffer_len()
{ {
return ctap_buffer_bcnt; return ctap_buffer_bcnt;
@ -356,7 +365,7 @@ void ctaphid_handle_packet(uint8_t * pkt_raw)
if (pkt->cid == 0) if (pkt->cid == 0)
{ {
printf("Error, invalid cid 0\n"); printf("Error, invalid cid 0\n");
ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_PARAMETER); ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_CHANNEL);
return; return;
} }
@ -392,16 +401,46 @@ void ctaphid_handle_packet(uint8_t * pkt_raw)
else else
{ {
// Check if matches existing CID // Check if matches existing CID
if (pkt->cid == CTAPHID_BROADCAST_CID)
{
ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_CHANNEL);
return;
}
if (cid_exists(pkt->cid)) if (cid_exists(pkt->cid))
{ {
if (buffer_status() == BUFFERING)
{
if (pkt->cid == buffer_cid() && ! is_cont_pkt(pkt))
{
printf("INVALID_SEQ\n");
printf("Have %d/%d bytes\n", ctap_buffer_offset, ctap_buffer_bcnt);
ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_SEQ);
return;
}
else if (pkt->cid != buffer_cid())
{
printf("BUSY with %08x\n", buffer_cid());
ctaphid_send_error(pkt->cid, CTAP1_ERR_CHANNEL_BUSY);
return;
}
}
if (! is_cont_pkt(pkt)) if (! is_cont_pkt(pkt))
{ {
if (ctaphid_packet_len(pkt) > CTAPHID_BUFFER_SIZE) if (ctaphid_packet_len(pkt) > CTAPHID_BUFFER_SIZE)
{ {
ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_LENGTH); ctaphid_send_error(pkt->cid, CTAP1_ERR_INVALID_LENGTH);
return; return;
} }
} }
else
{
if (buffer_status() == EMPTY || pkt->cid != buffer_cid())
{
printf("ignoring random cont packet\n");
return;
}
}
if (buffer_packet(pkt) == SEQUENCE_ERROR) if (buffer_packet(pkt) == SEQUENCE_ERROR)
{ {
printf("Buffering sequence error\n"); printf("Buffering sequence error\n");
@ -424,8 +463,8 @@ void ctaphid_handle_packet(uint8_t * pkt_raw)
} }
else else
{ {
printf("ignoring unwarranted init packet\n"); printf("BUSY\n");
// Ignore ctaphid_send_error(pkt->cid, CTAP1_ERR_CHANNEL_BUSY);
return; return;
} }
} }