Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build-scripts/build_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import argparse
import json
import logging
import os
import pathlib
Expand Down Expand Up @@ -179,9 +180,10 @@ def _process_rules(env_yaml: Dict, output_path: pathlib.Path,
resolved_root: pathlib.Path) -> None:
product = resolved_root.parent.name
for rule_id in product_rules:
rule_file = resolved_root / f'{rule_id}.yml'
rule_file = resolved_root / f'{rule_id}.json'

rendered_rule_obj = ssg.yaml.open_raw(str(rule_file))
with open(rule_file, "r") as file:
rendered_rule_obj = json.load(file)
rule_path = pathlib.Path(rendered_rule_obj["definition_location"])
rule_root = rule_path.parent

Expand All @@ -197,7 +199,8 @@ def _get_rules_in_profile(built_profiles_root) -> Generator[str, None, None]:
for profile_file in built_profiles_root.iterdir(): # type: pathlib.Path
if not profile_file.name.endswith(".profile"):
continue
profile_data = ssg.yaml.open_raw(str(profile_file.absolute()))
with open(str(profile_file.absolute()), "r") as file:
profile_data = json.load(file)
for selection in profile_data["selections"]:
if "=" not in selection:
yield selection
Expand Down
5 changes: 3 additions & 2 deletions build-scripts/collect_remediations.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def main():
for platform_file in os.listdir(args.platforms_dir):
platform_path = os.path.join(args.platforms_dir, platform_file)
try:
platform = ssg.build_yaml.Platform.from_yaml(platform_path, env_yaml, product_cpes)
platform = ssg.build_yaml.Platform.from_compiled_json(
platform_path, env_yaml, product_cpes)
except ssg.build_yaml.DocumentationNotComplete:
# Happens on non-debug build when a platform is
# "documentation-incomplete"
Expand All @@ -145,7 +146,7 @@ def main():
for rule_file in os.listdir(args.resolved_rules_dir):
rule_path = os.path.join(args.resolved_rules_dir, rule_file)
try:
rule = ssg.build_yaml.Rule.from_yaml(rule_path, env_yaml)
rule = ssg.build_yaml.Rule.from_compiled_json(rule_path, env_yaml)
except ssg.build_yaml.DocumentationNotComplete:
# Happens on non-debug build when a rule is
# "documentation-incomplete"
Expand Down
4 changes: 3 additions & 1 deletion build-scripts/compile_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def load_benchmark_source_data_from_directory_tree(loader, env_yaml, product_yam

def dump_compiled_profile(base_dir, profile):
dest = os.path.join(base_dir, "profiles", "{name}.profile".format(name=profile.id_))
profile.dump_yaml(dest)
profile.dump_json(dest)


def get_all_resolved_profiles_by_id(
Expand Down Expand Up @@ -121,6 +121,8 @@ def load_resolve_and_validate_profiles(
def save_everything(base_dir, loader, controls_manager, profiles):
controls_manager.save_everything(os.path.join(base_dir, "controls"))
loader.save_all_entities(base_dir)
if not os.path.exists(os.path.join(base_dir, "profiles")):
os.makedirs(os.path.join(base_dir, "profiles"))
for p in profiles:
dump_compiled_profile(base_dir, p)

Expand Down
4 changes: 2 additions & 2 deletions build-scripts/generate_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def create_parser():

def add_rule_attributes(main_dir, output_dict, rule_id):
rule_filename = os.path.join(main_dir, "rules", rule_id + ".yml")
r = build_yaml.Rule.from_yaml(rule_filename)
r = build_yaml.Rule.from_compiled_json(rule_filename)
platform_names = r.cpe_platform_names.union(r.inherited_cpe_platform_names)
output_dict["platform_names"] = sorted(list(platform_names))

Expand Down Expand Up @@ -64,7 +64,7 @@ def add_profile_data(output_dict, main_dir):
profiles_glob = os.path.join(main_dir, "profiles", "*.profile")
filenames = glob(profiles_glob)
for path in sorted(filenames):
p = profile.Profile.from_yaml(path)
p = profile.Profile.from_compiled_json(path)
output_dict[p.id_] = dict()
output_dict[p.id_]["rules"] = sorted(p.selected)

Expand Down
8 changes: 5 additions & 3 deletions ssg/build_cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ def load_cpes_from_directory_tree(self, root_path, env_yaml):
continue

_, ext = os.path.splitext(os.path.basename(dir_item_path))
if ext != '.yml':
if ext == ".yml":
cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml)
elif ext == ".json":
cpe_item = CPEItem.from_compiled_json(dir_item_path, env_yaml)
else:
sys.stderr.write(
"Encountered file '%s' while looking for content CPEs, "
"extension '%s' is unknown. Skipping..\n"
% (dir_item, ext)
)
continue

cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml)
self.add_cpe_item(cpe_item)

def add_cpe_item(self, cpe_item):
Expand Down
52 changes: 31 additions & 21 deletions ssg/build_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
Raises:
ValueError: If the operator in the input data is not one of the expected possible operators.
"""
input_contents["interactive"] = (
input_contents.get("interactive", "false").lower() == "true")
if "interactive" in input_contents and isinstance(input_contents["interactive"], str):
input_contents["interactive"] = (
input_contents.get("interactive", "false").lower() == "true")

data = super(Value, cls).process_input_dict(input_contents, env_yaml)

Expand Down Expand Up @@ -576,7 +577,7 @@ def add_profiles_from_dir(self, dir_, env_yaml, product_cpes):
continue

try:
new_profile = ProfileWithInlinePolicies.from_yaml(
new_profile = ProfileWithInlinePolicies.from_compiled_json(
dir_item_path, env_yaml, product_cpes)
except DocumentationNotComplete:
continue
Expand Down Expand Up @@ -2863,9 +2864,9 @@ def save_entities(self, entities, destdir):
if not entities:
return
for entity in entities:
basename = entity.id_ + ".yml"
basename = entity.id_ + ".json"
dest_filename = os.path.join(destdir, basename)
entity.dump_yaml(dest_filename)
entity.dump_json(dest_filename)


class BuildLoader(DirectoryLoader):
Expand Down Expand Up @@ -3093,18 +3094,18 @@ def find_first_groups_ids(self, start_dir):

def load_entities_by_id(self, filenames, destination, cls):
"""
Loads entities from a list of YAML files and stores them in a destination dictionary by their ID.
Loads entities from a list of JSON files and stores them in a destination dictionary by their ID.

Args:
filenames (list of str): List of file paths to YAML files.
filenames (list of str): List of file paths to JSON files.
destination (dict): Dictionary to store the loaded entities, keyed by their ID.
cls (type): Class type that has a `from_yaml` method to create an instance from a YAML file.
cls (type): Class type that has a `from_yaml` method to create an instance from a JSON file.

Returns:
None
"""
for fname in filenames:
entity = cls.from_yaml(fname, self.env_yaml, self.product_cpes)
entity = cls.from_compiled_json(fname, self.env_yaml, self.product_cpes)
destination[entity.id_] = entity

def add_fixes_to_rules(self):
Expand Down Expand Up @@ -3183,7 +3184,7 @@ def get_benchmark_xml_by_profile(self, rule_and_variables_dict):
)

for profile in self.benchmark.profiles:
if profile.single_rule_profile == "true":
if profile.single_rule_profile:
profiles_ids, benchmark = self.benchmark.get_benchmark_xml_for_profiles(
self.env_yaml, [profile], rule_and_variables_dict
)
Expand Down Expand Up @@ -3221,16 +3222,16 @@ def load_compiled_content(self):

self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir)

filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml"))
filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.json"))
self.load_entities_by_id(filenames, self.rules, Rule)

filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml"))
filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.json"))
self.load_entities_by_id(filenames, self.groups, Group)

filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml"))
filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.json"))
self.load_entities_by_id(filenames, self.values, Value)

filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.yml"))
filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.json"))
self.load_entities_by_id(filenames, self.platforms, Platform)
self.product_cpes.platforms = self.platforms

Expand All @@ -3249,7 +3250,7 @@ def export_benchmark_to_xml(self, rule_and_variables_dict, ignore_single_rule_pr
str: The benchmark data in XML format.
"""
if ignore_single_rule_profiles:
profiles = [p for p in self.benchmark.profiles if p.single_rule_profile == "false"]
profiles = [p for p in self.benchmark.profiles if not p.single_rule_profile]
else:
profiles = self.benchmark.profiles
_, benchmark = self.benchmark.get_benchmark_xml_for_profiles(
Expand Down Expand Up @@ -3531,12 +3532,12 @@ def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
Platform: A Platform object created from the YAML file.
"""
platform = super(Platform, cls).from_yaml(yaml_file, env_yaml)
# If we received a product_cpes, we can restore also the original test object
# it can be later used e.g. for comparison
if product_cpes:
platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True)
product_cpes.add_resolved_cpe_items_from_platform(platform)
return platform
return restore_original_test_object(platform, product_cpes)

@classmethod
def from_compiled_json(cls, json_file_path, env_yaml=None, product_cpes=None):
platform = super(Platform, cls).from_compiled_json(json_file_path, env_yaml)
return restore_original_test_object(platform, product_cpes)

def get_fact_refs(self):
"""
Expand Down Expand Up @@ -3598,3 +3599,12 @@ def add_platform_if_not_defined(platform, product_cpes):
return p
product_cpes.platforms[platform.id_] = platform
return platform


def restore_original_test_object(platform, product_cpes):
# If we received a product_cpes, we can restore also the original test object
# it can be later used e.g. for comparison
if product_cpes:
platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True)
product_cpes.add_resolved_cpe_items_from_platform(platform)
return platform
30 changes: 18 additions & 12 deletions ssg/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,17 @@ def __init__(self, controls_dir, env_yaml=None, existing_rules=None):
self.existing_rules = existing_rules
self.policies = {}

def _load(self, format):
if not os.path.exists(self.controls_dir):
return
for filename in sorted(glob(os.path.join(self.controls_dir, "*." + format))):
filepath = os.path.join(self.controls_dir, filename)
policy = Policy(filepath, self.env_yaml)
policy.load()
self.policies[policy.id] = policy
self.check_all_rules_exist()
self.resolve_controls()

def load(self):
"""
Load policies from YAML files in the controls directory.
Expand All @@ -822,15 +833,10 @@ def load(self):
Returns:
None
"""
if not os.path.exists(self.controls_dir):
return
for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
filepath = os.path.join(self.controls_dir, filename)
policy = Policy(filepath, self.env_yaml)
policy.load()
self.policies[policy.id] = policy
self.check_all_rules_exist()
self.resolve_controls()
self._load("yml")

def load_compiled(self):
self._load("json")

def check_all_rules_exist(self):
"""
Expand Down Expand Up @@ -1050,7 +1056,7 @@ def get_all_controls(self, policy_id):

def save_everything(self, output_dir):
"""
Saves all policies to the specified output directory in YAML format.
Saves all policies to the specified output directory in JSON format.

Args:
output_dir (str): The directory where the policy files will be saved.
Expand All @@ -1060,8 +1066,8 @@ def save_everything(self, output_dir):
"""
ssg.utils.mkdir_p(output_dir)
for policy_id, policy in self.policies.items():
filename = os.path.join(output_dir, "{}.{}".format(policy_id, "yml"))
policy.dump_yaml(filename)
filename = os.path.join(output_dir, "{}.{}".format(policy_id, "json"))
policy.dump_json(filename)

def add_references(self, rules):
"""
Expand Down
Loading
Loading