129 lines
4.1 KiB
Python
Executable File
129 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Adapted from: https://enix.io/en/blog/pxe-talos/
|
|
|
|
import functools
|
|
import json
|
|
import pathlib
|
|
|
|
import requests
|
|
import yaml
|
|
from jinja2 import Environment, FileSystemLoader, StrictUndefined, Template
|
|
|
|
NODES = pathlib.Path("nodes")
|
|
SCHEMATICS = pathlib.Path("schematics")
|
|
RENDERED = pathlib.Path("rendered")
|
|
PATCHES = Environment(loader=FileSystemLoader("patches"), undefined=StrictUndefined)
|
|
TEMPLATES = Environment(loader=FileSystemLoader("templates"), undefined=StrictUndefined)
|
|
|
|
|
|
def node_encoder(node: dict):
|
|
class Inner(json.JSONEncoder):
|
|
def default(self, o):
|
|
if isinstance(o, Template):
|
|
try:
|
|
rendered = o.render(node)
|
|
except Exception as e:
|
|
e.add_note(f"While rendering for: {node['hostname']}")
|
|
raise e
|
|
# Parse the rendered yaml and convert it to a json patch
|
|
return json.dumps(yaml.safe_load(rendered))
|
|
|
|
return super().default(o)
|
|
|
|
return Inner
|
|
|
|
|
|
@functools.cache
|
|
def get_schematic_id(schematic: str):
|
|
"""Lookup the schematic id associated with a given schematic"""
|
|
r = requests.post("https://factory.talos.dev/schematics", data=schematic)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
return data["id"]
|
|
|
|
|
|
def schematic_constructor(loader: yaml.SafeLoader, node: yaml.nodes.ScalarNode):
|
|
"""Load specified schematic file and get the assocatied schematic id"""
|
|
schematic_name = loader.construct_yaml_str(node)
|
|
try:
|
|
schematic = SCHEMATICS.joinpath(schematic_name).with_suffix(".yaml").read_text()
|
|
return get_schematic_id(schematic)
|
|
except Exception:
|
|
raise yaml.MarkedYAMLError("Failed to load schematic", node.start_mark)
|
|
|
|
|
|
def patch_constructor(loader: yaml.SafeLoader, node: yaml.nodes.ScalarNode):
|
|
patch_name = loader.construct_scalar(node)
|
|
try:
|
|
template = PATCHES.get_template(f"{patch_name}.yaml")
|
|
return template
|
|
except Exception:
|
|
raise yaml.MarkedYAMLError("Failed to load patch", node.start_mark)
|
|
|
|
|
|
def get_loader():
|
|
"""Add special constructors to yaml loader"""
|
|
loader = yaml.SafeLoader
|
|
loader.add_constructor("!schematic", schematic_constructor)
|
|
loader.add_constructor("!patch", patch_constructor)
|
|
|
|
return loader
|
|
|
|
|
|
@functools.cache
|
|
def get_defaults(directory: pathlib.Path, root: pathlib.Path):
|
|
"""Compute the defaults from the provided directory and parents."""
|
|
try:
|
|
with open(directory.joinpath("_defaults.yaml")) as fyaml:
|
|
yml_data = yaml.load(fyaml, Loader=get_loader())
|
|
except OSError:
|
|
yml_data = {}
|
|
|
|
# Stop recursion when reaching root directory
|
|
if directory != root:
|
|
return get_defaults(directory.parent, root) | yml_data
|
|
else:
|
|
return yml_data
|
|
|
|
|
|
def walk_files(root: pathlib.Path):
|
|
"""Get all files that do not start with and underscore"""
|
|
for dirpath, _dirnames, filenames in root.walk():
|
|
for fn in filenames:
|
|
if not fn.startswith("_"):
|
|
yield dirpath.joinpath(fn)
|
|
|
|
|
|
def main():
|
|
nodes = []
|
|
for fullname in walk_files(NODES):
|
|
filename = str(fullname.relative_to(NODES).parent) + "/" + fullname.stem
|
|
|
|
with open(fullname) as fyaml:
|
|
yml_data = yaml.load(fyaml, Loader=get_loader())
|
|
yml_data = get_defaults(fullname.parent, NODES) | yml_data
|
|
yml_data["hostname"] = fullname.stem
|
|
yml_data["filename"] = filename
|
|
nodes.append(yml_data)
|
|
|
|
final_nodes = []
|
|
for node in nodes:
|
|
# Quick and dirty way to resolve all the templates using a custom encoder
|
|
final_nodes.append(json.loads(json.dumps(node, cls=node_encoder(node))))
|
|
|
|
with open("config.yaml") as fyaml:
|
|
config = yaml.safe_load(fyaml)
|
|
|
|
RENDERED.mkdir(exist_ok=True)
|
|
for template_name in TEMPLATES.list_templates():
|
|
template = TEMPLATES.get_template(template_name)
|
|
|
|
rendered = template.render(nodes=final_nodes, config=config)
|
|
with open(RENDERED.joinpath(template_name), "w") as f:
|
|
f.write(rendered)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|