#!/usr/bin/env python3 import argparse import base64 import json import os import re import shutil import socket import subprocess import sys import tempfile import textwrap import time import zipfile from pathlib import Path REPO_ROOT = Path(__file__).resolve().parents[1] EXTENSION_SOURCE = REPO_ROOT / "browser" / "extension" STUB_SERVER = REPO_ROOT / "scripts" / "browser_extension_validation_server.go" TOKEN = "test-token" ORIGINAL_HOME = Path(os.environ.get("HOME", "")) def run(cmd, *, cwd=None, env=None, check=True): result = subprocess.run(cmd, cwd=cwd, env=env, text=True, capture_output=True) if check and result.returncode != 0: raise RuntimeError( f"command failed ({result.returncode}): {' '.join(cmd)}\n" f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}" ) return result def ensure_selenium_venv(venv_dir: Path): python_bin = venv_dir / "bin" / "python" if not python_bin.exists(): run([sys.executable, "-m", "venv", str(venv_dir)]) run([str(python_bin), "-m", "pip", "install", "selenium"]) return python_bin def require_binary(name): path = shutil.which(name) if not path: raise RuntimeError(f"required binary {name!r} was not found in PATH") return path def find_geckodriver(): direct = shutil.which("geckodriver") if direct: return direct cache_root = ORIGINAL_HOME / ".cache" / "selenium" / "geckodriver" if cache_root.exists(): candidates = sorted(cache_root.glob("**/geckodriver")) if candidates: return str(candidates[-1]) raise RuntimeError("required binary 'geckodriver' was not found in PATH or Selenium cache") def write_login_fixture(path: Path): path.write_text( textwrap.dedent( """\
""" ), encoding="utf-8", ) def build_bridge(binary_path: Path): run(["go", "build", "-o", str(binary_path), "./cmd/keepassgo-browser-bridge"], cwd=REPO_ROOT) def patch_validation_defaults(background_js: Path, grpc_addr: str): data = background_js.read_text(encoding="utf-8") data = data.replace('grpcAddress: "",', f'grpcAddress: "{grpc_addr}",', 1) data = data.replace('bearerToken: ""', f'bearerToken: "{TOKEN}"', 1) data += textwrap.dedent( """ ;((api) => { api.runtime.onMessage.addListener((message, _sender, sendResponse) => { if (message?.type === "keepassgo-validation-ping") { sendResponse({ ok: true }); return false; } if (message?.type === "keepassgo-validation-status") { (async () => { try { const settings = await loadSettings(); const status = await connectNative({ action: "status", grpcAddress: settings.grpcAddress, bearerToken: settings.bearerToken }); sendResponse({ ok: true, settings, status }); } catch (error) { sendResponse({ ok: false, error: String(error) }); } })(); return true; } return false; }); })(globalThis.browser ?? globalThis.chrome); """ ) background_js.write_text(data, encoding="utf-8") def patch_validation_content(content_js: Path): data = content_js.read_text(encoding="utf-8") data += textwrap.dedent( """ ;(() => { const set = (name, value) => { document.documentElement.setAttribute(name, String(value)); }; const api = globalThis.browser ?? globalThis.chrome; set("data-keepassgo-validation-runtime-id", api?.runtime?.id || ""); const username = document.getElementById("username"); const focusTarget = username ? { role: "username", formIndex: 0, fieldIndex: 0, id: "username", name: "", autocomplete: "username" } : null; document.documentElement.setAttribute("data-keepassgo-validation-content", "loaded"); try { if (api?.runtime?.sendMessage) { Promise.resolve(api.runtime.sendMessage({ type: "keepassgo-validation-ping" })) .then((response) => { if (response?.ok) { set("data-keepassgo-validation-background", "ok"); } }) .catch((error) => { set("data-keepassgo-validation-background", String(error)); }); Promise.resolve(api.runtime.sendMessage({ type: "keepassgo-validation-status" })) .then((response) => { if (response?.ok) { set("data-keepassgo-validation-native", JSON.stringify(response.status || {})); set("data-keepassgo-validation-settings", JSON.stringify(response.settings || {})); } else { set("data-keepassgo-validation-native-error", response?.error || "unknown"); } }) .catch((error) => { set("data-keepassgo-validation-native-error", String(error)); }); Promise.resolve(api.runtime.sendMessage({ type: "keepassgo-page-ready", force: true, pageHasLoginForm: true, focusTarget, signature: "validation" })) .then((response) => { set("data-keepassgo-validation-page-ready", JSON.stringify(response || {})); }) .catch((error) => { set("data-keepassgo-validation-page-ready-error", String(error)); }); } } catch (error) { set("data-keepassgo-validation-background", String(error)); } })(); """ ) content_js.write_text(data, encoding="utf-8") def prepare_chromium_extension(workspace: Path, grpc_addr: str): ext_dir = workspace / "extension-chromium" shutil.copytree(EXTENSION_SOURCE, ext_dir) patch_validation_defaults(ext_dir / "background.js", grpc_addr) patch_validation_content(ext_dir / "content.js") key_pem = workspace / "extension-key.pem" key_b64 = workspace / "extension-key.b64" run(["openssl", "genrsa", "-out", str(key_pem), "2048"]) der = subprocess.check_output(["openssl", "rsa", "-in", str(key_pem), "-pubout", "-outform", "DER"]) key_b64.write_text(base64.b64encode(der).decode("utf-8"), encoding="utf-8") manifest = json.loads((ext_dir / "manifest.chromium.json").read_text(encoding="utf-8")) manifest["key"] = key_b64.read_text(encoding="utf-8").strip() (ext_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8") return ext_dir, key_b64 def prepare_firefox_extension(workspace: Path, grpc_addr: str): ext_dir = workspace / "extension-firefox" shutil.copytree(EXTENSION_SOURCE, ext_dir) patch_validation_defaults(ext_dir / "background.js", grpc_addr) patch_validation_content(ext_dir / "content.js") manifest = json.loads((ext_dir / "manifest.firefox.json").read_text(encoding="utf-8")) (ext_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8") xpi_path = workspace / "keepassgo-firefox.xpi" with zipfile.ZipFile(xpi_path, "w") as zf: for path in ext_dir.iterdir(): if path.is_file() and path.name != "manifest.firefox.json": zf.write(path, arcname=path.name) return xpi_path def install_chromium_native_host(workspace: Path, bridge_binary: Path, key_b64: Path): home_dir = workspace / "home" home_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["HOME"] = str(home_dir) env["XDG_CONFIG_HOME"] = str(home_dir / ".config") result = run( [ str(bridge_binary), "install-native-host", "--browser", "chromium", "--binary", str(bridge_binary), "--extension-key-file", str(key_b64), ], env=env, ) manifest_path = Path(result.stdout.strip()) for mirror in [ home_dir / ".config" / "google-chrome" / "NativeMessagingHosts" / "com.keepassgo.browser.json", home_dir / ".config" / "chromium-browser" / "NativeMessagingHosts" / "com.keepassgo.browser.json", home_dir / ".config" / "chromium" / "chromium" / "NativeMessagingHosts" / "com.keepassgo.browser.json", workspace / "chromium-profile" / "NativeMessagingHosts" / "com.keepassgo.browser.json", ]: mirror.parent.mkdir(parents=True, exist_ok=True) shutil.copyfile(manifest_path, mirror) manifest = json.loads(manifest_path.read_text(encoding="utf-8")) origin = manifest["allowed_origins"][0] extension_id = re.search(r"chrome-extension://([^/]+)/", origin).group(1) return extension_id, home_dir def install_firefox_native_host(workspace: Path, bridge_binary: Path): home_dir = workspace / "home" home_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["HOME"] = str(home_dir) run( [ str(bridge_binary), "install-native-host", "--browser", "firefox", "--binary", str(bridge_binary), ], env=env, ) return home_dir def launch_process(cmd, *, cwd=None, env=None, log_path=None): handle = open(log_path, "w", encoding="utf-8") if log_path else subprocess.DEVNULL return subprocess.Popen(cmd, cwd=cwd, env=env, stdout=handle, stderr=handle, text=True) def wait_for_http(url: str, timeout: float = 10.0): import urllib.request deadline = time.time() + timeout while time.time() < deadline: try: with urllib.request.urlopen(url, timeout=1) as response: if response.status == 200: return except Exception: time.sleep(0.2) raise RuntimeError(f"timed out waiting for HTTP endpoint {url}") def wait_for_tcp(host: str, port: int, *, timeout: float = 20.0, process=None, log_path: Path | None = None, name: str = "TCP endpoint"): deadline = time.time() + timeout while time.time() < deadline: if process is not None and process.poll() is not None: details = "" if log_path and log_path.exists(): details = f"\nlog:\n{log_path.read_text(encoding='utf-8')}" raise RuntimeError(f"{name} exited before becoming ready{details}") try: with socket.create_connection((host, port), timeout=1): return except OSError: time.sleep(0.2) details = "" if log_path and log_path.exists(): details = f"\nlog:\n{log_path.read_text(encoding='utf-8')}" raise RuntimeError(f"timed out waiting for {name} on {host}:{port}{details}") def save_artifact(path: Path, content: str): path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") def find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return int(sock.getsockname()[1]) def run_chromium_flow(workspace: Path, extension_id: str, login_url: str): from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC ext_dir = workspace / "extension-chromium" options = Options() options.binary_location = require_binary("chromium") options.set_capability("goog:loggingPrefs", {"browser": "ALL"}) for arg in [ f"--user-data-dir={workspace / 'chromium-profile'}", f"--load-extension={ext_dir}", f"--disable-extensions-except={ext_dir}", "--no-first-run", "--no-default-browser-check", "--disable-search-engine-choice-screen", "--disable-gpu", "--no-sandbox", "--disable-dev-shm-usage", ]: options.add_argument(arg) driver = webdriver.Chrome(service=Service(require_binary("chromedriver")), options=options) wait = WebDriverWait(driver, 25) stage = "launch browser" try: stage = "open login page" driver.get(login_url) wait.until(EC.element_to_be_clickable((By.ID, "username"))).click() stage = "wait for inline root" wait.until(lambda d: d.execute_script("return !!document.querySelector('#keepassgo-inline-root')")) stage = "wait for inline dock" wait.until( lambda d: d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "const dock=root?.shadowRoot?.querySelector('.dock');" "return !!(dock && getComputedStyle(dock).display !== 'none');" ) ) stage = "open inline chooser" driver.execute_script( "document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.trigger').click()" ) stage = "wait for chooser matches" wait.until( lambda d: d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "return root.shadowRoot.querySelectorAll('.match').length || 0;" ) ) stage = "request browser fill" driver.execute_script( "document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.match').click()" ) stage = "wait for page approval prompt" wait.until( lambda d: "Approve or deny" in d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "return root.shadowRoot.querySelector('.match-list').textContent;" ) ) state_path = workspace / "state.txt" deadline = time.time() + 10 while time.time() < deadline: if state_path.read_text(encoding="utf-8").strip() == "pending": break time.sleep(0.2) else: raise RuntimeError("stub server never observed a pending approval state") stage = "verify popup approval state" target_tab_id = driver.execute_script( "const raw = document.documentElement.getAttribute('data-keepassgo-validation-page-ready');" "return raw ? JSON.parse(raw).tabId : null;" ) if not target_tab_id: raise RuntimeError("validation page did not expose a target tab id for popup state checks") driver.switch_to.new_window("tab") driver.get(f"chrome-extension://{extension_id}/popup.html?tabId={int(target_tab_id)}") wait.until(lambda d: "Approval needed" in d.find_element(By.ID, "status-title").text) stage = "approve fill and wait for completion" state_path.write_text("approved", encoding="utf-8") driver.switch_to.window(driver.window_handles[0]) wait.until(lambda d: d.find_element(By.ID, "username").get_attribute("value") == "dannyocean") wait.until(lambda d: d.find_element(By.ID, "password").get_attribute("value") == "token-1") return True except Exception as exc: # noqa: BLE001 artifacts = workspace / "artifacts" save_artifact(artifacts / "chromium-page.html", driver.page_source) try: driver.save_screenshot(str(artifacts / "chromium-page.png")) except Exception: pass try: save_artifact(artifacts / "chromium-browser.log", json.dumps(driver.get_log("browser"), indent=2)) except Exception: pass raise RuntimeError(f"chromium validation failed during {stage}: {type(exc).__name__}: {exc}") from exc finally: driver.quit() def run_firefox_flow(workspace: Path, login_url: str): from selenium import webdriver from selenium.webdriver.firefox.options import Options from selenium.webdriver.firefox.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC xpi_path = workspace / "keepassgo-firefox.xpi" options = Options() options.binary_location = require_binary("firefox") options.add_argument("-headless") service = Service(find_geckodriver()) driver = webdriver.Firefox(service=service, options=options) wait = WebDriverWait(driver, 25) stage = "launch firefox" try: stage = "install temporary addon" addon_id = driver.install_addon(str(xpi_path), temporary=True) if addon_id != "browser@keepassgo.com": raise RuntimeError(f"unexpected addon id {addon_id!r}") stage = "open login page" driver.get(login_url) wait.until(EC.element_to_be_clickable((By.ID, "username"))).click() stage = "wait for inline root" wait.until(lambda d: d.execute_script("return !!document.querySelector('#keepassgo-inline-root')")) stage = "wait for inline dock" wait.until( lambda d: d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "const dock=root?.shadowRoot?.querySelector('.dock');" "return !!(dock && getComputedStyle(dock).display !== 'none');" ) ) stage = "open inline chooser" driver.execute_script( "document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.trigger').click()" ) stage = "wait for chooser matches" wait.until( lambda d: d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "return root.shadowRoot.querySelectorAll('.match').length || 0;" ) ) stage = "request browser fill" driver.execute_script( "document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.match').click()" ) stage = "wait for page approval prompt" wait.until( lambda d: "Approve or deny" in d.execute_script( "const root=document.querySelector('#keepassgo-inline-root');" "return root.shadowRoot.querySelector('.match-list').textContent;" ) ) state_path = workspace / "state.txt" deadline = time.time() + 10 while time.time() < deadline: if state_path.read_text(encoding="utf-8").strip() == "pending": break time.sleep(0.2) else: raise RuntimeError("stub server never observed a pending approval state") stage = "approve fill and wait for completion" state_path.write_text("approved", encoding="utf-8") wait.until(lambda d: d.find_element(By.ID, "username").get_attribute("value") == "dannyocean") wait.until(lambda d: d.find_element(By.ID, "password").get_attribute("value") == "token-1") return True except Exception as exc: # noqa: BLE001 artifacts = workspace / "artifacts" save_artifact(artifacts / "firefox-page.html", driver.page_source) try: driver.save_screenshot(str(artifacts / "firefox-page.png")) except Exception: pass raise RuntimeError(f"firefox validation failed during {stage}: {type(exc).__name__}: {exc}") from exc finally: driver.quit() def main(): parser = argparse.ArgumentParser(description="Validate the browser-extension flow with isolated real-browser harnesses.") parser.add_argument("--browser", choices=["firefox", "chromium", "both"], default="firefox") parser.add_argument("--keep-workspace", action="store_true") parser.add_argument("--workspace", help=argparse.SUPPRESS) args = parser.parse_args() workspace = Path(args.workspace) if args.workspace else Path(tempfile.mkdtemp(prefix="keepassgo-browser-validate.")) workspace.joinpath("home").mkdir(parents=True, exist_ok=True) workspace.joinpath("web").mkdir(parents=True, exist_ok=True) if not args.workspace: workspace.joinpath("state.txt").write_text("idle", encoding="utf-8") write_login_fixture(workspace / "web" / "login.html") python_bin = ensure_selenium_venv(workspace / "venv") if Path(sys.executable) != python_bin: cmd = [str(python_bin), str(Path(__file__).resolve()), "--workspace", str(workspace), "--browser", args.browser] if args.keep_workspace: cmd.append("--keep-workspace") raise SystemExit(subprocess.run(cmd, cwd=REPO_ROOT).returncode) bridge_binary = workspace / "keepassgo-browser-bridge" build_bridge(bridge_binary) http_port = find_free_port() grpc_port = find_free_port() login_url = f"http://127.0.0.1:{http_port}/login.html" grpc_addr = f"tcp://127.0.0.1:{grpc_port}" ext_dir_chromium = xpi_path = None if args.browser in {"chromium", "both"}: ext_dir_chromium, key_b64 = prepare_chromium_extension(workspace, grpc_addr) chromium_id, chromium_home = install_chromium_native_host(workspace, bridge_binary, key_b64) if args.browser in {"firefox", "both"}: xpi_path = prepare_firefox_extension(workspace, grpc_addr) firefox_home = install_firefox_native_host(workspace, bridge_binary) home_dir = workspace / "home" env = os.environ.copy() env["HOME"] = str(home_dir) env["XDG_CONFIG_HOME"] = str(home_dir / ".config") env["CHROME_CONFIG_HOME"] = str(home_dir / ".config") os.environ["HOME"] = str(home_dir) os.environ["XDG_CONFIG_HOME"] = env["XDG_CONFIG_HOME"] os.environ["CHROME_CONFIG_HOME"] = env["CHROME_CONFIG_HOME"] http_server = launch_process( [sys.executable, "-m", "http.server", str(http_port)], cwd=workspace / "web", env=env, log_path=workspace / "http.log", ) stub_server = launch_process( [ "go", "run", str(STUB_SERVER), "--listen", f"127.0.0.1:{grpc_port}", "--state", str(workspace / "state.txt"), "--page-url", login_url, ], cwd=REPO_ROOT, env=env, log_path=workspace / "stub.log", ) try: wait_for_http(login_url) wait_for_tcp("127.0.0.1", grpc_port, process=stub_server, log_path=workspace / "stub.log", name="validation gRPC server") browser_results = [] if args.browser in {"firefox", "both"}: browser_results.append("firefox") run_firefox_flow(workspace, login_url) workspace.joinpath("state.txt").write_text("idle", encoding="utf-8") if args.browser in {"chromium", "both"}: browser_results.append("chromium") run_chromium_flow(workspace, chromium_id, login_url) print(f"browser validation passed for {', '.join(browser_results)}; workspace: {workspace}", flush=True) if not args.keep_workspace: shutil.rmtree(workspace, ignore_errors=True) except Exception as exc: # noqa: BLE001 print(f"{exc}\nworkspace preserved at {workspace}", file=sys.stderr) sys.exit(1) finally: for process in [stub_server, http_server]: if process.poll() is None: process.terminate() try: process.wait(timeout=3) except subprocess.TimeoutExpired: process.kill() if __name__ == "__main__": main()