-
Notifications
You must be signed in to change notification settings - Fork 286
Fix updater workflow #1101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix updater workflow #1101
Changes from all commits
efea88e
9332e82
902a025
2fc25ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -217,6 +217,68 @@ def test_root_rotation(self): | |
|
|
||
|
|
||
|
|
||
| def test_verify_root_with_current_keyids_and_threshold(self): | ||
| """ | ||
| Each root file is signed by the current root threshold of keys as well | ||
| as the previous root threshold of keys. Test that a root file which is | ||
| not 'self-signed' with the current root threshold of keys causes the | ||
| update to fail | ||
| """ | ||
| # Load repository with root.json == 1.root.json (available on client) | ||
| # Signing key: "root", Threshold: 1 | ||
| repository = repo_tool.load_repository(self.repository_directory) | ||
|
|
||
| # Rotate keys and update root: 1.root.json --> 2.root.json | ||
| # Signing key: "root" (previous) and "root2" (current) | ||
| # Threshold (for both): 1 | ||
| repository.root.load_signing_key(self.role_keys['root']['private']) | ||
| repository.root.add_verification_key(self.role_keys['root2']['public']) | ||
| repository.root.load_signing_key(self.role_keys['root2']['private']) | ||
| # Remove the previous "root" key from the list of current | ||
| # verification keys | ||
| repository.root.remove_verification_key(self.role_keys['root']['public']) | ||
| repository.writeall() | ||
|
|
||
| # Move staged metadata to "live" metadata | ||
| shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) | ||
| shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), | ||
| os.path.join(self.repository_directory, 'metadata')) | ||
|
|
||
| # Intercept 2.root.json and tamper with "root2" (current) key signature | ||
| root2_path_live = os.path.join( | ||
| self.repository_directory, 'metadata', '2.root.json') | ||
| root2 = securesystemslib.util.load_json_file(root2_path_live) | ||
|
|
||
| for idx, sig in enumerate(root2['signatures']): | ||
| if sig['keyid'] == self.role_keys['root2']['public']['keyid']: | ||
| sig_len = len(root2['signatures'][idx]['sig']) | ||
| root2['signatures'][idx]['sig'] = "deadbeef".ljust(sig_len, '0') | ||
|
|
||
| roo2_fobj = tempfile.TemporaryFile() | ||
| roo2_fobj.write(tuf.repository_lib._get_written_metadata(root2)) | ||
| securesystemslib.util.persist_temp_file(roo2_fobj, root2_path_live) | ||
|
|
||
| # Update 1.root.json -> 2.root.json | ||
| # Signature verification with current keys should fail because we replaced | ||
| with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: | ||
| self.repository_updater.refresh() | ||
|
|
||
| for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): | ||
| self.assertTrue(mirror_url.endswith('/2.root.json')) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| securesystemslib.exceptions.BadSignatureError)) | ||
|
|
||
| # Assert that the current 'root.json' on the client side is the verified one | ||
| self.assertTrue(filecmp.cmp( | ||
| os.path.join(self.repository_directory, 'metadata', '1.root.json'), | ||
| os.path.join(self.client_metadata_current, 'root.json'))) | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| def test_root_rotation_full(self): | ||
| """Test that a client whose root is outdated by multiple versions and who | ||
| has none of the latest nor next-to-latest root keys can still update and | ||
|
|
@@ -340,22 +402,26 @@ def test_root_rotation_missing_keys(self): | |
| shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), | ||
| os.path.join(self.repository_directory, 'metadata')) | ||
|
|
||
| try: | ||
| with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: | ||
| self.repository_updater.refresh() | ||
|
|
||
| except tuf.exceptions.NoWorkingMirrorError as exception: | ||
| for mirror_url, mirror_error in six.iteritems(exception.mirror_errors): | ||
| url_prefix = self.repository_mirrors['mirror1']['url_prefix'] | ||
| url_file = os.path.join(url_prefix, 'metadata', '2.root.json') | ||
|
|
||
| # Verify that '2.root.json' is the culprit. | ||
| self.assertEqual(url_file.replace('\\', '/'), mirror_url) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): | ||
| self.assertTrue(mirror_url.endswith('/2.root.json')) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| securesystemslib.exceptions.BadSignatureError)) | ||
|
|
||
| # Assert that the current 'root.json' on the client side is the verified one | ||
| self.assertTrue(filecmp.cmp( | ||
| os.path.join(self.repository_directory, 'metadata', '1.root.json'), | ||
| os.path.join(self.client_metadata_current, 'root.json'))) | ||
|
|
||
|
|
||
|
|
||
|
|
||
| def test_root_rotation_unmet_threshold(self): | ||
| def test_root_rotation_unmet_last_version_threshold(self): | ||
| """Test that client detects a root.json version that is not signed | ||
| by a previous threshold of signatures """ | ||
|
|
||
| repository = repo_tool.load_repository(self.repository_directory) | ||
|
|
||
| # Add verification keys | ||
|
|
@@ -411,8 +477,100 @@ def test_root_rotation_unmet_threshold(self): | |
|
|
||
| # The following refresh should fail because root must be signed by the | ||
| # previous self.role_keys['root'] key, which wasn't loaded. | ||
| self.assertRaises(tuf.exceptions.NoWorkingMirrorError, | ||
| self.repository_updater.refresh) | ||
| with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: | ||
| self.repository_updater.refresh() | ||
|
|
||
| for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): | ||
| self.assertTrue(mirror_url.endswith('/3.root.json')) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| securesystemslib.exceptions.BadSignatureError)) | ||
|
|
||
| # Assert that the current 'root.json' on the client side is the verified one | ||
| self.assertTrue(filecmp.cmp( | ||
| os.path.join(self.repository_directory, 'metadata', '2.root.json'), | ||
| os.path.join(self.client_metadata_current, 'root.json'))) | ||
|
|
||
|
|
||
|
|
||
| def test_root_rotation_unmet_new_threshold(self): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lovely test, but there's some overlap between this test and the next. Maybe clarify that there is at least one successful intermediate rotation involved or something. |
||
| """Test that client detects a root.json version that is not signed | ||
| by a current threshold of signatures """ | ||
| repository = repo_tool.load_repository(self.repository_directory) | ||
|
|
||
| # Create a new, valid root.json. | ||
| repository.root.threshold = 2 | ||
| repository.root.load_signing_key(self.role_keys['root']['private']) | ||
| repository.root.add_verification_key(self.role_keys['root2']['public']) | ||
| repository.root.load_signing_key(self.role_keys['root2']['private']) | ||
|
|
||
| repository.writeall() | ||
|
|
||
| # Increase the threshold and add a new verification key without | ||
| # actually loading the signing key | ||
| repository.root.threshold = 3 | ||
| repository.root.add_verification_key(self.role_keys['root3']['public']) | ||
|
|
||
| # writeall fails as expected since the third signature is missing | ||
| self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall) | ||
| # write an invalid '3.root.json' as partially signed | ||
| repository.write('root') | ||
|
|
||
| # Move the staged metadata to the "live" metadata. | ||
| shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) | ||
| shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), | ||
| os.path.join(self.repository_directory, 'metadata')) | ||
|
|
||
|
|
||
| # The following refresh should fail because root must be signed by the | ||
| # current self.role_keys['root3'] key, which wasn't loaded. | ||
| with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: | ||
| self.repository_updater.refresh() | ||
|
|
||
| for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): | ||
| self.assertTrue(mirror_url.endswith('/3.root.json')) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| securesystemslib.exceptions.BadSignatureError)) | ||
|
|
||
| # Assert that the current 'root.json' on the client side is the verified one | ||
| self.assertTrue(filecmp.cmp( | ||
| os.path.join(self.repository_directory, 'metadata', '2.root.json'), | ||
| os.path.join(self.client_metadata_current, 'root.json'))) | ||
|
|
||
|
|
||
|
|
||
| def test_root_rotation_discard_untrusted_version(self): | ||
trishankatdatadog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Test that client discards root.json version that failed the | ||
| signature verification """ | ||
| repository = repo_tool.load_repository(self.repository_directory) | ||
|
|
||
| # Rotate the root key without signing with the previous version key 'root' | ||
| repository.root.remove_verification_key(self.role_keys['root']['public']) | ||
| repository.root.add_verification_key(self.role_keys['root2']['public']) | ||
| repository.root.load_signing_key(self.role_keys['root2']['private']) | ||
|
|
||
| # 2.root.json | ||
| repository.writeall() | ||
|
|
||
| # Move the staged metadata to the "live" metadata. | ||
| shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) | ||
| shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), | ||
| os.path.join(self.repository_directory, 'metadata')) | ||
|
|
||
| # Refresh on the client side should fail because 2.root.json is not signed | ||
| # with a threshold of prevous keys | ||
| with self.assertRaises(tuf.exceptions.NoWorkingMirrorError) as cm: | ||
| self.repository_updater.refresh() | ||
|
|
||
| for mirror_url, mirror_error in six.iteritems(cm.exception.mirror_errors): | ||
| self.assertTrue(mirror_url.endswith('/2.root.json')) | ||
| self.assertTrue(isinstance(mirror_error, | ||
| securesystemslib.exceptions.BadSignatureError)) | ||
|
|
||
| # Assert that the current 'root.json' on the client side is the trusted one | ||
| # and 2.root.json is discarded | ||
| self.assertTrue(filecmp.cmp( | ||
| os.path.join(self.repository_directory, 'metadata', '1.root.json'), | ||
| os.path.join(self.client_metadata_current, 'root.json'))) | ||
|
|
||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1370,12 +1370,51 @@ def verify_target_file(target_file_object): | |
|
|
||
|
|
||
|
|
||
| def _verify_uncompressed_metadata_file(self, metadata_file_object, | ||
| def _verify_root_self_signed(self, signable): | ||
| """ | ||
| Verify the root metadata in signable is signed by a threshold of keys, | ||
| where the threshold and valid keys are defined by itself | ||
| """ | ||
| threshold = signable['signed']['roles']['root']['threshold'] | ||
| keyids = signable['signed']['roles']['root']['keyids'] | ||
| keys = signable['signed']['keys'] | ||
| signatures = signable['signatures'] | ||
| signed = securesystemslib.formats.encode_canonical( | ||
| signable['signed']).encode('utf-8') | ||
| validated = 0 | ||
|
|
||
| for signature in signatures: | ||
| keyid = signature['keyid'] | ||
|
|
||
| # At this point we are verifying that the root metadata is signed by a | ||
| # threshold of keys listed in the current root role, therefore skip | ||
| # keys with a keyid that is not listed in the current root role. | ||
| if keyid not in keyids: | ||
| continue | ||
|
|
||
| key = keys[keyid] | ||
| # The ANYKEY_SCHEMA check in verify_signature expects the keydict to | ||
| # include a keyid | ||
| key['keyid'] = keyid | ||
| valid_sig = securesystemslib.keys.verify_signature(key, signature, signed) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we bail on a single exception here? Feels like we need a try-catch...
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| if valid_sig: | ||
| validated = validated + 1 | ||
|
|
||
| if validated >= threshold: | ||
| return True | ||
| return False | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| def _verify_metadata_file(self, metadata_file_object, | ||
| metadata_role): | ||
| """ | ||
| <Purpose> | ||
| Non-public method that verifies an uncompressed metadata file. An | ||
| exception is raised if 'metadata_file_object is invalid. There is no | ||
| Non-public method that verifies a metadata file. An exception is | ||
| raised if 'metadata_file_object is invalid. There is no | ||
| return value. | ||
|
|
||
| <Arguments> | ||
|
|
@@ -1439,6 +1478,20 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, | |
| if not valid: | ||
| raise securesystemslib.exceptions.BadSignatureError(metadata_role) | ||
|
|
||
| # For root metadata, verify the downloaded root metadata object with the | ||
| # new threshold of new signatures contained within the downloaded root | ||
| # metadata object | ||
| # NOTE: we perform the checks on root metadata here because this enables | ||
| # us to perform the check before the tempfile is persisted. Furthermore, | ||
| # by checking here we can easily perform the check for each download | ||
| # mirror. Whereas if we check after _verify_metadata_file we may be | ||
| # persisting invalid files and we cannot try copies of the file from other | ||
| # mirrors. | ||
| if valid and metadata_role == 'root': | ||
| valid = self._verify_root_self_signed(metadata_signable) | ||
| if not valid: | ||
| raise securesystemslib.exceptions.BadSignatureError(metadata_role) | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
@@ -1569,7 +1622,7 @@ def _get_metadata_file(self, metadata_role, remote_filename, | |
| except KeyError: | ||
| logger.info(metadata_role + ' not available locally.') | ||
|
|
||
| self._verify_uncompressed_metadata_file(file_object, metadata_role) | ||
| self._verify_metadata_file(file_object, metadata_role) | ||
|
|
||
| except Exception as exception: | ||
| # Remember the error from this mirror, and "reset" the target file. | ||
|
|
@@ -1590,24 +1643,6 @@ def _get_metadata_file(self, metadata_role, remote_filename, | |
|
|
||
|
|
||
|
|
||
| def _verify_root_chain_link(self, rolename, current_root_metadata, | ||
| next_root_metadata): | ||
|
|
||
| if rolename != 'root': | ||
| return True | ||
|
|
||
| current_root_role = current_root_metadata['roles'][rolename] | ||
|
|
||
| # Verify next metadata with current keys/threshold | ||
| valid = tuf.sig.verify(next_root_metadata, rolename, self.repository_name, | ||
| current_root_role['threshold'], current_root_role['keyids']) | ||
|
|
||
| if not valid: | ||
| raise securesystemslib.exceptions.BadSignatureError('Root is not signed' | ||
| ' by previous threshold of keys.') | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| def _get_file(self, filepath, verify_file_function, file_type, file_length, | ||
|
|
@@ -1802,9 +1837,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None): | |
| updated_metadata_object = metadata_signable['signed'] | ||
| current_metadata_object = self.metadata['current'].get(metadata_role) | ||
|
|
||
| self._verify_root_chain_link(metadata_role, current_metadata_object, | ||
trishankatdatadog marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| metadata_signable) | ||
|
|
||
| # Finally, update the metadata and fileinfo stores, and rebuild the | ||
| # key and role info for the top-level roles if 'metadata_role' is root. | ||
| # Rebuilding the key and role info is required if the newly-installed | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.