run python black
This commit is contained in:
parent
23c140fd99
commit
a2611fb013
@ -1,26 +1,6 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
#
|
#
|
||||||
# Copyright (C) 2018 SoloKeys, Inc. <https://solokeys.com/>
|
|
||||||
#
|
|
||||||
# This file is part of Solo.
|
|
||||||
#
|
|
||||||
# Solo is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# Solo is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with Solo. If not, see <https://www.gnu.org/licenses/>
|
|
||||||
#
|
|
||||||
# This code is available under licenses for commercial use.
|
|
||||||
# Please contact SoloKeys for more information.
|
|
||||||
#
|
|
||||||
import sys
|
import sys
|
||||||
from sys import argv
|
from sys import argv
|
||||||
|
|
||||||
@ -34,7 +14,7 @@ nums = []
|
|||||||
|
|
||||||
for x in log:
|
for x in log:
|
||||||
parse = []
|
parse = []
|
||||||
for i in x.split(' '):
|
for i in x.split(" "):
|
||||||
try:
|
try:
|
||||||
n = int(i, 16)
|
n = int(i, 16)
|
||||||
parse.append(n)
|
parse.append(n)
|
||||||
@ -48,9 +28,9 @@ for x in log:
|
|||||||
hexlines = []
|
hexlines = []
|
||||||
|
|
||||||
for l in nums:
|
for l in nums:
|
||||||
s = ''
|
s = ""
|
||||||
for x in l:
|
for x in l:
|
||||||
s += '\\x%02x' % x
|
s += "\\x%02x" % x
|
||||||
hexlines.append(s)
|
hexlines.append(s)
|
||||||
|
|
||||||
for x in hexlines:
|
for x in hexlines:
|
||||||
|
@ -50,14 +50,14 @@ class Packet(object):
|
|||||||
|
|
||||||
class Tester:
|
class Tester:
|
||||||
def __init__(self,):
|
def __init__(self,):
|
||||||
self.origin = 'https://examplo.org'
|
self.origin = "https://examplo.org"
|
||||||
self.host = 'examplo.org'
|
self.host = "examplo.org"
|
||||||
|
|
||||||
def find_device(self,):
|
def find_device(self,):
|
||||||
print(list(CtapHidDevice.list_devices()))
|
print(list(CtapHidDevice.list_devices()))
|
||||||
dev = next(CtapHidDevice.list_devices(), None)
|
dev = next(CtapHidDevice.list_devices(), None)
|
||||||
if not dev:
|
if not dev:
|
||||||
raise RuntimeError('No FIDO device found')
|
raise RuntimeError("No FIDO device found")
|
||||||
self.dev = dev
|
self.dev = dev
|
||||||
self.client = Fido2Client(dev, self.origin)
|
self.client = Fido2Client(dev, self.origin)
|
||||||
self.ctap = self.client.ctap2
|
self.ctap = self.client.ctap2
|
||||||
@ -66,23 +66,23 @@ class Tester:
|
|||||||
# cmd,resp = self.recv_raw()
|
# cmd,resp = self.recv_raw()
|
||||||
|
|
||||||
def send_data(self, cmd, data):
|
def send_data(self, cmd, data):
|
||||||
if type(data) != type(b''):
|
if type(data) != type(b""):
|
||||||
data = struct.pack('%dB' % len(data), *[ord(x) for x in data])
|
data = struct.pack("%dB" % len(data), *[ord(x) for x in data])
|
||||||
with Timeout(1.0) as event:
|
with Timeout(1.0) as event:
|
||||||
return self.dev.call(cmd, data, event)
|
return self.dev.call(cmd, data, event)
|
||||||
|
|
||||||
def send_raw(self, data, cid=None):
|
def send_raw(self, data, cid=None):
|
||||||
if cid is None:
|
if cid is None:
|
||||||
cid = self.dev._dev.cid
|
cid = self.dev._dev.cid
|
||||||
elif type(cid) != type(b''):
|
elif type(cid) != type(b""):
|
||||||
cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid])
|
cid = struct.pack("%dB" % len(cid), *[ord(x) for x in cid])
|
||||||
if type(data) != type(b''):
|
if type(data) != type(b""):
|
||||||
data = struct.pack('%dB' % len(data), *[ord(x) for x in data])
|
data = struct.pack("%dB" % len(data), *[ord(x) for x in data])
|
||||||
data = cid + data
|
data = cid + data
|
||||||
l = len(data)
|
l = len(data)
|
||||||
if l != 64:
|
if l != 64:
|
||||||
pad = '\x00' * (64 - l)
|
pad = "\x00" * (64 - l)
|
||||||
pad = struct.pack('%dB' % len(pad), *[ord(x) for x in pad])
|
pad = struct.pack("%dB" % len(pad), *[ord(x) for x in pad])
|
||||||
data = data + pad
|
data = data + pad
|
||||||
data = list(data)
|
data = list(data)
|
||||||
assert len(data) == 64
|
assert len(data) == 64
|
||||||
@ -92,8 +92,8 @@ class Tester:
|
|||||||
return self.dev._dev.cid
|
return self.dev._dev.cid
|
||||||
|
|
||||||
def set_cid(self, cid):
|
def set_cid(self, cid):
|
||||||
if type(cid) not in [type(b''), type(bytearray())]:
|
if type(cid) not in [type(b""), type(bytearray())]:
|
||||||
cid = struct.pack('%dB' % len(cid), *[ord(x) for x in cid])
|
cid = struct.pack("%dB" % len(cid), *[ord(x) for x in cid])
|
||||||
self.dev._dev.cid = cid
|
self.dev._dev.cid = cid
|
||||||
|
|
||||||
def recv_raw(self,):
|
def recv_raw(self,):
|
||||||
@ -107,7 +107,7 @@ class Tester:
|
|||||||
if data[0] != 0:
|
if data[0] != 0:
|
||||||
raise CtapError(data[0])
|
raise CtapError(data[0])
|
||||||
elif data[0] != err:
|
elif data[0] != err:
|
||||||
raise ValueError('Unexpected error: %02x' % data[0])
|
raise ValueError("Unexpected error: %02x" % data[0])
|
||||||
|
|
||||||
def test_long_ping(self):
|
def test_long_ping(self):
|
||||||
amt = 1000
|
amt = 1000
|
||||||
@ -120,48 +120,48 @@ class Tester:
|
|||||||
# if (delt < 140 ):
|
# if (delt < 140 ):
|
||||||
# raise RuntimeError('Fob is too fast (%d ms)' % delt)
|
# raise RuntimeError('Fob is too fast (%d ms)' % delt)
|
||||||
if delt > 555 * (amt / 1000):
|
if delt > 555 * (amt / 1000):
|
||||||
raise RuntimeError('Fob is too slow (%d ms)' % delt)
|
raise RuntimeError("Fob is too slow (%d ms)" % delt)
|
||||||
if r != pingdata:
|
if r != pingdata:
|
||||||
raise ValueError('Ping data not echo\'d')
|
raise ValueError("Ping data not echo'd")
|
||||||
print('1000 byte ping time: %s ms' % delt)
|
print("1000 byte ping time: %s ms" % delt)
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
print('7609 byte Ping failed:', e)
|
print("7609 byte Ping failed:", e)
|
||||||
raise RuntimeError('ping failed')
|
raise RuntimeError("ping failed")
|
||||||
print('PASS: 7609 byte ping')
|
print("PASS: 7609 byte ping")
|
||||||
# sys.flush(sys.sto)
|
# sys.flush(sys.sto)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
def test_hid(self, check_timeouts=False):
|
def test_hid(self, check_timeouts=False):
|
||||||
if check_timeouts:
|
if check_timeouts:
|
||||||
print('Test idle')
|
print("Test idle")
|
||||||
try:
|
try:
|
||||||
cmd, resp = self.recv_raw()
|
cmd, resp = self.recv_raw()
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
print('Pass: Idle')
|
print("Pass: Idle")
|
||||||
|
|
||||||
print('Test init')
|
print("Test init")
|
||||||
r = self.send_data(CTAPHID.INIT, '\x11\x11\x11\x11\x11\x11\x11\x11')
|
r = self.send_data(CTAPHID.INIT, "\x11\x11\x11\x11\x11\x11\x11\x11")
|
||||||
|
|
||||||
pingdata = os.urandom(100)
|
pingdata = os.urandom(100)
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.PING, pingdata)
|
r = self.send_data(CTAPHID.PING, pingdata)
|
||||||
if r != pingdata:
|
if r != pingdata:
|
||||||
raise ValueError('Ping data not echo\'d')
|
raise ValueError("Ping data not echo'd")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
print('100 byte Ping failed:', e)
|
print("100 byte Ping failed:", e)
|
||||||
raise RuntimeError('ping failed')
|
raise RuntimeError("ping failed")
|
||||||
print('PASS: 100 byte ping')
|
print("PASS: 100 byte ping")
|
||||||
|
|
||||||
self.test_long_ping()
|
self.test_long_ping()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.WINK, '')
|
r = self.send_data(CTAPHID.WINK, "")
|
||||||
print(hexlify(r))
|
print(hexlify(r))
|
||||||
# assert(len(r) == 0)
|
# assert(len(r) == 0)
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
print('wink failed:', e)
|
print("wink failed:", e)
|
||||||
raise RuntimeError('wink failed')
|
raise RuntimeError("wink failed")
|
||||||
print('PASS: wink')
|
print("PASS: wink")
|
||||||
|
|
||||||
# try:
|
# try:
|
||||||
# r = self.send_data(CTAPHID.WINK, 'we9gofrei8g')
|
# r = self.send_data(CTAPHID.WINK, 'we9gofrei8g')
|
||||||
@ -171,87 +171,87 @@ class Tester:
|
|||||||
# print('PASS: malformed wink')
|
# print('PASS: malformed wink')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.CBOR, '')
|
r = self.send_data(CTAPHID.CBOR, "")
|
||||||
if len(r) > 1 or r[0] == 0:
|
if len(r) > 1 or r[0] == 0:
|
||||||
raise RuntimeError('Cbor is supposed to have payload')
|
raise RuntimeError("Cbor is supposed to have payload")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.INVALID_LENGTH
|
assert e.code == CtapError.ERR.INVALID_LENGTH
|
||||||
print('PASS: no data cbor')
|
print("PASS: no data cbor")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.MSG, '')
|
r = self.send_data(CTAPHID.MSG, "")
|
||||||
print(hexlify(r))
|
print(hexlify(r))
|
||||||
if len(r) > 2:
|
if len(r) > 2:
|
||||||
raise RuntimeError('MSG is supposed to have payload')
|
raise RuntimeError("MSG is supposed to have payload")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.INVALID_LENGTH
|
assert e.code == CtapError.ERR.INVALID_LENGTH
|
||||||
print('PASS: no data msg')
|
print("PASS: no data msg")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
r = self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
raise RuntimeError('resync fail: ', e)
|
raise RuntimeError("resync fail: ", e)
|
||||||
print('PASS: resync')
|
print("PASS: resync")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = self.send_data(0x66, '')
|
r = self.send_data(0x66, "")
|
||||||
raise RuntimeError('Invalid command did not return error')
|
raise RuntimeError("Invalid command did not return error")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.INVALID_COMMAND
|
assert e.code == CtapError.ERR.INVALID_COMMAND
|
||||||
print('PASS: invalid HID command')
|
print("PASS: invalid HID command")
|
||||||
|
|
||||||
print('Sending packet with too large of a length.')
|
print("Sending packet with too large of a length.")
|
||||||
self.send_raw('\x81\x1d\xba\x00')
|
self.send_raw("\x81\x1d\xba\x00")
|
||||||
cmd, resp = self.recv_raw()
|
cmd, resp = self.recv_raw()
|
||||||
self.check_error(resp, CtapError.ERR.INVALID_LENGTH)
|
self.check_error(resp, CtapError.ERR.INVALID_LENGTH)
|
||||||
print('PASS: invalid length')
|
print("PASS: invalid length")
|
||||||
|
|
||||||
r = self.send_data(CTAPHID.PING, '\x44' * 200)
|
r = self.send_data(CTAPHID.PING, "\x44" * 200)
|
||||||
print('Sending packets that skip a sequence number.')
|
print("Sending packets that skip a sequence number.")
|
||||||
self.send_raw('\x81\x04\x90')
|
self.send_raw("\x81\x04\x90")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
self.send_raw('\x01')
|
self.send_raw("\x01")
|
||||||
# skip 2
|
# skip 2
|
||||||
self.send_raw('\x03')
|
self.send_raw("\x03")
|
||||||
cmd, resp = self.recv_raw()
|
cmd, resp = self.recv_raw()
|
||||||
self.check_error(resp, CtapError.ERR.INVALID_SEQ)
|
self.check_error(resp, CtapError.ERR.INVALID_SEQ)
|
||||||
if check_timeouts:
|
if check_timeouts:
|
||||||
cmd, resp = self.recv_raw()
|
cmd, resp = self.recv_raw()
|
||||||
assert cmd == 0xBF # timeout
|
assert cmd == 0xBF # timeout
|
||||||
print('PASS: invalid sequence')
|
print("PASS: invalid sequence")
|
||||||
|
|
||||||
print('Resync and send ping')
|
print("Resync and send ping")
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
r = self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
pingdata = os.urandom(100)
|
pingdata = os.urandom(100)
|
||||||
r = self.send_data(CTAPHID.PING, pingdata)
|
r = self.send_data(CTAPHID.PING, pingdata)
|
||||||
if r != pingdata:
|
if r != pingdata:
|
||||||
raise ValueError('Ping data not echo\'d')
|
raise ValueError("Ping data not echo'd")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
raise RuntimeError('resync fail: ', e)
|
raise RuntimeError("resync fail: ", e)
|
||||||
print('PASS: resync and ping')
|
print("PASS: resync and ping")
|
||||||
|
|
||||||
print('Send ping and abort it')
|
print("Send ping and abort it")
|
||||||
self.send_raw('\x81\x04\x00')
|
self.send_raw("\x81\x04\x00")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
self.send_raw('\x01')
|
self.send_raw("\x01")
|
||||||
try:
|
try:
|
||||||
r = self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
r = self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
raise RuntimeError('resync fail: ', e)
|
raise RuntimeError("resync fail: ", e)
|
||||||
print('PASS: interrupt ping with resync')
|
print("PASS: interrupt ping with resync")
|
||||||
|
|
||||||
print('Send ping and abort it with different cid, expect timeout')
|
print("Send ping and abort it with different cid, expect timeout")
|
||||||
oldcid = self.cid()
|
oldcid = self.cid()
|
||||||
newcid = '\x11\x22\x33\x44'
|
newcid = "\x11\x22\x33\x44"
|
||||||
self.send_raw('\x81\x10\x00')
|
self.send_raw("\x81\x10\x00")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
self.send_raw('\x01')
|
self.send_raw("\x01")
|
||||||
self.set_cid(newcid)
|
self.set_cid(newcid)
|
||||||
self.send_raw(
|
self.send_raw(
|
||||||
'\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88'
|
"\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88"
|
||||||
) # init from different cid
|
) # init from different cid
|
||||||
print('wait for init response')
|
print("wait for init response")
|
||||||
cmd, r = self.recv_raw() # init response
|
cmd, r = self.recv_raw() # init response
|
||||||
assert cmd == 0x86
|
assert cmd == 0x86
|
||||||
self.set_cid(oldcid)
|
self.set_cid(oldcid)
|
||||||
@ -260,51 +260,51 @@ class Tester:
|
|||||||
cmd, r = self.recv_raw() # timeout response
|
cmd, r = self.recv_raw() # timeout response
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
|
|
||||||
print('PASS: resync and timeout')
|
print("PASS: resync and timeout")
|
||||||
|
|
||||||
print('Test timeout')
|
print("Test timeout")
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
self.send_raw('\x81\x04\x00')
|
self.send_raw("\x81\x04\x00")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
self.send_raw('\x01')
|
self.send_raw("\x01")
|
||||||
cmd, r = self.recv_raw() # timeout response
|
cmd, r = self.recv_raw() # timeout response
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
delt = t2 - t1
|
delt = t2 - t1
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.TIMEOUT
|
assert r[0] == CtapError.ERR.TIMEOUT
|
||||||
assert delt < 1000 and delt > 400
|
assert delt < 1000 and delt > 400
|
||||||
print('Pass timeout')
|
print("Pass timeout")
|
||||||
|
|
||||||
print('Test not cont')
|
print("Test not cont")
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
self.send_raw('\x81\x04\x00')
|
self.send_raw("\x81\x04\x00")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
self.send_raw('\x01')
|
self.send_raw("\x01")
|
||||||
self.send_raw('\x81\x10\x00') # init packet
|
self.send_raw("\x81\x10\x00") # init packet
|
||||||
cmd, r = self.recv_raw() # timeout response
|
cmd, r = self.recv_raw() # timeout response
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.INVALID_SEQ
|
assert r[0] == CtapError.ERR.INVALID_SEQ
|
||||||
print('PASS: Test not cont')
|
print("PASS: Test not cont")
|
||||||
|
|
||||||
if check_timeouts:
|
if check_timeouts:
|
||||||
print('Check random cont ignored')
|
print("Check random cont ignored")
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
self.send_raw('\x01\x10\x00')
|
self.send_raw("\x01\x10\x00")
|
||||||
try:
|
try:
|
||||||
cmd, r = self.recv_raw() # timeout response
|
cmd, r = self.recv_raw() # timeout response
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
pass
|
pass
|
||||||
print('PASS: random cont')
|
print("PASS: random cont")
|
||||||
|
|
||||||
print('Check busy')
|
print("Check busy")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
oldcid = self.cid()
|
oldcid = self.cid()
|
||||||
newcid = '\x11\x22\x33\x44'
|
newcid = "\x11\x22\x33\x44"
|
||||||
self.send_raw('\x81\x04\x00')
|
self.send_raw("\x81\x04\x00")
|
||||||
self.set_cid(newcid)
|
self.set_cid(newcid)
|
||||||
self.send_raw('\x81\x04\x00')
|
self.send_raw("\x81\x04\x00")
|
||||||
cmd, r = self.recv_raw() # busy response
|
cmd, r = self.recv_raw() # busy response
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assert t2 - t1 < 100
|
assert t2 - t1 < 100
|
||||||
@ -315,25 +315,25 @@ class Tester:
|
|||||||
cmd, r = self.recv_raw() # timeout response
|
cmd, r = self.recv_raw() # timeout response
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.TIMEOUT
|
assert r[0] == CtapError.ERR.TIMEOUT
|
||||||
print('PASS: busy')
|
print("PASS: busy")
|
||||||
|
|
||||||
print('Check busy interleaved')
|
print("Check busy interleaved")
|
||||||
cid1 = '\x11\x22\x33\x44'
|
cid1 = "\x11\x22\x33\x44"
|
||||||
cid2 = '\x01\x22\x33\x44'
|
cid2 = "\x01\x22\x33\x44"
|
||||||
self.set_cid(cid2)
|
self.set_cid(cid2)
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
self.set_cid(cid1)
|
self.set_cid(cid1)
|
||||||
self.send_data(CTAPHID.INIT, '\x11\x22\x33\x44\x55\x66\x77\x88')
|
self.send_data(CTAPHID.INIT, "\x11\x22\x33\x44\x55\x66\x77\x88")
|
||||||
self.send_raw('\x81\x00\x63') # echo 99 bytes first channel
|
self.send_raw("\x81\x00\x63") # echo 99 bytes first channel
|
||||||
|
|
||||||
self.set_cid(cid2) # send ping on 2nd channel
|
self.set_cid(cid2) # send ping on 2nd channel
|
||||||
self.send_raw('\x81\x00\x63')
|
self.send_raw("\x81\x00\x63")
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
|
|
||||||
cmd, r = self.recv_raw() # busy response
|
cmd, r = self.recv_raw() # busy response
|
||||||
|
|
||||||
self.set_cid(cid1) # finish 1st channel ping
|
self.set_cid(cid1) # finish 1st channel ping
|
||||||
self.send_raw('\x00')
|
self.send_raw("\x00")
|
||||||
|
|
||||||
self.set_cid(cid2)
|
self.set_cid(cid2)
|
||||||
|
|
||||||
@ -349,35 +349,35 @@ class Tester:
|
|||||||
cmd, r = self.recv_raw() # timeout
|
cmd, r = self.recv_raw() # timeout
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.TIMEOUT
|
assert r[0] == CtapError.ERR.TIMEOUT
|
||||||
print('PASS: busy interleaved')
|
print("PASS: busy interleaved")
|
||||||
|
|
||||||
if check_timeouts:
|
if check_timeouts:
|
||||||
print('Test idle, wait for timeout')
|
print("Test idle, wait for timeout")
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
try:
|
try:
|
||||||
cmd, resp = self.recv_raw()
|
cmd, resp = self.recv_raw()
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
print('Pass: Idle')
|
print("Pass: Idle")
|
||||||
|
|
||||||
print('Test cid 0 is invalid')
|
print("Test cid 0 is invalid")
|
||||||
self.set_cid('\x00\x00\x00\x00')
|
self.set_cid("\x00\x00\x00\x00")
|
||||||
self.send_raw(
|
self.send_raw(
|
||||||
'\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid='\x00\x00\x00\x00'
|
"\x86\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88", cid="\x00\x00\x00\x00"
|
||||||
)
|
)
|
||||||
cmd, r = self.recv_raw() # timeout
|
cmd, r = self.recv_raw() # timeout
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.INVALID_CHANNEL
|
assert r[0] == CtapError.ERR.INVALID_CHANNEL
|
||||||
print('Pass: cid 0')
|
print("Pass: cid 0")
|
||||||
|
|
||||||
print('Test invalid broadcast cid use')
|
print("Test invalid broadcast cid use")
|
||||||
self.set_cid('\xff\xff\xff\xff')
|
self.set_cid("\xff\xff\xff\xff")
|
||||||
self.send_raw(
|
self.send_raw(
|
||||||
'\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88', cid='\xff\xff\xff\xff'
|
"\x81\x00\x08\x11\x22\x33\x44\x55\x66\x77\x88", cid="\xff\xff\xff\xff"
|
||||||
)
|
)
|
||||||
cmd, r = self.recv_raw() # timeout
|
cmd, r = self.recv_raw() # timeout
|
||||||
assert cmd == 0xBF
|
assert cmd == 0xBF
|
||||||
assert r[0] == CtapError.ERR.INVALID_CHANNEL
|
assert r[0] == CtapError.ERR.INVALID_CHANNEL
|
||||||
print('Pass: cid broadcast')
|
print("Pass: cid broadcast")
|
||||||
|
|
||||||
def test_u2f(self,):
|
def test_u2f(self,):
|
||||||
pass
|
pass
|
||||||
@ -385,46 +385,46 @@ class Tester:
|
|||||||
def test_fido2_simple(self, pin_token=None):
|
def test_fido2_simple(self, pin_token=None):
|
||||||
creds = []
|
creds = []
|
||||||
exclude_list = []
|
exclude_list = []
|
||||||
rp = {'id': self.host, 'name': 'ExaRP'}
|
rp = {"id": self.host, "name": "ExaRP"}
|
||||||
user = {'id': b'usee_od', 'name': 'AB User'}
|
user = {"id": b"usee_od", "name": "AB User"}
|
||||||
challenge = 'Y2hhbGxlbmdl'
|
challenge = "Y2hhbGxlbmdl"
|
||||||
PIN = pin_token
|
PIN = pin_token
|
||||||
|
|
||||||
fake_id1 = array.array('B', [randint(0, 255) for i in range(0, 150)]).tobytes()
|
fake_id1 = array.array("B", [randint(0, 255) for i in range(0, 150)]).tobytes()
|
||||||
fake_id2 = array.array('B', [randint(0, 255) for i in range(0, 73)]).tobytes()
|
fake_id2 = array.array("B", [randint(0, 255) for i in range(0, 73)]).tobytes()
|
||||||
|
|
||||||
exclude_list.append({'id': fake_id1, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id1, "type": "public-key"})
|
||||||
exclude_list.append({'id': fake_id2, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id2, "type": "public-key"})
|
||||||
|
|
||||||
print('MC')
|
print("MC")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user, challenge, pin=PIN, exclude_list=[]
|
rp, user, challenge, pin=PIN, exclude_list=[]
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
cred = attest.auth_data.credential_data
|
cred = attest.auth_data.credential_data
|
||||||
creds.append(cred)
|
creds.append(cred)
|
||||||
|
|
||||||
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
|
allow_list = [{"id": creds[0].credential_id, "type": "public-key"}]
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, allow_list, pin=PIN
|
rp["id"], challenge, allow_list, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assertions[0].verify(client_data.hash, creds[0].public_key)
|
assertions[0].verify(client_data.hash, creds[0].public_key)
|
||||||
|
|
||||||
print('Assertion valid (%d ms)' % (t2 - t1))
|
print("Assertion valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
def test_fido2_brute_force(self):
|
def test_fido2_brute_force(self):
|
||||||
creds = []
|
creds = []
|
||||||
exclude_list = []
|
exclude_list = []
|
||||||
rp = {'id': self.host, 'name': 'ExaRP'}
|
rp = {"id": self.host, "name": "ExaRP"}
|
||||||
user = {'id': b'usee_od', 'name': 'AB User'}
|
user = {"id": b"usee_od", "name": "AB User"}
|
||||||
PIN = None
|
PIN = None
|
||||||
abc = 'abcdefghijklnmopqrstuvwxyz'
|
abc = "abcdefghijklnmopqrstuvwxyz"
|
||||||
abc += abc.upper()
|
abc += abc.upper()
|
||||||
|
|
||||||
self.ctap.reset()
|
self.ctap.reset()
|
||||||
@ -432,17 +432,17 @@ class Tester:
|
|||||||
for i in range(0, 2048 ** 2):
|
for i in range(0, 2048 ** 2):
|
||||||
creds = []
|
creds = []
|
||||||
|
|
||||||
challenge = ''.join([abc[randint(0, len(abc) - 1)] for x in range(0, 32)])
|
challenge = "".join([abc[randint(0, len(abc) - 1)] for x in range(0, 32)])
|
||||||
|
|
||||||
fake_id1 = array.array(
|
fake_id1 = array.array(
|
||||||
'B', [randint(0, 255) for i in range(0, 150)]
|
"B", [randint(0, 255) for i in range(0, 150)]
|
||||||
).tostring()
|
).tostring()
|
||||||
fake_id2 = array.array(
|
fake_id2 = array.array(
|
||||||
'B', [randint(0, 255) for i in range(0, 73)]
|
"B", [randint(0, 255) for i in range(0, 73)]
|
||||||
).tostring()
|
).tostring()
|
||||||
|
|
||||||
exclude_list.append({'id': fake_id1, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id1, "type": "public-key"})
|
||||||
exclude_list.append({'id': fake_id2, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id2, "type": "public-key"})
|
||||||
|
|
||||||
# for i in range(0,2048**2):
|
# for i in range(0,2048**2):
|
||||||
for i in range(0, 1):
|
for i in range(0, 1):
|
||||||
@ -453,7 +453,7 @@ class Tester:
|
|||||||
print(attest.auth_data.counter)
|
print(attest.auth_data.counter)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
cred = attest.auth_data.credential_data
|
cred = attest.auth_data.credential_data
|
||||||
@ -461,39 +461,39 @@ class Tester:
|
|||||||
|
|
||||||
# for i in range(0,2048**2):
|
# for i in range(0,2048**2):
|
||||||
for i in range(0, 1):
|
for i in range(0, 1):
|
||||||
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
|
allow_list = [{"id": creds[0].credential_id, "type": "public-key"}]
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, allow_list, pin=PIN
|
rp["id"], challenge, allow_list, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assertions[0].verify(client_data.hash, creds[0].public_key)
|
assertions[0].verify(client_data.hash, creds[0].public_key)
|
||||||
print(assertions[0].auth_data.counter)
|
print(assertions[0].auth_data.counter)
|
||||||
|
|
||||||
print('Assertion valid (%d ms)' % (t2 - t1))
|
print("Assertion valid (%d ms)" % (t2 - t1))
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
def test_fido2(self):
|
def test_fido2(self):
|
||||||
def test(self, pincode=None):
|
def test(self, pincode=None):
|
||||||
creds = []
|
creds = []
|
||||||
exclude_list = []
|
exclude_list = []
|
||||||
rp = {'id': self.host, 'name': 'ExaRP'}
|
rp = {"id": self.host, "name": "ExaRP"}
|
||||||
user = {'id': b'usee_od', 'name': 'AB User'}
|
user = {"id": b"usee_od", "name": "AB User"}
|
||||||
challenge = 'Y2hhbGxlbmdl'
|
challenge = "Y2hhbGxlbmdl"
|
||||||
PIN = pincode
|
PIN = pincode
|
||||||
|
|
||||||
fake_id1 = array.array(
|
fake_id1 = array.array(
|
||||||
'B', [randint(0, 255) for i in range(0, 150)]
|
"B", [randint(0, 255) for i in range(0, 150)]
|
||||||
).tostring()
|
).tostring()
|
||||||
fake_id2 = array.array(
|
fake_id2 = array.array(
|
||||||
'B', [randint(0, 255) for i in range(0, 73)]
|
"B", [randint(0, 255) for i in range(0, 73)]
|
||||||
).tostring()
|
).tostring()
|
||||||
|
|
||||||
exclude_list.append({'id': fake_id1, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id1, "type": "public-key"})
|
||||||
exclude_list.append({'id': fake_id2, 'type': 'public-key'})
|
exclude_list.append({"id": fake_id2, "type": "public-key"})
|
||||||
|
|
||||||
# test make credential
|
# test make credential
|
||||||
print('make 3 credentials')
|
print("make 3 credentials")
|
||||||
for i in range(0, 3):
|
for i in range(0, 3):
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user, challenge, pin=PIN, exclude_list=[]
|
rp, user, challenge, pin=PIN, exclude_list=[]
|
||||||
@ -502,149 +502,149 @@ class Tester:
|
|||||||
cred = attest.auth_data.credential_data
|
cred = attest.auth_data.credential_data
|
||||||
creds.append(cred)
|
creds.append(cred)
|
||||||
print(cred)
|
print(cred)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
if PIN is not None:
|
if PIN is not None:
|
||||||
print('make credential with wrong pin code')
|
print("make credential with wrong pin code")
|
||||||
try:
|
try:
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user, challenge, pin=PIN + ' ', exclude_list=[]
|
rp, user, challenge, pin=PIN + " ", exclude_list=[]
|
||||||
)
|
)
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.PIN_INVALID
|
assert e.code == CtapError.ERR.PIN_INVALID
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('make credential with exclude list')
|
print("make credential with exclude list")
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user, challenge, pin=PIN, exclude_list=exclude_list
|
rp, user, challenge, pin=PIN, exclude_list=exclude_list
|
||||||
)
|
)
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
cred = attest.auth_data.credential_data
|
cred = attest.auth_data.credential_data
|
||||||
creds.append(cred)
|
creds.append(cred)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('make credential with exclude list including real credential')
|
print("make credential with exclude list including real credential")
|
||||||
real_excl = [{'id': cred.credential_id, 'type': 'public-key'}]
|
real_excl = [{"id": cred.credential_id, "type": "public-key"}]
|
||||||
try:
|
try:
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user, challenge, pin=PIN, exclude_list=exclude_list + real_excl
|
rp, user, challenge, pin=PIN, exclude_list=exclude_list + real_excl
|
||||||
)
|
)
|
||||||
raise RuntimeError('Exclude list did not return expected error')
|
raise RuntimeError("Exclude list did not return expected error")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.CREDENTIAL_EXCLUDED
|
assert e.code == CtapError.ERR.CREDENTIAL_EXCLUDED
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
assert e.cause.code == CtapError.ERR.CREDENTIAL_EXCLUDED
|
assert e.cause.code == CtapError.ERR.CREDENTIAL_EXCLUDED
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
for i, x in enumerate(creds):
|
for i, x in enumerate(creds):
|
||||||
print('get assertion %d' % i)
|
print("get assertion %d" % i)
|
||||||
allow_list = [{'id': x.credential_id, 'type': 'public-key'}]
|
allow_list = [{"id": x.credential_id, "type": "public-key"}]
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, allow_list, pin=PIN
|
rp["id"], challenge, allow_list, pin=PIN
|
||||||
)
|
)
|
||||||
assertions[0].verify(client_data.hash, x.public_key)
|
assertions[0].verify(client_data.hash, x.public_key)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
if PIN is not None:
|
if PIN is not None:
|
||||||
print('get assertion with wrong pin code')
|
print("get assertion with wrong pin code")
|
||||||
try:
|
try:
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, allow_list, pin=PIN + ' '
|
rp["id"], challenge, allow_list, pin=PIN + " "
|
||||||
)
|
)
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.PIN_INVALID
|
assert e.code == CtapError.ERR.PIN_INVALID
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('get multiple assertions')
|
print("get multiple assertions")
|
||||||
allow_list = [{'id': x.credential_id, 'type': 'public-key'} for x in creds]
|
allow_list = [{"id": x.credential_id, "type": "public-key"} for x in creds]
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, allow_list, pin=PIN
|
rp["id"], challenge, allow_list, pin=PIN
|
||||||
)
|
)
|
||||||
|
|
||||||
for ass, cred in zip(assertions, creds):
|
for ass, cred in zip(assertions, creds):
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
ass.verify(client_data.hash, cred.public_key)
|
ass.verify(client_data.hash, cred.public_key)
|
||||||
print('%d verified' % i)
|
print("%d verified" % i)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('Reset device')
|
print("Reset device")
|
||||||
try:
|
try:
|
||||||
self.ctap.reset()
|
self.ctap.reset()
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
print('Warning, reset failed: ', e)
|
print("Warning, reset failed: ", e)
|
||||||
pass
|
pass
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
test(self, None)
|
test(self, None)
|
||||||
|
|
||||||
print('Set a pin code')
|
print("Set a pin code")
|
||||||
PIN = '1122aabbwfg0h9g !@#=='
|
PIN = "1122aabbwfg0h9g !@#=="
|
||||||
self.client.pin_protocol.set_pin(PIN)
|
self.client.pin_protocol.set_pin(PIN)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('Illegally set pin code again')
|
print("Illegally set pin code again")
|
||||||
try:
|
try:
|
||||||
self.client.pin_protocol.set_pin(PIN)
|
self.client.pin_protocol.set_pin(PIN)
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.NOT_ALLOWED
|
assert e.code == CtapError.ERR.NOT_ALLOWED
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('Change pin code')
|
print("Change pin code")
|
||||||
PIN2 = PIN + '_pin2'
|
PIN2 = PIN + "_pin2"
|
||||||
self.client.pin_protocol.change_pin(PIN, PIN2)
|
self.client.pin_protocol.change_pin(PIN, PIN2)
|
||||||
PIN = PIN2
|
PIN = PIN2
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('Change pin code using wrong pin')
|
print("Change pin code using wrong pin")
|
||||||
try:
|
try:
|
||||||
self.client.pin_protocol.change_pin(PIN.replace('a', 'b'), '1234')
|
self.client.pin_protocol.change_pin(PIN.replace("a", "b"), "1234")
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
assert e.code == CtapError.ERR.PIN_INVALID
|
assert e.code == CtapError.ERR.PIN_INVALID
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('MC using wrong pin')
|
print("MC using wrong pin")
|
||||||
try:
|
try:
|
||||||
self.test_fido2_simple('abcd3')
|
self.test_fido2_simple("abcd3")
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
assert e.cause.code == CtapError.ERR.PIN_INVALID
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
print('get info')
|
print("get info")
|
||||||
inf = self.ctap.get_info()
|
inf = self.ctap.get_info()
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
self.test_fido2_simple(PIN)
|
self.test_fido2_simple(PIN)
|
||||||
|
|
||||||
print('Re-run make_credential and get_assertion tests with pin code')
|
print("Re-run make_credential and get_assertion tests with pin code")
|
||||||
test(self, PIN)
|
test(self, PIN)
|
||||||
|
|
||||||
print('Reset device')
|
print("Reset device")
|
||||||
try:
|
try:
|
||||||
self.ctap.reset()
|
self.ctap.reset()
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
print('Warning, reset failed: ', e)
|
print("Warning, reset failed: ", e)
|
||||||
print('PASS')
|
print("PASS")
|
||||||
|
|
||||||
def test_rk(self,):
|
def test_rk(self,):
|
||||||
creds = []
|
creds = []
|
||||||
rp = {'id': self.host, 'name': 'ExaRP'}
|
rp = {"id": self.host, "name": "ExaRP"}
|
||||||
user0 = {'id': b'first one', 'name': 'single User'}
|
user0 = {"id": b"first one", "name": "single User"}
|
||||||
|
|
||||||
users = [
|
users = [
|
||||||
{'id': b'user' + os.urandom(16), 'name': 'AB User'} for i in range(0, 2)
|
{"id": b"user" + os.urandom(16), "name": "AB User"} for i in range(0, 2)
|
||||||
]
|
]
|
||||||
challenge = 'Y2hhbGxlbmdl'
|
challenge = "Y2hhbGxlbmdl"
|
||||||
PIN = None
|
PIN = None
|
||||||
print('reset')
|
print("reset")
|
||||||
self.ctap.reset()
|
self.ctap.reset()
|
||||||
# if PIN: self.client.pin_protocol.set_pin(PIN)
|
# if PIN: self.client.pin_protocol.set_pin(PIN)
|
||||||
|
|
||||||
print('registering 1 user with RK')
|
print("registering 1 user with RK")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
rp, user0, challenge, pin=PIN, exclude_list=[], rk=True
|
rp, user0, challenge, pin=PIN, exclude_list=[], rk=True
|
||||||
@ -652,20 +652,20 @@ class Tester:
|
|||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
creds.append(attest.auth_data.credential_data)
|
creds.append(attest.auth_data.credential_data)
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
print('1 assertion')
|
print("1 assertion")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, pin=PIN
|
rp["id"], challenge, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assertions[0].verify(client_data.hash, creds[0].public_key)
|
assertions[0].verify(client_data.hash, creds[0].public_key)
|
||||||
print('Assertion valid (%d ms)' % (t2 - t1))
|
print("Assertion valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
print(assertions[0], client_data)
|
print(assertions[0], client_data)
|
||||||
|
|
||||||
print('registering %d users with RK' % len(users))
|
print("registering %d users with RK" % len(users))
|
||||||
for i in range(0, len(users)):
|
for i in range(0, len(users)):
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
@ -673,22 +673,22 @@ class Tester:
|
|||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
creds.append(attest.auth_data.credential_data)
|
creds.append(attest.auth_data.credential_data)
|
||||||
|
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, pin=PIN
|
rp["id"], challenge, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
|
|
||||||
for x, y in zip(assertions, creds):
|
for x, y in zip(assertions, creds):
|
||||||
x.verify(client_data.hash, y.public_key)
|
x.verify(client_data.hash, y.public_key)
|
||||||
|
|
||||||
print('Assertion(s) valid (%d ms)' % (t2 - t1))
|
print("Assertion(s) valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
print('registering a duplicate user ')
|
print("registering a duplicate user ")
|
||||||
|
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(
|
||||||
@ -697,24 +697,24 @@ class Tester:
|
|||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
creds = creds[:2] + creds[3:] + [attest.auth_data.credential_data]
|
creds = creds[:2] + creds[3:] + [attest.auth_data.credential_data]
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = self.client.get_assertion(
|
assertions, client_data = self.client.get_assertion(
|
||||||
rp['id'], challenge, pin=PIN
|
rp["id"], challenge, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assert len(assertions) == len(users) + 1
|
assert len(assertions) == len(users) + 1
|
||||||
for x, y in zip(assertions, creds):
|
for x, y in zip(assertions, creds):
|
||||||
x.verify(client_data.hash, y.public_key)
|
x.verify(client_data.hash, y.public_key)
|
||||||
|
|
||||||
print('Assertion(s) valid (%d ms)' % (t2 - t1))
|
print("Assertion(s) valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
def test_responses(self,):
|
def test_responses(self,):
|
||||||
PIN = '1234'
|
PIN = "1234"
|
||||||
RPID = self.host
|
RPID = self.host
|
||||||
for dev in CtapHidDevice.list_devices():
|
for dev in CtapHidDevice.list_devices():
|
||||||
print('dev', dev)
|
print("dev", dev)
|
||||||
client = Fido2Client(dev, RPID)
|
client = Fido2Client(dev, RPID)
|
||||||
ctap = client.ctap2
|
ctap = client.ctap2
|
||||||
# ctap.reset()
|
# ctap.reset()
|
||||||
@ -726,17 +726,17 @@ class Tester:
|
|||||||
|
|
||||||
inf = ctap.get_info()
|
inf = ctap.get_info()
|
||||||
# print (inf)
|
# print (inf)
|
||||||
print('versions: ', inf.versions)
|
print("versions: ", inf.versions)
|
||||||
print('aaguid: ', inf.aaguid)
|
print("aaguid: ", inf.aaguid)
|
||||||
print('rk: ', inf.options['rk'])
|
print("rk: ", inf.options["rk"])
|
||||||
print('clientPin: ', inf.options['clientPin'])
|
print("clientPin: ", inf.options["clientPin"])
|
||||||
print('max_message_size: ', inf.max_msg_size)
|
print("max_message_size: ", inf.max_msg_size)
|
||||||
|
|
||||||
# rp = {'id': 'SelectDevice', 'name': 'SelectDevice'}
|
# rp = {'id': 'SelectDevice', 'name': 'SelectDevice'}
|
||||||
rp = {'id': RPID, 'name': 'ExaRP'}
|
rp = {"id": RPID, "name": "ExaRP"}
|
||||||
user = {'id': os.urandom(10), 'name': 'SelectDevice'}
|
user = {"id": os.urandom(10), "name": "SelectDevice"}
|
||||||
user = {'id': b'21first one', 'name': 'single User'}
|
user = {"id": b"21first one", "name": "single User"}
|
||||||
challenge = 'Y2hhbGxlbmdl'
|
challenge = "Y2hhbGxlbmdl"
|
||||||
|
|
||||||
if 1:
|
if 1:
|
||||||
attest, data = client.make_credential(
|
attest, data = client.make_credential(
|
||||||
@ -746,15 +746,15 @@ class Tester:
|
|||||||
cred = attest.auth_data.credential_data
|
cred = attest.auth_data.credential_data
|
||||||
creds = [cred]
|
creds = [cred]
|
||||||
|
|
||||||
allow_list = [{'id': creds[0].credential_id, 'type': 'public-key'}]
|
allow_list = [{"id": creds[0].credential_id, "type": "public-key"}]
|
||||||
allow_list = []
|
allow_list = []
|
||||||
assertions, client_data = client.get_assertion(
|
assertions, client_data = client.get_assertion(
|
||||||
rp['id'], challenge, pin=PIN
|
rp["id"], challenge, pin=PIN
|
||||||
)
|
)
|
||||||
assertions[0].verify(client_data.hash, creds[0].public_key)
|
assertions[0].verify(client_data.hash, creds[0].public_key)
|
||||||
|
|
||||||
if 0:
|
if 0:
|
||||||
print('registering 1 user with RK')
|
print("registering 1 user with RK")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
attest, data = client.make_credential(
|
attest, data = client.make_credential(
|
||||||
rp, user, challenge, pin=PIN, exclude_list=[], rk=True
|
rp, user, challenge, pin=PIN, exclude_list=[], rk=True
|
||||||
@ -762,23 +762,23 @@ class Tester:
|
|||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
creds = [attest.auth_data.credential_data]
|
creds = [attest.auth_data.credential_data]
|
||||||
print('Register valid (%d ms)' % (t2 - t1))
|
print("Register valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
print('1 assertion')
|
print("1 assertion")
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
assertions, client_data = client.get_assertion(
|
assertions, client_data = client.get_assertion(
|
||||||
rp['id'], challenge, pin=PIN
|
rp["id"], challenge, pin=PIN
|
||||||
)
|
)
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
assertions[0].verify(client_data.hash, creds[0].public_key)
|
assertions[0].verify(client_data.hash, creds[0].public_key)
|
||||||
print('Assertion valid (%d ms)' % (t2 - t1))
|
print("Assertion valid (%d ms)" % (t2 - t1))
|
||||||
|
|
||||||
# print('fmt:',attest.fmt)
|
# print('fmt:',attest.fmt)
|
||||||
# print('rp_id_hash',attest.auth_data.rp_id_hash)
|
# print('rp_id_hash',attest.auth_data.rp_id_hash)
|
||||||
# print('flags:', hex(attest.auth_data.flags))
|
# print('flags:', hex(attest.auth_data.flags))
|
||||||
# print('count:', hex(attest.auth_data.counter))
|
# print('count:', hex(attest.auth_data.counter))
|
||||||
print('flags MC:', attest.auth_data)
|
print("flags MC:", attest.auth_data)
|
||||||
print('flags GA:', assertions[0].auth_data)
|
print("flags GA:", assertions[0].auth_data)
|
||||||
# print('cred_id:',attest.auth_data.credential_data.credential_id)
|
# print('cred_id:',attest.auth_data.credential_data.credential_id)
|
||||||
# print('pubkey:',attest.auth_data.credential_data.public_key)
|
# print('pubkey:',attest.auth_data.credential_data.public_key)
|
||||||
# print('aaguid:',attest.auth_data.credential_data.aaguid)
|
# print('aaguid:',attest.auth_data.credential_data.aaguid)
|
||||||
@ -790,8 +790,8 @@ class Tester:
|
|||||||
# print('x5c:',attest.att_statement['x5c'])
|
# print('x5c:',attest.att_statement['x5c'])
|
||||||
# print('data:',data)
|
# print('data:',data)
|
||||||
|
|
||||||
print('assertion:', assertions[0])
|
print("assertion:", assertions[0])
|
||||||
print('clientData:', client_data)
|
print("clientData:", client_data)
|
||||||
|
|
||||||
print()
|
print()
|
||||||
# break
|
# break
|
||||||
@ -804,12 +804,12 @@ def test_find_brute_force():
|
|||||||
t = Tester()
|
t = Tester()
|
||||||
t.find_device()
|
t.find_device()
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
print('connected %d (%d ms)' % (i, t2 - t1))
|
print("connected %d (%d ms)" % (i, t2 - t1))
|
||||||
i += 1
|
i += 1
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
t = Tester()
|
t = Tester()
|
||||||
t.find_device()
|
t.find_device()
|
||||||
# t.test_hid()
|
# t.test_hid()
|
||||||
|
@ -24,7 +24,7 @@ from sign_firmware import *
|
|||||||
httpport = 8080
|
httpport = 8080
|
||||||
udpport = 8111
|
udpport = 8111
|
||||||
|
|
||||||
HEX_FILE = '../efm32/GNU ARM v7.2.1 - Debug/EFM32.hex'
|
HEX_FILE = "../efm32/GNU ARM v7.2.1 - Debug/EFM32.hex"
|
||||||
|
|
||||||
|
|
||||||
def ForceU2F(client, device):
|
def ForceU2F(client, device):
|
||||||
@ -34,13 +34,13 @@ def ForceU2F(client, device):
|
|||||||
client._do_get_assertion = client._ctap1_get_assertion
|
client._do_get_assertion = client._ctap1_get_assertion
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
dev = next(CtapHidDevice.list_devices(), None)
|
dev = next(CtapHidDevice.list_devices(), None)
|
||||||
print(dev)
|
print(dev)
|
||||||
if not dev:
|
if not dev:
|
||||||
raise RuntimeError('No FIDO device found')
|
raise RuntimeError("No FIDO device found")
|
||||||
client = Fido2Client(dev, 'https://example.com')
|
client = Fido2Client(dev, "https://example.com")
|
||||||
ForceU2F(client, dev)
|
ForceU2F(client, dev)
|
||||||
ctap = client.ctap
|
ctap = client.ctap
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -50,8 +50,8 @@ if __name__ == '__main__':
|
|||||||
def write(data):
|
def write(data):
|
||||||
msg = from_websafe(data)
|
msg = from_websafe(data)
|
||||||
msg = base64.b64decode(msg)
|
msg = base64.b64decode(msg)
|
||||||
chal = b'A' * 32
|
chal = b"A" * 32
|
||||||
appid = b'A' * 32
|
appid = b"A" * 32
|
||||||
# print (msg)
|
# print (msg)
|
||||||
# print (msg.decode())
|
# print (msg.decode())
|
||||||
# print (str(msg))
|
# print (str(msg))
|
||||||
@ -80,40 +80,40 @@ def read():
|
|||||||
|
|
||||||
class UDPBridge(BaseHTTPRequestHandler):
|
class UDPBridge(BaseHTTPRequestHandler):
|
||||||
def end_headers(self):
|
def end_headers(self):
|
||||||
self.send_header('Access-Control-Allow-Origin', '*')
|
self.send_header("Access-Control-Allow-Origin", "*")
|
||||||
BaseHTTPRequestHandler.end_headers(self)
|
BaseHTTPRequestHandler.end_headers(self)
|
||||||
|
|
||||||
def do_POST(self):
|
def do_POST(self):
|
||||||
content_len = int(self.headers.get('Content-Length', 0))
|
content_len = int(self.headers.get("Content-Length", 0))
|
||||||
post_body = self.rfile.read(content_len)
|
post_body = self.rfile.read(content_len)
|
||||||
data = json.loads(post_body)['data']
|
data = json.loads(post_body)["data"]
|
||||||
|
|
||||||
print(data)
|
print(data)
|
||||||
msg = from_websafe(data)
|
msg = from_websafe(data)
|
||||||
msg = base64.b64decode(msg)
|
msg = base64.b64decode(msg)
|
||||||
chal = b"\xf6\xa2\x3c\xa4\x0a\xf9\xda\xd4\x5f\xdc\xba\x7d\xc9\xde\xcb\xed\xb5\x84\x64\x3a\x4c\x9f\x44\xc2\x04\xb0\x17\xd7\xf4\x3e\xe0\x3f"
|
chal = b"\xf6\xa2\x3c\xa4\x0a\xf9\xda\xd4\x5f\xdc\xba\x7d\xc9\xde\xcb\xed\xb5\x84\x64\x3a\x4c\x9f\x44\xc2\x04\xb0\x17\xd7\xf4\x3e\xe0\x3f"
|
||||||
appid = b'A' * 32
|
appid = b"A" * 32
|
||||||
|
|
||||||
s = ctap.authenticate(chal, appid, msg)
|
s = ctap.authenticate(chal, appid, msg)
|
||||||
|
|
||||||
data = (
|
data = (
|
||||||
struct.pack('B', s.user_presence)
|
struct.pack("B", s.user_presence)
|
||||||
+ struct.pack('>L', s.counter)
|
+ struct.pack(">L", s.counter)
|
||||||
+ s.signature
|
+ s.signature
|
||||||
)
|
)
|
||||||
data = base64.b64encode(data).decode('ascii')
|
data = base64.b64encode(data).decode("ascii")
|
||||||
data = to_websafe(data)
|
data = to_websafe(data)
|
||||||
data = json.dumps({'data': data})
|
data = json.dumps({"data": data})
|
||||||
data = data.encode('ascii')
|
data = data.encode("ascii")
|
||||||
|
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header('Content-type', 'text/json')
|
self.send_header("Content-type", "text/json")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(data)
|
self.wfile.write(data)
|
||||||
|
|
||||||
def do_GET(self):
|
def do_GET(self):
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header('Content-type', 'text/json')
|
self.send_header("Content-type", "text/json")
|
||||||
|
|
||||||
msg = get_firmware_object("signing_key.pem", HEX_FILE)
|
msg = get_firmware_object("signing_key.pem", HEX_FILE)
|
||||||
|
|
||||||
@ -123,19 +123,19 @@ class UDPBridge(BaseHTTPRequestHandler):
|
|||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
server = HTTPServer(('', httpport), UDPBridge)
|
server = HTTPServer(("", httpport), UDPBridge)
|
||||||
print('Started httpserver on port ', httpport)
|
print("Started httpserver on port ", httpport)
|
||||||
|
|
||||||
server.socket = ssl.wrap_socket(
|
server.socket = ssl.wrap_socket(
|
||||||
server.socket,
|
server.socket,
|
||||||
keyfile="../web/localhost.key",
|
keyfile="../web/localhost.key",
|
||||||
certfile='../web/localhost.crt',
|
certfile="../web/localhost.crt",
|
||||||
server_side=True,
|
server_side=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
print('Saving signed firmware to firmware.json')
|
print("Saving signed firmware to firmware.json")
|
||||||
msg = get_firmware_object("signing_key.pem", HEX_FILE)
|
msg = get_firmware_object("signing_key.pem", HEX_FILE)
|
||||||
wfile = open('firmware.json', 'wb+')
|
wfile = open("firmware.json", "wb+")
|
||||||
wfile.write(json.dumps(msg).encode())
|
wfile.write(json.dumps(msg).encode())
|
||||||
wfile.close()
|
wfile.close()
|
||||||
|
|
||||||
|
@ -22,9 +22,9 @@ for p in Chameleon.Device.listDevices():
|
|||||||
if p:
|
if p:
|
||||||
chameleon.connect(p)
|
chameleon.connect(p)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError('No chameleon mini connected')
|
raise RuntimeError("No chameleon mini connected")
|
||||||
|
|
||||||
chameleon.execCmd('LOGMODE=LIVE')
|
chameleon.execCmd("LOGMODE=LIVE")
|
||||||
|
|
||||||
while 1:
|
while 1:
|
||||||
b = chameleon.read(1, 20)
|
b = chameleon.read(1, 20)
|
||||||
@ -33,4 +33,4 @@ while 1:
|
|||||||
sys.stdout.write(h)
|
sys.stdout.write(h)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
chameleon.execCmd('LOGMODE=NONE')
|
chameleon.execCmd("LOGMODE=NONE")
|
||||||
|
@ -39,27 +39,27 @@ import serial
|
|||||||
|
|
||||||
|
|
||||||
def to_websafe(data):
|
def to_websafe(data):
|
||||||
data = data.replace('+', '-')
|
data = data.replace("+", "-")
|
||||||
data = data.replace('/', '_')
|
data = data.replace("/", "_")
|
||||||
data = data.replace('=', '')
|
data = data.replace("=", "")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def from_websafe(data):
|
def from_websafe(data):
|
||||||
data = data.replace('-', '+')
|
data = data.replace("-", "+")
|
||||||
data = data.replace('_', '/')
|
data = data.replace("_", "/")
|
||||||
return data + '=='[: (3 * len(data)) % 4]
|
return data + "=="[: (3 * len(data)) % 4]
|
||||||
|
|
||||||
|
|
||||||
def get_firmware_object(sk_name, hex_file):
|
def get_firmware_object(sk_name, hex_file):
|
||||||
from ecdsa import SigningKey, NIST256p
|
from ecdsa import SigningKey, NIST256p
|
||||||
|
|
||||||
sk = SigningKey.from_pem(open(sk_name).read())
|
sk = SigningKey.from_pem(open(sk_name).read())
|
||||||
fw = open(hex_file, 'r').read()
|
fw = open(hex_file, "r").read()
|
||||||
fw = base64.b64encode(fw.encode())
|
fw = base64.b64encode(fw.encode())
|
||||||
fw = to_websafe(fw.decode())
|
fw = to_websafe(fw.decode())
|
||||||
ih = IntelHex()
|
ih = IntelHex()
|
||||||
ih.fromfile(hex_file, format='hex')
|
ih.fromfile(hex_file, format="hex")
|
||||||
# start of firmware and the size of the flash region allocated for it.
|
# start of firmware and the size of the flash region allocated for it.
|
||||||
# TODO put this somewhere else.
|
# TODO put this somewhere else.
|
||||||
START = ih.segments()[0][0]
|
START = ih.segments()[0][0]
|
||||||
@ -71,29 +71,31 @@ def get_firmware_object(sk_name, hex_file):
|
|||||||
|
|
||||||
im_size = END - START
|
im_size = END - START
|
||||||
|
|
||||||
print('im_size: ', im_size)
|
print("im_size: ", im_size)
|
||||||
print('firmware_size: ', len(arr))
|
print("firmware_size: ", len(arr))
|
||||||
|
|
||||||
byts = (arr).tobytes() if hasattr(arr, 'tobytes') else (arr).tostring()
|
byts = (arr).tobytes() if hasattr(arr, "tobytes") else (arr).tostring()
|
||||||
h = sha256()
|
h = sha256()
|
||||||
h.update(byts)
|
h.update(byts)
|
||||||
sig = binascii.unhexlify(h.hexdigest())
|
sig = binascii.unhexlify(h.hexdigest())
|
||||||
print('hash', binascii.hexlify(sig))
|
print("hash", binascii.hexlify(sig))
|
||||||
sig = sk.sign_digest(sig)
|
sig = sk.sign_digest(sig)
|
||||||
|
|
||||||
print('sig', binascii.hexlify(sig))
|
print("sig", binascii.hexlify(sig))
|
||||||
|
|
||||||
sig = base64.b64encode(sig)
|
sig = base64.b64encode(sig)
|
||||||
sig = to_websafe(sig.decode())
|
sig = to_websafe(sig.decode())
|
||||||
|
|
||||||
# msg = {'data': read()}
|
# msg = {'data': read()}
|
||||||
msg = {'firmware': fw, 'signature': sig}
|
msg = {"firmware": fw, "signature": sig}
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
class SoloExtension:
|
class SoloExtension:
|
||||||
version= 0x14
|
version = 0x14
|
||||||
rng = 0x15
|
rng = 0x15
|
||||||
|
|
||||||
|
|
||||||
class SoloBootloader:
|
class SoloBootloader:
|
||||||
write = 0x40
|
write = 0x40
|
||||||
done = 0x41
|
done = 0x41
|
||||||
@ -109,13 +111,13 @@ class SoloBootloader:
|
|||||||
HIDCommandEnterSTBoot = 0x52
|
HIDCommandEnterSTBoot = 0x52
|
||||||
HIDCommandRNG = 0x60
|
HIDCommandRNG = 0x60
|
||||||
|
|
||||||
TAG = b'\x8C\x27\x90\xf6'
|
TAG = b"\x8C\x27\x90\xf6"
|
||||||
|
|
||||||
|
|
||||||
class SoloClient:
|
class SoloClient:
|
||||||
def __init__(self,):
|
def __init__(self,):
|
||||||
self.origin = 'https://example.org'
|
self.origin = "https://example.org"
|
||||||
self.host = 'example.org'
|
self.host = "example.org"
|
||||||
self.exchange = self.exchange_hid
|
self.exchange = self.exchange_hid
|
||||||
self.do_reboot = True
|
self.do_reboot = True
|
||||||
|
|
||||||
@ -139,52 +141,52 @@ class SoloClient:
|
|||||||
def find_device(self,):
|
def find_device(self,):
|
||||||
dev = next(CtapHidDevice.list_devices(), None)
|
dev = next(CtapHidDevice.list_devices(), None)
|
||||||
if not dev:
|
if not dev:
|
||||||
raise RuntimeError('No FIDO device found')
|
raise RuntimeError("No FIDO device found")
|
||||||
self.dev = dev
|
self.dev = dev
|
||||||
self.ctap1 = CTAP1(dev)
|
self.ctap1 = CTAP1(dev)
|
||||||
self.ctap2 = CTAP2(dev)
|
self.ctap2 = CTAP2(dev)
|
||||||
self.client = Fido2Client(dev, self.origin)
|
self.client = Fido2Client(dev, self.origin)
|
||||||
|
|
||||||
if self.exchange == self.exchange_hid:
|
if self.exchange == self.exchange_hid:
|
||||||
self.send_data_hid(CTAPHID.INIT, '\x11\x11\x11\x11\x11\x11\x11\x11')
|
self.send_data_hid(CTAPHID.INIT, "\x11\x11\x11\x11\x11\x11\x11\x11")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def format_request(cmd, addr=0, data=b'A' * 16):
|
def format_request(cmd, addr=0, data=b"A" * 16):
|
||||||
arr = b'\x00' * 9
|
arr = b"\x00" * 9
|
||||||
addr = struct.pack('<L', addr)
|
addr = struct.pack("<L", addr)
|
||||||
cmd = struct.pack('B', cmd)
|
cmd = struct.pack("B", cmd)
|
||||||
length = struct.pack('>H', len(data))
|
length = struct.pack(">H", len(data))
|
||||||
|
|
||||||
return cmd + addr[:3] + SoloBootloader.TAG + length + data
|
return cmd + addr[:3] + SoloBootloader.TAG + length + data
|
||||||
|
|
||||||
def send_only_hid(self, cmd, data):
|
def send_only_hid(self, cmd, data):
|
||||||
if type(data) != type(b''):
|
if type(data) != type(b""):
|
||||||
data = struct.pack('%dB' % len(data), *[ord(x) for x in data])
|
data = struct.pack("%dB" % len(data), *[ord(x) for x in data])
|
||||||
self.dev._dev.InternalSend(0x80 | cmd, bytearray(data))
|
self.dev._dev.InternalSend(0x80 | cmd, bytearray(data))
|
||||||
|
|
||||||
def send_data_hid(self, cmd, data):
|
def send_data_hid(self, cmd, data):
|
||||||
if type(data) != type(b''):
|
if type(data) != type(b""):
|
||||||
data = struct.pack('%dB' % len(data), *[ord(x) for x in data])
|
data = struct.pack("%dB" % len(data), *[ord(x) for x in data])
|
||||||
with Timeout(1.0) as event:
|
with Timeout(1.0) as event:
|
||||||
return self.dev.call(cmd, data, event)
|
return self.dev.call(cmd, data, event)
|
||||||
|
|
||||||
def exchange_hid(self, cmd, addr=0, data=b'A' * 16):
|
def exchange_hid(self, cmd, addr=0, data=b"A" * 16):
|
||||||
req = SoloClient.format_request(cmd, addr, data)
|
req = SoloClient.format_request(cmd, addr, data)
|
||||||
|
|
||||||
data = self.send_data_hid(SoloBootloader.HIDCommandBoot, req)
|
data = self.send_data_hid(SoloBootloader.HIDCommandBoot, req)
|
||||||
|
|
||||||
ret = data[0]
|
ret = data[0]
|
||||||
if ret != CtapError.ERR.SUCCESS:
|
if ret != CtapError.ERR.SUCCESS:
|
||||||
str = ''
|
str = ""
|
||||||
if ret == CtapError.ERR.NOT_ALLOWED:
|
if ret == CtapError.ERR.NOT_ALLOWED:
|
||||||
str = 'Out of bounds write'
|
str = "Out of bounds write"
|
||||||
raise RuntimeError('Device returned non-success code %02x: %s' % (ret, str))
|
raise RuntimeError("Device returned non-success code %02x: %s" % (ret, str))
|
||||||
|
|
||||||
return data[1:]
|
return data[1:]
|
||||||
|
|
||||||
def exchange_u2f(self, cmd, addr=0, data=b'A' * 16):
|
def exchange_u2f(self, cmd, addr=0, data=b"A" * 16):
|
||||||
appid = b'A' * 32
|
appid = b"A" * 32
|
||||||
chal = b'B' * 32
|
chal = b"B" * 32
|
||||||
|
|
||||||
req = SoloClient.format_request(cmd, addr, data)
|
req = SoloClient.format_request(cmd, addr, data)
|
||||||
|
|
||||||
@ -192,28 +194,28 @@ class SoloClient:
|
|||||||
|
|
||||||
ret = res.signature[0]
|
ret = res.signature[0]
|
||||||
if ret != CtapError.ERR.SUCCESS:
|
if ret != CtapError.ERR.SUCCESS:
|
||||||
str = ''
|
str = ""
|
||||||
if ret == CtapError.ERR.NOT_ALLOWED:
|
if ret == CtapError.ERR.NOT_ALLOWED:
|
||||||
str = 'Out of bounds write'
|
str = "Out of bounds write"
|
||||||
raise RuntimeError('Device returned non-success code %02x: %s' % (ret, str))
|
raise RuntimeError("Device returned non-success code %02x: %s" % (ret, str))
|
||||||
|
|
||||||
return res.signature[1:]
|
return res.signature[1:]
|
||||||
|
|
||||||
def bootloader_version(self,):
|
def bootloader_version(self,):
|
||||||
data = self.exchange(SoloBootloader.version)
|
data = self.exchange(SoloBootloader.version)
|
||||||
if len(data) > 2:
|
if len(data) > 2:
|
||||||
return (data[0],data[1],data[2])
|
return (data[0], data[1], data[2])
|
||||||
return data[0]
|
return data[0]
|
||||||
|
|
||||||
def solo_version(self,):
|
def solo_version(self,):
|
||||||
data = self.exchange_u2f(SoloExtension.version)
|
data = self.exchange_u2f(SoloExtension.version)
|
||||||
return (data[0],data[1],data[2])
|
return (data[0], data[1], data[2])
|
||||||
|
|
||||||
def write_flash(self, addr, data):
|
def write_flash(self, addr, data):
|
||||||
self.exchange(SoloBootloader.write, addr, data)
|
self.exchange(SoloBootloader.write, addr, data)
|
||||||
|
|
||||||
def get_rng(self, num=0):
|
def get_rng(self, num=0):
|
||||||
ret = self.send_data_hid(SoloBootloader.HIDCommandRNG, struct.pack('B', num))
|
ret = self.send_data_hid(SoloBootloader.HIDCommandRNG, struct.pack("B", num))
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def verify_flash(self, sig):
|
def verify_flash(self, sig):
|
||||||
@ -225,34 +227,27 @@ class SoloClient:
|
|||||||
self.exchange(SoloBootloader.done, 0, sig)
|
self.exchange(SoloBootloader.done, 0, sig)
|
||||||
|
|
||||||
def wink(self,):
|
def wink(self,):
|
||||||
self.send_data_hid(CTAPHID.WINK, b'')
|
self.send_data_hid(CTAPHID.WINK, b"")
|
||||||
|
|
||||||
def reset(self,):
|
def reset(self,):
|
||||||
self.ctap2.reset()
|
self.ctap2.reset()
|
||||||
|
|
||||||
def make_credential(self,):
|
def make_credential(self,):
|
||||||
rp = {'id': self.host, 'name': 'example site'}
|
rp = {"id": self.host, "name": "example site"}
|
||||||
user = {'id': b'abcdef', 'name': 'example user'}
|
user = {"id": b"abcdef", "name": "example user"}
|
||||||
challenge = 'Y2hhbGxlbmdl'
|
challenge = "Y2hhbGxlbmdl"
|
||||||
attest, data = self.client.make_credential(
|
attest, data = self.client.make_credential(rp, user, challenge, exclude_list=[])
|
||||||
rp, user, challenge, exclude_list=[]
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
attest.verify(data.hash)
|
attest.verify(data.hash)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
verifier = Attestation.for_type(attest.fmt)
|
verifier = Attestation.for_type(attest.fmt)
|
||||||
verifier().verify(
|
verifier().verify(attest.att_statement, attest.auth_data, data.hash)
|
||||||
attest.att_statement,
|
print("Register valid")
|
||||||
attest.auth_data,
|
x5c = attest.att_statement["x5c"][0]
|
||||||
data.hash
|
|
||||||
)
|
|
||||||
print('Register valid')
|
|
||||||
x5c = attest.att_statement['x5c'][0]
|
|
||||||
cert = x509.load_der_x509_certificate(x5c, default_backend())
|
cert = x509.load_der_x509_certificate(x5c, default_backend())
|
||||||
|
|
||||||
return cert
|
return cert
|
||||||
|
|
||||||
|
|
||||||
def enter_solo_bootloader(self,):
|
def enter_solo_bootloader(self,):
|
||||||
"""
|
"""
|
||||||
If solo is configured as solo hacker or something similar,
|
If solo is configured as solo hacker or something similar,
|
||||||
@ -260,8 +255,8 @@ class SoloClient:
|
|||||||
so it can be reprogrammed
|
so it can be reprogrammed
|
||||||
"""
|
"""
|
||||||
if self.exchange != self.exchange_hid:
|
if self.exchange != self.exchange_hid:
|
||||||
self.send_data_hid(CTAPHID.INIT, '\x11\x11\x11\x11\x11\x11\x11\x11')
|
self.send_data_hid(CTAPHID.INIT, "\x11\x11\x11\x11\x11\x11\x11\x11")
|
||||||
self.send_data_hid(SoloBootloader.HIDCommandEnterBoot, '')
|
self.send_data_hid(SoloBootloader.HIDCommandEnterBoot, "")
|
||||||
|
|
||||||
def is_solo_bootloader(self,):
|
def is_solo_bootloader(self,):
|
||||||
try:
|
try:
|
||||||
@ -286,7 +281,7 @@ class SoloClient:
|
|||||||
req = SoloClient.format_request(SoloBootloader.st_dfu)
|
req = SoloClient.format_request(SoloBootloader.st_dfu)
|
||||||
self.send_only_hid(SoloBootloader.HIDCommandBoot, req)
|
self.send_only_hid(SoloBootloader.HIDCommandBoot, req)
|
||||||
else:
|
else:
|
||||||
self.send_only_hid(SoloBootloader.HIDCommandEnterSTBoot, '')
|
self.send_only_hid(SoloBootloader.HIDCommandEnterSTBoot, "")
|
||||||
|
|
||||||
def disable_solo_bootloader(self,):
|
def disable_solo_bootloader(self,):
|
||||||
"""
|
"""
|
||||||
@ -295,10 +290,10 @@ class SoloClient:
|
|||||||
If you've started from a solo hacker, make you you've programmed a final/production build!
|
If you've started from a solo hacker, make you you've programmed a final/production build!
|
||||||
"""
|
"""
|
||||||
ret = self.exchange(
|
ret = self.exchange(
|
||||||
SoloBootloader.disable, 0, b'\xcd\xde\xba\xaa'
|
SoloBootloader.disable, 0, b"\xcd\xde\xba\xaa"
|
||||||
) # magic number
|
) # magic number
|
||||||
if ret[0] != CtapError.ERR.SUCCESS:
|
if ret[0] != CtapError.ERR.SUCCESS:
|
||||||
print('Failed to disable bootloader')
|
print("Failed to disable bootloader")
|
||||||
return False
|
return False
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
self.exchange(SoloBootloader.do_reboot)
|
self.exchange(SoloBootloader.do_reboot)
|
||||||
@ -306,22 +301,22 @@ class SoloClient:
|
|||||||
|
|
||||||
def program_file(self, name):
|
def program_file(self, name):
|
||||||
|
|
||||||
if name.lower().endswith('.json'):
|
if name.lower().endswith(".json"):
|
||||||
data = json.loads(open(name, 'r').read())
|
data = json.loads(open(name, "r").read())
|
||||||
fw = base64.b64decode(from_websafe(data['firmware']).encode())
|
fw = base64.b64decode(from_websafe(data["firmware"]).encode())
|
||||||
sig = base64.b64decode(from_websafe(data['signature']).encode())
|
sig = base64.b64decode(from_websafe(data["signature"]).encode())
|
||||||
ih = IntelHex()
|
ih = IntelHex()
|
||||||
tmp = tempfile.NamedTemporaryFile(delete=False)
|
tmp = tempfile.NamedTemporaryFile(delete=False)
|
||||||
tmp.write(fw)
|
tmp.write(fw)
|
||||||
tmp.seek(0)
|
tmp.seek(0)
|
||||||
tmp.close()
|
tmp.close()
|
||||||
ih.fromfile(tmp.name, format='hex')
|
ih.fromfile(tmp.name, format="hex")
|
||||||
else:
|
else:
|
||||||
if not name.lower().endswith('.hex'):
|
if not name.lower().endswith(".hex"):
|
||||||
print('Warning, assuming "%s" is an Intel Hex file.' % name)
|
print('Warning, assuming "%s" is an Intel Hex file.' % name)
|
||||||
sig = None
|
sig = None
|
||||||
ih = IntelHex()
|
ih = IntelHex()
|
||||||
ih.fromfile(name, format='hex')
|
ih.fromfile(name, format="hex")
|
||||||
|
|
||||||
if self.exchange == self.exchange_hid:
|
if self.exchange == self.exchange_hid:
|
||||||
chunk = 2048
|
chunk = 2048
|
||||||
@ -332,7 +327,7 @@ class SoloClient:
|
|||||||
size = seg[1] - seg[0]
|
size = seg[1] - seg[0]
|
||||||
total = 0
|
total = 0
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
print('erasing...')
|
print("erasing...")
|
||||||
for i in range(seg[0], seg[1], chunk):
|
for i in range(seg[0], seg[1], chunk):
|
||||||
s = i
|
s = i
|
||||||
e = min(i + chunk, seg[1])
|
e = min(i + chunk, seg[1])
|
||||||
@ -340,17 +335,17 @@ class SoloClient:
|
|||||||
self.write_flash(i, data)
|
self.write_flash(i, data)
|
||||||
total += chunk
|
total += chunk
|
||||||
progress = total / float(size) * 100
|
progress = total / float(size) * 100
|
||||||
sys.stdout.write('downloading %.2f%%...\r' % progress)
|
sys.stdout.write("downloading %.2f%%...\r" % progress)
|
||||||
sys.stdout.write('downloaded 100% \r\n')
|
sys.stdout.write("downloaded 100% \r\n")
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
print('time: %.2f s' % ((t2 - t1) / 1000.0))
|
print("time: %.2f s" % ((t2 - t1) / 1000.0))
|
||||||
|
|
||||||
print('Verifying...')
|
print("Verifying...")
|
||||||
if self.do_reboot:
|
if self.do_reboot:
|
||||||
if sig is not None:
|
if sig is not None:
|
||||||
self.verify_flash(sig)
|
self.verify_flash(sig)
|
||||||
else:
|
else:
|
||||||
self.verify_flash(b'A' * 64)
|
self.verify_flash(b"A" * 64)
|
||||||
|
|
||||||
|
|
||||||
class DFU:
|
class DFU:
|
||||||
@ -431,14 +426,14 @@ class DFUDevice:
|
|||||||
devs = usb.core.find(idVendor=0x0483, idProduct=0xDF11, find_all=1)
|
devs = usb.core.find(idVendor=0x0483, idProduct=0xDF11, find_all=1)
|
||||||
for x in devs:
|
for x in devs:
|
||||||
if ser == (usb.util.get_string(x, x.iSerialNumber)):
|
if ser == (usb.util.get_string(x, x.iSerialNumber)):
|
||||||
print('connecting to ', ser)
|
print("connecting to ", ser)
|
||||||
self.dev = x
|
self.dev = x
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.dev = usb.core.find(idVendor=0x0483, idProduct=0xDF11)
|
self.dev = usb.core.find(idVendor=0x0483, idProduct=0xDF11)
|
||||||
|
|
||||||
if self.dev is None:
|
if self.dev is None:
|
||||||
raise RuntimeError('No ST DFU devices found.')
|
raise RuntimeError("No ST DFU devices found.")
|
||||||
self.dev.set_configuration()
|
self.dev.set_configuration()
|
||||||
|
|
||||||
for cfg in self.dev:
|
for cfg in self.dev:
|
||||||
@ -449,7 +444,7 @@ class DFUDevice:
|
|||||||
self.intNum = intf.bInterfaceNumber
|
self.intNum = intf.bInterfaceNumber
|
||||||
return self.dev
|
return self.dev
|
||||||
|
|
||||||
raise RuntimeError('No ST DFU alternate-%d found.' % altsetting)
|
raise RuntimeError("No ST DFU alternate-%d found." % altsetting)
|
||||||
|
|
||||||
def init(self,):
|
def init(self,):
|
||||||
if self.state() == DFU.state.ERROR:
|
if self.state() == DFU.state.ERROR:
|
||||||
@ -510,7 +505,7 @@ class DFUDevice:
|
|||||||
self.clear_status()
|
self.clear_status()
|
||||||
self.clear_status()
|
self.clear_status()
|
||||||
if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE):
|
if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE):
|
||||||
raise RuntimeError('DFU device not in correct state for writing memory.')
|
raise RuntimeError("DFU device not in correct state for writing memory.")
|
||||||
|
|
||||||
oldaddr = addr
|
oldaddr = addr
|
||||||
addr = DFUDevice.addr2block(addr, len(data))
|
addr = DFUDevice.addr2block(addr, len(data))
|
||||||
@ -527,7 +522,7 @@ class DFUDevice:
|
|||||||
self.clear_status()
|
self.clear_status()
|
||||||
self.clear_status()
|
self.clear_status()
|
||||||
if self.state() not in (DFU.state.IDLE, DFU.state.UPLOAD_IDLE):
|
if self.state() not in (DFU.state.IDLE, DFU.state.UPLOAD_IDLE):
|
||||||
raise RuntimeError('DFU device not in correct state for reading memory.')
|
raise RuntimeError("DFU device not in correct state for reading memory.")
|
||||||
|
|
||||||
return self.upload(addr, size)
|
return self.upload(addr, size)
|
||||||
|
|
||||||
@ -542,7 +537,7 @@ class DFUDevice:
|
|||||||
self.clear_status()
|
self.clear_status()
|
||||||
self.clear_status()
|
self.clear_status()
|
||||||
if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE):
|
if self.state() not in (DFU.state.IDLE, DFU.state.DOWNLOAD_IDLE):
|
||||||
raise RuntimeError('DFU device not in correct state for detaching.')
|
raise RuntimeError("DFU device not in correct state for detaching.")
|
||||||
# self.set_addr(0x08000000)
|
# self.set_addr(0x08000000)
|
||||||
# self.block_on_state(DFU.state.DOWNLOAD_BUSY)
|
# self.block_on_state(DFU.state.DOWNLOAD_BUSY)
|
||||||
# assert(DFU.state.DOWNLOAD_IDLE == self.state())
|
# assert(DFU.state.DOWNLOAD_IDLE == self.state())
|
||||||
@ -572,15 +567,15 @@ def attempt_to_boot_bootloader(p):
|
|||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
if e.code == CtapError.ERR.INVALID_COMMAND:
|
if e.code == CtapError.ERR.INVALID_COMMAND:
|
||||||
print(
|
print(
|
||||||
'Solo appears to not be a solo hacker. Try holding down the button for 2 while you plug token in.'
|
"Solo appears to not be a solo hacker. Try holding down the button for 2 while you plug token in."
|
||||||
)
|
)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
else:
|
else:
|
||||||
raise (e)
|
raise (e)
|
||||||
print('Solo rebooted. Reconnecting...')
|
print("Solo rebooted. Reconnecting...")
|
||||||
time.sleep(0.500)
|
time.sleep(0.500)
|
||||||
if not attempt_to_find_device(p):
|
if not attempt_to_find_device(p):
|
||||||
raise RuntimeError('Failed to reconnect!')
|
raise RuntimeError("Failed to reconnect!")
|
||||||
|
|
||||||
|
|
||||||
def solo_main():
|
def solo_main():
|
||||||
@ -588,11 +583,19 @@ def solo_main():
|
|||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--rng",
|
"--rng",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Continuously dump random numbers generated from Solo.',
|
help="Continuously dump random numbers generated from Solo.",
|
||||||
|
)
|
||||||
|
parser.add_argument("--wink", action="store_true", help="HID Wink command.")
|
||||||
|
parser.add_argument(
|
||||||
|
"--reset",
|
||||||
|
action="store_true",
|
||||||
|
help="Issue a FIDO2 reset command. Warning: your credentials will be lost.",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--verify-solo",
|
||||||
|
action="store_true",
|
||||||
|
help="Verify that the Solo firmware is from SoloKeys. Check firmware version.",
|
||||||
)
|
)
|
||||||
parser.add_argument("--wink", action="store_true", help='HID Wink command.')
|
|
||||||
parser.add_argument("--reset", action="store_true", help='Issue a FIDO2 reset command. Warning: your credentials will be lost.')
|
|
||||||
parser.add_argument("--verify-solo", action="store_true", help='Verify that the Solo firmware is from SoloKeys. Check firmware version.')
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
p = SoloClient()
|
p = SoloClient()
|
||||||
@ -614,26 +617,26 @@ def solo_main():
|
|||||||
if args.verify_solo:
|
if args.verify_solo:
|
||||||
cert = p.make_credential()
|
cert = p.make_credential()
|
||||||
|
|
||||||
solo_fingerprint = b'r\xd5\x831&\xac\xfc\xe9\xa8\xe8&`\x18\xe6AI4\xc8\xbeJ\xb8h_\x91\xb0\x99!\x13\xbb\xd42\x95'
|
solo_fingerprint = b"r\xd5\x831&\xac\xfc\xe9\xa8\xe8&`\x18\xe6AI4\xc8\xbeJ\xb8h_\x91\xb0\x99!\x13\xbb\xd42\x95"
|
||||||
hacker_fingerprint = b"\xd0ml\xcb\xda}\xe5j\x16'\xc2\xa7\x89\x9c5\xa2\xa3\x16\xc8Q\xb3j\xd8\xed~\xd7\x84y\xbbx~\xf7"
|
hacker_fingerprint = b"\xd0ml\xcb\xda}\xe5j\x16'\xc2\xa7\x89\x9c5\xa2\xa3\x16\xc8Q\xb3j\xd8\xed~\xd7\x84y\xbbx~\xf7"
|
||||||
|
|
||||||
if (cert.fingerprint(hashes.SHA256()) == solo_fingerprint):
|
if cert.fingerprint(hashes.SHA256()) == solo_fingerprint:
|
||||||
print('Valid SOLO firmware from SoloKeys')
|
print("Valid SOLO firmware from SoloKeys")
|
||||||
elif (cert.fingerprint(hashes.SHA256()) == hacker_fingerprint):
|
elif cert.fingerprint(hashes.SHA256()) == hacker_fingerprint:
|
||||||
print('Valid HACKER firmware')
|
print("Valid HACKER firmware")
|
||||||
else:
|
else:
|
||||||
print('Unknown fingerprint! ', cert.fingerprint(hashes.SHA256()))
|
print("Unknown fingerprint! ", cert.fingerprint(hashes.SHA256()))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
v = p.solo_version()
|
v = p.solo_version()
|
||||||
print('Version: ', v)
|
print("Version: ", v)
|
||||||
except ApduError:
|
except ApduError:
|
||||||
print('Firmware is out of date.')
|
print("Firmware is out of date.")
|
||||||
|
|
||||||
|
|
||||||
def asked_for_help():
|
def asked_for_help():
|
||||||
for i, v in enumerate(sys.argv):
|
for i, v in enumerate(sys.argv):
|
||||||
if v == '-h' or v == '--help':
|
if v == "-h" or v == "--help":
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -668,9 +671,9 @@ def monitor_main():
|
|||||||
try:
|
try:
|
||||||
d = ser.read(1)
|
d = ser.read(1)
|
||||||
except serial.SerialException:
|
except serial.SerialException:
|
||||||
print('reconnecting...')
|
print("reconnecting...")
|
||||||
ser = reconnect()
|
ser = reconnect()
|
||||||
print('done')
|
print("done")
|
||||||
sys.stdout.buffer.write(d)
|
sys.stdout.buffer.write(d)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
@ -695,26 +698,26 @@ def genkey_main():
|
|||||||
|
|
||||||
if len(sys.argv) > 2:
|
if len(sys.argv) > 2:
|
||||||
seed = sys.argv[2]
|
seed = sys.argv[2]
|
||||||
print('using input seed file ', seed)
|
print("using input seed file ", seed)
|
||||||
rng = open(seed, 'rb').read()
|
rng = open(seed, "rb").read()
|
||||||
secexp = randrange_from_seed__trytryagain(rng, NIST256p.order)
|
secexp = randrange_from_seed__trytryagain(rng, NIST256p.order)
|
||||||
sk = SigningKey.from_secret_exponent(secexp, curve=NIST256p)
|
sk = SigningKey.from_secret_exponent(secexp, curve=NIST256p)
|
||||||
else:
|
else:
|
||||||
sk = SigningKey.generate(curve=NIST256p)
|
sk = SigningKey.generate(curve=NIST256p)
|
||||||
|
|
||||||
sk_name = sys.argv[1]
|
sk_name = sys.argv[1]
|
||||||
print('Signing key for signing device firmware: ' + sk_name)
|
print("Signing key for signing device firmware: " + sk_name)
|
||||||
open(sk_name, 'wb+').write(sk.to_pem())
|
open(sk_name, "wb+").write(sk.to_pem())
|
||||||
|
|
||||||
vk = sk.get_verifying_key()
|
vk = sk.get_verifying_key()
|
||||||
|
|
||||||
print('Public key in various formats:')
|
print("Public key in various formats:")
|
||||||
print()
|
print()
|
||||||
print([c for c in vk.to_string()])
|
print([c for c in vk.to_string()])
|
||||||
print()
|
print()
|
||||||
print(''.join(['%02x' % c for c in vk.to_string()]))
|
print("".join(["%02x" % c for c in vk.to_string()]))
|
||||||
print()
|
print()
|
||||||
print('"\\x' + '\\x'.join(['%02x' % c for c in vk.to_string()]) + '"')
|
print('"\\x' + "\\x".join(["%02x" % c for c in vk.to_string()]) + '"')
|
||||||
print()
|
print()
|
||||||
|
|
||||||
|
|
||||||
@ -722,20 +725,20 @@ def sign_main():
|
|||||||
|
|
||||||
if asked_for_help() or len(sys.argv) != 4:
|
if asked_for_help() or len(sys.argv) != 4:
|
||||||
print(
|
print(
|
||||||
'Signs a firmware hex file, outputs a .json file that can be used for signed update.'
|
"Signs a firmware hex file, outputs a .json file that can be used for signed update."
|
||||||
)
|
)
|
||||||
print('usage: %s <signing-key.pem> <app.hex> <output.json> [-h]' % sys.argv[0])
|
print("usage: %s <signing-key.pem> <app.hex> <output.json> [-h]" % sys.argv[0])
|
||||||
print()
|
print()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
msg = get_firmware_object(sys.argv[1], sys.argv[2])
|
msg = get_firmware_object(sys.argv[1], sys.argv[2])
|
||||||
print('Saving signed firmware to', sys.argv[3])
|
print("Saving signed firmware to", sys.argv[3])
|
||||||
wfile = open(sys.argv[3], 'wb+')
|
wfile = open(sys.argv[3], "wb+")
|
||||||
wfile.write(json.dumps(msg).encode())
|
wfile.write(json.dumps(msg).encode())
|
||||||
wfile.close()
|
wfile.close()
|
||||||
|
|
||||||
|
|
||||||
def use_dfu(args):
|
def use_dfu(args):
|
||||||
fw = args.__dict__['[firmware]']
|
fw = args.__dict__["[firmware]"]
|
||||||
|
|
||||||
for i in range(0, 8):
|
for i in range(0, 8):
|
||||||
dfu = DFUDevice()
|
dfu = DFUDevice()
|
||||||
@ -746,15 +749,15 @@ def use_dfu(args):
|
|||||||
dfu = None
|
dfu = None
|
||||||
|
|
||||||
if dfu is None:
|
if dfu is None:
|
||||||
print('No STU DFU device found. ')
|
print("No STU DFU device found. ")
|
||||||
if args.dfu_serial:
|
if args.dfu_serial:
|
||||||
print('Serial number used: ', args.dfu_serial)
|
print("Serial number used: ", args.dfu_serial)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
dfu.init()
|
dfu.init()
|
||||||
|
|
||||||
if fw:
|
if fw:
|
||||||
ih = IntelHex()
|
ih = IntelHex()
|
||||||
ih.fromfile(fw, format='hex')
|
ih.fromfile(fw, format="hex")
|
||||||
|
|
||||||
chunk = 2048
|
chunk = 2048
|
||||||
seg = ih.segments()[0]
|
seg = ih.segments()[0]
|
||||||
@ -762,11 +765,11 @@ def use_dfu(args):
|
|||||||
total = 0
|
total = 0
|
||||||
t1 = time.time() * 1000
|
t1 = time.time() * 1000
|
||||||
|
|
||||||
print('erasing...')
|
print("erasing...")
|
||||||
try:
|
try:
|
||||||
dfu.mass_erase()
|
dfu.mass_erase()
|
||||||
except usb.core.USBError:
|
except usb.core.USBError:
|
||||||
dfu.write_page(0x08000000 + 2048 * 10, 'ZZFF' * (2048 // 4))
|
dfu.write_page(0x08000000 + 2048 * 10, "ZZFF" * (2048 // 4))
|
||||||
dfu.mass_erase()
|
dfu.mass_erase()
|
||||||
|
|
||||||
page = 0
|
page = 0
|
||||||
@ -780,7 +783,7 @@ def use_dfu(args):
|
|||||||
progress = total / float(size) * 100
|
progress = total / float(size) * 100
|
||||||
|
|
||||||
sys.stdout.write(
|
sys.stdout.write(
|
||||||
'downloading %.2f%% %08x - %08x ... \r'
|
"downloading %.2f%% %08x - %08x ... \r"
|
||||||
% (progress, i, i + page)
|
% (progress, i, i + page)
|
||||||
)
|
)
|
||||||
# time.sleep(0.100)
|
# time.sleep(0.100)
|
||||||
@ -789,8 +792,8 @@ def use_dfu(args):
|
|||||||
# print(dfu.read_mem(i,16))
|
# print(dfu.read_mem(i,16))
|
||||||
t2 = time.time() * 1000
|
t2 = time.time() * 1000
|
||||||
print()
|
print()
|
||||||
print('time: %d ms' % (t2 - t1))
|
print("time: %d ms" % (t2 - t1))
|
||||||
print('verifying...')
|
print("verifying...")
|
||||||
progress = 0
|
progress = 0
|
||||||
for start, end in ih.segments():
|
for start, end in ih.segments():
|
||||||
for i in range(start, end, chunk):
|
for i in range(start, end, chunk):
|
||||||
@ -799,13 +802,13 @@ def use_dfu(args):
|
|||||||
total += chunk
|
total += chunk
|
||||||
progress = total / float(size) * 100
|
progress = total / float(size) * 100
|
||||||
sys.stdout.write(
|
sys.stdout.write(
|
||||||
'reading %.2f%% %08x - %08x ... \r'
|
"reading %.2f%% %08x - %08x ... \r"
|
||||||
% (progress, i, i + page)
|
% (progress, i, i + page)
|
||||||
)
|
)
|
||||||
if (end - start) == chunk:
|
if (end - start) == chunk:
|
||||||
assert data1 == data2
|
assert data1 == data2
|
||||||
print()
|
print()
|
||||||
print('firmware readback verified.')
|
print("firmware readback verified.")
|
||||||
if args.detach:
|
if args.detach:
|
||||||
dfu.detach()
|
dfu.detach()
|
||||||
|
|
||||||
@ -815,71 +818,71 @@ def programmer_main():
|
|||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"[firmware]",
|
"[firmware]",
|
||||||
nargs='?',
|
nargs="?",
|
||||||
default='',
|
default="",
|
||||||
help='firmware file. Either a JSON or hex file. JSON file contains signature while hex does not.',
|
help="firmware file. Either a JSON or hex file. JSON file contains signature while hex does not.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--use-hid",
|
"--use-hid",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Programs using custom HID command (default). Quicker than using U2F authenticate which is what a browser has to use.',
|
help="Programs using custom HID command (default). Quicker than using U2F authenticate which is what a browser has to use.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--use-u2f",
|
"--use-u2f",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Programs using U2F authenticate. This is what a web application will use.',
|
help="Programs using U2F authenticate. This is what a web application will use.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--no-reset",
|
"--no-reset",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Don\'t reset after writing firmware. Stay in bootloader mode.',
|
help="Don't reset after writing firmware. Stay in bootloader mode.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--reset-only",
|
"--reset-only",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Don\'t write anything, try to boot without a signature.',
|
help="Don't write anything, try to boot without a signature.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--reboot", action="store_true", help='Tell bootloader to reboot.'
|
"--reboot", action="store_true", help="Tell bootloader to reboot."
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--enter-bootloader",
|
"--enter-bootloader",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Don\'t write anything, try to enter bootloader. Typically only supported by Solo Hacker builds.',
|
help="Don't write anything, try to enter bootloader. Typically only supported by Solo Hacker builds.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--st-dfu",
|
"--st-dfu",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Don\'t write anything, try to enter ST DFU. Warning, you could brick your Solo if you overwrite everything. You should reprogram the option bytes just to be safe (boot to Solo bootloader first, then run this command).',
|
help="Don't write anything, try to enter ST DFU. Warning, you could brick your Solo if you overwrite everything. You should reprogram the option bytes just to be safe (boot to Solo bootloader first, then run this command).",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--disable",
|
"--disable",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Disable the Solo bootloader. Cannot be undone. No future updates can be applied.',
|
help="Disable the Solo bootloader. Cannot be undone. No future updates can be applied.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--detach",
|
"--detach",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
help='Detach from ST DFU and boot from main flash. Must be in DFU mode.',
|
help="Detach from ST DFU and boot from main flash. Must be in DFU mode.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--dfu-serial",
|
"--dfu-serial",
|
||||||
default='',
|
default="",
|
||||||
help='Specify a serial number for a specific DFU device to connect to.',
|
help="Specify a serial number for a specific DFU device to connect to.",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--use-dfu", action="store_true", help='Boot to ST-DFU before continuing.'
|
"--use-dfu", action="store_true", help="Boot to ST-DFU before continuing."
|
||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
fw = args.__dict__['[firmware]']
|
fw = args.__dict__["[firmware]"]
|
||||||
|
|
||||||
p = SoloClient()
|
p = SoloClient()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
p.find_device()
|
p.find_device()
|
||||||
if args.use_dfu:
|
if args.use_dfu:
|
||||||
print('entering dfu..')
|
print("entering dfu..")
|
||||||
try:
|
try:
|
||||||
attempt_to_boot_bootloader(p)
|
attempt_to_boot_bootloader(p)
|
||||||
p.enter_st_dfu()
|
p.enter_st_dfu()
|
||||||
@ -887,7 +890,7 @@ def programmer_main():
|
|||||||
# already in DFU mode?
|
# already in DFU mode?
|
||||||
pass
|
pass
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
print('No Solo device detected.')
|
print("No Solo device detected.")
|
||||||
if fw or args.detach:
|
if fw or args.detach:
|
||||||
use_dfu(args)
|
use_dfu(args)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
@ -905,7 +908,7 @@ def programmer_main():
|
|||||||
p.set_reboot(False)
|
p.set_reboot(False)
|
||||||
|
|
||||||
if args.enter_bootloader:
|
if args.enter_bootloader:
|
||||||
print('Attempting to boot into bootloader mode...')
|
print("Attempting to boot into bootloader mode...")
|
||||||
attempt_to_boot_bootloader(p)
|
attempt_to_boot_bootloader(p)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
@ -914,7 +917,7 @@ def programmer_main():
|
|||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if args.st_dfu:
|
if args.st_dfu:
|
||||||
print('Sending command to boot into ST DFU...')
|
print("Sending command to boot into ST DFU...")
|
||||||
p.enter_st_dfu()
|
p.enter_st_dfu()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
@ -922,8 +925,8 @@ def programmer_main():
|
|||||||
p.disable_solo_bootloader()
|
p.disable_solo_bootloader()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if fw == '' and not args.reset_only:
|
if fw == "" and not args.reset_only:
|
||||||
print('Need to supply firmware filename, or see help for more options.')
|
print("Need to supply firmware filename, or see help for more options.")
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
@ -931,16 +934,16 @@ def programmer_main():
|
|||||||
p.bootloader_version()
|
p.bootloader_version()
|
||||||
except CtapError as e:
|
except CtapError as e:
|
||||||
if e.code == CtapError.ERR.INVALID_COMMAND:
|
if e.code == CtapError.ERR.INVALID_COMMAND:
|
||||||
print('Bootloader not active. Attempting to boot into bootloader mode...')
|
print("Bootloader not active. Attempting to boot into bootloader mode...")
|
||||||
attempt_to_boot_bootloader(p)
|
attempt_to_boot_bootloader(p)
|
||||||
else:
|
else:
|
||||||
raise (e)
|
raise (e)
|
||||||
except ApduError:
|
except ApduError:
|
||||||
print('Bootloader not active. Attempting to boot into bootloader mode...')
|
print("Bootloader not active. Attempting to boot into bootloader mode...")
|
||||||
attempt_to_boot_bootloader(p)
|
attempt_to_boot_bootloader(p)
|
||||||
|
|
||||||
if args.reset_only:
|
if args.reset_only:
|
||||||
p.exchange(SoloBootloader.done, 0, b'A' * 64)
|
p.exchange(SoloBootloader.done, 0, b"A" * 64)
|
||||||
else:
|
else:
|
||||||
p.program_file(fw)
|
p.program_file(fw)
|
||||||
|
|
||||||
@ -948,7 +951,7 @@ def programmer_main():
|
|||||||
def main_mergehex():
|
def main_mergehex():
|
||||||
if len(sys.argv) < 3:
|
if len(sys.argv) < 3:
|
||||||
print(
|
print(
|
||||||
'usage: %s <file1.hex> <file2.hex> [...] [-s <secret_attestation_key>] <output.hex>'
|
"usage: %s <file1.hex> <file2.hex> [...] [-s <secret_attestation_key>] <output.hex>"
|
||||||
)
|
)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
@ -964,7 +967,7 @@ def main_mergehex():
|
|||||||
|
|
||||||
# user supplied, optional
|
# user supplied, optional
|
||||||
for i, x in enumerate(args):
|
for i, x in enumerate(args):
|
||||||
if x == '-s':
|
if x == "-s":
|
||||||
secret_attestation_key = args[i + 1]
|
secret_attestation_key = args[i + 1]
|
||||||
args = args[:i] + args[i + 2 :]
|
args = args[:i] + args[i + 2 :]
|
||||||
break
|
break
|
||||||
@ -977,8 +980,8 @@ def main_mergehex():
|
|||||||
|
|
||||||
first = IntelHex(args[1])
|
first = IntelHex(args[1])
|
||||||
for i in range(2, len(args) - 1):
|
for i in range(2, len(args) - 1):
|
||||||
print('merging %s with ' % (args[1]), args[i])
|
print("merging %s with " % (args[1]), args[i])
|
||||||
first.merge(IntelHex(args[i]), overlap='replace')
|
first.merge(IntelHex(args[i]), overlap="replace")
|
||||||
|
|
||||||
first[flash_addr(APPLICATION_END_PAGE - 1)] = 0x41
|
first[flash_addr(APPLICATION_END_PAGE - 1)] = 0x41
|
||||||
first[flash_addr(APPLICATION_END_PAGE - 1) + 1] = 0x41
|
first[flash_addr(APPLICATION_END_PAGE - 1) + 1] = 0x41
|
||||||
@ -1004,19 +1007,19 @@ def main_mergehex():
|
|||||||
for i, x in enumerate(key):
|
for i, x in enumerate(key):
|
||||||
first[ATTEST_ADDR + i] = x
|
first[ATTEST_ADDR + i] = x
|
||||||
|
|
||||||
first.tofile(args[len(args) - 1], format='hex')
|
first.tofile(args[len(args) - 1], format="hex")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
if sys.version_info[0] < 3:
|
||||||
print('Sorry, python3 is required.')
|
print("Sorry, python3 is required.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
if len(sys.argv) < 2 or (len(sys.argv) == 2 and asked_for_help()):
|
if len(sys.argv) < 2 or (len(sys.argv) == 2 and asked_for_help()):
|
||||||
print('Diverse command line tool for working with Solo')
|
print("Diverse command line tool for working with Solo")
|
||||||
print('usage: %s <command> [options] [-h]' % sys.argv[0])
|
print("usage: %s <command> [options] [-h]" % sys.argv[0])
|
||||||
print('commands: program, solo, monitor, sign, genkey, mergehex')
|
print("commands: program, solo, monitor, sign, genkey, mergehex")
|
||||||
print(
|
print(
|
||||||
"""
|
"""
|
||||||
Examples:
|
Examples:
|
||||||
@ -1039,19 +1042,19 @@ Examples:
|
|||||||
|
|
||||||
c = sys.argv[1]
|
c = sys.argv[1]
|
||||||
sys.argv = sys.argv[:1] + sys.argv[2:]
|
sys.argv = sys.argv[:1] + sys.argv[2:]
|
||||||
sys.argv[0] = sys.argv[0] + ' ' + c
|
sys.argv[0] = sys.argv[0] + " " + c
|
||||||
|
|
||||||
if c == 'program':
|
if c == "program":
|
||||||
programmer_main()
|
programmer_main()
|
||||||
elif c == 'solo':
|
elif c == "solo":
|
||||||
solo_main()
|
solo_main()
|
||||||
elif c == 'monitor':
|
elif c == "monitor":
|
||||||
monitor_main()
|
monitor_main()
|
||||||
elif c == 'sign':
|
elif c == "sign":
|
||||||
sign_main()
|
sign_main()
|
||||||
elif c == 'genkey':
|
elif c == "genkey":
|
||||||
genkey_main()
|
genkey_main()
|
||||||
elif c == 'mergehex':
|
elif c == "mergehex":
|
||||||
main_mergehex()
|
main_mergehex()
|
||||||
else:
|
else:
|
||||||
print('invalid command: %s' % c)
|
print("invalid command: %s" % c)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user