utils.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. from elemental_utils import ElementalDns, ElementalCdns
  2. from pynetbox.models.virtualization import VirtualMachines
  3. from pynetbox.core.response import Record
  4. from elemental_utils import cpnr
  5. from typing import List
  6. import concurrent.futures
  7. import logging
  8. from cleu.config import Config as C # type: ignore
  9. def normalize_cnames(cnames: List[str], domain: str) -> List[str]:
  10. """
  11. Given a list of CNAMEs, ensure each one is stripped, ends with a '.'
  12. and has the default domain name if another domain name is not present.
  13. Args:
  14. :cnames List[str]: List of CNAMEs to normalize
  15. :domain str: Default domain name to append to unqualified CNAMEs
  16. Returns:
  17. :List[str]: Normalized list of CNAMEs
  18. """
  19. cnames = [s.strip() for s in cnames]
  20. cnames = list(map(lambda s: s + "." if ("." in s and not s.endswith(".")) else s, cnames))
  21. cnames = list(map(lambda s: s + f".{domain}" if (not s.endswith(".")) else s, cnames))
  22. return cnames
  23. def dedup_cnames(cnames: List[str], domain: str) -> List[str]:
  24. """
  25. Ensure a list of CNAMEs is unique
  26. Args:
  27. :cnames List[str]: List of CNAMEs to check
  28. :domain str: Domain name to append to those unqualified CNAMEs
  29. Returns:
  30. :List[str]: De-duped list of CNAMEs
  31. """
  32. cname_dict = {}
  33. cname_list = normalize_cnames(cnames, domain)
  34. for c in cname_list:
  35. cname_dict[c] = True
  36. return list(cname_dict.keys())
  37. def get_cname_record(alias: str, domain: str, edns: ElementalDns) -> cpnr.models.model.Record:
  38. """Get a CNAME RRSet if it exists.
  39. Args:
  40. :alias str: Alias for which to search
  41. :domain str: Domain name in which to look for the CNAME alias
  42. :edns ElementalDns: ElementalDns object that is the auth DNS
  43. Returns:
  44. :Record: Resource Record set if CNAME is found else (or if auth DNS cannot be found) None
  45. """
  46. return edns.rrset.get(alias, zoneOrigin=domain)
  47. def launch_parallel_task(
  48. task, task_name: str, iterator: list, name_attribute: str, workers: int = 20, stop_on_error: bool = False, /, *args
  49. ) -> bool:
  50. """Execute a parallel task using thread pools.
  51. Args:
  52. :task (function): Task/function to execute
  53. :task_name str: Description of the task
  54. :iterator list: List of items on which to run the task
  55. :name_attribute str: Name of the attribute to identify the item
  56. :workers int: Number of threads to use (default: 20)
  57. :stop_on_error bool: Whether to stop if an error is encountered (default: False)
  58. :*args: Arguments to the task
  59. Returns:
  60. :bool: True if the task succeeded, False otherwise
  61. """
  62. logger = logging.getLogger(__name__)
  63. result = True
  64. with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
  65. future_task = {executor.submit(task, item, *args): item for item in iterator}
  66. for ft in concurrent.futures.as_completed(future_task):
  67. item = future_task[ft]
  68. try:
  69. ft.result()
  70. except Exception as e:
  71. if not name_attribute:
  72. logger.exception(f"⛔️ Failed to {task_name} for {item}: {e}")
  73. else:
  74. logger.exception(f"⛔️ Failed to {task_name} for {getattr(item, name_attribute)}: {e}")
  75. result = False
  76. if stop_on_error:
  77. break
  78. return result
  79. def restart_dns_servers(edns: ElementalDns, cdnses: list) -> None:
  80. """Restart all affected DNS servers.
  81. Args:
  82. :edns ElementalDns: ElementalDns object to restart
  83. :ecdns ElementalCdns: ElementalCdns object to restart
  84. """
  85. logger = logging.getLogger(__name__)
  86. try:
  87. edns.sync_ha_pair(instance="DNSHA", add_params={"mode": "exact", "direction": "fromMain"})
  88. except Exception:
  89. # This can fail when we don't yet have an HA pair.
  90. pass
  91. edns.reload_server()
  92. logger.info(f"🏁 Reloaded server {edns.base_url}")
  93. # Restart each applicable CDNS server.
  94. for cdns in cdnses:
  95. ecdns = ElementalCdns(url=f"https://{cdns}:8443/")
  96. ecdns.reload_server()
  97. logger.info(f"🏁 Reloaded CDNS server {ecdns.base_url}")
  98. def get_reverse_zone(ip: str) -> str:
  99. """Get the reverse zone for an IP.
  100. Args:
  101. :ip str: IP address to parse
  102. Returns:
  103. :str: Reverse zone name
  104. """
  105. octets = ip.split(".")
  106. rzone_name = f"{'.'.join(octets[::-1][1:])}.in-addr.arpa."
  107. return rzone_name
  108. def parse_txt_record(txt_record: str) -> dict:
  109. """Parse a NetBox TXT record and return a dict of it.
  110. Args:
  111. :txt_record str: String representation of the TXT record data
  112. Returns:
  113. :dict: Dict of the results with each field a key
  114. """
  115. result = {}
  116. txt_record = txt_record.strip('"')
  117. if not txt_record.startswith("v=_netbox"):
  118. raise ValueError(f"Invalid NetBox TXT record data: {txt_record}")
  119. key_vals = txt_record.split(" ")
  120. for key_val in key_vals:
  121. if "=" in key_val:
  122. (key, value) = key_val.split("=")
  123. result[key] = value
  124. else:
  125. result[key_val] = None
  126. return result