Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 4 additions & 23 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,38 +1728,19 @@ def test_10__visit_child_role(self):



def test_11__verify_uncompressed_metadata_file(self):
def test_11__verify_metadata_file(self):
# Test for invalid metadata content.
metadata_file_object = tempfile.TemporaryFile()
metadata_file_object.write(b'X')
metadata_file_object.seek(0)

self.assertRaises(tuf.exceptions.InvalidMetadataJSONError,
self.repository_updater._verify_uncompressed_metadata_file,
self.repository_updater._verify_metadata_file,
metadata_file_object, 'root')



def test_12__verify_root_chain_link(self):
# Test for an invalid signature in the chain link.
# current = (i.e., 1.root.json)
# next = signable for the next metadata in the chain (i.e., 2.root.json)
rolename = 'root'
current_root = self.repository_updater.metadata['current']['root']

targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json')

# 'next_invalid_root' is a Targets signable, as written to disk.
# We use the Targets metadata here to ensure the signatures are invalid.
next_invalid_root = securesystemslib.util.load_json_file(targets_path)

self.assertRaises(securesystemslib.exceptions.BadSignatureError,
self.repository_updater._verify_root_chain_link, rolename, current_root,
next_invalid_root)



def test_13__get_file(self):
def test_12__get_file(self):
# Test for an "unsafe" download, where the file is downloaded up to
# a required length (and no more). The "safe" download approach
# downloads an exact required length.
Expand All @@ -1779,7 +1760,7 @@ def verify_target_file(targets_path):
self.repository_updater._get_file('targets.json', verify_target_file,
file_type, file_size, download_safely=False)

def test_14__targets_of_role(self):
def test_13__targets_of_role(self):
# Test case where a list of targets is given. By default, the 'targets'
# parameter is None.
targets = [{'filepath': 'file1.txt', 'fileinfo': {'length': 1, 'hashes': {'sha256': 'abc'}}}]
Expand Down
182 changes: 170 additions & 12 deletions tests/test_updater_root_rotation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,68 @@ def test_root_rotation(self):



def test_verify_root_with_current_keyids_and_threshold(self):
"""
Each root file is signed by the current root threshold of keys as well
as the previous root threshold of keys. Test that a root file which is
not 'self-signed' with the current root threshold of keys causes the
update to fail
"""
# Load repository with root.json == 1.root.json (available on client)
# Signing key: "root", Threshold: 1
repository = repo_tool.load_repository(self.repository_directory)

# Rotate keys and update root: 1.root.json --> 2.root.json
# Signing key: "root" (previous) and "root2" (current)
# Threshold (for both): 1
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.add_verification_key(self.role_keys['root2']['public'])
repository.root.load_signing_key(self.role_keys['root2']['private'])
# Remove the previous "root" key from the list of current
# verification keys
repository.root.remove_verification_key(self.role_keys['root']['public'])
repository.writeall()

# Move staged metadata to "live" metadata
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))

# Intercept 2.root.json and tamper with "root2" (current) key signature
root2_path_live = os.path.join(
self.repository_directory, 'metadata', '2.root.json')
root2 = securesystemslib.util.load_json_file(root2_path_live)

for idx, sig in enumerate(root2['signatures']):
if sig['keyid'] == self.role_keys['root2']['public']['keyid']:
sig_len = len(root2['signatures'][idx]['sig'])
root2['signatures'][idx]['sig'] = "deadbeef".ljust(sig_len, '0')

roo2_fobj = tempfile.TemporaryFile()
roo2_fobj.write(tuf.repository_lib._get_written_metadata(root2))
securesystemslib.util.persist_temp_file(roo2_fobj, root2_path_live)

# Update 1.root.json -> 2.root.json
# Signature verification with current keys should fail because we replaced
with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm:
self.repository_updater.refresh()

for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors):
self.assertTrue(mirror_url.endswith('/2.root.json'))
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))

