diff --git a/tests/test_updater.py b/tests/test_updater.py index 3cf47ac53e..5df0cce381 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -1728,38 +1728,19 @@ def test_10__visit_child_role(self): - def test_11__verify_uncompressed_metadata_file(self): + def test_11__verify_metadata_file(self): # Test for invalid metadata content. metadata_file_object = tempfile.TemporaryFile() metadata_file_object.write(b'X') metadata_file_object.seek(0) self.assertRaises(tuf.exceptions.InvalidMetadataJSONError, - self.repository_updater._verify_uncompressed_metadata_file, + self.repository_updater._verify_metadata_file, metadata_file_object, 'root') - def test_12__verify_root_chain_link(self): - # Test for an invalid signature in the chain link. - # current = (i.e., 1.root.json) - # next = signable for the next metadata in the chain (i.e., 2.root.json) - rolename = 'root' - current_root = self.repository_updater.metadata['current']['root'] - - targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json') - - # 'next_invalid_root' is a Targets signable, as written to disk. - # We use the Targets metadata here to ensure the signatures are invalid. - next_invalid_root = securesystemslib.util.load_json_file(targets_path) - - self.assertRaises(securesystemslib.exceptions.BadSignatureError, - self.repository_updater._verify_root_chain_link, rolename, current_root, - next_invalid_root) - - - - def test_13__get_file(self): + def test_12__get_file(self): # Test for an "unsafe" download, where the file is downloaded up to # a required length (and no more). The "safe" download approach # downloads an exact required length. @@ -1779,7 +1760,7 @@ def verify_target_file(targets_path): self.repository_updater._get_file('targets.json', verify_target_file, file_type, file_size, download_safely=False) - def test_14__targets_of_role(self): + def test_13__targets_of_role(self): # Test case where a list of targets is given. By default, the 'targets' # parameter is None. targets = [{'filepath': 'file1.txt', 'fileinfo': {'length': 1, 'hashes': {'sha256': 'abc'}}}] diff --git a/tests/test_updater_root_rotation_integration.py b/tests/test_updater_root_rotation_integration.py index c49b82a726..71cdc39e0a 100755 --- a/tests/test_updater_root_rotation_integration.py +++ b/tests/test_updater_root_rotation_integration.py @@ -217,6 +217,68 @@ def test_root_rotation(self): + def test_verify_root_with_current_keyids_and_threshold(self): + """ + Each root file is signed by the current root threshold of keys as well + as the previous root threshold of keys. Test that a root file which is + not 'self-signed' with the current root threshold of keys causes the + update to fail + """ + # Load repository with root.json == 1.root.json (available on client) + # Signing key: "root", Threshold: 1 + repository = repo_tool.load_repository(self.repository_directory) + + # Rotate keys and update root: 1.root.json --> 2.root.json + # Signing key: "root" (previous) and "root2" (current) + # Threshold (for both): 1 + repository.root.load_signing_key(self.role_keys['root']['private']) + repository.root.add_verification_key(self.role_keys['root2']['public']) + repository.root.load_signing_key(self.role_keys['root2']['private']) + # Remove the previous "root" key from the list of current + # verification keys + repository.root.remove_verification_key(self.role_keys['root']['public']) + repository.writeall() + + # Move staged metadata to "live" metadata + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) + + # Intercept 2.root.json and tamper with "root2" (current) key signature + root2_path_live = os.path.join( + self.repository_directory, 'metadata', '2.root.json') + root2 = securesystemslib.util.load_json_file(root2_path_live) + + for idx, sig in enumerate(root2['signatures']): + if sig['keyid'] == self.role_keys['root2']['public']['keyid']: + sig_len = len(root2['signatures'][idx]['sig']) + root2['signatures'][idx]['sig'] = "deadbeef".ljust(sig_len, '0') + + roo2_fobj = tempfile.TemporaryFile() + roo2_fobj.write(tuf.repository_lib._get_written_metadata(root2)) + securesystemslib.util.persist_temp_file(roo2_fobj, root2_path_live) + + # Update 1.root.json -> 2.root.json + # Signature verification with current keys should fail because we replaced + with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: + self.repository_updater.refresh() + + for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): + self.assertTrue(mirror_url.endswith('/2.root.json')) + self.assertTrue(isinstance(mirror_error, + securesystemslib.exceptions.BadSignatureError)) + + # Assert that the current 'root.json' on the client side is the verified one + self.assertTrue(filecmp.cmp( + os.path.join(self.repository_directory, 'metadata', '1.root.json'), + os.path.join(self.client_metadata_current, 'root.json'))) + + + + + + + def test_root_rotation_full(self): """Test that a client whose root is outdated by multiple versions and who has none of the latest nor next-to-latest root keys can still update and @@ -340,22 +402,26 @@ def test_root_rotation_missing_keys(self): shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) - try: + with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: self.repository_updater.refresh() - except tuf.exceptions.NoWorkingMirrorError as exception: - for mirror_url, mirror_error in six.iteritems(exception.mirror_errors): - url_prefix = self.repository_mirrors['mirror1']['url_prefix'] - url_file = os.path.join(url_prefix, 'metadata', '2.root.json') - - # Verify that '2.root.json' is the culprit. - self.assertEqual(url_file.replace('\\', '/'), mirror_url) - self.assertTrue(isinstance(mirror_error, + for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): + self.assertTrue(mirror_url.endswith('/2.root.json')) + self.assertTrue(isinstance(mirror_error, securesystemslib.exceptions.BadSignatureError)) + # Assert that the current 'root.json' on the client side is the verified one + self.assertTrue(filecmp.cmp( + os.path.join(self.repository_directory, 'metadata', '1.root.json'), + os.path.join(self.client_metadata_current, 'root.json'))) + + - def test_root_rotation_unmet_threshold(self): + def test_root_rotation_unmet_last_version_threshold(self): + """Test that client detects a root.json version that is not signed + by a previous threshold of signatures """ + repository = repo_tool.load_repository(self.repository_directory) # Add verification keys @@ -411,8 +477,100 @@ def test_root_rotation_unmet_threshold(self): # The following refresh should fail because root must be signed by the # previous self.role_keys['root'] key, which wasn't loaded. - self.assertRaises(tuf.exceptions.NoWorkingMirrorError, - self.repository_updater.refresh) + with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: + self.repository_updater.refresh() + + for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): + self.assertTrue(mirror_url.endswith('/3.root.json')) + self.assertTrue(isinstance(mirror_error, + securesystemslib.exceptions.BadSignatureError)) + + # Assert that the current 'root.json' on the client side is the verified one + self.assertTrue(filecmp.cmp( + os.path.join(self.repository_directory, 'metadata', '2.root.json'), + os.path.join(self.client_metadata_current, 'root.json'))) + + + + def test_root_rotation_unmet_new_threshold(self): + """Test that client detects a root.json version that is not signed + by a current threshold of signatures """ + repository = repo_tool.load_repository(self.repository_directory) + + # Create a new, valid root.json. + repository.root.threshold = 2 + repository.root.load_signing_key(self.role_keys['root']['private']) + repository.root.add_verification_key(self.role_keys['root2']['public']) + repository.root.load_signing_key(self.role_keys['root2']['private']) + + repository.writeall() + + # Increase the threshold and add a new verification key without + # actually loading the signing key + repository.root.threshold = 3 + repository.root.add_verification_key(self.role_keys['root3']['public']) + + # writeall fails as expected since the third signature is missing + self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall) + # write an invalid '3.root.json' as partially signed + repository.write('root') + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) + + + # The following refresh should fail because root must be signed by the + # current self.role_keys['root3'] key, which wasn't loaded. + with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: + self.repository_updater.refresh() + + for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): + self.assertTrue(mirror_url.endswith('/3.root.json')) + self.assertTrue(isinstance(mirror_error, + securesystemslib.exceptions.BadSignatureError)) + + # Assert that the current 'root.json' on the client side is the verified one + self.assertTrue(filecmp.cmp( + os.path.join(self.repository_directory, 'metadata', '2.root.json'), + os.path.join(self.client_metadata_current, 'root.json'))) + + + + def test_root_rotation_discard_untrusted_version(self): + """Test that client discards root.json version that failed the + signature verification """ + repository = repo_tool.load_repository(self.repository_directory) + + # Rotate the root key without signing with the previous version key 'root' + repository.root.remove_verification_key(self.role_keys['root']['public']) + repository.root.add_verification_key(self.role_keys['root2']['public']) + repository.root.load_signing_key(self.role_keys['root2']['private']) + + # 2.root.json + repository.writeall() + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) + + # Refresh on the client side should fail because 2.root.json is not signed + # with a threshold of prevous keys + with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: + self.repository_updater.refresh() + + for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): + self.assertTrue(mirror_url.endswith('/2.root.json')) + self.assertTrue(isinstance(mirror_error, + securesystemslib.exceptions.BadSignatureError)) + + # Assert that the current 'root.json' on the client side is the trusted one + # and 2.root.json is discarded + self.assertTrue(filecmp.cmp( + os.path.join(self.repository_directory, 'metadata', '1.root.json'), + os.path.join(self.client_metadata_current, 'root.json'))) diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 19d518e639..b5514e35c5 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -1370,12 +1370,51 @@ def verify_target_file(target_file_object): - def _verify_uncompressed_metadata_file(self, metadata_file_object, + def _verify_root_self_signed(self, signable): + """ + Verify the root metadata in signable is signed by a threshold of keys, + where the threshold and valid keys are defined by itself + """ + threshold = signable['signed']['roles']['root']['threshold'] + keyids = signable['signed']['roles']['root']['keyids'] + keys = signable['signed']['keys'] + signatures = signable['signatures'] + signed = securesystemslib.formats.encode_canonical( + signable['signed']).encode('utf-8') + validated = 0 + + for signature in signatures: + keyid = signature['keyid'] + + # At this point we are verifying that the root metadata is signed by a + # threshold of keys listed in the current root role, therefore skip + # keys with a keyid that is not listed in the current root role. + if keyid not in keyids: + continue + + key = keys[keyid] + # The ANYKEY_SCHEMA check in verify_signature expects the keydict to + # include a keyid + key['keyid'] = keyid + valid_sig = securesystemslib.keys.verify_signature(key, signature, signed) + + if valid_sig: + validated = validated + 1 + + if validated >= threshold: + return True + return False + + + + + + def _verify_metadata_file(self, metadata_file_object, metadata_role): """ - Non-public method that verifies an uncompressed metadata file. An - exception is raised if 'metadata_file_object is invalid. There is no + Non-public method that verifies a metadata file. An exception is + raised if 'metadata_file_object is invalid. There is no return value. @@ -1439,6 +1478,20 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, if not valid: raise securesystemslib.exceptions.BadSignatureError(metadata_role) + # For root metadata, verify the downloaded root metadata object with the + # new threshold of new signatures contained within the downloaded root + # metadata object + # NOTE: we perform the checks on root metadata here because this enables + # us to perform the check before the tempfile is persisted. Furthermore, + # by checking here we can easily perform the check for each download + # mirror. Whereas if we check after _verify_metadata_file we may be + # persisting invalid files and we cannot try copies of the file from other + # mirrors. + if valid and metadata_role == 'root': + valid = self._verify_root_self_signed(metadata_signable) + if not valid: + raise securesystemslib.exceptions.BadSignatureError(metadata_role) + @@ -1569,7 +1622,7 @@ def _get_metadata_file(self, metadata_role, remote_filename, except KeyError: logger.info(metadata_role + ' not available locally.') - self._verify_uncompressed_metadata_file(file_object, metadata_role) + self._verify_metadata_file(file_object, metadata_role) except Exception as exception: # Remember the error from this mirror, and "reset" the target file. @@ -1590,24 +1643,6 @@ def _get_metadata_file(self, metadata_role, remote_filename, - def _verify_root_chain_link(self, rolename, current_root_metadata, - next_root_metadata): - - if rolename != 'root': - return True - - current_root_role = current_root_metadata['roles'][rolename] - - # Verify next metadata with current keys/threshold - valid = tuf.sig.verify(next_root_metadata, rolename, self.repository_name, - current_root_role['threshold'], current_root_role['keyids']) - - if not valid: - raise securesystemslib.exceptions.BadSignatureError('Root is not signed' - ' by previous threshold of keys.') - - - def _get_file(self, filepath, verify_file_function, file_type, file_length, @@ -1802,9 +1837,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None): updated_metadata_object = metadata_signable['signed'] current_metadata_object = self.metadata['current'].get(metadata_role) - self._verify_root_chain_link(metadata_role, current_metadata_object, - metadata_signable) - # Finally, update the metadata and fileinfo stores, and rebuild the # key and role info for the top-level roles if 'metadata_role' is root. # Rebuilding the key and role info is required if the newly-installed