123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- from elemental_utils import ElementalDns, ElementalCdns
- from pynetbox.models.virtualization import VirtualMachines
- from pynetbox.core.response import Record
- from elemental_utils import cpnr
- from typing import List
- import concurrent.futures
- import logging
- from cleu.config import Config as C # type: ignore
- def normalize_cnames(cnames: List[str], domain: str) -> List[str]:
- """
- Given a list of CNAMEs, ensure each one is stripped, ends with a '.'
- and has the default domain name if another domain name is not present.
- Args:
- :cnames List[str]: List of CNAMEs to normalize
- :domain str: Default domain name to append to unqualified CNAMEs
- Returns:
- :List[str]: Normalized list of CNAMEs
- """
- cnames = [s.strip() for s in cnames]
- cnames = list(map(lambda s: s + "." if ("." in s and not s.endswith(".")) else s, cnames))
- cnames = list(map(lambda s: s + f".{domain}" if (not s.endswith(".")) else s, cnames))
- return cnames
- def dedup_cnames(cnames: List[str], domain: str) -> List[str]:
- """
- Ensure a list of CNAMEs is unique
- Args:
- :cnames List[str]: List of CNAMEs to check
- :domain str: Domain name to append to those unqualified CNAMEs
- Returns:
- :List[str]: De-duped list of CNAMEs
- """
- cname_dict = {}
- cname_list = normalize_cnames(cnames, domain)
- for c in cname_list:
- cname_dict[c] = True
- return list(cname_dict.keys())
- def get_cname_record(alias: str, domain: str, edns: ElementalDns) -> cpnr.models.model.Record:
- """Get a CNAME RRSet if it exists.
- Args:
- :alias str: Alias for which to search
- :domain str: Domain name in which to look for the CNAME alias
- :edns ElementalDns: ElementalDns object that is the auth DNS
- Returns:
- :Record: Resource Record set if CNAME is found else (or if auth DNS cannot be found) None
- """
- return edns.rrset.get(alias, zoneOrigin=domain)
- def launch_parallel_task(
- task, task_name: str, iterator: list, name_attribute: str, workers: int = 20, stop_on_error: bool = False, /, *args
- ) -> bool:
- """Execute a parallel task using thread pools.
- Args:
- :task (function): Task/function to execute
- :task_name str: Description of the task
- :iterator list: List of items on which to run the task
- :name_attribute str: Name of the attribute to identify the item
- :workers int: Number of threads to use (default: 20)
- :stop_on_error bool: Whether to stop if an error is encountered (default: False)
- :*args: Arguments to the task
- Returns:
- :bool: True if the task succeeded, False otherwise
- """
- logger = logging.getLogger(__name__)
- result = True
- with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
- future_task = {executor.submit(task, item, *args): item for item in iterator}
- for ft in concurrent.futures.as_completed(future_task):
- item = future_task[ft]
- try:
- ft.result()
- except Exception as e:
- if not name_attribute:
- logger.exception(f"⛔️ Failed to {task_name} for {item}: {e}")
- else:
- logger.exception(f"⛔️ Failed to {task_name} for {getattr(item, name_attribute)}: {e}")
- result = False
- if stop_on_error:
- break
- return result
- def restart_dns_servers(edns: ElementalDns, cdnses: list) -> None:
- """Restart all affected DNS servers.
- Args:
- :edns ElementalDns: ElementalDns object to restart
- :ecdns ElementalCdns: ElementalCdns object to restart
- """
- logger = logging.getLogger(__name__)
- try:
- edns.sync_ha_pair(instance="DNSHA", add_params={"mode": "exact", "direction": "fromMain"})
- except Exception:
- # This can fail when we don't yet have an HA pair.
- pass
- edns.reload_server()
- logger.info(f"🏁 Reloaded server {edns.base_url}")
- # Restart each applicable CDNS server.
- for cdns in cdnses:
- ecdns = ElementalCdns(url=f"https://{cdns}:8443/")
- ecdns.reload_server()
- logger.info(f"🏁 Reloaded CDNS server {ecdns.base_url}")
- def get_reverse_zone(ip: str) -> str:
- """Get the reverse zone for an IP.
- Args:
- :ip str: IP address to parse
- Returns:
- :str: Reverse zone name
- """
- octets = ip.split(".")
- rzone_name = f"{'.'.join(octets[::-1][1:])}.in-addr.arpa."
- return rzone_name
- def parse_txt_record(txt_record: str) -> dict:
- """Parse a NetBox TXT record and return a dict of it.
- Args:
- :txt_record str: String representation of the TXT record data
- Returns:
- :dict: Dict of the results with each field a key
- """
- result = {}
- txt_record = txt_record.strip('"')
- if not txt_record.startswith("v=_netbox"):
- raise ValueError(f"Invalid NetBox TXT record data: {txt_record}")
- key_vals = txt_record.split(" ")
- for key_val in key_vals:
- if "=" in key_val:
- (key, value) = key_val.split("=")
- result[key] = value
- else:
- result[key_val] = None
- return result
|