Complete browser extension gRPC flow

This commit is contained in:
Joe Julian
2026-04-11 23:45:48 -07:00
parent 2f2338f6f2
commit d522af7d51
24 changed files with 2744 additions and 191 deletions
+611
View File
@@ -0,0 +1,611 @@
#!/usr/bin/env python3
import argparse
import base64
import json
import os
import re
import shutil
import socket
import subprocess
import sys
import tempfile
import textwrap
import time
import zipfile
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
EXTENSION_SOURCE = REPO_ROOT / "browser" / "extension"
STUB_SERVER = REPO_ROOT / "scripts" / "browser_extension_validation_server.go"
TOKEN = "test-token"
ORIGINAL_HOME = Path(os.environ.get("HOME", ""))
def run(cmd, *, cwd=None, env=None, check=True):
result = subprocess.run(cmd, cwd=cwd, env=env, text=True, capture_output=True)
if check and result.returncode != 0:
raise RuntimeError(
f"command failed ({result.returncode}): {' '.join(cmd)}\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
return result
def ensure_selenium_venv(venv_dir: Path):
python_bin = venv_dir / "bin" / "python"
if not python_bin.exists():
run([sys.executable, "-m", "venv", str(venv_dir)])
run([str(python_bin), "-m", "pip", "install", "selenium"])
return python_bin
def require_binary(name):
path = shutil.which(name)
if not path:
raise RuntimeError(f"required binary {name!r} was not found in PATH")
return path
def find_geckodriver():
direct = shutil.which("geckodriver")
if direct:
return direct
cache_root = ORIGINAL_HOME / ".cache" / "selenium" / "geckodriver"
if cache_root.exists():
candidates = sorted(cache_root.glob("**/geckodriver"))
if candidates:
return str(candidates[-1])
raise RuntimeError("required binary 'geckodriver' was not found in PATH or Selenium cache")
def write_login_fixture(path: Path):
path.write_text(
textwrap.dedent(
"""\
<!doctype html>
<html lang="en">
<body>
<form id="heist-login">
<label>Username <input id="username" type="text" autocomplete="username"></label>
<label>Password <input id="password" type="password" autocomplete="current-password"></label>
<button type="submit">Open Vault</button>
</form>
</body>
</html>
"""
),
encoding="utf-8",
)
def build_bridge(binary_path: Path):
run(["go", "build", "-o", str(binary_path), "./cmd/keepassgo-browser-bridge"], cwd=REPO_ROOT)
def patch_validation_defaults(background_js: Path, grpc_addr: str):
data = background_js.read_text(encoding="utf-8")
data = data.replace('grpcAddress: "",', f'grpcAddress: "{grpc_addr}",', 1)
data = data.replace('bearerToken: ""', f'bearerToken: "{TOKEN}"', 1)
data += textwrap.dedent(
"""
;((api) => {
api.runtime.onMessage.addListener((message, _sender, sendResponse) => {
if (message?.type === "keepassgo-validation-ping") {
sendResponse({ ok: true });
return false;
}
if (message?.type === "keepassgo-validation-status") {
(async () => {
try {
const settings = await loadSettings();
const status = await connectNative({
action: "status",
grpcAddress: settings.grpcAddress,
bearerToken: settings.bearerToken
});
sendResponse({ ok: true, settings, status });
} catch (error) {
sendResponse({ ok: false, error: String(error) });
}
})();
return true;
}
return false;
});
})(globalThis.browser ?? globalThis.chrome);
"""
)
background_js.write_text(data, encoding="utf-8")
def patch_validation_content(content_js: Path):
data = content_js.read_text(encoding="utf-8")
data += textwrap.dedent(
"""
;(() => {
const set = (name, value) => {
document.documentElement.setAttribute(name, String(value));
};
const api = globalThis.browser ?? globalThis.chrome;
set("data-keepassgo-validation-runtime-id", api?.runtime?.id || "");
const username = document.getElementById("username");
const focusTarget = username ? {
role: "username",
formIndex: 0,
fieldIndex: 0,
id: "username",
name: "",
autocomplete: "username"
} : null;
document.documentElement.setAttribute("data-keepassgo-validation-content", "loaded");
try {
if (api?.runtime?.sendMessage) {
Promise.resolve(api.runtime.sendMessage({ type: "keepassgo-validation-ping" }))
.then((response) => {
if (response?.ok) {
set("data-keepassgo-validation-background", "ok");
}
})
.catch((error) => {
set("data-keepassgo-validation-background", String(error));
});
Promise.resolve(api.runtime.sendMessage({ type: "keepassgo-validation-status" }))
.then((response) => {
if (response?.ok) {
set("data-keepassgo-validation-native", JSON.stringify(response.status || {}));
set("data-keepassgo-validation-settings", JSON.stringify(response.settings || {}));
} else {
set("data-keepassgo-validation-native-error", response?.error || "unknown");
}
})
.catch((error) => {
set("data-keepassgo-validation-native-error", String(error));
});
Promise.resolve(api.runtime.sendMessage({
type: "keepassgo-page-ready",
force: true,
pageHasLoginForm: true,
focusTarget,
signature: "validation"
}))
.then((response) => {
set("data-keepassgo-validation-page-ready", JSON.stringify(response || {}));
})
.catch((error) => {
set("data-keepassgo-validation-page-ready-error", String(error));
});
}
} catch (error) {
set("data-keepassgo-validation-background", String(error));
}
})();
"""
)
content_js.write_text(data, encoding="utf-8")
def prepare_chromium_extension(workspace: Path, grpc_addr: str):
ext_dir = workspace / "extension-chromium"
shutil.copytree(EXTENSION_SOURCE, ext_dir)
patch_validation_defaults(ext_dir / "background.js", grpc_addr)
patch_validation_content(ext_dir / "content.js")
key_pem = workspace / "extension-key.pem"
key_b64 = workspace / "extension-key.b64"
run(["openssl", "genrsa", "-out", str(key_pem), "2048"])
der = subprocess.check_output(["openssl", "rsa", "-in", str(key_pem), "-pubout", "-outform", "DER"])
key_b64.write_text(base64.b64encode(der).decode("utf-8"), encoding="utf-8")
manifest = json.loads((ext_dir / "manifest.chromium.json").read_text(encoding="utf-8"))
manifest["key"] = key_b64.read_text(encoding="utf-8").strip()
(ext_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
return ext_dir, key_b64
def prepare_firefox_extension(workspace: Path, grpc_addr: str):
ext_dir = workspace / "extension-firefox"
shutil.copytree(EXTENSION_SOURCE, ext_dir)
patch_validation_defaults(ext_dir / "background.js", grpc_addr)
patch_validation_content(ext_dir / "content.js")
manifest = json.loads((ext_dir / "manifest.firefox.json").read_text(encoding="utf-8"))
(ext_dir / "manifest.json").write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
xpi_path = workspace / "keepassgo-firefox.xpi"
with zipfile.ZipFile(xpi_path, "w") as zf:
for path in ext_dir.iterdir():
if path.is_file() and path.name != "manifest.firefox.json":
zf.write(path, arcname=path.name)
return xpi_path
def install_chromium_native_host(workspace: Path, bridge_binary: Path, key_b64: Path):
home_dir = workspace / "home"
home_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["HOME"] = str(home_dir)
env["XDG_CONFIG_HOME"] = str(home_dir / ".config")
result = run(
[
str(bridge_binary),
"install-native-host",
"--browser",
"chromium",
"--binary",
str(bridge_binary),
"--extension-key-file",
str(key_b64),
],
env=env,
)
manifest_path = Path(result.stdout.strip())
for mirror in [
home_dir / ".config" / "google-chrome" / "NativeMessagingHosts" / "com.keepassgo.browser.json",
home_dir / ".config" / "chromium-browser" / "NativeMessagingHosts" / "com.keepassgo.browser.json",
home_dir / ".config" / "chromium" / "chromium" / "NativeMessagingHosts" / "com.keepassgo.browser.json",
workspace / "chromium-profile" / "NativeMessagingHosts" / "com.keepassgo.browser.json",
]:
mirror.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(manifest_path, mirror)
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
origin = manifest["allowed_origins"][0]
extension_id = re.search(r"chrome-extension://([^/]+)/", origin).group(1)
return extension_id, home_dir
def install_firefox_native_host(workspace: Path, bridge_binary: Path):
home_dir = workspace / "home"
home_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["HOME"] = str(home_dir)
run(
[
str(bridge_binary),
"install-native-host",
"--browser",
"firefox",
"--binary",
str(bridge_binary),
],
env=env,
)
return home_dir
def launch_process(cmd, *, cwd=None, env=None, log_path=None):
handle = open(log_path, "w", encoding="utf-8") if log_path else subprocess.DEVNULL
return subprocess.Popen(cmd, cwd=cwd, env=env, stdout=handle, stderr=handle, text=True)
def wait_for_http(url: str, timeout: float = 10.0):
import urllib.request
deadline = time.time() + timeout
while time.time() < deadline:
try:
with urllib.request.urlopen(url, timeout=1) as response:
if response.status == 200:
return
except Exception:
time.sleep(0.2)
raise RuntimeError(f"timed out waiting for HTTP endpoint {url}")
def wait_for_tcp(host: str, port: int, *, timeout: float = 20.0, process=None, log_path: Path | None = None, name: str = "TCP endpoint"):
deadline = time.time() + timeout
while time.time() < deadline:
if process is not None and process.poll() is not None:
details = ""
if log_path and log_path.exists():
details = f"\nlog:\n{log_path.read_text(encoding='utf-8')}"
raise RuntimeError(f"{name} exited before becoming ready{details}")
try:
with socket.create_connection((host, port), timeout=1):
return
except OSError:
time.sleep(0.2)
details = ""
if log_path and log_path.exists():
details = f"\nlog:\n{log_path.read_text(encoding='utf-8')}"
raise RuntimeError(f"timed out waiting for {name} on {host}:{port}{details}")
def save_artifact(path: Path, content: str):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
def find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return int(sock.getsockname()[1])
def run_chromium_flow(workspace: Path, extension_id: str, login_url: str):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
ext_dir = workspace / "extension-chromium"
options = Options()
options.binary_location = require_binary("chromium")
options.set_capability("goog:loggingPrefs", {"browser": "ALL"})
for arg in [
f"--user-data-dir={workspace / 'chromium-profile'}",
f"--load-extension={ext_dir}",
f"--disable-extensions-except={ext_dir}",
"--no-first-run",
"--no-default-browser-check",
"--disable-search-engine-choice-screen",
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
]:
options.add_argument(arg)
driver = webdriver.Chrome(service=Service(require_binary("chromedriver")), options=options)
wait = WebDriverWait(driver, 25)
stage = "launch browser"
try:
stage = "open login page"
driver.get(login_url)
wait.until(EC.element_to_be_clickable((By.ID, "username"))).click()
stage = "wait for inline root"
wait.until(lambda d: d.execute_script("return !!document.querySelector('#keepassgo-inline-root')"))
stage = "wait for inline dock"
wait.until(
lambda d: d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"const dock=root?.shadowRoot?.querySelector('.dock');"
"return !!(dock && getComputedStyle(dock).display !== 'none');"
)
)
stage = "open inline chooser"
driver.execute_script(
"document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.trigger').click()"
)
stage = "wait for chooser matches"
wait.until(
lambda d: d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"return root.shadowRoot.querySelectorAll('.match').length || 0;"
)
)
stage = "request browser fill"
driver.execute_script(
"document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.match').click()"
)
stage = "wait for page approval prompt"
wait.until(
lambda d: "Approve or deny"
in d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"return root.shadowRoot.querySelector('.match-list').textContent;"
)
)
state_path = workspace / "state.txt"
deadline = time.time() + 10
while time.time() < deadline:
if state_path.read_text(encoding="utf-8").strip() == "pending":
break
time.sleep(0.2)
else:
raise RuntimeError("stub server never observed a pending approval state")
stage = "verify popup approval state"
target_tab_id = driver.execute_script(
"const raw = document.documentElement.getAttribute('data-keepassgo-validation-page-ready');"
"return raw ? JSON.parse(raw).tabId : null;"
)
if not target_tab_id:
raise RuntimeError("validation page did not expose a target tab id for popup state checks")
driver.switch_to.new_window("tab")
driver.get(f"chrome-extension://{extension_id}/popup.html?tabId={int(target_tab_id)}")
wait.until(lambda d: "Approval needed" in d.find_element(By.ID, "status-title").text)
stage = "approve fill and wait for completion"
state_path.write_text("approved", encoding="utf-8")
driver.switch_to.window(driver.window_handles[0])
wait.until(lambda d: d.find_element(By.ID, "username").get_attribute("value") == "dannyocean")
wait.until(lambda d: d.find_element(By.ID, "password").get_attribute("value") == "token-1")
return True
except Exception as exc: # noqa: BLE001
artifacts = workspace / "artifacts"
save_artifact(artifacts / "chromium-page.html", driver.page_source)
try:
driver.save_screenshot(str(artifacts / "chromium-page.png"))
except Exception:
pass
try:
save_artifact(artifacts / "chromium-browser.log", json.dumps(driver.get_log("browser"), indent=2))
except Exception:
pass
raise RuntimeError(f"chromium validation failed during {stage}: {type(exc).__name__}: {exc}") from exc
finally:
driver.quit()
def run_firefox_flow(workspace: Path, login_url: str):
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
xpi_path = workspace / "keepassgo-firefox.xpi"
options = Options()
options.binary_location = require_binary("firefox")
options.add_argument("-headless")
service = Service(find_geckodriver())
driver = webdriver.Firefox(service=service, options=options)
wait = WebDriverWait(driver, 25)
stage = "launch firefox"
try:
stage = "install temporary addon"
addon_id = driver.install_addon(str(xpi_path), temporary=True)
if addon_id != "browser@keepassgo.com":
raise RuntimeError(f"unexpected addon id {addon_id!r}")
stage = "open login page"
driver.get(login_url)
wait.until(EC.element_to_be_clickable((By.ID, "username"))).click()
stage = "wait for inline root"
wait.until(lambda d: d.execute_script("return !!document.querySelector('#keepassgo-inline-root')"))
stage = "wait for inline dock"
wait.until(
lambda d: d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"const dock=root?.shadowRoot?.querySelector('.dock');"
"return !!(dock && getComputedStyle(dock).display !== 'none');"
)
)
stage = "open inline chooser"
driver.execute_script(
"document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.trigger').click()"
)
stage = "wait for chooser matches"
wait.until(
lambda d: d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"return root.shadowRoot.querySelectorAll('.match').length || 0;"
)
)
stage = "request browser fill"
driver.execute_script(
"document.querySelector('#keepassgo-inline-root').shadowRoot.querySelector('.match').click()"
)
stage = "wait for page approval prompt"
wait.until(
lambda d: "Approve or deny"
in d.execute_script(
"const root=document.querySelector('#keepassgo-inline-root');"
"return root.shadowRoot.querySelector('.match-list').textContent;"
)
)
state_path = workspace / "state.txt"
deadline = time.time() + 10
while time.time() < deadline:
if state_path.read_text(encoding="utf-8").strip() == "pending":
break
time.sleep(0.2)
else:
raise RuntimeError("stub server never observed a pending approval state")
stage = "approve fill and wait for completion"
state_path.write_text("approved", encoding="utf-8")
wait.until(lambda d: d.find_element(By.ID, "username").get_attribute("value") == "dannyocean")
wait.until(lambda d: d.find_element(By.ID, "password").get_attribute("value") == "token-1")
return True
except Exception as exc: # noqa: BLE001
artifacts = workspace / "artifacts"
save_artifact(artifacts / "firefox-page.html", driver.page_source)
try:
driver.save_screenshot(str(artifacts / "firefox-page.png"))
except Exception:
pass
raise RuntimeError(f"firefox validation failed during {stage}: {type(exc).__name__}: {exc}") from exc
finally:
driver.quit()
def main():
parser = argparse.ArgumentParser(description="Validate the browser-extension flow with isolated real-browser harnesses.")
parser.add_argument("--browser", choices=["firefox", "chromium", "both"], default="firefox")
parser.add_argument("--keep-workspace", action="store_true")
parser.add_argument("--workspace", help=argparse.SUPPRESS)
args = parser.parse_args()
workspace = Path(args.workspace) if args.workspace else Path(tempfile.mkdtemp(prefix="keepassgo-browser-validate."))
workspace.joinpath("home").mkdir(parents=True, exist_ok=True)
workspace.joinpath("web").mkdir(parents=True, exist_ok=True)
if not args.workspace:
workspace.joinpath("state.txt").write_text("idle", encoding="utf-8")
write_login_fixture(workspace / "web" / "login.html")
python_bin = ensure_selenium_venv(workspace / "venv")
if Path(sys.executable) != python_bin:
cmd = [str(python_bin), str(Path(__file__).resolve()), "--workspace", str(workspace), "--browser", args.browser]
if args.keep_workspace:
cmd.append("--keep-workspace")
raise SystemExit(subprocess.run(cmd, cwd=REPO_ROOT).returncode)
bridge_binary = workspace / "keepassgo-browser-bridge"
build_bridge(bridge_binary)
http_port = find_free_port()
grpc_port = find_free_port()
login_url = f"http://127.0.0.1:{http_port}/login.html"
grpc_addr = f"tcp://127.0.0.1:{grpc_port}"
ext_dir_chromium = xpi_path = None
if args.browser in {"chromium", "both"}:
ext_dir_chromium, key_b64 = prepare_chromium_extension(workspace, grpc_addr)
chromium_id, chromium_home = install_chromium_native_host(workspace, bridge_binary, key_b64)
if args.browser in {"firefox", "both"}:
xpi_path = prepare_firefox_extension(workspace, grpc_addr)
firefox_home = install_firefox_native_host(workspace, bridge_binary)
home_dir = workspace / "home"
env = os.environ.copy()
env["HOME"] = str(home_dir)
env["XDG_CONFIG_HOME"] = str(home_dir / ".config")
env["CHROME_CONFIG_HOME"] = str(home_dir / ".config")
os.environ["HOME"] = str(home_dir)
os.environ["XDG_CONFIG_HOME"] = env["XDG_CONFIG_HOME"]
os.environ["CHROME_CONFIG_HOME"] = env["CHROME_CONFIG_HOME"]
http_server = launch_process(
[sys.executable, "-m", "http.server", str(http_port)],
cwd=workspace / "web",
env=env,
log_path=workspace / "http.log",
)
stub_server = launch_process(
[
"go",
"run",
str(STUB_SERVER),
"--listen",
f"127.0.0.1:{grpc_port}",
"--state",
str(workspace / "state.txt"),
"--page-url",
login_url,
],
cwd=REPO_ROOT,
env=env,
log_path=workspace / "stub.log",
)
try:
wait_for_http(login_url)
wait_for_tcp("127.0.0.1", grpc_port, process=stub_server, log_path=workspace / "stub.log", name="validation gRPC server")
browser_results = []
if args.browser in {"firefox", "both"}:
browser_results.append("firefox")
run_firefox_flow(workspace, login_url)
workspace.joinpath("state.txt").write_text("idle", encoding="utf-8")
if args.browser in {"chromium", "both"}:
browser_results.append("chromium")
run_chromium_flow(workspace, chromium_id, login_url)
print(f"browser validation passed for {', '.join(browser_results)}; workspace: {workspace}", flush=True)
if not args.keep_workspace:
shutil.rmtree(workspace, ignore_errors=True)
except Exception as exc: # noqa: BLE001
print(f"{exc}\nworkspace preserved at {workspace}", file=sys.stderr)
sys.exit(1)
finally:
for process in [stub_server, http_server]:
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=3)
except subprocess.TimeoutExpired:
process.kill()
if __name__ == "__main__":
main()