# Assert that the current 'root.json' on the client side is the verified one
self.assertTrue(filecmp.cmp(
os.path.join(self.repository_directory, 'metadata', '1.root.json'),
os.path.join(self.client_metadata_current, 'root.json')))







def test_root_rotation_full(self):
"""Test that a client whose root is outdated by multiple versions and who
has none of the latest nor next-to-latest root keys can still update and
Expand Down Expand Up @@ -340,22 +402,26 @@ def test_root_rotation_missing_keys(self):
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))

try:
with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm:
self.repository_updater.refresh()

except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', '2.root.json')

# Verify that '2.root.json' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error,
for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors):
self.assertTrue(mirror_url.endswith('/2.root.json'))
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))

# Assert that the current 'root.json' on the client side is the verified one
self.assertTrue(filecmp.cmp(
os.path.join(self.repository_directory, 'metadata', '1.root.json'),
os.path.join(self.client_metadata_current, 'root.json')))




def test_root_rotation_unmet_threshold(self):
def test_root_rotation_unmet_last_version_threshold(self):
"""Test that client detects a root.json version that is not signed
by a previous threshold of signatures """

repository = repo_tool.load_repository(self.repository_directory)

# Add verification keys
Expand Down Expand Up @@ -411,8 +477,100 @@ def test_root_rotation_unmet_threshold(self):

# The following refresh should fail because root must be signed by the
# previous self.role_keys['root'] key, which wasn't loaded.
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
self.repository_updater.refresh)
with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm:
self.repository_updater.refresh()

for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors):
self.assertTrue(mirror_url.endswith('/3.root.json'))
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))

# Assert that the current 'root.json' on the client side is the verified one
self.assertTrue(filecmp.cmp(
os.path.join(self.repository_directory, 'metadata', '2.root.json'),
os.path.join(self.client_metadata_current, 'root.json')))



def test_root_rotation_unmet_new_threshold(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely test, but there's some overlap between this test and the next. Maybe clarify that there is at least one successful intermediate rotation involved or something.

"""Test that client detects a root.json version that is not signed
by a current threshold of signatures """
repository = repo_tool.load_repository(self.repository_directory)

# Create a new, valid root.json.
repository.root.threshold = 2
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.add_verification_key(self.role_keys['root2']['public'])
repository.root.load_signing_key(self.role_keys['root2']['private'])

repository.writeall()

# Increase the threshold and add a new verification key without
# actually loading the signing key
repository.root.threshold = 3
repository.root.add_verification_key(self.role_keys['root3']['public'])

# writeall fails as expected since the third signature is missing
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
# write an invalid '3.root.json' as partially signed
repository.write('root')

# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))


# The following refresh should fail because root must be signed by the
# current self.role_keys['root3'] key, which wasn't loaded.
with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm:
self.repository_updater.refresh()

for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors):
self.assertTrue(mirror_url.endswith('/3.root.json'))
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))

# Assert that the current 'root.json' on the client side is the verified one
self.assertTrue(filecmp.cmp(
os.path.join(self.repository_directory, 'metadata', '2.root.json'),
os.path.join(self.client_metadata_current, 'root.json')))



def test_root_rotation_discard_untrusted_version(self):
"""Test that client discards root.json version that failed the
signature verification """
repository = repo_tool.load_repository(self.repository_directory)

# Rotate the root key without signing with the previous version key 'root'
repository.root.remove_verification_key(self.role_keys['root']['public'])
repository.root.add_verification_key(self.role_keys['root2']['public'])
repository.root.load_signing_key(self.role_keys['root2']['private'])

# 2.root.json
repository.writeall()

# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))

# Refresh on the client side should fail because 2.root.json is not signed
# with a threshold of prevous keys
with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm:
self.repository_updater.refresh()

for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors):
self.assertTrue(mirror_url.endswith('/2.root.json'))
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))

# Assert that the current 'root.json' on the client side is the trusted one
# and 2.root.json is discarded
self.assertTrue(filecmp.cmp(
os.path.join(self.repository_directory, 'metadata', '1.root.json'),
os.path.join(self.client_metadata_current, 'root.json')))



Expand Down
82 changes: 57 additions & 25 deletions tuf/client/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -1370,12 +1370,51 @@ def verify_target_file(target_file_object):



def _verify_uncompressed_metadata_file(self, metadata_file_object,
def _verify_root_self_signed(self, signable):
"""
Verify the root metadata in signable is signed by a threshold of keys,
where the threshold and valid keys are defined by itself
"""
threshold = signable['signed']['roles']['root']['threshold']
keyids = signable['signed']['roles']['root']['keyids']
keys = signable['signed']['keys']
signatures = signable['signatures']
signed = securesystemslib.formats.encode_canonical(
signable['signed']).encode('utf-8')
validated = 0

for signature in signatures:
keyid = signature['keyid']

# At this point we are verifying that the root metadata is signed by a
# threshold of keys listed in the current root role, therefore skip
# keys with a keyid that is not listed in the current root role.
if keyid not in keyids:
continue

key = keys[keyid]
# The ANYKEY_SCHEMA check in verify_signature expects the keydict to
# include a keyid
key['keyid'] = keyid
valid_sig = securesystemslib.keys.verify_signature(key, signature, signed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we bail on a single exception here? Feels like we need a try-catch...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_metadata_file has some logic to collect all exceptions and pack them into NoWorkingMirrorError, so I think this just continues the existing pattern?


if valid_sig:
validated = validated + 1

if validated >= threshold:
return True
return False





def _verify_metadata_file(self, metadata_file_object,
metadata_role):
"""
<Purpose>
Non-public method that verifies an uncompressed metadata file. An
exception is raised if 'metadata_file_object is invalid. There is no
Non-public method that verifies a metadata file. An exception is
raised if 'metadata_file_object is invalid. There is no
return value.

