From 84d912ac0ab1d05ddcd94ada4f934d544ae2f00b Mon Sep 17 00:00:00 2001 From: tobias Date: Sat, 21 Feb 2026 23:10:44 +0100 Subject: [PATCH] visidata: add IOC types with cached, throttled lookups Centralize provider caching and rate-limit handling, then add Domain/URL/Hash IOC types and safer VT/IPInfo key resolution so lookups stay reliable on free-tier APIs. --- config/visidata/.gitignore | 1 + config/visidata/plugins/__init__.py | 15 +- config/visidata/plugins/ioc.py | 446 +++++++++++++++++++++ config/visidata/plugins/ioclib.py | 84 ++++ config/visidata/plugins/iplib.py | 13 + config/visidata/plugins/iptype.py | 210 +++------- config/visidata/plugins/lookupcore.py | 336 ++++++++++++++++ config/visidata/scripts/validate_ioclib.py | 45 +++ config/visidata/visidatarc | 71 +++- 9 files changed, 1048 insertions(+), 173 deletions(-) create mode 100644 config/visidata/plugins/ioc.py create mode 100644 config/visidata/plugins/ioclib.py create mode 100644 config/visidata/plugins/lookupcore.py create mode 100644 config/visidata/scripts/validate_ioclib.py diff --git a/config/visidata/.gitignore b/config/visidata/.gitignore index c4c5c83..c8a7a75 100644 --- a/config/visidata/.gitignore +++ b/config/visidata/.gitignore @@ -1,3 +1,4 @@ __pycache__/ plugins/__pycache__/ *.pyc +lookup_config.py diff --git a/config/visidata/plugins/__init__.py b/config/visidata/plugins/__init__.py index 7d7b404..45aa24a 100644 --- a/config/visidata/plugins/__init__.py +++ b/config/visidata/plugins/__init__.py @@ -10,10 +10,19 @@ their commands/types are registered on startup. for _mod in ( "hidecol", "iptype", + "ioc", ): try: __import__(f"{__name__}.{_mod}") - except Exception: - # VisiData will show exceptions in its error sheet if needed; don't hard-fail here. - pass + except ModuleNotFoundError: + # Optional/missing plugin file. + continue + except Exception as e: + # Don't silently swallow unexpected import errors; surface them. + try: + from visidata import vd + vd.warning(f"plugin import failed: plugins.{_mod}") + vd.exceptionCaught(e) + except Exception: + raise diff --git a/config/visidata/plugins/ioc.py b/config/visidata/plugins/ioc.py new file mode 100644 index 0000000..2b5635c --- /dev/null +++ b/config/visidata/plugins/ioc.py @@ -0,0 +1,446 @@ +""" +IOC datatypes for VisiData: domains, URLs, and hashes. + +Features: +- Domain normalization and lookups: RDAP, DNS, VirusTotal domain report. +- URL parsing and VT URL report. +- Hash detection + VT file report and MalwareBazaar fallback. + +All network lookups are cached in the local sqlite cache db (`options.tke_cache_db_path`). +""" + +from __future__ import annotations + +import functools +import os +import re +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple +from urllib.parse import urlsplit + +from visidata import vd +from visidata.sheets import TableSheet + +from .iplib import JSONNode, VTInfo, parse_vt_ip +from .ioclib import MBInfo, URLParts, parse_mb_info, vt_url_id +from .lookupcore import ( + auth_tag, + cache_ttl, + error_ttl, + http_get_json, + http_post_json, + opt, + sqlite_getset, +) + + +vd.option( + "tke_rdap_base", "https://rdap.org", "base URL for RDAP queries", sheettype=None +) +vd.option( + "tke_mb_api_base", + "https://mb-api.abuse.ch/api/v1/", + "base URL for MalwareBazaar API", + sheettype=None, +) + + +def _is_nullish(v: Any) -> bool: + return v is None or v == "" or v == "null" + + +def _vt_key() -> str: + from .lookupcore import read_key_from_file + + return str( + opt("tke_vt_api_key", "") + or os.getenv("VT_API_KEY") + or os.getenv("VIRUSTOTAL_API_KEY") + or read_key_from_file("~/.virustotal_api_key") + or "" + ) + + +def _rdap_base() -> str: + return str(opt("tke_rdap_base", "https://rdap.org") or "https://rdap.org").rstrip( + "/" + ) + + +def _mb_base() -> str: + return str(opt("tke_mb_api_base", "https://mb-api.abuse.ch/api/v1/") or "").strip() + + +@functools.lru_cache(maxsize=4096) +def _rdap_domain_raw(domain: str) -> Optional[Dict[str, Any]]: + base = _rdap_base() + url = f"{base}/domain/{domain}" + return sqlite_getset( + f"rdap_domain:{domain}", + lambda: http_get_json(url, provider="rdap"), + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) + + +@functools.lru_cache(maxsize=4096) +def _vt_domain_raw(domain: str) -> Optional[Dict[str, Any]]: + key = _vt_key() + if not key: + return None + tag = auth_tag(key) + url = f"https://www.virustotal.com/api/v3/domains/{domain}" + return sqlite_getset( + f"vt_domain:{tag}:{domain}", + lambda: http_get_json(url, headers={"x-apikey": key}, provider="vt"), + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) + + +@functools.lru_cache(maxsize=4096) +def _vt_url_raw(url: str) -> Optional[Dict[str, Any]]: + key = _vt_key() + if not key: + return None + tag = auth_tag(key) + url_id = vt_url_id(url) + api = f"https://www.virustotal.com/api/v3/urls/{url_id}" + return sqlite_getset( + f"vt_url:{tag}:{url_id}", + lambda: http_get_json(api, headers={"x-apikey": key}, provider="vt"), + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) + + +@functools.lru_cache(maxsize=4096) +def _vt_file_raw(h: str) -> Optional[Dict[str, Any]]: + key = _vt_key() + if not key: + return None + tag = auth_tag(key) + url = f"https://www.virustotal.com/api/v3/files/{h}" + return sqlite_getset( + f"vt_file:{tag}:{h}", + lambda: http_get_json(url, headers={"x-apikey": key}, provider="vt"), + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) + + +@functools.lru_cache(maxsize=4096) +def _mb_hash_raw(h: str) -> Optional[Dict[str, Any]]: + base = _mb_base() + if not base: + return None + return sqlite_getset( + f"mb_hash:{h}", + lambda: http_post_json(base, {"query": "get_info", "hash": h}, provider="mb"), + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) + + +@dataclass(frozen=True) +class DNSInfo: + a: Tuple[str, ...] = () + aaaa: Tuple[str, ...] = () + cname: Tuple[str, ...] = () + mx: Tuple[str, ...] = () + ns: Tuple[str, ...] = () + txt: Tuple[str, ...] = () + raw: Optional[Dict[str, Any]] = None + source: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + +def _dns_resolve(domain: str, rtype: str) -> Tuple[str, ...]: + domain = domain.rstrip(".") + rtype = rtype.upper() + try: + import dns.resolver # optional dep + + ans = dns.resolver.resolve(domain, rtype) + return tuple(str(r) for r in ans) + except Exception: + return () + + +@functools.lru_cache(maxsize=4096) +def _dns_info(domain: str) -> DNSInfo: + def _do() -> DNSInfo: + a = _dns_resolve(domain, "A") + aaaa = _dns_resolve(domain, "AAAA") + cname = _dns_resolve(domain, "CNAME") + mx = _dns_resolve(domain, "MX") + ns = _dns_resolve(domain, "NS") + txt = _dns_resolve(domain, "TXT") + raw = {"A": a, "AAAA": aaaa, "CNAME": cname, "MX": mx, "NS": ns, "TXT": txt} + return DNSInfo( + a=a, aaaa=aaaa, cname=cname, mx=mx, ns=ns, txt=txt, raw=raw, source="dns" + ) + + return sqlite_getset( + f"dns:{domain}", + _do, + max_age=cache_ttl(), + error_max_age=error_ttl(), + ) or DNSInfo(source="") + + +@functools.total_ordering +class DomainValue: + __slots__ = ("_d",) + + def __init__(self, domain: str): + self._d = domain + + @property + def domain(self) -> str: + return self._d + + def __str__(self) -> str: + return self._d + + def __repr__(self) -> str: + return f"DomainValue({self._d!r})" + + def __hash__(self) -> int: + return hash(self._d) + + def __eq__(self, other: object) -> bool: + return isinstance(other, DomainValue) and self._d == other._d + + def __lt__(self, other: object) -> bool: + if not isinstance(other, DomainValue): + return NotImplemented + return self._d < other._d + + @property + def rdap(self) -> JSONNode: + return JSONNode(_rdap_domain_raw(self._d)) + + @property + def dns(self) -> DNSInfo: + return _dns_info(self._d) + + @property + def vt(self) -> VTInfo: + data = _vt_domain_raw(self._d) + return parse_vt_ip(data) if data else VTInfo() + + +def _normalize_domain(s: str) -> str: + s = s.strip().lower() + if not s: + return "" + # Strip scheme/path if the input is a URL. + if "://" in s: + try: + sp = urlsplit(s) + if sp.hostname: + s = sp.hostname + except Exception: + pass + s = s.strip().rstrip(".") + # Strip brackets around IPv6 host literals if accidentally passed. + if s.startswith("[") and s.endswith("]"): + s = s[1:-1] + return s + + +def domain(val: Any) -> Optional[DomainValue]: + if _is_nullish(val): + return None + if isinstance(val, DomainValue): + return val + s = _normalize_domain(str(val)) + if not s: + return None + return DomainValue(s) + + +@functools.total_ordering +class URLValue: + __slots__ = ("_u", "_parts") + + def __init__(self, url: str, parts: URLParts): + self._u = url + self._parts = parts + + @property + def url(self) -> str: + return self._u + + def __str__(self) -> str: + return self._u + + def __repr__(self) -> str: + return f"URLValue({self._u!r})" + + def __hash__(self) -> int: + return hash(self._u) + + def __eq__(self, other: object) -> bool: + return isinstance(other, URLValue) and self._u == other._u + + def __lt__(self, other: object) -> bool: + if not isinstance(other, URLValue): + return NotImplemented + return self._u < other._u + + @property + def parts(self) -> URLParts: + return self._parts + + @property + def host(self) -> str: + return self._parts.host + + @property + def domain(self) -> Optional[DomainValue]: + return domain(self._parts.host) + + @property + def vt(self) -> VTInfo: + data = _vt_url_raw(self._u) + return parse_vt_ip(data) if data else VTInfo() + + +def url_ioc(val: Any) -> Optional[URLValue]: + if _is_nullish(val): + return None + if isinstance(val, URLValue): + return val + s = str(val).strip() + if not s: + return None + # Accept bare domains by prefixing scheme (so parsing is consistent). + if "://" not in s and re.match(r"^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(/|$)", s): + s = "http://" + s + try: + sp = urlsplit(s) + parts = URLParts( + scheme=sp.scheme or "", + username=sp.username or "", + password=sp.password or "", + host=sp.hostname or "", + port=sp.port, + path=sp.path or "", + query=sp.query or "", + fragment=sp.fragment or "", + ) + return URLValue(s, parts) + except Exception: + return None + + +@functools.total_ordering +class HashValue: + __slots__ = ("_h",) + + def __init__(self, h: str): + self._h = h + + @property + def hash(self) -> str: + return self._h + + def __str__(self) -> str: + return self._h + + def __repr__(self) -> str: + return f"HashValue({self._h!r})" + + def __hash__(self) -> int: + return hash(self._h) + + def __eq__(self, other: object) -> bool: + return isinstance(other, HashValue) and self._h == other._h + + def __lt__(self, other: object) -> bool: + if not isinstance(other, HashValue): + return NotImplemented + return self._h < other._h + + @property + def kind(self) -> str: + n = len(self._h) + if n == 32: + return "md5" + if n == 40: + return "sha1" + if n == 64: + return "sha256" + return "" + + @property + def vt(self) -> VTInfo: + data = _vt_file_raw(self._h) + return parse_vt_ip(data) if data else VTInfo() + + @property + def mb(self) -> MBInfo: + data = _mb_hash_raw(self._h) + return parse_mb_info(data) if data else MBInfo() + + +_HASH_RE = re.compile(r"^[A-Fa-f0-9]{32}$|^[A-Fa-f0-9]{40}$|^[A-Fa-f0-9]{64}$") + + +def hash_ioc(val: Any) -> Optional[HashValue]: + if _is_nullish(val): + return None + if isinstance(val, HashValue): + return val + s = str(val).strip() + if not s: + return None + if not _HASH_RE.match(s): + return None + return HashValue(s.lower()) + + +# Make custom converters available in command/expr globals. +vd.addGlobals(domain=domain, url_ioc=url_ioc, hash_ioc=hash_ioc) + + +vd.addType( + domain, + icon="d", + formatter=lambda fmt, v: "" if v is None else str(v), + name="Domain", +) +vd.addType( + url_ioc, icon="u", formatter=lambda fmt, v: "" if v is None else str(v), name="URL" +) +vd.addType( + hash_ioc, + icon="#", + formatter=lambda fmt, v: "" if v is None else str(v), + name="Hash", +) + +TableSheet.addCommand( + None, "type-domain", "cursorCol.type=domain", "set type of current column to Domain" +) +TableSheet.addCommand( + None, + "type-url-ioc", + "cursorCol.type=url_ioc", + "set type of current column to URL (IOC)", +) +TableSheet.addCommand( + None, + "type-hash", + "cursorCol.type=hash_ioc", + "set type of current column to Hash (md5/sha1/sha256)", +) + +vd.addMenuItem("Column", "Type", "Domain", "type-domain") +vd.addMenuItem("Column", "Type", "URL (IOC)", "type-url-ioc") +vd.addMenuItem("Column", "Type", "Hash", "type-hash") diff --git a/config/visidata/plugins/ioclib.py b/config/visidata/plugins/ioclib.py new file mode 100644 index 0000000..14c5abd --- /dev/null +++ b/config/visidata/plugins/ioclib.py @@ -0,0 +1,84 @@ +""" +Pure-Python IOC helpers for domains/urls/hashes. + +No VisiData imports; safe to unit-test with any interpreter. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + +from .iplib import JSONNode +import base64 + + +@dataclass(frozen=True) +class URLParts: + scheme: str = "" + username: str = "" + password: str = "" + host: str = "" + port: Optional[int] = None + path: str = "" + query: str = "" + fragment: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode( + { + "scheme": self.scheme, + "username": self.username, + "password": self.password, + "host": self.host, + "port": self.port, + "path": self.path, + "query": self.query, + "fragment": self.fragment, + } + ) + + +@dataclass(frozen=True) +class MBInfo: + """MalwareBazaar hash info (abuse.ch).""" + + status: str = "" # query_status + signatures: Tuple[str, ...] = () + tags: Tuple[str, ...] = () + raw: Optional[Dict[str, Any]] = None + source: str = "malwarebazaar" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + @property + def signature(self) -> str: + return self.signatures[0] if self.signatures else "" + + +def parse_mb_info(raw: Optional[Dict[str, Any]]) -> MBInfo: + raw = raw or {} + status = str(raw.get("query_status") or "") + sigs = [] + tags = [] + + data = raw.get("data") + if isinstance(data, list) and data: + item = data[0] if isinstance(data[0], dict) else {} + sig = item.get("signature") + if sig: + sigs.append(str(sig)) + t = item.get("tags") + if isinstance(t, list): + tags.extend(str(x) for x in t if x) + + return MBInfo(status=status, signatures=tuple(sigs), tags=tuple(tags), raw=raw) + + +def vt_url_id(url: str) -> str: + """Compute VirusTotal URL ID (urlsafe base64 without padding).""" + b = base64.urlsafe_b64encode(url.encode("utf-8")).decode("ascii") + return b.rstrip("=") diff --git a/config/visidata/plugins/iplib.py b/config/visidata/plugins/iplib.py index 3c8af52..68e318b 100644 --- a/config/visidata/plugins/iplib.py +++ b/config/visidata/plugins/iplib.py @@ -96,6 +96,10 @@ class IPInfo: return self.raw.get(name) raise AttributeError(name) + def __call__(self) -> "IPInfo": + # Allow `ip.ipinfo()` in VisiData expressions. + return self + @dataclass(frozen=True) class ASNInfo: @@ -114,6 +118,9 @@ class ASNInfo: return self.raw.get(name) raise AttributeError(name) + def __call__(self) -> "ASNInfo": + return self + @dataclass(frozen=True) class VTInfo: @@ -150,6 +157,9 @@ class VTInfo: return self.raw.get(name) raise AttributeError(name) + def __call__(self) -> "VTInfo": + return self + @dataclass(frozen=True) class GeoInfo: @@ -173,6 +183,9 @@ class GeoInfo: return self.raw.get(name) raise AttributeError(name) + def __call__(self) -> "GeoInfo": + return self + def _to_float(v: Any) -> Optional[float]: try: diff --git a/config/visidata/plugins/iptype.py b/config/visidata/plugins/iptype.py index 467e83d..27d71b6 100644 --- a/config/visidata/plugins/iptype.py +++ b/config/visidata/plugins/iptype.py @@ -16,11 +16,7 @@ Network calls are optional; when deps/keys are missing, properties return empty from __future__ import annotations import functools -import hashlib import os -import pickle -import sqlite3 -import time from typing import Any, Dict, Optional, Tuple, Union import ipaddress @@ -43,166 +39,52 @@ from .iplib import ( parse_geo_maxmind, parse_vt_ip, ) - - -vd.option( - "tke_cache_db_path", - os.path.expanduser("~/.visidata_cache.db"), - "sqlite cache db path for local lookups (pickle-serialized)", - sheettype=None, +from .lookupcore import ( + auth_tag, + cache_ttl, + error_ttl, + http_get_json, + opt, + read_key_from_file, + sqlite_getset, ) -vd.option("tke_lookup_cache_ttl", 60 * 60 * 24, "lookup cache ttl in seconds", sheettype=None) -vd.option( - "tke_lookup_error_ttl", - 5 * 60, - "cache ttl in seconds for failed lookups (to avoid tight loops)", - sheettype=None, -) -vd.option("tke_lookup_timeout", 10, "HTTP lookup timeout in seconds", sheettype=None) - -vd.option("tke_ipinfo_token", "", "ipinfo token (optional)", sheettype=None) -vd.option("tke_ipapi_key", "", "ipapi.co API key (optional)", sheettype=None) -vd.option("tke_vt_api_key", "", "VirusTotal API key (required for VT lookups)", sheettype=None) -vd.option( - "tke_maxmind_mmdb_path", - "", - "path to GeoLite2/GeoIP2 .mmdb file for offline MaxMind lookups", - sheettype=None, -) - - -def _opt(name: str, default: Any = "") -> Any: - try: - return getattr(vd.options, name) - except Exception: - return default - - -def _cache_path() -> str: - p = str(_opt("tke_cache_db_path", "") or os.path.expanduser("~/.visidata_cache.db")) - return os.path.expanduser(p) - - -def _auth_tag(secret: str) -> str: - if not secret: - return "noauth" - return hashlib.sha256(secret.encode("utf-8")).hexdigest()[:12] - - -def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None, error_max_age: Optional[int] = None): - """Tiny sqlite+pickle cache. Falls back to computing if db can't be used. - - `key` should NOT contain secrets; include `_auth_tag()` instead. - """ - try: - path = _cache_path() - os.makedirs(os.path.dirname(path), exist_ok=True) - with sqlite3.connect(path, timeout=2) as conn: - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA synchronous=NORMAL") - conn.execute( - "CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)" - ) - cur = conn.cursor() - cur.execute("SELECT value, timestamp FROM cache WHERE key=?", (key,)) - row = cur.fetchone() - now = int(time.time()) - if row: - val_blob, ts = row - cached_val = pickle.loads(val_blob) - age = now - int(ts) - ttl = max_age - if cached_val is None and error_max_age is not None: - ttl = error_max_age - if ttl is None or age <= int(ttl): - return cached_val - val = fn() - cur.execute( - "INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)", - (key, pickle.dumps(val), now), - ) - conn.commit() - return val - except Exception: - return fn() def _is_nullish(v: Any) -> bool: return v is None or v == "" or v == "null" -def _read_key_from_file(path: str) -> str: - try: - with open(os.path.expanduser(path)) as f: - return f.readline().strip() - except Exception: - return "" - - def _ipinfo_token() -> str: - return str(_opt("tke_ipinfo_token", "") or os.getenv("IPINFO_TOKEN") or "") + return str(opt("tke_ipinfo_token", "") or os.getenv("IPINFO_TOKEN") or "") def _ipapi_key() -> str: - return str(_opt("tke_ipapi_key", "") or os.getenv("IPAPI_KEY") or "") + return str(opt("tke_ipapi_key", "") or os.getenv("IPAPI_KEY") or "") def _vt_key() -> str: return str( - _opt("tke_vt_api_key", "") + opt("tke_vt_api_key", "") or os.getenv("VT_API_KEY") or os.getenv("VIRUSTOTAL_API_KEY") - or _read_key_from_file("~/.virustotal_api_key") + or read_key_from_file("~/.virustotal_api_key") or "" ) -def _http_timeout() -> int: - try: - return int(_opt("tke_lookup_timeout", 10)) - except Exception: - return 10 - - -def _cache_ttl() -> int: - try: - return int(_opt("tke_lookup_cache_ttl", 60 * 60 * 24)) - except Exception: - return 60 * 60 * 24 - - -def _error_ttl() -> int: - try: - return int(_opt("tke_lookup_error_ttl", 5 * 60)) - except Exception: - return 5 * 60 - - -def _http_get_json(url: str, *, headers: Optional[Dict[str, str]] = None) -> Optional[Dict[str, Any]]: - try: - import requests # optional dep - - r = requests.get(url, headers=headers, timeout=_http_timeout()) - if not r.ok: - return None - return r.json() - except Exception: - return None - - @functools.lru_cache(maxsize=4096) def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]: token = _ipinfo_token() - tag = _auth_tag(token) + tag = auth_tag(token) url = f"http://ipinfo.io/{ip}/json" if token: url = f"{url}?token={token}" - return _sqlite_getset( + return sqlite_getset( f"ipinfo:{tag}:{ip}", - lambda: _http_get_json(url), - max_age=_cache_ttl(), - error_max_age=_error_ttl(), + lambda: http_get_json(url, provider="ipinfo"), + max_age=cache_ttl(), + error_max_age=error_ttl(), ) @@ -210,11 +92,11 @@ def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]: def _ipwho_raw(ip: str) -> Optional[Dict[str, Any]]: # Free geo+asn provider; no key. url = f"https://ipwho.is/{ip}" - return _sqlite_getset( + return sqlite_getset( f"ipwho:{ip}", - lambda: _http_get_json(url), - max_age=_cache_ttl(), - error_max_age=_error_ttl(), + lambda: http_get_json(url, provider="ipwho"), + max_age=cache_ttl(), + error_max_age=error_ttl(), ) @@ -222,15 +104,15 @@ def _ipwho_raw(ip: str) -> Optional[Dict[str, Any]]: def _ipapi_raw(ip: str) -> Optional[Dict[str, Any]]: # Free tier works without key for many cases; key improves limits/features. key = _ipapi_key() - tag = _auth_tag(key) + tag = auth_tag(key) url = f"https://ipapi.co/{ip}/json/" if key: url = f"{url}?key={key}" - return _sqlite_getset( + return sqlite_getset( f"ipapi:{tag}:{ip}", - lambda: _http_get_json(url), - max_age=_cache_ttl(), - error_max_age=_error_ttl(), + lambda: http_get_json(url, provider="ipapi"), + max_age=cache_ttl(), + error_max_age=error_ttl(), ) @@ -263,25 +145,31 @@ def _vt_info(ip: str) -> VTInfo: def _do() -> VTInfo: try: - data = _http_get_json( + data = http_get_json( f"https://www.virustotal.com/api/v3/ip_addresses/{ip}", headers={"x-apikey": key}, + provider="vt", ) return parse_vt_ip(data) except Exception: return VTInfo() - tag = _auth_tag(key) - return _sqlite_getset( + tag = auth_tag(key) + return sqlite_getset( f"vt_ip:{tag}:{ip}", _do, - max_age=_cache_ttl(), - error_max_age=_error_ttl(), + max_age=cache_ttl(), + error_max_age=error_ttl(), ) def _maxmind_paths() -> Tuple[str, ...]: - p = str(_opt("tke_maxmind_mmdb_path", "") or os.getenv("MAXMIND_MMDB_PATH") or os.getenv("GEOIP_MMDB_PATH") or "") + p = str( + opt("tke_maxmind_mmdb_path", "") + or os.getenv("MAXMIND_MMDB_PATH") + or os.getenv("GEOIP_MMDB_PATH") + or "" + ) if p: return (os.path.expanduser(p),) @@ -347,11 +235,11 @@ def _maxmind_geo(ip: str) -> GeoInfo: except Exception: return GeoInfo(source="") - return _sqlite_getset( + return sqlite_getset( f"maxmind:{sig}:{ip}", _do, - max_age=_cache_ttl(), - error_max_age=_error_ttl(), + max_age=cache_ttl(), + error_max_age=error_ttl(), ) or GeoInfo(source="") @@ -480,6 +368,12 @@ class IPValue: # allow "netmask * ip" too return self.__mul__(other) + # Convenience helpers (to match common expectations in expressions) + def country(self) -> str: + # Prefer best-available geo; fall back to ipinfo. + g = self.geo + return g.country_code or self.ipinfo.country or "" + # Normalized enrichments @property def ipinfo(self) -> IPInfo: @@ -539,7 +433,13 @@ def ip(val: Any) -> Optional[IPValue]: return None -vd.addType(ip, icon=":", formatter=lambda fmt, v: "" if v is None else str(v), name="IP") +# Make custom converter available in command/expr globals. +vd.addGlobals(ip=ip) + + +vd.addType( + ip, icon=":", formatter=lambda fmt, v: "" if v is None else str(v), name="IP" +) TableSheet.addCommand( None, @@ -547,3 +447,5 @@ TableSheet.addCommand( "cursorCol.type=ip", "set type of current column to IP (IPv4/IPv6/CIDR)", ) + +vd.addMenuItem("Column", "Type", "IP (IPv4/IPv6/CIDR)", "type-ip") diff --git a/config/visidata/plugins/lookupcore.py b/config/visidata/plugins/lookupcore.py new file mode 100644 index 0000000..b297167 --- /dev/null +++ b/config/visidata/plugins/lookupcore.py @@ -0,0 +1,336 @@ +""" +Lookup/caching helpers shared across local plugins. + +Depends on VisiData (`vd`) because options are stored in vd.options. +""" + +from __future__ import annotations + +import hashlib +import os +import pickle +import sqlite3 +import threading +import time +from email.utils import parsedate_to_datetime +from typing import Any, Callable, Dict, Optional + +from visidata import vd + + +vd.option( + "tke_cache_db_path", + os.path.expanduser("~/.visidata_cache.db"), + "sqlite cache db path for local lookups (pickle-serialized)", + sheettype=None, +) +vd.option( + "tke_lookup_cache_ttl", 60 * 60 * 24, "lookup cache ttl in seconds", sheettype=None +) +vd.option( + "tke_lookup_error_ttl", + 5 * 60, + "cache ttl in seconds for failed lookups (to avoid tight loops)", + sheettype=None, +) +vd.option("tke_lookup_timeout", 10, "HTTP lookup timeout in seconds", sheettype=None) +vd.option( + "tke_http_retries", + 1, + "number of retries for transient HTTP failures", + sheettype=None, +) + +# Provider-specific minimum delay between requests (seconds). +vd.option( + "tke_throttle_default_sec", + 0.0, + "default min delay between HTTP requests", + sheettype=None, +) +vd.option( + "tke_throttle_vt_sec", + 16.0, + "min delay between VirusTotal API requests", + sheettype=None, +) +vd.option( + "tke_throttle_ipinfo_sec", 0.5, "min delay between ipinfo requests", sheettype=None +) +vd.option( + "tke_throttle_ipapi_sec", 1.0, "min delay between ipapi.co requests", sheettype=None +) +vd.option( + "tke_throttle_ipwho_sec", 0.5, "min delay between ipwho.is requests", sheettype=None +) +vd.option( + "tke_throttle_rdap_sec", 1.0, "min delay between RDAP requests", sheettype=None +) +vd.option( + "tke_throttle_mb_sec", + 1.0, + "min delay between MalwareBazaar requests", + sheettype=None, +) + +# API keys/tokens (optional unless otherwise stated by the provider). +vd.option("tke_ipinfo_token", "", "ipinfo token (optional)", sheettype=None) +vd.option("tke_ipapi_key", "", "ipapi.co API key (optional)", sheettype=None) +vd.option( + "tke_vt_api_key", "", "VirusTotal API key (required for VT lookups)", sheettype=None +) +vd.option( + "tke_maxmind_mmdb_path", + "", + "path to GeoLite2/GeoIP2 .mmdb file for offline MaxMind lookups", + sheettype=None, +) + + +def opt(name: str, default: Any = "") -> Any: + try: + return getattr(vd.options, name) + except Exception: + return default + + +def cache_path() -> str: + p = str(opt("tke_cache_db_path", "") or os.path.expanduser("~/.visidata_cache.db")) + return os.path.expanduser(p) + + +def auth_tag(secret: str) -> str: + if not secret: + return "noauth" + return hashlib.sha256(secret.encode("utf-8")).hexdigest()[:12] + + +def cache_ttl() -> int: + try: + return int(opt("tke_lookup_cache_ttl", 60 * 60 * 24)) + except Exception: + return 60 * 60 * 24 + + +def error_ttl() -> int: + try: + return int(opt("tke_lookup_error_ttl", 5 * 60)) + except Exception: + return 5 * 60 + + +def http_timeout() -> int: + try: + return int(opt("tke_lookup_timeout", 10)) + except Exception: + return 10 + + +def sqlite_getset( + key: str, + fn: Callable[[], Any], + *, + max_age: Optional[int] = None, + error_max_age: Optional[int] = None, +): + """SQLite+pickle cache. Falls back to computing if db can't be used. + + `key` should NOT contain secrets; include `auth_tag()` instead. + """ + try: + path = cache_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with sqlite3.connect(path, timeout=2) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute( + "CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)" + ) + cur = conn.cursor() + cur.execute("SELECT value, timestamp FROM cache WHERE key=?", (key,)) + row = cur.fetchone() + now = int(time.time()) + if row: + val_blob, ts = row + cached_val = pickle.loads(val_blob) + age = now - int(ts) + ttl = max_age + if cached_val is None and error_max_age is not None: + ttl = error_max_age + if ttl is None or age <= int(ttl): + return cached_val + val = fn() + cur.execute( + "INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)", + (key, pickle.dumps(val), now), + ) + conn.commit() + return val + except Exception: + return fn() + + +def read_key_from_file(path: str) -> str: + try: + with open(os.path.expanduser(path)) as f: + return f.readline().strip() + except Exception: + return "" + + +_rate_lock = threading.Lock() +_next_allowed_at: Dict[str, float] = {} +_retry_after_until: Dict[str, float] = {} + + +def _provider_for_url(url: str) -> str: + u = str(url).lower() + if "virustotal.com" in u: + return "vt" + if "ipinfo.io" in u: + return "ipinfo" + if "ipapi.co" in u: + return "ipapi" + if "ipwho.is" in u: + return "ipwho" + if "rdap" in u: + return "rdap" + if "mb-api.abuse.ch" in u: + return "mb" + return "default" + + +def _provider_delay(provider: str) -> float: + optname = { + "vt": "tke_throttle_vt_sec", + "ipinfo": "tke_throttle_ipinfo_sec", + "ipapi": "tke_throttle_ipapi_sec", + "ipwho": "tke_throttle_ipwho_sec", + "rdap": "tke_throttle_rdap_sec", + "mb": "tke_throttle_mb_sec", + }.get(provider, "tke_throttle_default_sec") + try: + return max(0.0, float(opt(optname, 0.0))) + except Exception: + return 0.0 + + +def _wait_for_slot(provider: str) -> None: + now = time.monotonic() + with _rate_lock: + ready = max( + now, + _next_allowed_at.get(provider, 0.0), + _retry_after_until.get(provider, 0.0), + ) + _next_allowed_at[provider] = ready + _provider_delay(provider) + if ready > now: + time.sleep(ready - now) + + +def _mark_retry_after(provider: str, retry_after_s: float) -> None: + if retry_after_s <= 0: + return + until = time.monotonic() + retry_after_s + with _rate_lock: + prev = _retry_after_until.get(provider, 0.0) + if until > prev: + _retry_after_until[provider] = until + + +def _parse_retry_after(value: str) -> Optional[float]: + v = (value or "").strip() + if not v: + return None + try: + sec = float(v) + if sec >= 0: + return sec + except Exception: + pass + try: + dt = parsedate_to_datetime(v) + if dt is None: + return None + # parsedate_to_datetime can return naive dt; treat as UTC then. + if dt.tzinfo is None: + return max(0.0, dt.timestamp() - time.time()) + return max(0.0, dt.timestamp() - time.time()) + except Exception: + return None + + +def _request_json( + method: str, + url: str, + *, + headers: Optional[Dict[str, str]] = None, + data: Optional[Dict[str, Any]] = None, + provider: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + try: + import requests # optional dep + except Exception: + return None + + prov = provider or _provider_for_url(url) + try: + retries = max(0, int(opt("tke_http_retries", 1))) + except Exception: + retries = 1 + + for attempt in range(retries + 1): + _wait_for_slot(prov) + try: + r = requests.request( + method, + url, + headers=headers, + data=data, + timeout=http_timeout(), + ) + except Exception: + if attempt < retries: + continue + return None + + if r.status_code == 429: + ra = _parse_retry_after(r.headers.get("Retry-After", "")) + if ra is None: + ra = max(1.0, _provider_delay(prov)) + _mark_retry_after(prov, ra) + if attempt < retries: + continue + return None + + if not r.ok: + if 500 <= r.status_code < 600 and attempt < retries: + _mark_retry_after(prov, max(1.0, _provider_delay(prov))) + continue + return None + + try: + return r.json() + except Exception: + return None + + return None + + +def http_get_json( + url: str, + *, + headers: Optional[Dict[str, str]] = None, + provider: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + return _request_json("GET", url, headers=headers, provider=provider) + + +def http_post_json( + url: str, + data: Dict[str, Any], + *, + headers: Optional[Dict[str, str]] = None, + provider: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + return _request_json("POST", url, headers=headers, data=data, provider=provider) diff --git a/config/visidata/scripts/validate_ioclib.py b/config/visidata/scripts/validate_ioclib.py new file mode 100644 index 0000000..9d4c223 --- /dev/null +++ b/config/visidata/scripts/validate_ioclib.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +""" +Offline validation for plugins/ioclib.py helpers. +""" + +from __future__ import annotations + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from plugins.ioclib import parse_mb_info, vt_url_id # noqa: E402 + + +def _assert(cond: bool, msg: str): + if not cond: + raise AssertionError(msg) + + +def main() -> int: + _assert(vt_url_id("http://example.com/") == "aHR0cDovL2V4YW1wbGUuY29tLw", "vt_url_id known example") + + mb = parse_mb_info( + { + "query_status": "ok", + "data": [ + { + "sha256_hash": "0" * 64, + "signature": "Emotet", + "tags": ["tag1", "tag2"], + } + ], + } + ) + _assert(mb.status == "ok", "mb.status") + _assert(mb.signature == "Emotet", "mb.signature") + _assert(mb.tags == ("tag1", "tag2"), "mb.tags") + print("ok") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + diff --git a/config/visidata/visidatarc b/config/visidata/visidatarc index c26f2f3..ce0ad2a 100644 --- a/config/visidata/visidatarc +++ b/config/visidata/visidatarc @@ -17,6 +17,18 @@ try: except ModuleNotFoundError: pass +try: + import plugins.ioc +except ModuleNotFoundError: + pass + +# Optional local lookup settings (tokens, key preference, throttling). +# Keep this as a separate module so secrets can stay out of versioned config. +try: + import lookup_config +except ModuleNotFoundError: + pass + from datetime import datetime import functools import json @@ -171,12 +183,23 @@ def vendor(mac): def _get_vt(): try: from virus_total_apis import PublicApi as VirusTotalPublicApi - import os.path - with open(os.path.expanduser('~/.virustotal_api_key')) as af: - API_KEY = af.readline().strip() - vt = VirusTotalPublicApi(API_KEY) + api_key = str( + getattr(options, 'tke_vt_api_key', '') + or os.getenv('VT_API_KEY') + or os.getenv('VIRUSTOTAL_API_KEY') + or '' + ) + if not api_key: + try: + with open(os.path.expanduser('~/.virustotal_api_key')) as af: + api_key = af.readline().strip() + except Exception: + api_key = '' + if not api_key: + return None + vt = VirusTotalPublicApi(api_key) return vt - except: + except Exception: return None @disk_cache_decorator() @@ -204,19 +227,20 @@ def dns_lookup(domain, record='A'): try: import dns import dns.resolver as rs + except ModuleNotFoundError: + return "module not available" + try: # dnspython 2.x prefers resolve(); keep a fallback for older versions. try: result = rs.resolve(domain, record) except AttributeError: result = rs.query(domain, record) return ",".join([x.to_text() for x in result]) - except dns.resolver.NoAnswer as e: + except dns.resolver.NoAnswer: return "" - except dns.exception.DNSException as e: + except dns.exception.DNSException: # return e.msg return "" - except ModuleNotFoundError: - return "module not available" @disk_cache_decorator() def _asn(ip): @@ -250,14 +274,28 @@ def asn(ip, type="asn"): @disk_cache_decorator() def _ipinfo(ip): + token = str(getattr(options, 'tke_ipinfo_token', '') or os.getenv('IPINFO_TOKEN') or '') + url = 'https://ipinfo.io/{}/json'.format(ip) + if token: + url = '{}?token={}'.format(url, token) try: - import requests - import json - r = requests.get(url='http://ipinfo.io/{}/json'.format(ip)) - return r.json() - except json.JSONDecodeError as e: - return None + from plugins.lookupcore import http_get_json + + return http_get_json(url, provider='ipinfo') except ModuleNotFoundError: + try: + import requests + import json + + r = requests.get(url=url, timeout=10) + if not r.ok: + return None + return r.json() + except json.JSONDecodeError: + return None + except ModuleNotFoundError: + return None + except Exception: return None @functools.lru_cache(maxsize=1000) @@ -284,7 +322,8 @@ def split_number2ip(number): @functools.lru_cache(maxsize=1000) def mx_lookup(domain): - domain = domain.lstrip("www.") + if domain.startswith("www."): + domain = domain[4:] try: mxs = dns_lookup(domain, 'MX').split(",") mxt = [x.split(" ")[1] for x in mxs if len(x.split(" ")) == 2]