312 lines
12 KiB
Go
312 lines
12 KiB
Go
package mcpserver
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
|
|
|
keepassgov1 "git.julianfamily.org/keepassgo/proto/keepassgo/v1"
|
|
)
|
|
|
|
const serverName = "keepassgo"
|
|
|
|
type vaultClient interface {
|
|
Status(context.Context) (*keepassgov1.GetSessionStatusResponse, error)
|
|
FindBrowserLogins(context.Context, string) ([]*keepassgov1.BrowserLoginMatch, error)
|
|
ListEntries(context.Context, []string, string) ([]*keepassgov1.Entry, error)
|
|
GetBrowserCredential(context.Context, string, string) (*keepassgov1.GetBrowserCredentialResponse, error)
|
|
}
|
|
|
|
type Config struct {
|
|
GRPCAddress string
|
|
Version string
|
|
}
|
|
|
|
func New(client vaultClient, cfg Config) *mcp.Server {
|
|
version := strings.TrimSpace(cfg.Version)
|
|
if version == "" {
|
|
version = "dev"
|
|
}
|
|
handlers := &handlers{
|
|
client: client,
|
|
grpcAddress: strings.TrimSpace(cfg.GRPCAddress),
|
|
}
|
|
server := mcp.NewServer(&mcp.Implementation{
|
|
Name: serverName,
|
|
Version: version,
|
|
}, &mcp.ServerOptions{
|
|
Instructions: "Use KeePassGO tools to inspect vault status, search entry metadata, find browser login matches, and request credentials through the authenticated KeePassGO gRPC API.",
|
|
})
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "get_session_status",
|
|
Title: "Get Session Status",
|
|
Description: "Return KeePassGO lock, dirty, entry count, approval count, and gRPC connection metadata.",
|
|
}, handlers.getSessionStatus)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "search_entries",
|
|
Title: "Search Entries",
|
|
Description: "Search KeePassGO entry metadata by query and optional group path. Passwords, notes, and custom field values are not returned.",
|
|
}, handlers.searchEntries)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "find_browser_logins",
|
|
Title: "Find Browser Logins",
|
|
Description: "Find KeePassGO browser-login matches for a page URL without returning passwords.",
|
|
}, handlers.findBrowserLogins)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "get_entry_password",
|
|
Title: "Get Entry Password",
|
|
Description: "Return the password for one uniquely matched KeePassGO entry using path, query, and optional metadata filters.",
|
|
}, handlers.getEntryPassword)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "get_browser_credential",
|
|
Title: "Get Browser Credential",
|
|
Description: "Request the username and password for an entry and page URL through KeePassGO's credential access policy and approval flow.",
|
|
}, handlers.getBrowserCredential)
|
|
return server
|
|
}
|
|
|
|
type handlers struct {
|
|
client vaultClient
|
|
grpcAddress string
|
|
}
|
|
|
|
type statusInput struct{}
|
|
|
|
type statusOutput struct {
|
|
Connected bool `json:"connected" jsonschema:"whether the MCP server reached the KeePassGO gRPC API"`
|
|
Locked bool `json:"locked" jsonschema:"whether the active vault is locked"`
|
|
Dirty bool `json:"dirty" jsonschema:"whether the active vault has unsaved changes"`
|
|
EntryCount uint32 `json:"entryCount" jsonschema:"number of entries visible to the token"`
|
|
PendingApprovalCount uint32 `json:"pendingApprovalCount" jsonschema:"number of pending approval requests"`
|
|
TokenPendingApprovalCount uint32 `json:"tokenPendingApprovalCount" jsonschema:"number of pending approval requests for this API token"`
|
|
GRPCAddress string `json:"grpcAddress" jsonschema:"KeePassGO gRPC address used by the MCP server"`
|
|
}
|
|
|
|
func (h *handlers) getSessionStatus(ctx context.Context, _ *mcp.CallToolRequest, _ statusInput) (*mcp.CallToolResult, statusOutput, error) {
|
|
resp, err := h.client.Status(ctx)
|
|
if err != nil {
|
|
return nil, statusOutput{}, fmt.Errorf("get KeePassGO session status: %w", err)
|
|
}
|
|
return nil, statusOutput{
|
|
Connected: true,
|
|
Locked: resp.GetLocked(),
|
|
Dirty: resp.GetDirty(),
|
|
EntryCount: resp.GetEntryCount(),
|
|
PendingApprovalCount: resp.GetPendingApprovalCount(),
|
|
TokenPendingApprovalCount: resp.GetTokenPendingApprovalCount(),
|
|
GRPCAddress: h.grpcAddress,
|
|
}, nil
|
|
}
|
|
|
|
type searchEntriesInput struct {
|
|
Path []string `json:"path,omitempty" jsonschema:"optional KeePassGO group path to search within, for example [\"Root\", \"Internet\"]"`
|
|
Query string `json:"query,omitempty" jsonschema:"optional text query for entry title, username, URL, tags, or other indexed metadata"`
|
|
}
|
|
|
|
type entrySummary struct {
|
|
ID string `json:"id" jsonschema:"entry id"`
|
|
Title string `json:"title" jsonschema:"entry title"`
|
|
Username string `json:"username,omitempty" jsonschema:"entry username"`
|
|
URL string `json:"url,omitempty" jsonschema:"entry URL"`
|
|
Path []string `json:"path,omitempty" jsonschema:"entry group path"`
|
|
Tags []string `json:"tags,omitempty" jsonschema:"entry tags"`
|
|
FieldNames []string `json:"fieldNames,omitempty" jsonschema:"names of custom fields on the entry; values are not returned"`
|
|
HasNotes bool `json:"hasNotes" jsonschema:"whether the entry has notes; note text is not returned"`
|
|
HasPassword bool `json:"hasPassword" jsonschema:"whether the entry has a password; password text is not returned"`
|
|
}
|
|
|
|
type searchEntriesOutput struct {
|
|
Entries []entrySummary `json:"entries" jsonschema:"matching entry metadata"`
|
|
}
|
|
|
|
func (h *handlers) searchEntries(ctx context.Context, _ *mcp.CallToolRequest, input searchEntriesInput) (*mcp.CallToolResult, searchEntriesOutput, error) {
|
|
entries, err := h.client.ListEntries(ctx, cleanPath(input.Path), strings.TrimSpace(input.Query))
|
|
if err != nil {
|
|
return nil, searchEntriesOutput{}, fmt.Errorf("search KeePassGO entries: %w", err)
|
|
}
|
|
out := searchEntriesOutput{Entries: make([]entrySummary, 0, len(entries))}
|
|
for _, entry := range entries {
|
|
out.Entries = append(out.Entries, summarizeEntry(entry))
|
|
}
|
|
return nil, out, nil
|
|
}
|
|
|
|
type findBrowserLoginsInput struct {
|
|
PageURL string `json:"pageUrl" jsonschema:"page URL to match against KeePassGO browser-login entries"`
|
|
}
|
|
|
|
type browserLoginMatch struct {
|
|
ID string `json:"id" jsonschema:"entry id"`
|
|
Title string `json:"title" jsonschema:"entry title"`
|
|
Username string `json:"username,omitempty" jsonschema:"entry username"`
|
|
URL string `json:"url,omitempty" jsonschema:"entry URL"`
|
|
Path []string `json:"path,omitempty" jsonschema:"entry group path"`
|
|
Quality string `json:"quality,omitempty" jsonschema:"match quality reported by KeePassGO"`
|
|
}
|
|
|
|
type findBrowserLoginsOutput struct {
|
|
Matches []browserLoginMatch `json:"matches" jsonschema:"browser-login matches without passwords"`
|
|
}
|
|
|
|
func (h *handlers) findBrowserLogins(ctx context.Context, _ *mcp.CallToolRequest, input findBrowserLoginsInput) (*mcp.CallToolResult, findBrowserLoginsOutput, error) {
|
|
pageURL := strings.TrimSpace(input.PageURL)
|
|
if pageURL == "" {
|
|
return nil, findBrowserLoginsOutput{}, fmt.Errorf("pageUrl is required")
|
|
}
|
|
matches, err := h.client.FindBrowserLogins(ctx, pageURL)
|
|
if err != nil {
|
|
return nil, findBrowserLoginsOutput{}, fmt.Errorf("find KeePassGO browser logins: %w", err)
|
|
}
|
|
out := findBrowserLoginsOutput{Matches: make([]browserLoginMatch, 0, len(matches))}
|
|
for _, match := range matches {
|
|
out.Matches = append(out.Matches, browserLoginMatch{
|
|
ID: match.GetId(),
|
|
Title: match.GetTitle(),
|
|
Username: match.GetUsername(),
|
|
URL: match.GetUrl(),
|
|
Path: append([]string(nil), match.GetPath()...),
|
|
Quality: match.GetQuality(),
|
|
})
|
|
}
|
|
return nil, out, nil
|
|
}
|
|
|
|
type getBrowserCredentialInput struct {
|
|
EntryID string `json:"entryId" jsonschema:"KeePassGO entry id to request credentials for"`
|
|
PageURL string `json:"pageUrl" jsonschema:"page URL associated with the credential request"`
|
|
}
|
|
|
|
type getBrowserCredentialOutput struct {
|
|
ID string `json:"id" jsonschema:"entry id"`
|
|
Username string `json:"username,omitempty" jsonschema:"entry username"`
|
|
Password string `json:"password,omitempty" jsonschema:"entry password"`
|
|
URL string `json:"url,omitempty" jsonschema:"entry URL"`
|
|
}
|
|
|
|
func (h *handlers) getBrowserCredential(ctx context.Context, _ *mcp.CallToolRequest, input getBrowserCredentialInput) (*mcp.CallToolResult, getBrowserCredentialOutput, error) {
|
|
entryID := strings.TrimSpace(input.EntryID)
|
|
if entryID == "" {
|
|
return nil, getBrowserCredentialOutput{}, fmt.Errorf("entryId is required")
|
|
}
|
|
resp, err := h.client.GetBrowserCredential(ctx, entryID, strings.TrimSpace(input.PageURL))
|
|
if err != nil {
|
|
return nil, getBrowserCredentialOutput{}, fmt.Errorf("get KeePassGO browser credential: %w", err)
|
|
}
|
|
return nil, getBrowserCredentialOutput{
|
|
ID: resp.GetId(),
|
|
Username: resp.GetUsername(),
|
|
Password: resp.GetPassword(),
|
|
URL: resp.GetUrl(),
|
|
}, nil
|
|
}
|
|
|
|
type getEntryPasswordInput struct {
|
|
Path []string `json:"path,omitempty" jsonschema:"optional KeePassGO group path to search within"`
|
|
Query string `json:"query,omitempty" jsonschema:"text query used to find candidate entries"`
|
|
Title string `json:"title,omitempty" jsonschema:"optional exact entry title filter"`
|
|
Username string `json:"username,omitempty" jsonschema:"optional exact entry username filter"`
|
|
URL string `json:"url,omitempty" jsonschema:"optional exact entry URL filter"`
|
|
}
|
|
|
|
type getEntryPasswordOutput struct {
|
|
ID string `json:"id,omitempty" jsonschema:"entry id, when present"`
|
|
Title string `json:"title" jsonschema:"entry title"`
|
|
Username string `json:"username,omitempty" jsonschema:"entry username"`
|
|
URL string `json:"url,omitempty" jsonschema:"entry URL"`
|
|
Path []string `json:"path,omitempty" jsonschema:"entry group path"`
|
|
Password string `json:"password" jsonschema:"entry password"`
|
|
}
|
|
|
|
func (h *handlers) getEntryPassword(ctx context.Context, _ *mcp.CallToolRequest, input getEntryPasswordInput) (*mcp.CallToolResult, getEntryPasswordOutput, error) {
|
|
entries, err := h.client.ListEntries(ctx, cleanPath(input.Path), strings.TrimSpace(input.Query))
|
|
if err != nil {
|
|
return nil, getEntryPasswordOutput{}, fmt.Errorf("find KeePassGO entry password candidates: %w", err)
|
|
}
|
|
entry, err := uniquePasswordEntry(entries, input)
|
|
if err != nil {
|
|
return nil, getEntryPasswordOutput{}, err
|
|
}
|
|
password := entry.GetPassword()
|
|
if password == "" {
|
|
return nil, getEntryPasswordOutput{}, fmt.Errorf("matched KeePassGO entry %q has an empty password", entry.GetTitle())
|
|
}
|
|
return nil, getEntryPasswordOutput{
|
|
ID: entry.GetId(),
|
|
Title: entry.GetTitle(),
|
|
Username: entry.GetUsername(),
|
|
URL: entry.GetUrl(),
|
|
Path: append([]string(nil), entry.GetPath()...),
|
|
Password: password,
|
|
}, nil
|
|
}
|
|
|
|
func summarizeEntry(entry *keepassgov1.Entry) entrySummary {
|
|
if entry == nil {
|
|
return entrySummary{}
|
|
}
|
|
fieldNames := make([]string, 0, len(entry.GetFields()))
|
|
for name := range entry.GetFields() {
|
|
fieldNames = append(fieldNames, name)
|
|
}
|
|
sort.Strings(fieldNames)
|
|
return entrySummary{
|
|
ID: entry.GetId(),
|
|
Title: entry.GetTitle(),
|
|
Username: entry.GetUsername(),
|
|
URL: entry.GetUrl(),
|
|
Path: append([]string(nil), entry.GetPath()...),
|
|
Tags: append([]string(nil), entry.GetTags()...),
|
|
FieldNames: fieldNames,
|
|
HasNotes: strings.TrimSpace(entry.GetNotes()) != "",
|
|
HasPassword: entry.GetPassword() != "",
|
|
}
|
|
}
|
|
|
|
func uniquePasswordEntry(entries []*keepassgov1.Entry, input getEntryPasswordInput) (*keepassgov1.Entry, error) {
|
|
var matches []*keepassgov1.Entry
|
|
for _, entry := range entries {
|
|
if entry == nil {
|
|
continue
|
|
}
|
|
if !optionalEqual(input.Title, entry.GetTitle()) {
|
|
continue
|
|
}
|
|
if !optionalEqual(input.Username, entry.GetUsername()) {
|
|
continue
|
|
}
|
|
if !optionalEqual(input.URL, entry.GetUrl()) {
|
|
continue
|
|
}
|
|
matches = append(matches, entry)
|
|
}
|
|
switch len(matches) {
|
|
case 0:
|
|
return nil, fmt.Errorf("no KeePassGO entry matched the password request")
|
|
case 1:
|
|
return matches[0], nil
|
|
default:
|
|
return nil, fmt.Errorf("password request matched %d KeePassGO entries; add title, username, URL, path, or query filters", len(matches))
|
|
}
|
|
}
|
|
|
|
func optionalEqual(want, got string) bool {
|
|
want = strings.TrimSpace(want)
|
|
if want == "" {
|
|
return true
|
|
}
|
|
return strings.EqualFold(want, strings.TrimSpace(got))
|
|
}
|
|
|
|
func cleanPath(path []string) []string {
|
|
out := make([]string, 0, len(path))
|
|
for _, part := range path {
|
|
if trimmed := strings.TrimSpace(part); trimmed != "" {
|
|
out = append(out, trimmed)
|
|
}
|
|
}
|
|
return out
|
|
}
|