<Arguments>
Expand Down Expand Up @@ -1439,6 +1478,20 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
if not valid:
raise securesystemslib.exceptions.BadSignatureError(metadata_role)

# For root metadata, verify the downloaded root metadata object with the
# new threshold of new signatures contained within the downloaded root
# metadata object
# NOTE: we perform the checks on root metadata here because this enables
# us to perform the check before the tempfile is persisted. Furthermore,
# by checking here we can easily perform the check for each download
# mirror. Whereas if we check after _verify_metadata_file we may be
# persisting invalid files and we cannot try copies of the file from other
# mirrors.
if valid and metadata_role == 'root':
valid = self._verify_root_self_signed(metadata_signable)
if not valid:
raise securesystemslib.exceptions.BadSignatureError(metadata_role)




Expand Down Expand Up @@ -1569,7 +1622,7 @@ def _get_metadata_file(self, metadata_role, remote_filename,
except KeyError:
logger.info(metadata_role + ' not available locally.')

self._verify_uncompressed_metadata_file(file_object, metadata_role)
self._verify_metadata_file(file_object, metadata_role)

except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
Expand All @@ -1590,24 +1643,6 @@ def _get_metadata_file(self, metadata_role, remote_filename,



def _verify_root_chain_link(self, rolename, current_root_metadata,
next_root_metadata):

if rolename != 'root':
return True

current_root_role = current_root_metadata['roles'][rolename]

# Verify next metadata with current keys/threshold
valid = tuf.sig.verify(next_root_metadata, rolename, self.repository_name,
current_root_role['threshold'], current_root_role['keyids'])

if not valid:
raise securesystemslib.exceptions.BadSignatureError('Root is not signed'
' by previous threshold of keys.')





def _get_file(self, filepath, verify_file_function, file_type, file_length,
Expand Down Expand Up @@ -1802,9 +1837,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None):
updated_metadata_object = metadata_signable['signed']
current_metadata_object = self.metadata['current'].get(metadata_role)

self._verify_root_chain_link(metadata_role, current_metadata_object,
metadata_signable)

# Finally, update the metadata and fileinfo stores, and rebuild the
# key and role info for the top-level roles if 'metadata_role' is root.
# Rebuilding the key and role info is required if the newly-installed
Expand Down