Pass tools/ through black

This commit is contained in:
Nicolas Stalder 2019-01-03 14:27:21 +01:00
parent b2c78ca7c0
commit 6a5449b8cb
8 changed files with 614 additions and 474 deletions

View File

@ -23,7 +23,7 @@ import sys
from sys import argv from sys import argv
if len(argv) != 2: if len(argv) != 2:
print("usage: %s <input-log>" % argv[0]); print("usage: %s <input-log>" % argv[0])
sys.exit(1) sys.exit(1)
log = open(argv[1]).readlines() log = open(argv[1]).readlines()
@ -40,7 +40,7 @@ for x in log:
pass pass
if len(parse) == 0: if len(parse) == 0:
continue continue
assert(len(parse) == 64) assert len(parse) == 64
nums.append(parse) nums.append(parse)
hexlines = [] hexlines = []

View File

@ -58,7 +58,8 @@ class Packet(object):
def FromWireFormat(pkt_size, data): def FromWireFormat(pkt_size, data):
return Packet(data) return Packet(data)
class Tester():
class Tester:
def __init__(self,): def __init__(self,):
self.origin = 'https://examplo.org' self.origin = 'https://examplo.org'
self.host = 'examplo.org' self.host = 'examplo.org'
@ -95,7 +96,7 @@ class Tester():
pad = struct.pack('%dB' % len(pad), *[ord(x) for x in pad]) pad = struct.pack('%dB' % len(pad), *[ord(x) for x in pad])
data = data + pad data = data + pad
data = list(data) data = list(data)
assert(len(data) == 64) assert len(data) == 64
self.dev._dev.InternalSendPacket(Packet(data)) self.dev._dev.InternalSendPacket(Packet(data))
def cid(self,): def cid(self,):
@ -112,7 +113,7 @@ class Tester():
return cmd, payload return cmd, payload
def check_error(self, data, err=None): def check_error(self, data, err=None):
assert(len(data) == 1) assert len(data) == 1
if err is None: if err is None:
if data[0] != 0: if data[0] != 0:
raise CtapError(data[0]) raise CtapError(data[0])
@ -129,9 +130,9 @@ class Tester():
delt = t2 - t1 delt = t2 - t1
# if (delt < 140 ): # if (delt < 140 ):
# raise RuntimeError('Fob is too fast (%d ms)' % delt) # raise RuntimeError('Fob is too fast (%d ms)' % delt)
if (delt > 555 * (amt/1000)): if delt > 555 * (amt / 1000):
raise RuntimeError('Fob is too slow (%d ms)' % delt) raise RuntimeError('Fob is too slow (%d ms)' % delt)
if (r != pingdata): if r != pingdata:
raise ValueError('Ping data not echo\'d') raise ValueError('Ping data not echo\'d')
print('1000 byte ping time: %s ms' % delt) print('1000 byte ping time: %s ms' % delt)
except CtapError as e: except CtapError as e:
@ -141,7 +142,6 @@ class Tester():
# sys.flush(sys.sto) # sys.flush(sys.sto)
sys.stdout.flush() sys.stdout.flush()
def test_hid(self, check_timeouts=False): def test_hid(self, check_timeouts=False):
if check_timeouts: if check_timeouts:
print('Test idle') print('Test idle')
@ -153,11 +153,10 @@ class Tester():
print('Test init') print('Test init')
r = self.send_data(CTAPHID.INIT, '\x11\x11\x11\x11\x11\x11\x11\x11') r = self.send_data(CTAPHID.INIT, '\x11\x11\x11\x11\x11\x11\x11\x11')
pingdata = os.urandom(100) pingdata = os.urandom(100)
try: try:
r = self.send_data(CTAPHID.PING, pingdata) r = self.send_data(CTAPHID.PING, pingdata)
if (r != pingdata): if r != pingdata:
raise ValueError('Ping data not echo\'d') raise ValueError('Ping data not echo\'d')
except CtapError as e: except CtapError as e:
print('100 byte Ping failed:', e) print('100 byte Ping failed:', e)
@ -187,7 +186,7 @@ class Tester():
if len(r) > 1 or r[0] == 0: if len(r) > 1 or r[0] == 0:
raise RuntimeError('Cbor is supposed to have payload') raise RuntimeError('Cbor is supposed to have payload')
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.INVALID_LENGTH) assert e.code == CtapError.ERR.INVALID_LENGTH
print('PASS: no data cbor') print('PASS: no data cbor')
try: try:
@ -196,7 +195,7 @@ class Tester():
if len(r) > 2: if len(r) > 2:
raise RuntimeError('MSG is supposed to have payload') raise RuntimeError('MSG is supposed to have payload')
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.INVALID_LENGTH) assert e.code == CtapError.ERR.INVALID_LENGTH
print('PASS: no data msg') print('PASS: no data msg')
try: try:
@ -209,10 +208,9 @@ class Tester():
r = self.send_data(0x66, '') r = self.send_data(0x66, '')
raise RuntimeError('Invalid command did not return error') raise RuntimeError('Invalid command did not return error')
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.INVALID_COMMAND) assert e.code == CtapError.ERR.INVALID_COMMAND
print('PASS: invalid HID command') print('PASS: invalid HID command')
print('Sending packet with too large of a length.') print('Sending packet with too large of a length.')
self.send_raw('\x81\x1d\xba\x00') self.send_raw('\x81\x1d\xba\x00')
cmd, resp = self.recv_raw() cmd, resp = self.recv_raw()
@ -230,7 +228,7 @@ class Tester():
self.check_error(resp, CtapError.ERR.INVALID_SEQ) self.check_error(resp, CtapError.ERR.INVALID_SEQ)
if check_timeouts: if check_timeouts:
cmd, resp = self.recv_raw() cmd, resp = self.recv_raw()
assert(cmd == 0xbf) # timeout assert cmd == 0xBF # timeout
print('PASS: invalid sequence') print('PASS: invalid sequence')
print('Resync and send ping') print('Resync and send ping')
@ -238,7 +236,7 @@ class Tester():
r = self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88') r = self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
pingdata = os.urandom(100) pingdata = os.urandom(100)
r = self.send_data(CTAPHID.PING, pingdata) r = self.send_data(CTAPHID.PING, pingdata)
if (r != pingdata): if r != pingdata:
raise ValueError('Ping data not echo\'d') raise ValueError('Ping data not echo\'d')
except CtapError as e: except CtapError as e:
raise RuntimeError('resync fail: ', e) raise RuntimeError('resync fail: ', e)
@ -261,15 +259,17 @@ class Tester():
self.send_raw('\x00') self.send_raw('\x00')
self.send_raw('\x01') self.send_raw('\x01')
self.set_cid(newcid) self.set_cid(newcid)
self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88') # init from different cid self.send_raw(
'\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88'
) # init from different cid
print('wait for init response') print('wait for init response')
cmd, r = self.recv_raw() # init response cmd, r = self.recv_raw() # init response
assert(cmd == 0x86) assert cmd == 0x86
self.set_cid(oldcid) self.set_cid(oldcid)
if check_timeouts: if check_timeouts:
# print('wait for timeout') # print('wait for timeout')
cmd, r = self.recv_raw() # timeout response cmd, r = self.recv_raw() # timeout response
assert(cmd == 0xbf) assert cmd == 0xBF
print('PASS: resync and timeout') print('PASS: resync and timeout')
@ -282,9 +282,9 @@ class Tester():
cmd, r = self.recv_raw() # timeout response cmd, r = self.recv_raw() # timeout response
t2 = time.time() * 1000 t2 = time.time() * 1000
delt = t2 - t1 delt = t2 - t1
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.TIMEOUT) assert r[0] == CtapError.ERR.TIMEOUT
assert(delt < 1000 and delt > 400) assert delt < 1000 and delt > 400
print('Pass timeout') print('Pass timeout')
print('Test not cont') print('Test not cont')
@ -294,8 +294,8 @@ class Tester():
self.send_raw('\x01') self.send_raw('\x01')
self.send_raw('\x81\x10\x00') # init packet self.send_raw('\x81\x10\x00') # init packet
cmd, r = self.recv_raw() # timeout response cmd, r = self.recv_raw() # timeout response
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.INVALID_SEQ) assert r[0] == CtapError.ERR.INVALID_SEQ
print('PASS: Test not cont') print('PASS: Test not cont')
if check_timeouts: if check_timeouts:
@ -318,14 +318,14 @@ class Tester():
self.send_raw('\x81\x04\x00') self.send_raw('\x81\x04\x00')
cmd, r = self.recv_raw() # busy response cmd, r = self.recv_raw() # busy response
t2 = time.time() * 1000 t2 = time.time() * 1000
assert(t2-t1 < 100) assert t2 - t1 < 100
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.CHANNEL_BUSY) assert r[0] == CtapError.ERR.CHANNEL_BUSY
self.set_cid(oldcid) self.set_cid(oldcid)
cmd, r = self.recv_raw() # timeout response cmd, r = self.recv_raw() # timeout response
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.TIMEOUT) assert r[0] == CtapError.ERR.TIMEOUT
print('PASS: busy') print('PASS: busy')
print('Check busy interleaved') print('Check busy interleaved')
@ -348,18 +348,18 @@ class Tester():
self.set_cid(cid2) self.set_cid(cid2)
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.CHANNEL_BUSY) assert r[0] == CtapError.ERR.CHANNEL_BUSY
self.set_cid(cid1) self.set_cid(cid1)
cmd, r = self.recv_raw() # ping response cmd, r = self.recv_raw() # ping response
assert(cmd == 0x81) assert cmd == 0x81
assert(len(r) == 0x63) assert len(r) == 0x63
if check_timeouts: if check_timeouts:
cmd, r = self.recv_raw() # timeout cmd, r = self.recv_raw() # timeout
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.TIMEOUT) assert r[0] == CtapError.ERR.TIMEOUT
print('PASS: busy interleaved') print('PASS: busy interleaved')
if check_timeouts: if check_timeouts:
@ -372,18 +372,22 @@ class Tester():
print('Test cid 0 is invalid') print('Test cid 0 is invalid')
self.set_cid('\x00\x00\x00\x00') self.set_cid('\x00\x00\x00\x00')
self.send_raw('\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\x00\x00\x00\x00') self.send_raw(
'\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid='\x00\x00\x00\x00'
)
cmd, r = self.recv_raw() # timeout cmd, r = self.recv_raw() # timeout
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.INVALID_CHANNEL) assert r[0] == CtapError.ERR.INVALID_CHANNEL
print('Pass: cid 0') print('Pass: cid 0')
print('Test invalid broadcast cid use') print('Test invalid broadcast cid use')
self.set_cid('\xff\xff\xff\xff') self.set_cid('\xff\xff\xff\xff')
self.send_raw('\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid = '\xff\xff\xff\xff') self.send_raw(
'\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid='\xff\xff\xff\xff'
)
cmd, r = self.recv_raw() # timeout cmd, r = self.recv_raw() # timeout
assert(cmd == 0xbf) assert cmd == 0xBF
assert(r[0] == CtapError.ERR.INVALID_CHANNEL) assert r[0] == CtapError.ERR.INVALID_CHANNEL
print('Pass: cid broadcast') print('Pass: cid broadcast')
def test_u2f(self,): def test_u2f(self,):
@ -405,7 +409,9 @@ class Tester():
print('MC') print('MC')
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN, exclude_list = []) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=[]
)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
print('Register valid (%d ms)' % (t2 - t1)) print('Register valid (%d ms)' % (t2 - t1))
@ -415,7 +421,9 @@ class Tester():
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}] allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = self.client.get_assertion(rp['id'], challenge, allow_list, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, allow_list, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
assertions[0].verify(client_data.hash, creds[0].public_key) assertions[0].verify(client_data.hash, creds[0].public_key)
@ -437,8 +445,12 @@ class Tester():
challenge = ''.join([abc[randint(0, len(abc) - 1)] for x in range(0, 32)]) challenge = ''.join([abc[randint(0, len(abc) - 1)] for x in range(0, 32)])
fake_id1 = array.array('B',[randint(0,255) for i in range(0,150)]).tostring() fake_id1 = array.array(
fake_id2 = array.array('B',[randint(0,255) for i in range(0,73)]).tostring() 'B', [randint(0, 255) for i in range(0, 150)]
).tostring()
fake_id2 = array.array(
'B', [randint(0, 255) for i in range(0, 73)]
).tostring()
exclude_list.append({'id': fake_id1, 'type': 'public-key'}) exclude_list.append({'id': fake_id1, 'type': 'public-key'})
exclude_list.append({'id': fake_id2, 'type': 'public-key'}) exclude_list.append({'id': fake_id2, 'type': 'public-key'})
@ -446,7 +458,9 @@ class Tester():
# for i in range(0,2048**2): # for i in range(0,2048**2):
for i in range(0, 1): for i in range(0, 1):
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN, exclude_list = []) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=[]
)
print(attest.auth_data.counter) print(attest.auth_data.counter)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
@ -460,7 +474,9 @@ class Tester():
for i in range(0, 1): for i in range(0, 1):
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}] allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = self.client.get_assertion(rp['id'], challenge, allow_list, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, allow_list, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
assertions[0].verify(client_data.hash, creds[0].public_key) assertions[0].verify(client_data.hash, creds[0].public_key)
print(assertions[0].auth_data.counter) print(assertions[0].auth_data.counter)
@ -468,8 +484,6 @@ class Tester():
print('Assertion valid (%d ms)' % (t2 - t1)) print('Assertion valid (%d ms)' % (t2 - t1))
sys.stdout.flush() sys.stdout.flush()
def test_fido2(self): def test_fido2(self):
def test(self, pincode=None): def test(self, pincode=None):
creds = [] creds = []
@ -479,8 +493,12 @@ class Tester():
challenge = 'Y2hhbGxlbmdl' challenge = 'Y2hhbGxlbmdl'
PIN = pincode PIN = pincode
fake_id1 = array.array('B',[randint(0,255) for i in range(0,150)]).tostring() fake_id1 = array.array(
fake_id2 = array.array('B',[randint(0,255) for i in range(0,73)]).tostring() 'B', [randint(0, 255) for i in range(0, 150)]
).tostring()
fake_id2 = array.array(
'B', [randint(0, 255) for i in range(0, 73)]
).tostring()
exclude_list.append({'id': fake_id1, 'type': 'public-key'}) exclude_list.append({'id': fake_id1, 'type': 'public-key'})
exclude_list.append({'id': fake_id2, 'type': 'public-key'}) exclude_list.append({'id': fake_id2, 'type': 'public-key'})
@ -488,7 +506,9 @@ class Tester():
# test make credential # test make credential
print('make 3 credentials') print('make 3 credentials')
for i in range(0, 3): for i in range(0, 3):
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN, exclude_list = []) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=[]
)
attest.verify(data.hash) attest.verify(data.hash)
cred = attest.auth_data.credential_data cred = attest.auth_data.credential_data
creds.append(cred) creds.append(cred)
@ -498,15 +518,19 @@ class Tester():
if PIN is not None: if PIN is not None:
print('make credential with wrong pin code') print('make credential with wrong pin code')
try: try:
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN + ' ', exclude_list = []) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN + ' ', exclude_list=[]
)
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.PIN_INVALID) assert e.code == CtapError.ERR.PIN_INVALID
except ClientError as e: except ClientError as e:
assert(e.cause.code == CtapError.ERR.PIN_INVALID) assert e.cause.code == CtapError.ERR.PIN_INVALID
print('PASS') print('PASS')
print('make credential with exclude list') print('make credential with exclude list')
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN, exclude_list = exclude_list) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=exclude_list
)
attest.verify(data.hash) attest.verify(data.hash)
cred = attest.auth_data.credential_data cred = attest.auth_data.credential_data
creds.append(cred) creds.append(cred)
@ -515,35 +539,42 @@ class Tester():
print('make credential with exclude list including real credential') print('make credential with exclude list including real credential')
real_excl = [{'id': cred.credential_id, 'type': 'public-key'}] real_excl = [{'id': cred.credential_id, 'type': 'public-key'}]
try: try:
attest, data = self.client.make_credential(rp, user, challenge, pin = PIN, exclude_list = exclude_list + real_excl) attest, data = self.client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=exclude_list + real_excl
)
raise RuntimeError('Exclude list did not return expected error') raise RuntimeError('Exclude list did not return expected error')
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.CREDENTIAL_EXCLUDED) assert e.code == CtapError.ERR.CREDENTIAL_EXCLUDED
except ClientError as e: except ClientError as e:
assert(e.cause.code == CtapError.ERR.CREDENTIAL_EXCLUDED) assert e.cause.code == CtapError.ERR.CREDENTIAL_EXCLUDED
print('PASS') print('PASS')
for i, x in enumerate(creds): for i, x in enumerate(creds):
print('get assertion %d' % i) print('get assertion %d' % i)
allow_list = [{'id': x.credential_id, 'type': 'public-key'}] allow_list = [{'id': x.credential_id, 'type': 'public-key'}]
assertions, client_data = self.client.get_assertion(rp['id'], challenge, allow_list, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, allow_list, pin=PIN
)
assertions[0].verify(client_data.hash, x.public_key) assertions[0].verify(client_data.hash, x.public_key)
print('PASS') print('PASS')
if PIN is not None: if PIN is not None:
print('get assertion with wrong pin code') print('get assertion with wrong pin code')
try: try:
assertions, client_data = self.client.get_assertion(rp['id'], challenge, allow_list, pin = PIN + ' ') assertions, client_data = self.client.get_assertion(
rp['id'], challenge, allow_list, pin=PIN + ' '
)
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.PIN_INVALID) assert e.code == CtapError.ERR.PIN_INVALID
except ClientError as e: except ClientError as e:
assert(e.cause.code == CtapError.ERR.PIN_INVALID) assert e.cause.code == CtapError.ERR.PIN_INVALID
print('PASS') print('PASS')
print('get multiple assertions') print('get multiple assertions')
allow_list = [{'id': x.credential_id, 'type': 'public-key'} for x in creds] allow_list = [{'id': x.credential_id, 'type': 'public-key'} for x in creds]
assertions, client_data = self.client.get_assertion(rp['id'], challenge, allow_list, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, allow_list, pin=PIN
)
for ass, cred in zip(assertions, creds): for ass, cred in zip(assertions, creds):
i += 1 i += 1
@ -571,7 +602,7 @@ class Tester():
try: try:
self.client.pin_protocol.set_pin(PIN) self.client.pin_protocol.set_pin(PIN)
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.NOT_ALLOWED) assert e.code == CtapError.ERR.NOT_ALLOWED
print('PASS') print('PASS')
print('Change pin code') print('Change pin code')
@ -584,21 +615,21 @@ class Tester():
try: try:
self.client.pin_protocol.change_pin(PIN.replace('a', 'b'), '1234') self.client.pin_protocol.change_pin(PIN.replace('a', 'b'), '1234')
except CtapError as e: except CtapError as e:
assert(e.code == CtapError.ERR.PIN_INVALID) assert e.code == CtapError.ERR.PIN_INVALID
print('PASS') print('PASS')
print('MC using wrong pin') print('MC using wrong pin')
try: try:
self.test_fido2_simple('abcd3'); self.test_fido2_simple('abcd3')
except ClientError as e: except ClientError as e:
assert(e.cause.code == CtapError.ERR.PIN_INVALID) assert e.cause.code == CtapError.ERR.PIN_INVALID
print('PASS') print('PASS')
print('get info') print('get info')
inf = self.ctap.get_info() inf = self.ctap.get_info()
print('PASS') print('PASS')
self.test_fido2_simple(PIN); self.test_fido2_simple(PIN)
print('Re-run make_credential and get_assertion tests with pin code') print('Re-run make_credential and get_assertion tests with pin code')
test(self, PIN) test(self, PIN)
@ -615,7 +646,9 @@ class Tester():
rp = {'id': self.host, 'name': 'ExaRP'} rp = {'id': self.host, 'name': 'ExaRP'}
user0 = {'id': b'first one', 'name': 'single User'} user0 = {'id': b'first one', 'name': 'single User'}
users = [{'id': b'user' + os.urandom(16), 'name': 'AB User'} for i in range(0,2)] users = [
{'id': b'user' + os.urandom(16), 'name': 'AB User'} for i in range(0, 2)
]
challenge = 'Y2hhbGxlbmdl' challenge = 'Y2hhbGxlbmdl'
PIN = None PIN = None
print('reset') print('reset')
@ -624,7 +657,9 @@ class Tester():
print('registering 1 user with RK') print('registering 1 user with RK')
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = self.client.make_credential(rp, user0, challenge, pin = PIN, exclude_list = [], rk = True) attest, data = self.client.make_credential(
rp, user0, challenge, pin=PIN, exclude_list=[], rk=True
)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
creds.append(attest.auth_data.credential_data) creds.append(attest.auth_data.credential_data)
@ -632,27 +667,31 @@ class Tester():
print('1 assertion') print('1 assertion')
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = self.client.get_assertion(rp['id'], challenge, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
assertions[0].verify(client_data.hash, creds[0].public_key) assertions[0].verify(client_data.hash, creds[0].public_key)
print('Assertion valid (%d ms)' % (t2 - t1)) print('Assertion valid (%d ms)' % (t2 - t1))
print(assertions[0], client_data) print(assertions[0], client_data)
print('registering %d users with RK' % len(users)) print('registering %d users with RK' % len(users))
for i in range(0, len(users)): for i in range(0, len(users)):
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = self.client.make_credential(rp, users[i], challenge, pin = PIN, exclude_list = [], rk = True) attest, data = self.client.make_credential(
rp, users[i], challenge, pin=PIN, exclude_list=[], rk=True
)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
print('Register valid (%d ms)' % (t2 - t1)) print('Register valid (%d ms)' % (t2 - t1))
creds.append(attest.auth_data.credential_data) creds.append(attest.auth_data.credential_data)
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = self.client.get_assertion(rp['id'], challenge, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
for x, y in zip(assertions, creds): for x, y in zip(assertions, creds):
@ -660,38 +699,41 @@ class Tester():
print('Assertion(s) valid (%d ms)' % (t2 - t1)) print('Assertion(s) valid (%d ms)' % (t2 - t1))
print('registering a duplicate user ') print('registering a duplicate user ')
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = self.client.make_credential(rp, users[1], challenge, pin = PIN, exclude_list = [], rk = True) attest, data = self.client.make_credential(
rp, users[1], challenge, pin=PIN, exclude_list=[], rk=True
)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
creds = creds[:2] + creds[3:] + [attest.auth_data.credential_data] creds = creds[:2] + creds[3:] + [attest.auth_data.credential_data]
print('Register valid (%d ms)' % (t2 - t1)) print('Register valid (%d ms)' % (t2 - t1))
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = self.client.get_assertion(rp['id'], challenge, pin = PIN) assertions, client_data = self.client.get_assertion(
rp['id'], challenge, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
assert(len(assertions) == len(users) +1) assert len(assertions) == len(users) + 1
for x, y in zip(assertions, creds): for x, y in zip(assertions, creds):
x.verify(client_data.hash, y.public_key) x.verify(client_data.hash, y.public_key)
print('Assertion(s) valid (%d ms)' % (t2 - t1)) print('Assertion(s) valid (%d ms)' % (t2 - t1))
def test_responses(self,): def test_responses(self,):
PIN = '1234' PIN = '1234'
RPID = self.host RPID = self.host
for dev in (CtapHidDevice.list_devices()): for dev in CtapHidDevice.list_devices():
print('dev', dev) print('dev', dev)
client = Fido2Client(dev, RPID) client = Fido2Client(dev, RPID)
ctap = client.ctap2 ctap = client.ctap2
# ctap.reset() # ctap.reset()
try: try:
if PIN: client.pin_protocol.set_pin(PIN) if PIN:
except:pass client.pin_protocol.set_pin(PIN)
except:
pass
inf = ctap.get_info() inf = ctap.get_info()
# print (inf) # print (inf)
@ -708,21 +750,26 @@ class Tester():
challenge = 'Y2hhbGxlbmdl' challenge = 'Y2hhbGxlbmdl'
if 1: if 1:
attest, data = client.make_credential(rp, attest, data = client.make_credential(
user, challenge, exclude_list = [], pin = PIN, rk=True) rp, user, challenge, exclude_list=[], pin=PIN, rk=True
)
cred = attest.auth_data.credential_data cred = attest.auth_data.credential_data
creds = [cred] creds = [cred]
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}] allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
allow_list = [] allow_list = []
assertions, client_data = client.get_assertion(rp['id'], challenge, pin = PIN) assertions, client_data = client.get_assertion(
rp['id'], challenge, pin=PIN
)
assertions[0].verify(client_data.hash, creds[0].public_key) assertions[0].verify(client_data.hash, creds[0].public_key)
if 0: if 0:
print('registering 1 user with RK') print('registering 1 user with RK')
t1 = time.time() * 1000 t1 = time.time() * 1000
attest, data = client.make_credential(rp, user, challenge, pin = PIN, exclude_list = [], rk = True) attest, data = client.make_credential(
rp, user, challenge, pin=PIN, exclude_list=[], rk=True
)
t2 = time.time() * 1000 t2 = time.time() * 1000
attest.verify(data.hash) attest.verify(data.hash)
creds = [attest.auth_data.credential_data] creds = [attest.auth_data.credential_data]
@ -730,15 +777,13 @@ class Tester():
print('1 assertion') print('1 assertion')
t1 = time.time() * 1000 t1 = time.time() * 1000
assertions, client_data = client.get_assertion(rp['id'], challenge, pin = PIN) assertions, client_data = client.get_assertion(
rp['id'], challenge, pin=PIN
)
t2 = time.time() * 1000 t2 = time.time() * 1000
assertions[0].verify(client_data.hash, creds[0].public_key) assertions[0].verify(client_data.hash, creds[0].public_key)
print('Assertion valid (%d ms)' % (t2 - t1)) print('Assertion valid (%d ms)' % (t2 - t1))
# print('fmt:',attest.fmt) # print('fmt:',attest.fmt)
# print('rp_id_hash',attest.auth_data.rp_id_hash) # print('rp_id_hash',attest.auth_data.rp_id_hash)
# print('flags:', hex(attest.auth_data.flags)) # print('flags:', hex(attest.auth_data.flags))
@ -763,7 +808,6 @@ class Tester():
# break # break
def test_find_brute_force(): def test_find_brute_force():
i = 0 i = 0
while 1: while 1:

View File

@ -22,6 +22,7 @@
# #
from __future__ import print_function from __future__ import print_function
import base64 import base64
""" """
cbytes.py cbytes.py
@ -49,7 +50,7 @@ size = len(buf)
a = ''.join(map(lambda c: '\\x%02x' % c, buf)) a = ''.join(map(lambda c: '\\x%02x' % c, buf))
for i in range(0, len(a), 80): for i in range(0, len(a), 80):
c_str += ("\""+a[i:i+80]+"\"\n") c_str += "\"" + a[i : i + 80] + "\"\n"
if '-s' in sys.argv: if '-s' in sys.argv:
print(c_str) print(c_str)

View File

@ -22,6 +22,7 @@
# #
from __future__ import print_function from __future__ import print_function
import sys, fileinput, binascii import sys, fileinput, binascii
try: try:
import ecdsa import ecdsa
except: except:
@ -48,4 +49,3 @@ for d1 in it:
cstr += '\\x' + d1 + d2 cstr += '\\x' + d1 + d2
print('"%s"' % cstr) print('"%s"' % cstr)

View File

@ -33,5 +33,3 @@ print(''.join(['%02x'%c for c in sk.to_string()]))
print() print()
print('"\\x' + '\\x'.join(['%02x' % c for c in sk.to_string()]) + '"') print('"\\x' + '\\x'.join(['%02x' % c for c in sk.to_string()]) + '"')
print() print()

View File

@ -44,6 +44,7 @@ udpport = 8111
HEX_FILE = '../efm32/GNU ARM v7.2.1 - Debug/EFM32.hex' HEX_FILE = '../efm32/GNU ARM v7.2.1 - Debug/EFM32.hex'
def ForceU2F(client, device): def ForceU2F(client, device):
client.ctap = CTAP1(device) client.ctap = CTAP1(device)
client.pin_protocol = None client.pin_protocol = None
@ -64,8 +65,6 @@ if __name__ == '__main__':
print(e) print(e)
def write(data): def write(data):
msg = from_websafe(data) msg = from_websafe(data)
msg = base64.b64decode(msg) msg = base64.b64decode(msg)
@ -82,10 +81,11 @@ def write(data):
# data = client_param + app_param + struct.pack('>B', len(key_handle)) + key_handle # data = client_param + app_param + struct.pack('>B', len(key_handle)) + key_handle
# msg = str(msg.decode()) # msg = str(msg.decode())
# print(msg.decode()) # print(msg.decode())
s = ctap.authenticate(chal,appid,msg,) s = ctap.authenticate(chal, appid, msg)
print(s) print(s)
# sock.sendto(msg, ('127.0.0.1', udpport)) # sock.sendto(msg, ('127.0.0.1', udpport))
def read(): def read():
# msg = [0]*64 # msg = [0]*64
pkt, _ = sock.recvfrom(1000) pkt, _ = sock.recvfrom(1000)
@ -95,6 +95,7 @@ def read():
msg = to_websafe(pkt) msg = to_websafe(pkt)
return msg return msg
class UDPBridge(BaseHTTPRequestHandler): class UDPBridge(BaseHTTPRequestHandler):
def end_headers(self): def end_headers(self):
self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Origin', '*')
@ -111,9 +112,13 @@ class UDPBridge(BaseHTTPRequestHandler):
chal = b"\xf6\xa2\x3c\xa4\x0a\xf9\xda\xd4\x5f\xdc\xba\x7d\xc9\xde\xcb\xed\xb5\x84\x64\x3a\x4c\x9f\x44\xc2\x04\xb0\x17\xd7\xf4\x3e\xe0\x3f" chal = b"\xf6\xa2\x3c\xa4\x0a\xf9\xda\xd4\x5f\xdc\xba\x7d\xc9\xde\xcb\xed\xb5\x84\x64\x3a\x4c\x9f\x44\xc2\x04\xb0\x17\xd7\xf4\x3e\xe0\x3f"
appid = b'A' * 32 appid = b'A' * 32
s = ctap.authenticate(chal,appid,msg,) s = ctap.authenticate(chal, appid, msg)
data = struct.pack('B',s.user_presence) + struct.pack('>L',s.counter) + s.signature data = (
struct.pack('B', s.user_presence)
+ struct.pack('>L', s.counter)
+ s.signature
)
data = base64.b64encode(data).decode('ascii') data = base64.b64encode(data).decode('ascii')
data = to_websafe(data) data = to_websafe(data)
data = json.dumps({'data': data}) data = json.dumps({'data': data})
@ -124,7 +129,6 @@ class UDPBridge(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
self.wfile.write(data) self.wfile.write(data)
def do_GET(self): def do_GET(self):
self.send_response(200) self.send_response(200)
self.send_header('Content-type', 'text/json') self.send_header('Content-type', 'text/json')
@ -135,13 +139,17 @@ class UDPBridge(BaseHTTPRequestHandler):
self.wfile.write(json.dumps(msg).encode()) self.wfile.write(json.dumps(msg).encode())
try: try:
server = HTTPServer(('', httpport), UDPBridge) server = HTTPServer(('', httpport), UDPBridge)
print('Started httpserver on port ', httpport) print('Started httpserver on port ', httpport)
server.socket = ssl.wrap_socket (server.socket, server.socket = ssl.wrap_socket(
server.socket,
keyfile="../web/localhost.key", keyfile="../web/localhost.key",
certfile='../web/localhost.crt', server_side=True) certfile='../web/localhost.crt',
server_side=True,
)
print('Saving signed firmware to firmware.json') print('Saving signed firmware to firmware.json')
msg = get_firmware_object("signing_key.pem", HEX_FILE) msg = get_firmware_object("signing_key.pem", HEX_FILE)
@ -153,4 +161,3 @@ try:
except KeyboardInterrupt: except KeyboardInterrupt:
server.socket.close() server.socket.close()

View File

@ -24,10 +24,12 @@ from binascii import hexlify
import Chameleon import Chameleon
def verboseLog(text): def verboseLog(text):
formatString = "[{}] {}" formatString = "[{}] {}"
timeString = datetime.datetime.utcnow() timeString = datetime.datetime.utcnow()
print(formatString.format(timeString, text), ) print(formatString.format(timeString, text))
chameleon = Chameleon.Device(verboseLog) chameleon = Chameleon.Device(verboseLog)

View File

@ -50,13 +50,16 @@ def to_websafe(data):
data = data.replace('=', '') data = data.replace('=', '')
return data return data
def from_websafe(data): def from_websafe(data):
data = data.replace('-', '+') data = data.replace('-', '+')
data = data.replace('_', '/') data = data.replace('_', '/')
return data + '=='[: (3 * len(data)) % 4] return data + '=='[: (3 * len(data)) % 4]
def get_firmware_object(sk_name, hex_file): def get_firmware_object(sk_name, hex_file):
from ecdsa import SigningKey, NIST256p from ecdsa import SigningKey, NIST256p
sk = SigningKey.from_pem(open(sk_name).read()) sk = SigningKey.from_pem(open(sk_name).read())
fw = open(hex_file, 'r').read() fw = open(hex_file, 'r').read()
fw = base64.b64encode(fw.encode()) fw = base64.b64encode(fw.encode())
@ -66,7 +69,7 @@ def get_firmware_object(sk_name, hex_file):
# start of firmware and the size of the flash region allocated for it. # start of firmware and the size of the flash region allocated for it.
# TODO put this somewhere else. # TODO put this somewhere else.
START = ih.segments()[0][0] START = ih.segments()[0][0]
END = ((0x08000000 + ((128-19)*2048))-8) END = (0x08000000 + ((128 - 19) * 2048)) - 8
ih = IntelHex(hex_file) ih = IntelHex(hex_file)
segs = ih.segments() segs = ih.segments()
@ -93,6 +96,7 @@ def get_firmware_object(sk_name, hex_file):
msg = {'firmware': fw, 'signature': sig} msg = {'firmware': fw, 'signature': sig}
return msg return msg
class SoloBootloader: class SoloBootloader:
write = 0x40 write = 0x40
done = 0x41 done = 0x41
@ -110,8 +114,8 @@ class SoloBootloader:
TAG = b'\x8C\x27\x90\xf6' TAG = b'\x8C\x27\x90\xf6'
class SoloClient():
class SoloClient:
def __init__(self,): def __init__(self,):
self.origin = 'https://example.org' self.origin = 'https://example.org'
self.exchange = self.exchange_hid self.exchange = self.exchange_hid
@ -258,7 +262,9 @@ class SoloClient():
of any updates. of any updates.
If you've started from a solo hacker, make you you've programmed a final/production build! If you've started from a solo hacker, make you you've programmed a final/production build!
""" """
ret = self.exchange(SoloBootloader.disable, 0, b'\xcd\xde\xba\xaa') # magic number ret = self.exchange(
SoloBootloader.disable, 0, b'\xcd\xde\xba\xaa'
) # magic number
if ret[0] != CtapError.ERR.SUCCESS: if ret[0] != CtapError.ERR.SUCCESS:
print('Failed to disable bootloader') print('Failed to disable bootloader')
return False return False
@ -266,7 +272,6 @@ class SoloClient():
self.exchange(SoloBootloader.do_reboot) self.exchange(SoloBootloader.do_reboot)
return True return True
def program_file(self, name): def program_file(self, name):
if name.lower().endswith('.json'): if name.lower().endswith('.json'):
@ -315,10 +320,11 @@ class SoloClient():
else: else:
self.verify_flash(b'A' * 64) self.verify_flash(b'A' * 64)
class DFU: class DFU:
class type: class type:
SEND = 0x21 SEND = 0x21
RECEIVE = 0xa1 RECEIVE = 0xA1
class bmReq: class bmReq:
DETACH = 0x00 DETACH = 0x00
@ -340,7 +346,7 @@ class DFU:
MANIFEST = 0x07 MANIFEST = 0x07
MANIFEST_WAIT_RESET = 0x08 MANIFEST_WAIT_RESET = 0x08
UPLOAD_IDLE = 0x09 UPLOAD_IDLE = 0x09
ERROR = 0x0a ERROR = 0x0A
class status: class status:
def __init__(self, s): def __init__(self, s):
@ -349,23 +355,28 @@ class DFU:
self.state = s[4] self.state = s[4]
self.istring = s[5] self.istring = s[5]
# hot patch for windows libusb backend # hot patch for windows libusb backend
olddel = usb._objfinalizer._AutoFinalizedObjectBase.__del__ olddel = usb._objfinalizer._AutoFinalizedObjectBase.__del__
def newdel(self): def newdel(self):
try: try:
olddel(self) olddel(self)
except OSError: except OSError:
pass pass
usb._objfinalizer._AutoFinalizedObjectBase.__del__ = newdel usb._objfinalizer._AutoFinalizedObjectBase.__del__ = newdel
class DFUDevice: class DFUDevice:
def __init__(self,): def __init__(self,):
pass pass
@staticmethod @staticmethod
def addr2list(a): def addr2list(a):
return [ a & 0xff, (a >> 8) & 0xff, (a >> 16) & 0xff, (a >> 24) & 0xff ] return [a & 0xFF, (a >> 8) & 0xFF, (a >> 16) & 0xFF, (a >> 24) & 0xFF]
@staticmethod @staticmethod
def addr2block(addr, size): def addr2block(addr, size):
@ -392,8 +403,7 @@ class DFUDevice:
self.dev = x self.dev = x
break break
else: else:
self.dev = usb.core.find(idVendor=0x0483, idProduct=0xDF11,) self.dev = usb.core.find(idVendor=0x0483, idProduct=0xDF11)
if self.dev is None: if self.dev is None:
raise RuntimeError('No ST DFU devices found.') raise RuntimeError('No ST DFU devices found.')
@ -418,7 +428,9 @@ class DFUDevice:
def get_status(self,): def get_status(self,):
# bmReqType, bmReq, wValue, wIndex, data/size # bmReqType, bmReq, wValue, wIndex, data/size
s = self.dev.ctrl_transfer(DFU.type.RECEIVE, DFU.bmReq.GETSTATUS,0, self.intNum, 6) s = self.dev.ctrl_transfer(
DFU.type.RECEIVE, DFU.bmReq.GETSTATUS, 0, self.intNum, 6
)
return DFU.status(s) return DFU.status(s)
def state(self,): def state(self,):
@ -426,14 +438,18 @@ class DFUDevice:
def clear_status(self,): def clear_status(self,):
# bmReqType, bmReq, wValue, wIndex, data/size # bmReqType, bmReq, wValue, wIndex, data/size
s = self.dev.ctrl_transfer(DFU.type.SEND, DFU.bmReq.CLRSTATUS, 0, self.intNum, None) s = self.dev.ctrl_transfer(
DFU.type.SEND, DFU.bmReq.CLRSTATUS, 0, self.intNum, None
)
def upload(self, block, size): def upload(self, block, size):
""" """
address is ((block 2) × size) + 0x08000000 address is ((block 2) × size) + 0x08000000
""" """
# bmReqType, bmReq, wValue, wIndex, data/size # bmReqType, bmReq, wValue, wIndex, data/size
return self.dev.ctrl_transfer(DFU.type.RECEIVE, DFU.bmReq.UPLOAD, block, self.intNum, size) return self.dev.ctrl_transfer(
DFU.type.RECEIVE, DFU.bmReq.UPLOAD, block, self.intNum, size
)
def set_addr(self, addr): def set_addr(self, addr):
# must get_status after to take effect # must get_status after to take effect
@ -441,19 +457,21 @@ class DFUDevice:
def dnload(self, block, data): def dnload(self, block, data):
# bmReqType, bmReq, wValue, wIndex, data/size # bmReqType, bmReq, wValue, wIndex, data/size
return self.dev.ctrl_transfer(DFU.type.SEND, DFU.bmReq.DNLOAD, block, self.intNum, data) return self.dev.ctrl_transfer(
DFU.type.SEND, DFU.bmReq.DNLOAD, block, self.intNum, data
)
def erase(self, a): def erase(self, a):
d = [0x41, a & 0xff, (a >> 8) & 0xff, (a >> 16) & 0xff, (a >> 24) & 0xff] d = [0x41, a & 0xFF, (a >> 8) & 0xFF, (a >> 16) & 0xFF, (a >> 24) & 0xFF]
return self.dnload(0x0, d) return self.dnload(0x0, d)
def mass_erase(self): def mass_erase(self):
# self.set_addr(0x08000000) # self.set_addr(0x08000000)
# self.block_on_state(DFU.state.DOWNLOAD_BUSY) # self.block_on_state(DFU.state.DOWNLOAD_BUSY)
# assert(DFU.state.DOWNLOAD_IDLE == self.state()) # assert(DFU.state.DOWNLOAD_IDLE == self.state())
self.dnload(0x0, [0x41,]) self.dnload(0x0, [0x41])
self.block_on_state(DFU.state.DOWNLOAD_BUSY) self.block_on_state(DFU.state.DOWNLOAD_BUSY)
assert(DFU.state.DOWNLOAD_IDLE == self.state()) assert DFU.state.DOWNLOAD_IDLE == self.state()
def write_page(self, addr, data): def write_page(self, addr, data):
if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE): if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE):
@ -468,7 +486,7 @@ class DFUDevice:
self.dnload(addr, data) self.dnload(addr, data)
self.block_on_state(DFU.state.DOWNLOAD_BUSY) self.block_on_state(DFU.state.DOWNLOAD_BUSY)
assert(DFU.state.DOWNLOAD_IDLE == self.state()) assert DFU.state.DOWNLOAD_IDLE == self.state()
def read_mem(self, addr, size): def read_mem(self, addr, size):
addr = DFUDevice.addr2block(addr, size) addr = DFUDevice.addr2block(addr, size)
@ -512,6 +530,7 @@ def attempt_to_find_device(p):
time.sleep(0.2) time.sleep(0.2)
return found return found
def attempt_to_boot_bootloader(p): def attempt_to_boot_bootloader(p):
try: try:
@ -520,19 +539,25 @@ def attempt_to_boot_bootloader(p):
pass pass
except CtapError as e: except CtapError as e:
if e.code == CtapError.ERR.INVALID_COMMAND: if e.code == CtapError.ERR.INVALID_COMMAND:
print('Solo appears to not be a solo hacker. Try holding down the button for 2 while you plug token in.') print(
'Solo appears to not be a solo hacker. Try holding down the button for 2 while you plug token in.'
)
sys.exit(1) sys.exit(1)
else: else:
raise (e) raise (e)
print('Solo rebooted. Reconnecting...') print('Solo rebooted. Reconnecting...')
time.sleep(.500) time.sleep(0.500)
if not attempt_to_find_device(p): if not attempt_to_find_device(p):
raise RuntimeError('Failed to reconnect!') raise RuntimeError('Failed to reconnect!')
def solo_main(): def solo_main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--rng", action="store_true", help = 'Continuously dump random numbers generated from Solo.') parser.add_argument(
"--rng",
action="store_true",
help='Continuously dump random numbers generated from Solo.',
)
parser.add_argument("--wink", action="store_true", help='HID Wink command.') parser.add_argument("--wink", action="store_true", help='HID Wink command.')
args = parser.parse_args() args = parser.parse_args()
@ -549,12 +574,14 @@ def solo_main():
p.wink() p.wink()
sys.exit(0) sys.exit(0)
def asked_for_help(): def asked_for_help():
for i, v in enumerate(sys.argv): for i, v in enumerate(sys.argv):
if v == '-h' or v == '--help': if v == '-h' or v == '--help':
return True return True
return False return False
def monitor_main(): def monitor_main():
if asked_for_help() or len(sys.argv) != 2: if asked_for_help() or len(sys.argv) != 2:
print( print(
@ -563,21 +590,24 @@ def monitor_main():
usage: %s <serial-port> [-h] usage: %s <serial-port> [-h]
* <serial-port> will look like COM10 or /dev/ttyACM0 or something. * <serial-port> will look like COM10 or /dev/ttyACM0 or something.
* baud is 115200. * baud is 115200.
""" % sys.argv[0]) """
% sys.argv[0]
)
sys.exit(1) sys.exit(1)
port = sys.argv[1] port = sys.argv[1]
ser = serial.Serial(port,115200,timeout=.05) ser = serial.Serial(port, 115200, timeout=0.05)
def reconnect(): def reconnect():
while(1): while 1:
time.sleep(0.02) time.sleep(0.02)
try: try:
ser = serial.Serial(port,115200,timeout=.05) ser = serial.Serial(port, 115200, timeout=0.05)
return ser return ser
except serial.SerialException: except serial.SerialException:
pass pass
while 1: while 1:
try: try:
d = ser.read(1) d = ser.read(1)
@ -588,6 +618,7 @@ def monitor_main():
sys.stdout.buffer.write(d) sys.stdout.buffer.write(d)
sys.stdout.flush() sys.stdout.flush()
def genkey_main(): def genkey_main():
from ecdsa import SigningKey, NIST256p from ecdsa import SigningKey, NIST256p
from ecdsa.util import randrange_from_seed__trytryagain from ecdsa.util import randrange_from_seed__trytryagain
@ -601,7 +632,9 @@ def genkey_main():
* Public key must be copied into correct source location in solo bootloader * Public key must be copied into correct source location in solo bootloader
* The private key can be used for signing updates. * The private key can be used for signing updates.
* You may optionally supply a file to seed the RNG for key generating. * You may optionally supply a file to seed the RNG for key generating.
""" % sys.argv[0]) """
% sys.argv[0]
)
sys.exit(1) sys.exit(1)
if len(sys.argv) > 2: if len(sys.argv) > 2:
@ -628,10 +661,13 @@ def genkey_main():
print('"\\x' + '\\x'.join(['%02x' % c for c in vk.to_string()]) + '"') print('"\\x' + '\\x'.join(['%02x' % c for c in vk.to_string()]) + '"')
print() print()
def sign_main(): def sign_main():
if asked_for_help() or len(sys.argv) != 4: if asked_for_help() or len(sys.argv) != 4:
print('Signs a firmware hex file, outputs a .json file that can be used for signed update.') print(
'Signs a firmware hex file, outputs a .json file that can be used for signed update.'
)
print('usage: %s <signing-key.pem> <app.hex> <output.json> [-h]' % sys.argv[0]) print('usage: %s <signing-key.pem> <app.hex> <output.json> [-h]' % sys.argv[0])
print() print()
sys.exit(1) sys.exit(1)
@ -641,6 +677,7 @@ def sign_main():
wfile.write(json.dumps(msg).encode()) wfile.write(json.dumps(msg).encode())
wfile.close() wfile.close()
def use_dfu(args): def use_dfu(args):
fw = args.__dict__['[firmware]'] fw = args.__dict__['[firmware]']
@ -686,7 +723,10 @@ def use_dfu(args):
total += chunk total += chunk
progress = total / float(size) * 100 progress = total / float(size) * 100
sys.stdout.write('downloading %.2f%% %08x - %08x ... \r' % (progress,i,i+page)) sys.stdout.write(
'downloading %.2f%% %08x - %08x ... \r'
% (progress, i, i + page)
)
# time.sleep(0.100) # time.sleep(0.100)
# print('done') # print('done')
@ -698,43 +738,88 @@ def use_dfu(args):
progress = 0 progress = 0
for start, end in ih.segments(): for start, end in ih.segments():
for i in range(start, end, chunk): for i in range(start, end, chunk):
data1 = (dfu.read_mem(i,2048)) data1 = dfu.read_mem(i, 2048)
data2 = ih.tobinarray(start=i, size=chunk) data2 = ih.tobinarray(start=i, size=chunk)
total += chunk total += chunk
progress = total / float(size) * 100 progress = total / float(size) * 100
sys.stdout.write('reading %.2f%% %08x - %08x ... \r' % (progress,i,i+page)) sys.stdout.write(
'reading %.2f%% %08x - %08x ... \r'
% (progress, i, i + page)
)
if (end - start) == chunk: if (end - start) == chunk:
assert(data1 == data2) assert data1 == data2
print() print()
print('firmware readback verified.') print('firmware readback verified.')
if args.detach: if args.detach:
dfu.detach() dfu.detach()
def programmer_main(): def programmer_main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("[firmware]", nargs='?', default='', help = 'firmware file. Either a JSON or hex file. JSON file contains signature while hex does not.') parser.add_argument(
parser.add_argument("--use-hid", action="store_true", help = 'Programs using custom HID command (default). Quicker than using U2F authenticate which is what a browser has to use.') "[firmware]",
parser.add_argument("--use-u2f", action="store_true", help = 'Programs using U2F authenticate. This is what a web application will use.') nargs='?',
parser.add_argument("--no-reset", action="store_true", help = 'Don\'t reset after writing firmware. Stay in bootloader mode.') default='',
parser.add_argument("--reset-only", action="store_true", help = 'Don\'t write anything, try to boot without a signature.') help='firmware file. Either a JSON or hex file. JSON file contains signature while hex does not.',
parser.add_argument("--reboot", action="store_true", help = 'Tell bootloader to reboot.') )
parser.add_argument("--enter-bootloader", action="store_true", help = 'Don\'t write anything, try to enter bootloader. Typically only supported by Solo Hacker builds.') parser.add_argument(
parser.add_argument("--st-dfu", action="store_true", help = 'Don\'t write anything, try to enter ST DFU. Warning, you could brick your Solo if you overwrite everything. You should reprogram the option bytes just to be safe (boot to Solo bootloader first, then run this command).') "--use-hid",
parser.add_argument("--disable", action="store_true", help = 'Disable the Solo bootloader. Cannot be undone. No future updates can be applied.') action="store_true",
parser.add_argument("--detach", action="store_true", help = 'Detach from ST DFU and boot from main flash. Must be in DFU mode.') help='Programs using custom HID command (default). Quicker than using U2F authenticate which is what a browser has to use.',
parser.add_argument("--dfu-serial", default='', help = 'Specify a serial number for a specific DFU device to connect to.') )
parser.add_argument("--use-dfu", action="store_true", help = 'Boot to ST-DFU before continuing.') parser.add_argument(
"--use-u2f",
action="store_true",
help='Programs using U2F authenticate. This is what a web application will use.',
)
parser.add_argument(
"--no-reset",
action="store_true",
help='Don\'t reset after writing firmware. Stay in bootloader mode.',
)
parser.add_argument(
"--reset-only",
action="store_true",
help='Don\'t write anything, try to boot without a signature.',
)
parser.add_argument(
"--reboot", action="store_true", help='Tell bootloader to reboot.'
)
parser.add_argument(
"--enter-bootloader",
action="store_true",
help='Don\'t write anything, try to enter bootloader. Typically only supported by Solo Hacker builds.',
)
parser.add_argument(
"--st-dfu",
action="store_true",
help='Don\'t write anything, try to enter ST DFU. Warning, you could brick your Solo if you overwrite everything. You should reprogram the option bytes just to be safe (boot to Solo bootloader first, then run this command).',
)
parser.add_argument(
"--disable",
action="store_true",
help='Disable the Solo bootloader. Cannot be undone. No future updates can be applied.',
)
parser.add_argument(
"--detach",
action="store_true",
help='Detach from ST DFU and boot from main flash. Must be in DFU mode.',
)
parser.add_argument(
"--dfu-serial",
default='',
help='Specify a serial number for a specific DFU device to connect to.',
)
parser.add_argument(
"--use-dfu", action="store_true", help='Boot to ST-DFU before continuing.'
)
args = parser.parse_args() args = parser.parse_args()
fw = args.__dict__['[firmware]'] fw = args.__dict__['[firmware]']
p = SoloClient() p = SoloClient()
try: try:
p.find_device() p.find_device()
if args.use_dfu: if args.use_dfu:
@ -781,7 +866,6 @@ def programmer_main():
p.disable_solo_bootloader() p.disable_solo_bootloader()
sys.exit(0) sys.exit(0)
if fw == '' and not args.reset_only: if fw == '' and not args.reset_only:
print('Need to supply firmware filename, or see help for more options.') print('Need to supply firmware filename, or see help for more options.')
parser.print_help() parser.print_help()
@ -807,7 +891,9 @@ def programmer_main():
def main_mergehex(): def main_mergehex():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print('usage: %s <file1.hex> <file2.hex> [...] [-s <secret_attestation_key>] <output.hex>') print(
'usage: %s <file1.hex> <file2.hex> [...] [-s <secret_attestation_key>] <output.hex>'
)
sys.exit(1) sys.exit(1)
def flash_addr(num): def flash_addr(num):
@ -816,7 +902,9 @@ def main_mergehex():
args = sys.argv[:] args = sys.argv[:]
# generic / hacker attestation key # generic / hacker attestation key
secret_attestation_key = "1b2626ecc8f69b0f69e34fb236d76466ba12ac16c3ab5750ba064e8b90e02448" secret_attestation_key = (
"1b2626ecc8f69b0f69e34fb236d76466ba12ac16c3ab5750ba064e8b90e02448"
)
# user supplied, optional # user supplied, optional
for i, x in enumerate(args): for i, x in enumerate(args):
@ -825,12 +913,11 @@ def main_mergehex():
args = args[:i] + args[i + 2 :] args = args[:i] + args[i + 2 :]
break break
# TODO put definitions somewhere else # TODO put definitions somewhere else
PAGES = 128 PAGES = 128
APPLICATION_END_PAGE = PAGES - 19 APPLICATION_END_PAGE = PAGES - 19
AUTH_WORD_ADDR = (flash_addr(APPLICATION_END_PAGE)-8) AUTH_WORD_ADDR = flash_addr(APPLICATION_END_PAGE) - 8
ATTEST_ADDR = (flash_addr(PAGES - 15)) ATTEST_ADDR = flash_addr(PAGES - 15)
first = IntelHex(args[1]) first = IntelHex(args[1])
for i in range(2, len(args) - 1): for i in range(2, len(args) - 1):
@ -850,21 +937,20 @@ def main_mergehex():
first[AUTH_WORD_ADDR + 2] = 0 first[AUTH_WORD_ADDR + 2] = 0
first[AUTH_WORD_ADDR + 3] = 0 first[AUTH_WORD_ADDR + 3] = 0
first[AUTH_WORD_ADDR+4] = 0xff first[AUTH_WORD_ADDR + 4] = 0xFF
first[AUTH_WORD_ADDR+5] = 0xff first[AUTH_WORD_ADDR + 5] = 0xFF
first[AUTH_WORD_ADDR+6] = 0xff first[AUTH_WORD_ADDR + 6] = 0xFF
first[AUTH_WORD_ADDR+7] = 0xff first[AUTH_WORD_ADDR + 7] = 0xFF
if secret_attestation_key is not None: if secret_attestation_key is not None:
key = unhexlify(secret_attestation_key) key = unhexlify(secret_attestation_key)
for i, x in enumerate(key): for i, x in enumerate(key):
first[ATTEST_ADDR + i] = x first[ATTEST_ADDR + i] = x
first.tofile(args[len(args) - 1], format='hex') first.tofile(args[len(args) - 1], format='hex')
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) < 2 or (len(sys.argv) == 2 and asked_for_help()): if len(sys.argv) < 2 or (len(sys.argv) == 2 and asked_for_help()):
@ -885,10 +971,12 @@ Examples:
{0} sign <key.pem> <firmware.hex> <output.json> {0} sign <key.pem> <firmware.hex> <output.json>
{0} genkey <output-pem-file> [rng-seed-file] {0} genkey <output-pem-file> [rng-seed-file]
{0} mergehex bootloader.hex solo.hex combined.hex {0} mergehex bootloader.hex solo.hex combined.hex
""".format(sys.argv[0])) """.format(
sys.argv[0]
)
)
sys.exit(1) sys.exit(1)
c = sys.argv[1] c = sys.argv[1]
sys.argv = sys.argv[:1] + sys.argv[2:] sys.argv = sys.argv[:1] + sys.argv[2:]
sys.argv[0] = sys.argv[0] + ' ' + c sys.argv[0] = sys.argv[0] + ' ' + c