Files
keepassgo/internal/mcpserver/server.go
T
Joe Julian 32f9abe5e2
ci / lint-test (pull_request) Successful in 9m39s
ci / build (pull_request) Successful in 2m52s
Add official MCP server
2026-05-14 08:54:01 -07:00

232 lines
9.2 KiB
Go

package mcpserver
import (
"context"
"fmt"
"sort"
"strings"
"github.com/modelcontextprotocol/go-sdk/mcp"
keepassgov1 "git.julianfamily.org/keepassgo/proto/keepassgo/v1"
)
const serverName = "keepassgo"
type vaultClient interface {
Status(context.Context) (*keepassgov1.GetSessionStatusResponse, error)
FindBrowserLogins(context.Context, string) ([]*keepassgov1.BrowserLoginMatch, error)
ListEntries(context.Context, []string, string) ([]*keepassgov1.Entry, error)
GetBrowserCredential(context.Context, string, string) (*keepassgov1.GetBrowserCredentialResponse, error)
}
type Config struct {
GRPCAddress string
Version string
}
func New(client vaultClient, cfg Config) *mcp.Server {
version := strings.TrimSpace(cfg.Version)
if version == "" {
version = "dev"
}
handlers := &handlers{
client: client,
grpcAddress: strings.TrimSpace(cfg.GRPCAddress),
}
server := mcp.NewServer(&mcp.Implementation{
Name: serverName,
Version: version,
}, &mcp.ServerOptions{
Instructions: "Use KeePassGO tools to inspect vault status, search entry metadata, find browser login matches, and request credentials through the authenticated KeePassGO gRPC API.",
})
mcp.AddTool(server, &mcp.Tool{
Name: "get_session_status",
Title: "Get Session Status",
Description: "Return KeePassGO lock, dirty, entry count, approval count, and gRPC connection metadata.",
}, handlers.getSessionStatus)
mcp.AddTool(server, &mcp.Tool{
Name: "search_entries",
Title: "Search Entries",
Description: "Search KeePassGO entry metadata by query and optional group path. Passwords, notes, and custom field values are not returned.",
}, handlers.searchEntries)
mcp.AddTool(server, &mcp.Tool{
Name: "find_browser_logins",
Title: "Find Browser Logins",
Description: "Find KeePassGO browser-login matches for a page URL without returning passwords.",
}, handlers.findBrowserLogins)
mcp.AddTool(server, &mcp.Tool{
Name: "get_browser_credential",
Title: "Get Browser Credential",
Description: "Request the username and password for an entry and page URL through KeePassGO's credential access policy and approval flow.",
}, handlers.getBrowserCredential)
return server
}
type handlers struct {
client vaultClient
grpcAddress string
}
type statusInput struct{}
type statusOutput struct {
Connected bool `json:"connected" jsonschema:"whether the MCP server reached the KeePassGO gRPC API"`
Locked bool `json:"locked" jsonschema:"whether the active vault is locked"`
Dirty bool `json:"dirty" jsonschema:"whether the active vault has unsaved changes"`
EntryCount uint32 `json:"entryCount" jsonschema:"number of entries visible to the token"`
PendingApprovalCount uint32 `json:"pendingApprovalCount" jsonschema:"number of pending approval requests"`
TokenPendingApprovalCount uint32 `json:"tokenPendingApprovalCount" jsonschema:"number of pending approval requests for this API token"`
GRPCAddress string `json:"grpcAddress" jsonschema:"KeePassGO gRPC address used by the MCP server"`
}
func (h *handlers) getSessionStatus(ctx context.Context, _ *mcp.CallToolRequest, _ statusInput) (*mcp.CallToolResult, statusOutput, error) {
resp, err := h.client.Status(ctx)
if err != nil {
return nil, statusOutput{}, fmt.Errorf("get KeePassGO session status: %w", err)
}
return nil, statusOutput{
Connected: true,
Locked: resp.GetLocked(),
Dirty: resp.GetDirty(),
EntryCount: resp.GetEntryCount(),
PendingApprovalCount: resp.GetPendingApprovalCount(),
TokenPendingApprovalCount: resp.GetTokenPendingApprovalCount(),
GRPCAddress: h.grpcAddress,
}, nil
}
type searchEntriesInput struct {
Path []string `json:"path,omitempty" jsonschema:"optional KeePassGO group path to search within, for example [\"Root\", \"Internet\"]"`
Query string `json:"query,omitempty" jsonschema:"optional text query for entry title, username, URL, tags, or other indexed metadata"`
}
type entrySummary struct {
ID string `json:"id" jsonschema:"entry id"`
Title string `json:"title" jsonschema:"entry title"`
Username string `json:"username,omitempty" jsonschema:"entry username"`
URL string `json:"url,omitempty" jsonschema:"entry URL"`
Path []string `json:"path,omitempty" jsonschema:"entry group path"`
Tags []string `json:"tags,omitempty" jsonschema:"entry tags"`
FieldNames []string `json:"fieldNames,omitempty" jsonschema:"names of custom fields on the entry; values are not returned"`
HasNotes bool `json:"hasNotes" jsonschema:"whether the entry has notes; note text is not returned"`
HasPassword bool `json:"hasPassword" jsonschema:"whether the entry has a password; password text is not returned"`
}
type searchEntriesOutput struct {
Entries []entrySummary `json:"entries" jsonschema:"matching entry metadata"`
}
func (h *handlers) searchEntries(ctx context.Context, _ *mcp.CallToolRequest, input searchEntriesInput) (*mcp.CallToolResult, searchEntriesOutput, error) {
entries, err := h.client.ListEntries(ctx, cleanPath(input.Path), strings.TrimSpace(input.Query))
if err != nil {
return nil, searchEntriesOutput{}, fmt.Errorf("search KeePassGO entries: %w", err)
}
out := searchEntriesOutput{Entries: make([]entrySummary, 0, len(entries))}
for _, entry := range entries {
out.Entries = append(out.Entries, summarizeEntry(entry))
}
return nil, out, nil
}
type findBrowserLoginsInput struct {
PageURL string `json:"pageUrl" jsonschema:"page URL to match against KeePassGO browser-login entries"`
}
type browserLoginMatch struct {
ID string `json:"id" jsonschema:"entry id"`
Title string `json:"title" jsonschema:"entry title"`
Username string `json:"username,omitempty" jsonschema:"entry username"`
URL string `json:"url,omitempty" jsonschema:"entry URL"`
Path []string `json:"path,omitempty" jsonschema:"entry group path"`
Quality string `json:"quality,omitempty" jsonschema:"match quality reported by KeePassGO"`
}
type findBrowserLoginsOutput struct {
Matches []browserLoginMatch `json:"matches" jsonschema:"browser-login matches without passwords"`
}
func (h *handlers) findBrowserLogins(ctx context.Context, _ *mcp.CallToolRequest, input findBrowserLoginsInput) (*mcp.CallToolResult, findBrowserLoginsOutput, error) {
pageURL := strings.TrimSpace(input.PageURL)
if pageURL == "" {
return nil, findBrowserLoginsOutput{}, fmt.Errorf("pageUrl is required")
}
matches, err := h.client.FindBrowserLogins(ctx, pageURL)
if err != nil {
return nil, findBrowserLoginsOutput{}, fmt.Errorf("find KeePassGO browser logins: %w", err)
}
out := findBrowserLoginsOutput{Matches: make([]browserLoginMatch, 0, len(matches))}
for _, match := range matches {
out.Matches = append(out.Matches, browserLoginMatch{
ID: match.GetId(),
Title: match.GetTitle(),
Username: match.GetUsername(),
URL: match.GetUrl(),
Path: append([]string(nil), match.GetPath()...),
Quality: match.GetQuality(),
})
}
return nil, out, nil
}
type getBrowserCredentialInput struct {
EntryID string `json:"entryId" jsonschema:"KeePassGO entry id to request credentials for"`
PageURL string `json:"pageUrl" jsonschema:"page URL associated with the credential request"`
}
type getBrowserCredentialOutput struct {
ID string `json:"id" jsonschema:"entry id"`
Username string `json:"username,omitempty" jsonschema:"entry username"`
Password string `json:"password,omitempty" jsonschema:"entry password"`
URL string `json:"url,omitempty" jsonschema:"entry URL"`
}
func (h *handlers) getBrowserCredential(ctx context.Context, _ *mcp.CallToolRequest, input getBrowserCredentialInput) (*mcp.CallToolResult, getBrowserCredentialOutput, error) {
entryID := strings.TrimSpace(input.EntryID)
if entryID == "" {
return nil, getBrowserCredentialOutput{}, fmt.Errorf("entryId is required")
}
resp, err := h.client.GetBrowserCredential(ctx, entryID, strings.TrimSpace(input.PageURL))
if err != nil {
return nil, getBrowserCredentialOutput{}, fmt.Errorf("get KeePassGO browser credential: %w", err)
}
return nil, getBrowserCredentialOutput{
ID: resp.GetId(),
Username: resp.GetUsername(),
Password: resp.GetPassword(),
URL: resp.GetUrl(),
}, nil
}
func summarizeEntry(entry *keepassgov1.Entry) entrySummary {
if entry == nil {
return entrySummary{}
}
fieldNames := make([]string, 0, len(entry.GetFields()))
for name := range entry.GetFields() {
fieldNames = append(fieldNames, name)
}
sort.Strings(fieldNames)
return entrySummary{
ID: entry.GetId(),
Title: entry.GetTitle(),
Username: entry.GetUsername(),
URL: entry.GetUrl(),
Path: append([]string(nil), entry.GetPath()...),
Tags: append([]string(nil), entry.GetTags()...),
FieldNames: fieldNames,
HasNotes: strings.TrimSpace(entry.GetNotes()) != "",
HasPassword: entry.GetPassword() != "",
}
}
func cleanPath(path []string) []string {
out := make([]string, 0, len(path))
for _, part := range path {
if trimmed := strings.TrimSpace(part); trimmed != "" {
out = append(out, trimmed)
}
}
return out
}