diff --git a/config/visidata/plugins/iplib.py b/config/visidata/plugins/iplib.py new file mode 100644 index 0000000..3c8af52 --- /dev/null +++ b/config/visidata/plugins/iplib.py @@ -0,0 +1,345 @@ +""" +Pure-Python helpers for IP lookups. + +This module intentionally does not import VisiData so it can be unit-tested with +any Python interpreter. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + + +class JSONNode: + """Recursive dict/list wrapper for attribute access in expressions. + + Missing keys/attrs return a JSONNode(None) which renders as empty string. + """ + + __slots__ = ("_v",) + + def __init__(self, v: Any): + self._v = v + + def _wrap(self, v: Any) -> "JSONNode": + return JSONNode(v) + + def __getattr__(self, name: str) -> "JSONNode": + if isinstance(self._v, dict): + return self._wrap(self._v.get(name)) + return self._wrap(None) + + def __getitem__(self, key: Any) -> "JSONNode": + try: + if isinstance(self._v, dict): + return self._wrap(self._v.get(key)) + if isinstance(self._v, list) and isinstance(key, int): + return self._wrap(self._v[key] if 0 <= key < len(self._v) else None) + except Exception: + pass + return self._wrap(None) + + def get(self, key: Any, default: Any = None) -> "JSONNode": + if isinstance(self._v, dict): + return self._wrap(self._v.get(key, default)) + return self._wrap(default) + + @property + def raw(self) -> Any: + return self._v + + def __bool__(self) -> bool: + return bool(self._v) + + def __str__(self) -> str: + return "" if self._v is None else str(self._v) + + def __repr__(self) -> str: + return f"JSONNode({self._v!r})" + + +@dataclass(frozen=True) +class IPInfo: + raw: Optional[Dict[str, Any]] = None + source: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + @property + def country(self) -> str: + return str((self.raw or {}).get("country") or "") + + @property + def region(self) -> str: + return str((self.raw or {}).get("region") or "") + + @property + def city(self) -> str: + return str((self.raw or {}).get("city") or "") + + @property + def org(self) -> str: + return str((self.raw or {}).get("org") or "") + + @property + def asn(self) -> str: + org = self.org + if org.startswith("AS"): + return org.split(" ", 1)[0] + return "" + + def __getattr__(self, name: str) -> Any: + if isinstance(self.raw, dict) and name in self.raw: + return self.raw.get(name) + raise AttributeError(name) + + +@dataclass(frozen=True) +class ASNInfo: + asn: str = "" + name: str = "" + country: str = "" + raw: Optional[Dict[str, Any]] = None + source: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + def __getattr__(self, name: str) -> Any: + if isinstance(self.raw, dict) and name in self.raw: + return self.raw.get(name) + raise AttributeError(name) + + +@dataclass(frozen=True) +class VTInfo: + malicious: int = 0 + suspicious: int = 0 + total: int = 0 + reputation: Optional[int] = None + categories: Tuple[str, ...] = () + raw: Optional[Dict[str, Any]] = None + source: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + @property + def verdict(self) -> str: + return "" if self.total <= 0 else f"{self.malicious}/{self.total}" + + @property + def ratio(self) -> Optional[float]: + return None if self.total <= 0 else self.malicious / self.total + + @property + def type(self) -> str: # noqa: A003 + return self.category + + @property + def category(self) -> str: + return self.categories[0] if self.categories else "" + + def __getattr__(self, name: str) -> Any: + if isinstance(self.raw, dict) and name in self.raw: + return self.raw.get(name) + raise AttributeError(name) + + +@dataclass(frozen=True) +class GeoInfo: + country: str = "" + country_code: str = "" + region: str = "" + city: str = "" + lat: Optional[float] = None + lon: Optional[float] = None + timezone: str = "" + postal: str = "" + raw: Optional[Dict[str, Any]] = None + source: str = "" + + @property + def data(self) -> JSONNode: + return JSONNode(self.raw) + + def __getattr__(self, name: str) -> Any: + if isinstance(self.raw, dict) and name in self.raw: + return self.raw.get(name) + raise AttributeError(name) + + +def _to_float(v: Any) -> Optional[float]: + try: + return float(v) if v is not None else None + except Exception: + return None + + +def parse_asn_ipinfo(raw: Optional[Dict[str, Any]]) -> ASNInfo: + raw = raw or {} + org = str(raw.get("org") or "") + cc = str(raw.get("country") or "") + if org.startswith("AS"): + parts = org.split(" ", 1) + asn = parts[0] + name = parts[1] if len(parts) > 1 else "" + return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipinfo") + return ASNInfo(raw=raw, source="ipinfo") + + +def parse_asn_ipapi(raw: Optional[Dict[str, Any]]) -> ASNInfo: + raw = raw or {} + asn = str(raw.get("asn") or "") + name = str(raw.get("org") or raw.get("organisation") or "") + cc = str(raw.get("country_code") or raw.get("country") or "") + if asn: + if not asn.startswith("AS"): + # ipapi typically already uses "AS123", but normalize if needed. + asn = f"AS{asn}" + return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipapi") + return ASNInfo(raw=raw, source="ipapi") + + +def parse_asn_ipwho(raw: Optional[Dict[str, Any]]) -> ASNInfo: + raw = raw or {} + conn = raw.get("connection") + if isinstance(conn, dict): + asn = str(conn.get("asn") or "") + name = str(conn.get("isp") or "") + cc = str(raw.get("country_code") or "") + if asn: + if not asn.startswith("AS"): + asn = f"AS{asn}" + return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipwho") + return ASNInfo(raw=raw, source="ipwho") + + +def parse_geo_ipinfo(raw: Optional[Dict[str, Any]]) -> GeoInfo: + raw = raw or {} + loc = str(raw.get("loc") or "") + lat = lon = None + if "," in loc: + a, b = loc.split(",", 1) + lat = _to_float(a) + lon = _to_float(b) + cc = str(raw.get("country") or "") + return GeoInfo( + country=cc, # ipinfo returns 2-letter country code; keep as-is + country_code=cc, + region=str(raw.get("region") or ""), + city=str(raw.get("city") or ""), + lat=lat, + lon=lon, + timezone=str(raw.get("timezone") or ""), + postal=str(raw.get("postal") or ""), + raw=raw, + source="ipinfo", + ) + + +def parse_geo_ipapi(raw: Optional[Dict[str, Any]]) -> GeoInfo: + raw = raw or {} + return GeoInfo( + country=str(raw.get("country_name") or ""), + country_code=str(raw.get("country_code") or raw.get("country") or ""), + region=str(raw.get("region") or ""), + city=str(raw.get("city") or ""), + lat=_to_float(raw.get("latitude")), + lon=_to_float(raw.get("longitude")), + timezone=str(raw.get("timezone") or ""), + postal=str(raw.get("postal") or ""), + raw=raw, + source="ipapi", + ) + + +def parse_geo_ipwho(raw: Optional[Dict[str, Any]]) -> GeoInfo: + raw = raw or {} + return GeoInfo( + country=str(raw.get("country") or ""), + country_code=str(raw.get("country_code") or ""), + region=str(raw.get("region") or ""), + city=str(raw.get("city") or ""), + lat=_to_float(raw.get("latitude")), + lon=_to_float(raw.get("longitude")), + timezone=str(raw.get("timezone") or ""), + postal=str(raw.get("postal") or ""), + raw=raw, + source="ipwho", + ) + + +def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo: + raw = raw or {} + cc = str(((raw.get("country") or {}).get("iso_code")) or "") + country = str(((raw.get("country") or {}).get("names") or {}).get("en") or "") + city = str(((raw.get("city") or {}).get("names") or {}).get("en") or "") + region = "" + subs = raw.get("subdivisions") or [] + if isinstance(subs, list) and subs: + region = str(((subs[0] or {}).get("names") or {}).get("en") or "") + loc = raw.get("location") or {} + lat = _to_float(loc.get("latitude")) + lon = _to_float(loc.get("longitude")) + tz = str(loc.get("time_zone") or "") + postal = str(((raw.get("postal") or {}).get("code")) or "") + return GeoInfo( + country=country, + country_code=cc, + region=region, + city=city, + lat=lat, + lon=lon, + timezone=tz, + postal=postal, + raw=raw, + source="maxmind", + ) + + +def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo: + data = data or {} + attrs = (((data.get("data") or {}).get("attributes") or {}) if isinstance(data, dict) else {}) + stats = attrs.get("last_analysis_stats") or {} + try: + malicious = int(stats.get("malicious") or 0) + except Exception: + malicious = 0 + try: + suspicious = int(stats.get("suspicious") or 0) + except Exception: + suspicious = 0 + try: + total = int(sum(int(v or 0) for v in stats.values())) + except Exception: + total = 0 + + cats = attrs.get("categories") or {} + if isinstance(cats, dict): + categories = tuple(str(v) for v in cats.values() if v) + elif isinstance(cats, list): + categories = tuple(str(v) for v in cats if v) + else: + categories = () + + rep = attrs.get("reputation") + try: + reputation = int(rep) if rep is not None else None + except Exception: + reputation = None + + return VTInfo( + malicious=malicious, + suspicious=suspicious, + total=total, + reputation=reputation, + categories=categories, + raw=data, + source="virustotal", + ) diff --git a/config/visidata/plugins/iptype.py b/config/visidata/plugins/iptype.py index a9595d0..467e83d 100644 --- a/config/visidata/plugins/iptype.py +++ b/config/visidata/plugins/iptype.py @@ -15,8 +15,8 @@ Network calls are optional; when deps/keys are missing, properties return empty from __future__ import annotations -from dataclasses import dataclass import functools +import hashlib import os import pickle import sqlite3 @@ -28,15 +28,78 @@ import ipaddress from visidata import vd from visidata.sheets import TableSheet - -_CACHE_PATH = os.path.expanduser("~/.visidata_cache.db") +from .iplib import ( + ASNInfo, + GeoInfo, + IPInfo, + JSONNode, + VTInfo, + parse_asn_ipapi, + parse_asn_ipinfo, + parse_asn_ipwho, + parse_geo_ipapi, + parse_geo_ipinfo, + parse_geo_ipwho, + parse_geo_maxmind, + parse_vt_ip, +) -def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None): - """Tiny sqlite+pickle cache. Falls back to computing if db can't be used.""" +vd.option( + "tke_cache_db_path", + os.path.expanduser("~/.visidata_cache.db"), + "sqlite cache db path for local lookups (pickle-serialized)", + sheettype=None, +) +vd.option("tke_lookup_cache_ttl", 60 * 60 * 24, "lookup cache ttl in seconds", sheettype=None) +vd.option( + "tke_lookup_error_ttl", + 5 * 60, + "cache ttl in seconds for failed lookups (to avoid tight loops)", + sheettype=None, +) +vd.option("tke_lookup_timeout", 10, "HTTP lookup timeout in seconds", sheettype=None) + +vd.option("tke_ipinfo_token", "", "ipinfo token (optional)", sheettype=None) +vd.option("tke_ipapi_key", "", "ipapi.co API key (optional)", sheettype=None) +vd.option("tke_vt_api_key", "", "VirusTotal API key (required for VT lookups)", sheettype=None) +vd.option( + "tke_maxmind_mmdb_path", + "", + "path to GeoLite2/GeoIP2 .mmdb file for offline MaxMind lookups", + sheettype=None, +) + + +def _opt(name: str, default: Any = "") -> Any: try: - os.makedirs(os.path.dirname(_CACHE_PATH), exist_ok=True) - with sqlite3.connect(_CACHE_PATH) as conn: + return getattr(vd.options, name) + except Exception: + return default + + +def _cache_path() -> str: + p = str(_opt("tke_cache_db_path", "") or os.path.expanduser("~/.visidata_cache.db")) + return os.path.expanduser(p) + + +def _auth_tag(secret: str) -> str: + if not secret: + return "noauth" + return hashlib.sha256(secret.encode("utf-8")).hexdigest()[:12] + + +def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None, error_max_age: Optional[int] = None): + """Tiny sqlite+pickle cache. Falls back to computing if db can't be used. + + `key` should NOT contain secrets; include `_auth_tag()` instead. + """ + try: + path = _cache_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with sqlite3.connect(path, timeout=2) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") conn.execute( "CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)" ) @@ -46,8 +109,13 @@ def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None): now = int(time.time()) if row: val_blob, ts = row - if max_age is None or now - int(ts) <= max_age: - return pickle.loads(val_blob) + cached_val = pickle.loads(val_blob) + age = now - int(ts) + ttl = max_age + if cached_val is None and error_max_age is not None: + ttl = error_max_age + if ttl is None or age <= int(ttl): + return cached_val val = fn() cur.execute( "INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)", @@ -63,165 +131,250 @@ def _is_nullish(v: Any) -> bool: return v is None or v == "" or v == "null" -@dataclass(frozen=True) -class IPInfo: - raw: Optional[Dict[str, Any]] = None - - @property - def country(self) -> str: - return str((self.raw or {}).get("country") or "") - - @property - def region(self) -> str: - return str((self.raw or {}).get("region") or "") - - @property - def city(self) -> str: - return str((self.raw or {}).get("city") or "") - - @property - def org(self) -> str: - return str((self.raw or {}).get("org") or "") - - @property - def asn(self) -> str: - # ipinfo "org" is often "AS12345 Some Org" - org = self.org - if org.startswith("AS"): - tok = org.split(" ", 1)[0] - return tok - return "" - - -@dataclass(frozen=True) -class ASNInfo: - asn: str = "" - name: str = "" - country: str = "" - raw: Optional[Dict[str, Any]] = None - - -@dataclass(frozen=True) -class VTInfo: - malicious: int = 0 - suspicious: int = 0 - total: int = 0 - reputation: Optional[int] = None - categories: Tuple[str, ...] = () - raw: Optional[Dict[str, Any]] = None - - @property - def verdict(self) -> str: - if self.total <= 0: - return "" - return f"{self.malicious}/{self.total}" - - @property - def ratio(self) -> Optional[float]: - if self.total <= 0: - return None - return self.malicious / self.total - - # User asked for vt.type; "category" is a better name, but keep alias. - @property - def type(self) -> str: # noqa: A003 - intentional API - return self.category - - @property - def category(self) -> str: - return self.categories[0] if self.categories else "" - - -def _read_vt_key() -> str: +def _read_key_from_file(path: str) -> str: try: - with open(os.path.expanduser("~/.virustotal_api_key")) as f: + with open(os.path.expanduser(path)) as f: return f.readline().strip() except Exception: return "" +def _ipinfo_token() -> str: + return str(_opt("tke_ipinfo_token", "") or os.getenv("IPINFO_TOKEN") or "") + + +def _ipapi_key() -> str: + return str(_opt("tke_ipapi_key", "") or os.getenv("IPAPI_KEY") or "") + + +def _vt_key() -> str: + return str( + _opt("tke_vt_api_key", "") + or os.getenv("VT_API_KEY") + or os.getenv("VIRUSTOTAL_API_KEY") + or _read_key_from_file("~/.virustotal_api_key") + or "" + ) + + +def _http_timeout() -> int: + try: + return int(_opt("tke_lookup_timeout", 10)) + except Exception: + return 10 + + +def _cache_ttl() -> int: + try: + return int(_opt("tke_lookup_cache_ttl", 60 * 60 * 24)) + except Exception: + return 60 * 60 * 24 + + +def _error_ttl() -> int: + try: + return int(_opt("tke_lookup_error_ttl", 5 * 60)) + except Exception: + return 5 * 60 + + +def _http_get_json(url: str, *, headers: Optional[Dict[str, str]] = None) -> Optional[Dict[str, Any]]: + try: + import requests # optional dep + + r = requests.get(url, headers=headers, timeout=_http_timeout()) + if not r.ok: + return None + return r.json() + except Exception: + return None + + @functools.lru_cache(maxsize=4096) def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]: - def _do(): - try: - import requests # optional dep + token = _ipinfo_token() + tag = _auth_tag(token) + url = f"http://ipinfo.io/{ip}/json" + if token: + url = f"{url}?token={token}" - r = requests.get(f"http://ipinfo.io/{ip}/json", timeout=5) - return r.json() - except Exception: - return None + return _sqlite_getset( + f"ipinfo:{tag}:{ip}", + lambda: _http_get_json(url), + max_age=_cache_ttl(), + error_max_age=_error_ttl(), + ) - return _sqlite_getset(f"ipinfo:{ip}", _do, max_age=60 * 60 * 24) + +@functools.lru_cache(maxsize=4096) +def _ipwho_raw(ip: str) -> Optional[Dict[str, Any]]: + # Free geo+asn provider; no key. + url = f"https://ipwho.is/{ip}" + return _sqlite_getset( + f"ipwho:{ip}", + lambda: _http_get_json(url), + max_age=_cache_ttl(), + error_max_age=_error_ttl(), + ) + + +@functools.lru_cache(maxsize=4096) +def _ipapi_raw(ip: str) -> Optional[Dict[str, Any]]: + # Free tier works without key for many cases; key improves limits/features. + key = _ipapi_key() + tag = _auth_tag(key) + url = f"https://ipapi.co/{ip}/json/" + if key: + url = f"{url}?key={key}" + return _sqlite_getset( + f"ipapi:{tag}:{ip}", + lambda: _http_get_json(url), + max_age=_cache_ttl(), + error_max_age=_error_ttl(), + ) @functools.lru_cache(maxsize=4096) def _asn_info(ip: str) -> ASNInfo: - # Prefer ipinfo-derived ASN (no scraping deps). - raw = _ipinfo_raw(ip) or {} - org = str(raw.get("org") or "") - country = str(raw.get("country") or "") - asn = "" - name = "" - if org.startswith("AS"): - parts = org.split(" ", 1) - asn = parts[0] - name = parts[1] if len(parts) > 1 else "" - return ASNInfo(asn=asn, name=name, country=country, raw={"org": org, "country": country}) + # Prefer ipinfo-derived ASN; fall back to ipapi/ipwho (free). + ipinfo = _ipinfo_raw(ip) or {} + asn = parse_asn_ipinfo(ipinfo) + if asn.asn: + return asn + + ipapi = _ipapi_raw(ip) or {} + asn = parse_asn_ipapi(ipapi) + if asn.asn: + return asn + + ipwho = _ipwho_raw(ip) or {} + asn = parse_asn_ipwho(ipwho) + if asn.asn: + return asn + + return ASNInfo(raw=None, source="") @functools.lru_cache(maxsize=4096) def _vt_info(ip: str) -> VTInfo: - key = _read_vt_key() + key = _vt_key() if not key: return VTInfo() def _do() -> VTInfo: try: - import requests # optional dep - - r = requests.get( + data = _http_get_json( f"https://www.virustotal.com/api/v3/ip_addresses/{ip}", headers={"x-apikey": key}, - timeout=10, - ) - data = r.json() if r.ok else None - attrs = (((data or {}).get("data") or {}).get("attributes") or {}) - stats = attrs.get("last_analysis_stats") or {} - malicious = int(stats.get("malicious") or 0) - suspicious = int(stats.get("suspicious") or 0) - total = 0 - try: - total = int(sum(int(v or 0) for v in stats.values())) - except Exception: - total = 0 - - cats = attrs.get("categories") or {} - if isinstance(cats, dict): - # keep stable order - categories = tuple(str(v) for v in cats.values() if v) - elif isinstance(cats, list): - categories = tuple(str(v) for v in cats if v) - else: - categories = () - - rep = attrs.get("reputation") - try: - reputation = int(rep) if rep is not None else None - except Exception: - reputation = None - - return VTInfo( - malicious=malicious, - suspicious=suspicious, - total=total, - reputation=reputation, - categories=categories, - raw=data, ) + return parse_vt_ip(data) except Exception: return VTInfo() - return _sqlite_getset(f"vt_ip:{ip}", _do, max_age=60 * 60 * 24) + tag = _auth_tag(key) + return _sqlite_getset( + f"vt_ip:{tag}:{ip}", + _do, + max_age=_cache_ttl(), + error_max_age=_error_ttl(), + ) + + +def _maxmind_paths() -> Tuple[str, ...]: + p = str(_opt("tke_maxmind_mmdb_path", "") or os.getenv("MAXMIND_MMDB_PATH") or os.getenv("GEOIP_MMDB_PATH") or "") + if p: + return (os.path.expanduser(p),) + + # Default under VD_DIR (visidata_dir); users can drop mmdb there. + vd_dir = str(getattr(vd.options, "visidata_dir", "") or "") + candidates = [] + if vd_dir: + candidates.extend( + [ + os.path.join(vd_dir, "GeoLite2-City.mmdb"), + os.path.join(vd_dir, "GeoLite2-Country.mmdb"), + os.path.join(vd_dir, "GeoIP2-City.mmdb"), + os.path.join(vd_dir, "GeoIP2-Country.mmdb"), + ] + ) + # Common OS locations (best-effort). + candidates.extend( + [ + os.path.expanduser("~/.local/share/GeoIP/GeoLite2-City.mmdb"), + os.path.expanduser("~/.local/share/GeoIP/GeoLite2-Country.mmdb"), + ] + ) + return tuple(dict.fromkeys(candidates)) + + +@functools.lru_cache(maxsize=4) +def _maxmind_reader(path: str): + try: + import geoip2.database # optional dep + + return geoip2.database.Reader(path) + except Exception: + return None + + +@functools.lru_cache(maxsize=4096) +def _maxmind_geo(ip: str) -> GeoInfo: + # Uses local mmdb; no network required. Cache results in sqlite too. + paths = [p for p in _maxmind_paths() if p and os.path.exists(p)] + if not paths: + return GeoInfo(source="") + + path = paths[0] + try: + st = os.stat(path) + sig = f"{path}:{int(st.st_mtime)}:{int(st.st_size)}" + except Exception: + sig = path + + def _do() -> GeoInfo: + reader = _maxmind_reader(path) + if reader is None: + return GeoInfo(source="") + try: + # Try City first, fallback to Country models if the DB doesn't support City. + try: + rec = reader.city(ip) + except Exception: + rec = reader.country(ip) + + raw = getattr(rec, "raw", None) + return parse_geo_maxmind(raw if isinstance(raw, dict) else {}) + except Exception: + return GeoInfo(source="") + + return _sqlite_getset( + f"maxmind:{sig}:{ip}", + _do, + max_age=_cache_ttl(), + error_max_age=_error_ttl(), + ) or GeoInfo(source="") + + +@functools.lru_cache(maxsize=4096) +def _geo_info(ip: str) -> GeoInfo: + # Prefer offline maxmind; fall back to free HTTP services. + mm = _maxmind_geo(ip) + if mm.source: + return mm + + ipinfo = _ipinfo_raw(ip) or {} + if ipinfo: + return parse_geo_ipinfo(ipinfo) + + ipapi = _ipapi_raw(ip) or {} + if ipapi: + return parse_geo_ipapi(ipapi) + + ipwho = _ipwho_raw(ip) or {} + if ipwho and ipwho.get("success", True): + return parse_geo_ipwho(ipwho) + + return GeoInfo(source="") @functools.total_ordering @@ -332,7 +485,7 @@ class IPValue: def ipinfo(self) -> IPInfo: if not self.is_address: return IPInfo() - return IPInfo(_ipinfo_raw(str(self))) + return IPInfo(_ipinfo_raw(str(self)), source="ipinfo") @property def asn(self) -> ASNInfo: @@ -346,6 +499,19 @@ class IPValue: return VTInfo() return _vt_info(str(self)) + @property + def geo(self) -> GeoInfo: + if not self.is_address: + return GeoInfo() + return _geo_info(str(self)) + + @property + def maxmind(self) -> GeoInfo: + # Explicit offline lookup (may be empty if no mmdb available). + if not self.is_address: + return GeoInfo() + return _maxmind_geo(str(self)) + def ip(val: Any) -> Optional[IPValue]: """VisiData type converter: parse IPv4/IPv6 address or CIDR network.""" @@ -381,4 +547,3 @@ TableSheet.addCommand( "cursorCol.type=ip", "set type of current column to IP (IPv4/IPv6/CIDR)", ) - diff --git a/config/visidata/requirements.txt b/config/visidata/requirements.txt index 046a48b..104f321 100644 --- a/config/visidata/requirements.txt +++ b/config/visidata/requirements.txt @@ -16,3 +16,7 @@ dnspython # # Enrichment helpers mac-vendor-lookup + +# Optional offline MaxMind mmdb reader (GeoLite2/GeoIP2) +geoip2 +maxminddb diff --git a/config/visidata/scripts/validate_ip_lookups.py b/config/visidata/scripts/validate_ip_lookups.py new file mode 100644 index 0000000..4683992 --- /dev/null +++ b/config/visidata/scripts/validate_ip_lookups.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +Offline validation for normalization/extraction logic in plugins/iptype.py. + +This intentionally does not perform live network calls (which may be blocked in CI/sandboxes). +""" + +from __future__ import annotations + +import os +import sys + +# Ensure repo root is on sys.path when running as a script. +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from plugins.iplib import ( # noqa: E402 + ASNInfo, + GeoInfo, + IPInfo, + JSONNode, + VTInfo, + parse_asn_ipapi, + parse_asn_ipinfo, + parse_asn_ipwho, + parse_geo_ipapi, + parse_geo_ipinfo, + parse_geo_ipwho, + parse_geo_maxmind, + parse_vt_ip, +) + + +def _assert(cond: bool, msg: str): + if not cond: + raise AssertionError(msg) + + +def main() -> int: + # JSONNode chaining behavior + n = JSONNode({"a": {"b": 1}, "x": None}) + _assert(str(n.a.b) == "1", "JSONNode dict nesting") + _assert(str(n.missing.anything) == "", "JSONNode missing keys should render empty") + _assert(str(n.x.y) == "", "JSONNode None chaining should render empty") + + # ipinfo basic + ipinfo_raw = { + "country": "US", + "region": "California", + "city": "Mountain View", + "org": "AS15169 Google LLC", + "loc": "37.4056,-122.0775", + } + ipi = IPInfo(ipinfo_raw, source="ipinfo") + _assert(ipi.country == "US", "ipinfo.country") + _assert(ipi.asn == "AS15169", "ipinfo.asn derived from org") + _assert(str(ipi.data.country) == "US", "ipinfo.data.country") + + # ASNInfo basics + asn = ASNInfo(asn="AS15169", name="Google LLC", country="US", raw={"org": "AS15169 Google LLC"}, source="ipinfo") + _assert(asn.asn == "AS15169", "asn.asn") + _assert(str(asn.data.org) == "AS15169 Google LLC", "asn.data.org") + + # VTInfo basics + vt = VTInfo(malicious=3, suspicious=1, total=94, categories=("search engine",), raw={"data": "x"}, source="virustotal") + _assert(vt.verdict == "3/94", "vt.verdict") + _assert(vt.category == "search engine", "vt.category") + _assert(vt.type == "search engine", "vt.type alias") + + # Parse helpers + _assert(parse_asn_ipinfo(ipinfo_raw).asn == "AS15169", "parse_asn_ipinfo") + ipapi_raw = {"asn": "AS123", "org": "Example ISP", "country_code": "DE", "country_name": "Germany"} + _assert(parse_asn_ipapi(ipapi_raw).asn == "AS123", "parse_asn_ipapi") + ipwho_raw = {"country_code": "NL", "connection": {"asn": 9009, "isp": "M247"}} + _assert(parse_asn_ipwho(ipwho_raw).asn == "AS9009", "parse_asn_ipwho") + + geo1 = parse_geo_ipinfo(ipinfo_raw) + _assert(geo1.country_code == "US", "parse_geo_ipinfo country_code") + _assert(geo1.lat is not None and geo1.lon is not None, "parse_geo_ipinfo loc") + geo2 = parse_geo_ipapi({"country_code": "DE", "country_name": "Germany", "latitude": 1, "longitude": 2}) + _assert(geo2.country_code == "DE", "parse_geo_ipapi country_code") + geo3 = parse_geo_ipwho({"country_code": "NL", "country": "Netherlands", "latitude": 1, "longitude": 2}) + _assert(geo3.country_code == "NL", "parse_geo_ipwho country_code") + + mm_raw = {"country": {"iso_code": "US", "names": {"en": "United States"}}, "location": {"latitude": 1, "longitude": 2}} + mm = parse_geo_maxmind(mm_raw) + _assert(mm.country_code == "US" and mm.country == "United States", "parse_geo_maxmind") + + vt_raw = { + "data": { + "attributes": { + "last_analysis_stats": {"malicious": 2, "suspicious": 1, "harmless": 10}, + "reputation": 5, + "categories": {"foo": "search engine"}, + } + } + } + vt2 = parse_vt_ip(vt_raw) + _assert(vt2.verdict == "2/13", "parse_vt_ip verdict") + + # GeoInfo basics + geo = GeoInfo(country="United States", country_code="US", region="California", city="Mountain View", lat=1.0, lon=2.0, raw={"country": {"iso_code": "US"}}, source="maxmind") + _assert(geo.country_code == "US", "geo.country_code") + _assert(str(geo.data.country.iso_code) == "US", "geo.data.country.iso_code") + + print("ok") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())