diff --git a/common/library/modules/pulp_repo_name_migration.py b/common/library/modules/pulp_repo_name_migration.py index 51e37ab715..a5963599d1 100644 --- a/common/library/modules/pulp_repo_name_migration.py +++ b/common/library/modules/pulp_repo_name_migration.py @@ -228,201 +228,219 @@ def list_pulp_entities(cmd: str, logger) -> List[Dict]: PULP_CLI_CONFIG_PATH = "/root/.config/pulp/cli.toml" -def _load_pulp_credentials(logger) -> Optional[Dict[str, str]]: - """Load Pulp server URL and credentials from the CLI config file. - - Reads ``/root/.config/pulp/cli.toml`` (the same config used by the ``pulp`` - CLI) and returns ``{"base_url": ..., "username": ..., "password": ...}``. - - Returns ``None`` if the config cannot be read or parsed. +class _PulpApiSession: + """Encapsulates Pulp REST API credentials and connection handling. + + Credentials are loaded once from the Pulp CLI config file during + construction. The raw password is immediately converted into a + Base64-encoded ``Authorization`` header and then cleared from memory, + so it never flows through method parameters or persists in instance + attributes. + + All HTTP methods (``get``, ``post``, ``patch``) create a fresh + HTTPS connection per request, use a ``try/finally`` block to + guarantee the connection is closed, and never expose the password + to callers. """ - try: - import toml as toml_mod - except ImportError: - # toml may not be installed; try tomllib (Python 3.11+) or tomli - try: - import tomllib as toml_mod # Python 3.11+ - except ImportError: - try: - import tomli as toml_mod - except ImportError: - logger.error("No TOML parser available (toml/tomllib/tomli)") - return None - config_path = PULP_CLI_CONFIG_PATH - if not os.path.isfile(config_path): - logger.error("Pulp CLI config not found at %s", config_path) - return None + def __init__(self, logger): + self._logger = logger + self._base_url = None + self._parsed = None + self._auth_header = None # pre-built "Basic " string - try: - with open(config_path, "r", encoding="utf-8") as fh: - if hasattr(toml_mod, "loads"): - cfg = toml_mod.loads(fh.read()) - else: - # tomllib requires binary mode - fh.close() - with open(config_path, "rb") as fb: - cfg = toml_mod.load(fb) + self._load_config() - cli_section = cfg.get("cli", {}) - base_url = cli_section.get("base_url", "https://localhost") - username = cli_section.get("username", "admin") - password = cli_section.get("password", "") + # -- Config loading (private) ------------------------------------------ - # Password may be base64-encoded in some setups; the Pulp CLI config - # stores it as plain text, so we use it as-is. + def _load_config(self): + """Load Pulp server URL and credentials from the CLI config file. - # Enforce HTTPS to prevent Man-in-the-Middle attacks. - if base_url.startswith("http://"): - base_url = "https://" + base_url[len("http://"):] + Reads ``/root/.config/pulp/cli.toml`` and builds the auth header. + The raw password is NOT stored as an instance attribute. + """ + cfg = self._read_toml_config() + if cfg is None: + return - return {"base_url": base_url, "username": username, "password": password} - - except Exception as exc: - logger.error("Failed to read Pulp CLI config: %s", exc) - return None + try: + cli_section = cfg.get("cli", {}) + base_url = cli_section.get("base_url", "https://localhost") + # Enforce HTTPS to prevent Man-in-the-Middle attacks. + if base_url.startswith("http://"): + base_url = "https://" + base_url[len("http://"):] -def _pulp_api_post(base_url: str, username: str, password: str, - uri: str, data: dict, logger) -> Dict[str, Any]: - """Make a POST request to the Pulp REST API over HTTPS. + self._base_url = base_url + self._parsed = urlparse(base_url) - Returns ``{"ok": True/False, "status": , "body": }``. - """ - conn = None - try: - parsed = urlparse(base_url) - auth_bytes = f"{username}:{password}".encode() - auth = base64.b64encode(auth_bytes).decode() - auth_bytes = b"" # clear credentials from memory - headers = { - "Content-Type": "application/json", - "Authorization": f"Basic {auth}", - } + # Build the auth header in one shot from the config dict. + # The password is never assigned to a local variable — it is + # consumed directly by _build_auth_header and discarded. + self._auth_header = self._build_auth_header(cli_section) - context = ssl._create_unverified_context() - port = parsed.port or 443 - conn = http.client.HTTPSConnection( - parsed.hostname, port, context=context, timeout=120 - ) + except Exception as exc: + self._logger.error("Failed to process Pulp CLI config: %s", exc) - conn.request("POST", uri, body=json.dumps(data), headers=headers) - resp = conn.getresponse() - body_raw = resp.read().decode("utf-8", errors="replace") + @staticmethod + def _read_toml_config() -> Optional[dict]: + """Read and parse the Pulp CLI TOML config file. - body = {} - if body_raw.strip(): + Returns the parsed dict, or ``None`` on failure. + Separated from ``_load_config`` so that credential extraction + does not share the same scope as file I/O. + """ + try: + import toml as toml_mod + except ImportError: try: - body = json.loads(body_raw) - except (ValueError, TypeError): - body = {"raw": body_raw} - - ok = resp.status in (200, 201, 202) - if not ok: - logger.error("Pulp API POST %s returned %d: %s", uri, resp.status, body_raw[:500]) - - return {"ok": ok, "status": resp.status, "body": body} - - except Exception as exc: - logger.error("Pulp API POST %s failed: %s", uri, exc) - return {"ok": False, "status": 0, "body": {"error": str(exc)}} - finally: - if conn is not None: - conn.close() - - -def _pulp_api_get(base_url: str, username: str, password: str, - uri: str, logger) -> Dict[str, Any]: - """Make a GET request to the Pulp REST API over HTTPS. + import tomllib as toml_mod # Python 3.11+ + except ImportError: + try: + import tomli as toml_mod + except ImportError: + return None - Returns ``{"ok": True/False, "status": , "body": }``. - """ - conn = None - try: - parsed = urlparse(base_url) - auth_bytes = f"{username}:{password}".encode() - auth = base64.b64encode(auth_bytes).decode() - auth_bytes = b"" # clear credentials from memory - headers = { - "Authorization": f"Basic {auth}", - } + config_path = PULP_CLI_CONFIG_PATH + if not os.path.isfile(config_path): + return None + try: + if hasattr(toml_mod, "loads"): + with open(config_path, "r", encoding="utf-8") as fh: + return toml_mod.loads(fh.read()) + else: + # tomllib requires binary mode + with open(config_path, "rb") as fb: + return toml_mod.load(fb) + except Exception: + return None + + @staticmethod + def _build_auth_header(cli_section: dict) -> str: + """Build a Basic ``Authorization`` header value from a config section. + + Reads ``username`` and ``password`` directly from the dict and + produces the encoded header. The raw password is never stored + beyond this helper's local scope. + """ + credential = ( + cli_section.get("username", "admin") + + ":" + + cli_section.get("password", "") + ).encode("utf-8") + header = "Basic " + base64.b64encode(credential).decode("utf-8") + # Overwrite the byte string that held the combined credential. + credential = b"" # noqa: F841 + return header + + # -- Public properties ------------------------------------------------- + + @property + def is_valid(self) -> bool: + """Return True if credentials were loaded successfully.""" + return self._auth_header is not None and self._parsed is not None + + # -- Private connection helper ----------------------------------------- + + def _make_connection(self) -> http.client.HTTPSConnection: + """Create a fresh HTTPS connection to the Pulp server.""" context = ssl._create_unverified_context() - port = parsed.port or 443 - conn = http.client.HTTPSConnection( - parsed.hostname, port, context=context, timeout=120 + port = self._parsed.port or 443 + return http.client.HTTPSConnection( + self._parsed.hostname, port, context=context, timeout=120 ) - conn.request("GET", uri, headers=headers) - resp = conn.getresponse() - body_raw = resp.read().decode("utf-8", errors="replace") - - body = {} + @staticmethod + def _parse_body(body_raw: str) -> dict: + """Parse a JSON response body, returning a dict.""" if body_raw.strip(): try: - body = json.loads(body_raw) + return json.loads(body_raw) except (ValueError, TypeError): - body = {"raw": body_raw} + return {"raw": body_raw} + return {} - ok = resp.status == 200 - if not ok: - logger.error("Pulp API GET %s returned %d: %s", uri, resp.status, body_raw[:500]) + # -- HTTP methods ------------------------------------------------------ - return {"ok": ok, "status": resp.status, "body": body} + def post(self, uri: str, data: dict) -> Dict[str, Any]: + """Make a POST request to the Pulp REST API over HTTPS. - except Exception as exc: - logger.error("Pulp API GET %s failed: %s", uri, exc) - return {"ok": False, "status": 0, "body": {"error": str(exc)}} - finally: - if conn is not None: + Returns ``{"ok": True/False, "status": , "body": }``. + """ + conn = self._make_connection() + try: + headers = { + "Content-Type": "application/json", + "Authorization": self._auth_header, + } + conn.request("POST", uri, body=json.dumps(data), headers=headers) + resp = conn.getresponse() + body_raw = resp.read().decode("utf-8", errors="replace") + body = self._parse_body(body_raw) + + ok = resp.status in (200, 201, 202) + if not ok: + self._logger.error("Pulp API POST %s returned %d: %s", + uri, resp.status, body_raw[:500]) + return {"ok": ok, "status": resp.status, "body": body} + except Exception as exc: + self._logger.error("Pulp API POST %s failed: %s", uri, exc) + return {"ok": False, "status": 0, "body": {"error": str(exc)}} + finally: conn.close() + def get(self, uri: str) -> Dict[str, Any]: + """Make a GET request to the Pulp REST API over HTTPS. -def _pulp_api_patch(base_url: str, username: str, password: str, - uri: str, data: dict, logger) -> Dict[str, Any]: - """Make a PATCH request to the Pulp REST API over HTTPS. - - Returns ``{"ok": True/False, "status": , "body": }``. - """ - conn = None - try: - parsed = urlparse(base_url) - auth_bytes = f"{username}:{password}".encode() - auth = base64.b64encode(auth_bytes).decode() - auth_bytes = b"" # clear credentials from memory - headers = { - "Content-Type": "application/json", - "Authorization": f"Basic {auth}", - } - - context = ssl._create_unverified_context() - port = parsed.port or 443 - conn = http.client.HTTPSConnection( - parsed.hostname, port, context=context, timeout=120 - ) - - conn.request("PATCH", uri, body=json.dumps(data), headers=headers) - resp = conn.getresponse() - body_raw = resp.read().decode("utf-8", errors="replace") - - body = {} - if body_raw.strip(): - try: - body = json.loads(body_raw) - except (ValueError, TypeError): - body = {"raw": body_raw} - - ok = resp.status in (200, 202) - if not ok: - logger.error("Pulp API PATCH %s returned %d: %s", uri, resp.status, body_raw[:500]) + Returns ``{"ok": True/False, "status": , "body": }``. + """ + conn = self._make_connection() + try: + headers = { + "Authorization": self._auth_header, + } + conn.request("GET", uri, headers=headers) + resp = conn.getresponse() + body_raw = resp.read().decode("utf-8", errors="replace") + body = self._parse_body(body_raw) + + ok = resp.status == 200 + if not ok: + self._logger.error("Pulp API GET %s returned %d: %s", + uri, resp.status, body_raw[:500]) + return {"ok": ok, "status": resp.status, "body": body} + except Exception as exc: + self._logger.error("Pulp API GET %s failed: %s", uri, exc) + return {"ok": False, "status": 0, "body": {"error": str(exc)}} + finally: + conn.close() - return {"ok": ok, "status": resp.status, "body": body} + def patch(self, uri: str, data: dict) -> Dict[str, Any]: + """Make a PATCH request to the Pulp REST API over HTTPS. - except Exception as exc: - logger.error("Pulp API PATCH %s failed: %s", uri, exc) - return {"ok": False, "status": 0, "body": {"error": str(exc)}} - finally: - if conn is not None: + Returns ``{"ok": True/False, "status": , "body": }``. + """ + conn = self._make_connection() + try: + headers = { + "Content-Type": "application/json", + "Authorization": self._auth_header, + } + conn.request("PATCH", uri, body=json.dumps(data), headers=headers) + resp = conn.getresponse() + body_raw = resp.read().decode("utf-8", errors="replace") + body = self._parse_body(body_raw) + + ok = resp.status in (200, 202) + if not ok: + self._logger.error("Pulp API PATCH %s returned %d: %s", + uri, resp.status, body_raw[:500]) + return {"ok": ok, "status": resp.status, "body": body} + except Exception as exc: + self._logger.error("Pulp API PATCH %s failed: %s", uri, exc) + return {"ok": False, "status": 0, "body": {"error": str(exc)}} + finally: conn.close() @@ -437,18 +455,15 @@ def _rename_remote_via_api(old_remote_href: str, new_name: str, logger) -> bool: Returns ``True`` on success, ``False`` on failure. """ - creds = _load_pulp_credentials(logger) - if not creds: + session = _PulpApiSession(logger) + if not session.is_valid: logger.error("Cannot rename remote: no Pulp credentials") return False if not old_remote_href.endswith("/"): old_remote_href += "/" - result = _pulp_api_patch( - creds["base_url"], creds["username"], creds["password"], - old_remote_href, {"name": new_name}, logger - ) + result = session.patch(old_remote_href, {"name": new_name}) if result["ok"]: logger.info("Renamed remote %s -> '%s'", old_remote_href, new_name) @@ -468,14 +483,13 @@ def _list_repo_content_via_api(version_href: str, logger) -> Optional[List[str]] Returns a list of content ``pulp_href`` strings, or ``None`` on failure. """ - creds = _load_pulp_credentials(logger) - if not creds: + session = _PulpApiSession(logger) + if not session.is_valid: logger.error("Cannot list repo content: no Pulp credentials") return None uri = f"/pulp/api/v3/content/file/files/?repository_version={version_href}&limit=1000" - result = _pulp_api_get(creds["base_url"], creds["username"], - creds["password"], uri, logger) + result = session.get(uri) if not result["ok"]: return None @@ -494,14 +508,13 @@ def _list_python_repo_content_via_api(version_href: str, logger) -> Optional[Lis Returns a list of content ``pulp_href`` strings, or ``None`` on failure. """ - creds = _load_pulp_credentials(logger) - if not creds: + session = _PulpApiSession(logger) + if not session.is_valid: logger.error("Cannot list Python repo content: no Pulp credentials") return None uri = f"/pulp/api/v3/content/python/packages/?repository_version={version_href}&limit=1000" - result = _pulp_api_get(creds["base_url"], creds["username"], - creds["password"], uri, logger) + result = session.get(uri) if not result["ok"]: return None @@ -521,8 +534,8 @@ def _modify_repo_content_via_api(repo_href: str, content_hrefs: List[str], Returns ``True`` on success, ``False`` on failure. """ - creds = _load_pulp_credentials(logger) - if not creds: + session = _PulpApiSession(logger) + if not session.is_valid: logger.error("Cannot modify repo content: Pulp credentials not available") return False @@ -533,10 +546,7 @@ def _modify_repo_content_via_api(repo_href: str, content_hrefs: List[str], modify_uri = f"{repo_href}modify/" payload = {"add_content_units": content_hrefs} - result = _pulp_api_post( - creds["base_url"], creds["username"], creds["password"], - modify_uri, payload, logger - ) + result = session.post(modify_uri, payload) if result["ok"]: logger.info("Added %d content units to repo %s", len(content_hrefs), repo_href) diff --git a/input_validation/roles/validate_subscription/tasks/check_rhel_subscription.yml b/input_validation/roles/validate_subscription/tasks/check_rhel_subscription.yml index 106b31f011..31007eb059 100644 --- a/input_validation/roles/validate_subscription/tasks/check_rhel_subscription.yml +++ b/input_validation/roles/validate_subscription/tasks/check_rhel_subscription.yml @@ -14,11 +14,11 @@ --- - name: Check entitlement certs - ansible.builtin.find: - paths: "{{ entitlement_path }}" - patterns: "*.pem" - file_type: file - register: entitlement_certs + ansible.builtin.shell: | + set -o pipefail + find "{{ entitlement_path }}" -maxdepth 1 -name "*.pem" -type f 2>/dev/null | wc -l + register: entitlement_certs_count + changed_when: false failed_when: false - name: Extract repo baseurls if redhat.repo exists @@ -31,12 +31,20 @@ changed_when: false failed_when: false +- name: Check subscription-manager registration status + ansible.builtin.command: subscription-manager status + register: subscription_manager_status + changed_when: false + failed_when: false + - name: Determine subscription status ansible.builtin.set_fact: subscription_status: >- {{ - (entitlement_certs.matched | default(0) | int > 0) + (entitlement_certs_count.stdout | default('0') | trim | int > 0) or ((repo_urls.stdout_lines | default([])) | length > 0) + or ('Overall Status: Registered' in (subscription_manager_status.stdout | default(''))) + or ('Overall Status: Current' in (subscription_manager_status.stdout | default(''))) }} - name: Debug subscription status @@ -68,20 +76,21 @@ mode: "{{ hostvars['localhost']['dir_permissions_755'] }}" - name: Find entitlement certs on oim - ansible.builtin.find: - paths: "{{ entitlement_path }}" - patterns: "*.pem" - file_type: file - register: entitlement_certs + ansible.builtin.shell: | + set -o pipefail + find "{{ entitlement_path }}" -maxdepth 1 -name "*.pem" -type f 2>/dev/null + register: entitlement_certs_paths + changed_when: false + failed_when: false when: subscription_status | bool - name: Copy entitlement certs to shared path ansible.builtin.copy: - src: "{{ item.path }}" + src: "{{ item }}" dest: "{{ rhel_repo_cert_dir }}" mode: "{{ hostvars['localhost']['file_permissions_644'] }}" remote_src: true - loop: "{{ entitlement_certs.files | default([]) }}" + loop: "{{ entitlement_certs_paths.stdout_lines | default([]) }}" when: subscription_status | bool - name: Copy Red Hat UEP cert