Rewrite what around README catalog and Ollama

Remove the JSON tool database and move tool metadata into a compact README catalog.
Make what README-driven and Ollama-only, with shortlist generation and JSON-repair retry handling.
Pull qwen3.5:2b and ministral-3:3b, compare them on fixed repository queries, and set ministral-3:3b as the default model.
Tighten README wording so similar tools like domgrep/geturls and sparsecmp/scatterhash rank correctly.
This commit is contained in:
tke
2026-03-07 20:39:24 +01:00
parent fd515742b5
commit 559fa38c04
4 changed files with 433 additions and 547 deletions

670
what
View File

@@ -1,423 +1,303 @@
#!/usr/bin/env python3
"""
'what' - Smart repository search tool with progressive enhancement
Fallback hierarchy:
1. Ollama + Gemma2 (natural language search)
2. fzf (fuzzy finding)
3. grep (simple text search)
`what` - README-driven repository search using Ollama only.
Usage:
what <query> # Find tools matching query
what -h # Show help
what -l # List all tools with short descriptions
what -a <filepath> # Add new file to database
what <query> # Find tools matching a natural-language query
what -l # List catalogued tools
what --model <model> ... # Override the default Ollama model
"""
import os
import sys
import json
from __future__ import annotations
import argparse
import subprocess
import shutil
from pathlib import Path
import json
import os
import re
import subprocess
import sys
from pathlib import Path
# Configuration
REPO_ROOT = Path(__file__).parent.absolute()
DB_FILE = REPO_ROOT / ".what_db.json"
REPO_ROOT = Path(__file__).parent.resolve()
README_PATH = REPO_ROOT / "README.md"
DEFAULT_MODEL = os.environ.get("WHAT_OLLAMA_MODEL", "ministral-3:3b")
CATALOG_HEADING = "## Tool Catalog"
ENTRY_RE = re.compile(
r"^- `([^`]+)` \| goal: (.*?) \| usage: (.*)$"
)
TOKEN_RE = re.compile(r"[a-z0-9_.+-]+")
class WhatTool:
def __init__(self):
self.db_path = DB_FILE
self.data = self.load_db()
# Detect available tools
self.has_ollama = self.check_ollama()
self.has_fzf = shutil.which('fzf') is not None
def load_db(self):
"""Load the tool database"""
if self.db_path.exists():
try:
with open(self.db_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
print(f"Warning: Corrupted database {self.db_path}, creating new one")
return {
"version": "1.0",
"tools": {}
}
def save_db(self):
"""Save the tool database"""
with open(self.db_path, 'w') as f:
json.dump(self.data, f, indent=2, sort_keys=True)
def check_ollama(self):
"""Check if ollama with gemma2 is available"""
try:
result = subprocess.run(['ollama', 'list'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Check if gemma2 model is available
models = result.stdout.lower()
return 'gemma2' in models
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
pass
return False
def get_file_type(self, filepath):
"""Determine file type"""
if not filepath.exists():
return "missing"
if filepath.is_dir():
return "directory"
# Check if executable
is_executable = os.access(filepath, os.X_OK)
# Check extension
suffix = filepath.suffix.lower()
if suffix == '.py':
return "python script" if is_executable else "python module"
elif suffix == '.sh':
return "shell script"
elif suffix == '.go':
return "go program"
elif suffix == '.js':
return "javascript"
elif suffix == '.ps1':
return "powershell script"
elif suffix == '.rs':
return "rust program"
elif suffix in ['.c', '.cpp']:
return "c/c++ source"
elif suffix == '.awk':
return "awk script"
elif not suffix and is_executable:
return "binary executable"
elif not suffix:
return "script"
else:
return f"{suffix[1:]} file"
def analyze_file_with_ollama(self, filepath):
"""Analyze file using Ollama Gemma2"""
try:
# Read file content (limit size for analysis)
content = ""
if filepath.stat().st_size > 50000: # Skip very large files
content = "[File too large for analysis]"
else:
try:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()[:10000] # First 10KB
except:
content = "[Binary or unreadable file]"
prompt = f"""
Analyze this code/script file and provide ONLY a JSON response with these fields:
Filename: {filepath.name}
File type: {self.get_file_type(filepath)}
Content preview:
{content[:2000]}
class WhatError(Exception):
pass
Respond with ONLY this JSON structure:
{{
"summary": "Brief 1-2 sentence summary of what this tool does and how it works",
"purpose": "What this tool is used for (e.g., 'Network analysis', 'File processing', 'Security scanning')",
"short_description": "Very short description for listings (e.g., 'like md5sum but for files inside tarballs')"
}}
"""
result = subprocess.run([
'ollama', 'run', 'gemma2:2b', prompt
], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
# Extract JSON from response
response = result.stdout.strip()
# Try to find JSON in the response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
return json.loads(json_match.group())
except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception) as e:
print(f"Ollama analysis failed: {e}")
return None
def add_file_interactive(self, filepath):
"""Add file with interactive prompts"""
rel_path = str(filepath.relative_to(REPO_ROOT))
file_type = self.get_file_type(filepath)
print(f"\nAdding: {rel_path}")
print(f"Type: {file_type}")
print()
if self.has_ollama:
print("Analyzing with Ollama Gemma2...")
analysis = self.analyze_file_with_ollama(filepath)
if analysis:
print("AI Analysis complete. Review and edit if needed:")
summary = input(f"Summary [{analysis.get('summary', '')}]: ").strip()
purpose = input(f"Purpose [{analysis.get('purpose', '')}]: ").strip()
short_desc = input(f"Short description [{analysis.get('short_description', '')}]: ").strip()
# Use AI suggestions if user didn't provide alternatives
summary = summary or analysis.get('summary', '')
purpose = purpose or analysis.get('purpose', '')
short_desc = short_desc or analysis.get('short_description', '')
else:
print("AI analysis failed, using manual input:")
summary = input("Summary (what it does and how): ").strip()
purpose = input("Purpose (what it's used for): ").strip()
short_desc = input("Short description (for listings): ").strip()
else:
print("Manual input (Ollama not available):")
summary = input("Summary (what it does and how): ").strip()
purpose = input("Purpose (what it's used for): ").strip()
short_desc = input("Short description (for listings): ").strip()
# Store in database
self.data["tools"][rel_path] = {
"path": rel_path,
"name": filepath.name,
"type": file_type,
"summary": summary,
"purpose": purpose,
"short_description": short_desc,
"executable": os.access(filepath, os.X_OK)
}
self.save_db()
print(f"✓ Added {rel_path} to database")
def search_with_ollama(self, query):
"""Search using natural language with Ollama"""
try:
tools_info = []
for tool_data in self.data["tools"].values():
tools_info.append(f"{tool_data['name']}: {tool_data['summary']} (Purpose: {tool_data['purpose']})")
tools_text = "\n".join(tools_info)
prompt = f"""
Given this query: "{query}"
def load_readme() -> str:
if not README_PATH.exists():
raise WhatError(f"README not found at {README_PATH}")
return README_PATH.read_text(encoding="utf-8")
Find the most relevant tools from this list. Respond with ONLY the tool names (one per line) in order of relevance:
{tools_text}
def extract_catalog(readme_text: str) -> list[dict[str, str]]:
in_catalog = False
entries: list[dict[str, str]] = []
for raw_line in readme_text.splitlines():
line = raw_line.rstrip()
if line == CATALOG_HEADING:
in_catalog = True
continue
if in_catalog and line.startswith("## "):
break
if not in_catalog:
continue
match = ENTRY_RE.match(line)
if not match:
continue
path, goal, usage = match.groups()
entries.append(
{
"path": path,
"goal": goal.strip(),
"usage": usage.strip(),
}
)
if not entries:
raise WhatError(
"No tool catalog entries found in README. "
f"Expected entries under '{CATALOG_HEADING}'."
)
return entries
def ensure_ollama_available(model: str) -> None:
if not shutil_which("ollama"):
raise WhatError("`ollama` is not installed or not in PATH.")
try:
result = subprocess.run(
["ollama", "list"],
capture_output=True,
text=True,
timeout=10,
check=False,
)
except subprocess.SubprocessError as exc:
raise WhatError(f"Failed to talk to Ollama: {exc}") from exc
if result.returncode != 0:
stderr = result.stderr.strip() or "unknown error"
raise WhatError(f"Ollama is unavailable: {stderr}")
models = result.stdout.lower()
if model.lower() not in models:
raise WhatError(
f"Model '{model}' is not available locally. "
"Pull it first with `ollama pull ...`."
)
def shutil_which(binary: str) -> str | None:
for directory in os.environ.get("PATH", "").split(os.pathsep):
candidate = Path(directory) / binary
if candidate.is_file() and os.access(candidate, os.X_OK):
return str(candidate)
return None
def build_prompt(query: str, entries: list[dict[str, str]]) -> str:
catalog_lines = [
f'- {entry["path"]} | goal: {entry["goal"]} | usage: {entry["usage"]}'
for entry in entries
]
catalog = "\n".join(catalog_lines)
return f"""You are selecting tools from a repository catalog.
Use only the catalog below. Prefer direct matches. Use archived tools only if they clearly fit the request.
Return strict JSON only. The response must be a JSON array with up to 8 objects.
Each object must contain:
- "path": exact catalog path
- "reason": one short sentence
Do not invent paths. Do not include markdown.
Prefer the entry whose action best matches the query: compare beats hash for comparison queries, open beats convert for opening queries, and mount beats inspect for mount queries.
Query: {query}
Response (tool names only, one per line, max 10):
Catalog:
{catalog}
"""
result = subprocess.run([
'ollama', 'run', 'gemma2:2b', prompt
], capture_output=True, text=True, timeout=20)
if result.returncode == 0:
tool_names = [line.strip() for line in result.stdout.strip().split('\n') if line.strip()]
# Find matching tools in database
matches = []
for tool_name in tool_names[:10]: # Limit to top 10
for tool_data in self.data["tools"].values():
if tool_data['name'] == tool_name:
matches.append(tool_data)
break
return matches
except Exception as e:
print(f"Ollama search failed: {e}")
return None
def search_with_fzf(self, query):
"""Search using fzf fuzzy finder"""
try:
# Prepare search data for fzf
search_lines = []
for tool_data in self.data["tools"].values():
line = f"{tool_data['name']} # {tool_data['short_description']} | {tool_data['path']}"
search_lines.append(line)
search_input = "\n".join(search_lines)
# Run fzf with initial query
result = subprocess.run([
'fzf', '--filter', query, '--no-sort'
], input=search_input, capture_output=True, text=True)
if result.returncode == 0:
matches = []
for line in result.stdout.strip().split('\n'):
if ' | ' in line:
path = line.split(' | ')[-1]
if path in self.data["tools"]:
matches.append(self.data["tools"][path])
return matches
except Exception as e:
print(f"fzf search failed: {e}")
return None
def search_with_grep(self, query):
"""Fallback search using grep-like functionality"""
matches = []
query_lower = query.lower()
for tool_data in self.data["tools"].values():
# Search in name, summary, purpose, and short description
searchable = f"{tool_data['name']} {tool_data['summary']} {tool_data['purpose']} {tool_data['short_description']}".lower()
if query_lower in searchable:
matches.append(tool_data)
# Simple relevance scoring
def score_match(tool):
score = 0
query_lower = query.lower()
if query_lower in tool['name'].lower():
score += 10
if query_lower in tool['short_description'].lower():
score += 5
if query_lower in tool['summary'].lower():
score += 3
if query_lower in tool['purpose'].lower():
score += 2
return score
matches.sort(key=score_match, reverse=True)
return matches[:20] # Limit results
def search(self, query):
"""Search using the best available method"""
if not query:
return []
print(f"Searching for: {query}")
# Try Ollama first
if self.has_ollama:
print("Using Ollama Gemma2 for natural language search...")
results = self.search_with_ollama(query)
if results is not None:
return results
print("Ollama search failed, falling back to fzf...")
# Try fzf
if self.has_fzf:
print("Using fzf for fuzzy search...")
results = self.search_with_fzf(query)
if results is not None:
return results
print("fzf search failed, falling back to grep...")
# Fallback to grep
print("Using basic text search...")
return self.search_with_grep(query)
def list_all_tools(self):
"""List all tools with short descriptions"""
if not self.data["tools"]:
print("No tools in database. Use 'what -a <file>' to add tools.")
return
print("Available tools:")
print()
# Sort by name
tools = sorted(self.data["tools"].values(), key=lambda x: x['name'])
# Calculate max name length for alignment
max_name_len = max(len(tool['name']) for tool in tools)
for tool in tools:
executable_mark = "*" if tool.get('executable', False) else " "
name_padded = tool['name'].ljust(max_name_len)
print(f"{executable_mark}{name_padded} # {tool['short_description']}")
def show_search_results(self, results):
"""Display search results"""
if not results:
print("No tools found matching your query.")
return
print(f"\nFound {len(results)} tool(s):")
print()
for i, tool in enumerate(results, 1):
executable_mark = "*" if tool.get('executable', False) else " "
print(f"{i:2d}. {executable_mark}{tool['name']}")
print(f" Path: {tool['path']}")
print(f" Type: {tool['type']}")
print(f" Purpose: {tool['purpose']}")
print(f" Summary: {tool['summary']}")
print()
def main():
parser = argparse.ArgumentParser(description="Smart repository search tool")
parser.add_argument("query", nargs="?", help="Search query")
parser.add_argument("-l", "--list", action="store_true",
help="List all tools with short descriptions")
parser.add_argument("-a", "--add", metavar="PATH",
help="Add new file to database")
def tokenize(text: str) -> set[str]:
return set(TOKEN_RE.findall(text.lower()))
def shortlist_entries(query: str, entries: list[dict[str, str]], limit: int = 28) -> list[dict[str, str]]:
query_tokens = tokenize(query)
if not query_tokens:
return entries[:limit]
scored: list[tuple[int, dict[str, str]]] = []
for entry in entries:
haystack = f'{entry["path"]} {entry["goal"]} {entry["usage"]}'.lower()
entry_tokens = tokenize(haystack)
overlap = len(query_tokens & entry_tokens)
substring_hits = sum(1 for token in query_tokens if token in haystack)
archive_penalty = 1 if entry["path"].startswith("archive/") else 0
score = overlap * 5 + substring_hits - archive_penalty
scored.append((score, entry))
scored.sort(key=lambda item: item[0], reverse=True)
best = [entry for score, entry in scored if score > 0][:limit]
return best or entries[:limit]
def extract_json_array(output: str) -> list[dict[str, str]]:
match = re.search(r"\[\s*\{.*\}\s*\]", output, re.DOTALL)
payload = match.group(0) if match else output
data = json.loads(payload)
if not isinstance(data, list):
raise WhatError("Model output must be a JSON array.")
normalized: list[dict[str, str]] = []
for item in data:
if not isinstance(item, dict):
continue
path = str(item.get("path", "")).strip()
reason = str(item.get("reason", "")).strip()
if path:
normalized.append({"path": path, "reason": reason})
return normalized
def run_ollama_once(prompt: str, model: str) -> str:
try:
result = subprocess.run(
["ollama", "run", model, prompt],
capture_output=True,
text=True,
timeout=60,
check=False,
)
except subprocess.SubprocessError as exc:
raise WhatError(f"Ollama run failed: {exc}") from exc
if result.returncode != 0:
stderr = result.stderr.strip() or "unknown error"
raise WhatError(f"Ollama run failed: {stderr}")
return result.stdout.strip()
def run_ollama(prompt: str, model: str) -> list[dict[str, str]]:
first_output = run_ollama_once(prompt, model)
try:
return extract_json_array(first_output)
except (json.JSONDecodeError, WhatError):
repair_prompt = (
"Rewrite the following response as strict JSON only.\n"
'Target format: [{"path":"exact catalog path","reason":"short reason"}]\n'
"Do not add markdown or commentary.\n\n"
f"Response to repair:\n{first_output}\n"
)
repaired_output = run_ollama_once(repair_prompt, model)
try:
return extract_json_array(repaired_output)
except (json.JSONDecodeError, WhatError) as exc:
raise WhatError(
"Model output was not valid JSON after repair. "
f"Raw output was:\n{repaired_output}"
) from exc
def search(query: str, entries: list[dict[str, str]], model: str) -> list[dict[str, str]]:
ensure_ollama_available(model)
prompt_entries = shortlist_entries(query, entries)
raw_results = run_ollama(build_prompt(query, prompt_entries), model)
entry_map = {entry["path"]: entry for entry in entries}
results: list[dict[str, str]] = []
seen: set[str] = set()
for item in raw_results:
path = item["path"]
if path not in entry_map or path in seen:
continue
seen.add(path)
merged = dict(entry_map[path])
merged["reason"] = item.get("reason", "")
results.append(merged)
return results
def list_entries(entries: list[dict[str, str]]) -> None:
for entry in entries:
print(f'{entry["path"]}')
print(f' goal: {entry["goal"]}')
print(f' usage: {entry["usage"]}')
def show_results(query: str, results: list[dict[str, str]], model: str) -> None:
if not results:
print(f"No catalogued tool matched: {query}")
return
print(f"Model: {model}")
print(f"Query: {query}")
print()
for idx, item in enumerate(results, 1):
print(f"{idx}. {item['path']}")
print(f" Goal: {item['goal']}")
print(f" Usage: {item['usage']}")
if item.get("reason"):
print(f" Why: {item['reason']}")
print()
def main() -> int:
parser = argparse.ArgumentParser(description="README-driven repository search using Ollama")
parser.add_argument("query", nargs="?", help="Natural-language search query")
parser.add_argument("-l", "--list", action="store_true", help="List catalogued tools")
parser.add_argument("--model", default=DEFAULT_MODEL, help=f"Ollama model to use (default: {DEFAULT_MODEL})")
args = parser.parse_args()
tool = WhatTool()
try:
entries = extract_catalog(load_readme())
except WhatError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
if args.list:
tool.list_all_tools()
return
if args.add:
filepath = Path(args.add)
if not filepath.exists():
print(f"Error: File {filepath} does not exist")
sys.exit(1)
if not filepath.is_relative_to(REPO_ROOT):
print(f"Error: File must be within the repository ({REPO_ROOT})")
sys.exit(1)
tool.add_file_interactive(filepath)
return
list_entries(entries)
return 0
if not args.query:
parser.print_help()
print()
print("Available search methods:")
if tool.has_ollama:
print(" ✓ Ollama + Gemma2 (natural language)")
else:
print(" ✗ Ollama + Gemma2 (not available)")
if tool.has_fzf:
print(" ✓ fzf (fuzzy finding)")
else:
print(" ✗ fzf (not available)")
print(" ✓ grep (basic text search)")
return
# Perform search
results = tool.search(args.query)
tool.show_search_results(results)
print(f"Catalog source: {README_PATH}")
print(f"Default model: {args.model}")
return 0
try:
results = search(args.query, entries, args.model)
except WhatError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
show_results(args.query, results, args.model)
return 0
if __name__ == "__main__":
main()
raise SystemExit(main())