Files
gists/config/visidata/plugins/iplib.py
tobias 49db614262 visidata: enhance IOC plugins with improved lookups and validation
Expand iplib, iptype, and ioc plugins with better caching, throttling,
and lookup logic. Update validation script and showcase journal accordingly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 22:49:49 +01:00

567 lines
16 KiB
Python

"""
Pure-Python helpers for IP lookups.
This module intentionally does not import VisiData so it can be unit-tested with
any Python interpreter.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple
class JSONNode:
"""Recursive dict/list wrapper for attribute access in expressions.
Missing keys/attrs return a JSONNode(None) which renders as empty string.
"""
__slots__ = ("_v",)
def __init__(self, v: Any):
self._v = v
def _wrap(self, v: Any) -> "JSONNode":
return JSONNode(v)
def __getattr__(self, name: str) -> "JSONNode":
if isinstance(self._v, dict):
return self._wrap(self._v.get(name))
return self._wrap(None)
def __getitem__(self, key: Any) -> "JSONNode":
try:
if isinstance(self._v, dict):
return self._wrap(self._v.get(key))
if isinstance(self._v, list) and isinstance(key, int):
return self._wrap(self._v[key] if 0 <= key < len(self._v) else None)
except Exception:
pass
return self._wrap(None)
def get(self, key: Any, default: Any = None) -> "JSONNode":
if isinstance(self._v, dict):
return self._wrap(self._v.get(key, default))
return self._wrap(default)
@property
def raw(self) -> Any:
return self._v
def __bool__(self) -> bool:
return bool(self._v)
def __str__(self) -> str:
return "" if self._v is None else str(self._v)
def __repr__(self) -> str:
return f"JSONNode({self._v!r})"
@dataclass(frozen=True)
class IPInfo:
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
@property
def country(self) -> str:
return str((self.raw or {}).get("country") or "")
@property
def region(self) -> str:
return str((self.raw or {}).get("region") or "")
@property
def city(self) -> str:
return str((self.raw or {}).get("city") or "")
@property
def org(self) -> str:
return str((self.raw or {}).get("org") or "")
@property
def asn(self) -> str:
org = self.org
if org.startswith("AS"):
return org.split(" ", 1)[0]
return ""
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
def __call__(self) -> "IPInfo":
# Allow `ip.ipinfo()` in VisiData expressions.
return self
@dataclass(frozen=True)
class ASNInfo:
asn: str = ""
name: str = ""
country: str = ""
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
def __call__(self) -> "ASNInfo":
return self
@dataclass(frozen=True)
class VTInfo:
object_type: str = ""
malicious: int = 0
suspicious: int = 0
harmless: int = 0
undetected: int = 0
timeout: int = 0
failure: int = 0
confirmed_timeout: int = 0
type_unsupported: int = 0
total: int = 0
reputation: Optional[int] = None
votes_harmless: int = 0
votes_malicious: int = 0
score: Optional[float] = None
categories: Tuple[str, ...] = ()
tags: Tuple[str, ...] = ()
names: Tuple[str, ...] = ()
name: str = ""
asn: str = ""
as_owner: str = ""
continent: str = ""
country: str = ""
network: str = ""
ip: str = ""
ips: Tuple[str, ...] = ()
last_analysis_date: Optional[int] = None
last_modification_date: Optional[int] = None
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
@property
def attrs(self) -> JSONNode:
attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {}
return JSONNode(attrs)
@property
def results(self) -> JSONNode:
return self.attrs.get("last_analysis_results", {})
@property
def stats(self) -> JSONNode:
return self.attrs.get("last_analysis_stats", {})
@property
def verdict(self) -> str:
return "" if self.total <= 0 else f"{self.malicious}/{self.total}"
@property
def ratio(self) -> Optional[float]:
return None if self.total <= 0 else self.malicious / self.total
@property
def confidence(self) -> Optional[float]:
return self.score
@property
def type(self) -> str: # noqa: A003
return self.category
@property
def category(self) -> str:
return self.categories[0] if self.categories else ""
def __getattr__(self, name: str) -> Any:
attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {}
if isinstance(attrs, dict) and name in attrs:
return attrs.get(name)
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
def __call__(self) -> "VTInfo":
return self
@dataclass(frozen=True)
class GeoInfo:
country: str = ""
country_code: str = ""
region: str = ""
city: str = ""
lat: Optional[float] = None
lon: Optional[float] = None
timezone: str = ""
postal: str = ""
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
def __call__(self) -> "GeoInfo":
return self
def _to_float(v: Any) -> Optional[float]:
try:
return float(v) if v is not None else None
except Exception:
return None
def parse_asn_ipinfo(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
org = str(raw.get("org") or "")
cc = str(raw.get("country") or "")
if org.startswith("AS"):
parts = org.split(" ", 1)
asn = parts[0]
name = parts[1] if len(parts) > 1 else ""
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipinfo")
return ASNInfo(raw=raw, source="ipinfo")
def parse_asn_ipapi(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
asn = str(raw.get("asn") or "")
name = str(raw.get("org") or raw.get("organisation") or "")
cc = str(raw.get("country_code") or raw.get("country") or "")
if asn:
if not asn.startswith("AS"):
# ipapi typically already uses "AS123", but normalize if needed.
asn = f"AS{asn}"
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipapi")
return ASNInfo(raw=raw, source="ipapi")
def parse_asn_ipwho(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
conn = raw.get("connection")
if isinstance(conn, dict):
asn = str(conn.get("asn") or "")
name = str(conn.get("isp") or "")
cc = str(raw.get("country_code") or "")
if asn:
if not asn.startswith("AS"):
asn = f"AS{asn}"
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipwho")
return ASNInfo(raw=raw, source="ipwho")
def parse_geo_ipinfo(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
loc = str(raw.get("loc") or "")
lat = lon = None
if "," in loc:
a, b = loc.split(",", 1)
lat = _to_float(a)
lon = _to_float(b)
cc = str(raw.get("country") or "")
return GeoInfo(
country=cc, # ipinfo returns 2-letter country code; keep as-is
country_code=cc,
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=lat,
lon=lon,
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipinfo",
)
def parse_geo_ipapi(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
return GeoInfo(
country=str(raw.get("country_name") or ""),
country_code=str(raw.get("country_code") or raw.get("country") or ""),
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=_to_float(raw.get("latitude")),
lon=_to_float(raw.get("longitude")),
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipapi",
)
def parse_geo_ipwho(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
return GeoInfo(
country=str(raw.get("country") or ""),
country_code=str(raw.get("country_code") or ""),
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=_to_float(raw.get("latitude")),
lon=_to_float(raw.get("longitude")),
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipwho",
)
def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
cc = str(((raw.get("country") or {}).get("iso_code")) or "")
country = str(((raw.get("country") or {}).get("names") or {}).get("en") or "")
city = str(((raw.get("city") or {}).get("names") or {}).get("en") or "")
region = ""
subs = raw.get("subdivisions") or []
if isinstance(subs, list) and subs:
region = str(((subs[0] or {}).get("names") or {}).get("en") or "")
loc = raw.get("location") or {}
lat = _to_float(loc.get("latitude"))
lon = _to_float(loc.get("longitude"))
tz = str(loc.get("time_zone") or "")
postal = str(((raw.get("postal") or {}).get("code")) or "")
return GeoInfo(
country=country,
country_code=cc,
region=region,
city=city,
lat=lat,
lon=lon,
timezone=tz,
postal=postal,
raw=raw,
source="maxmind",
)
def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo:
return _parse_vt(data, object_type="ip")
def parse_vt_domain(data: Optional[Dict[str, Any]]) -> VTInfo:
return _parse_vt(data, object_type="domain")
def parse_vt_url(data: Optional[Dict[str, Any]]) -> VTInfo:
return _parse_vt(data, object_type="url")
def parse_vt_file(data: Optional[Dict[str, Any]]) -> VTInfo:
return _parse_vt(data, object_type="file")
def _safe_int(v: Any, default: Optional[int] = 0) -> Optional[int]:
try:
return int(v)
except Exception:
return default
def _uniq(items: Iterable[str]) -> Tuple[str, ...]:
out: List[str] = []
seen = set()
for x in items:
s = str(x or "").strip()
if not s or s in seen:
continue
seen.add(s)
out.append(s)
return tuple(out)
def _extract_categories(attrs: Dict[str, Any]) -> Tuple[str, ...]:
cats = attrs.get("categories") or {}
if isinstance(cats, dict):
return _uniq(str(v) for v in cats.values() if v)
if isinstance(cats, list):
return _uniq(str(v) for v in cats if v)
return ()
def _extract_names(attrs: Dict[str, Any], object_type: str) -> Tuple[str, ...]:
names: List[str] = []
threat_names = attrs.get("threat_names")
if isinstance(threat_names, list):
names.extend(str(x) for x in threat_names if x)
elif isinstance(threat_names, str) and threat_names:
names.append(threat_names)
ptc = attrs.get("popular_threat_classification") or {}
if isinstance(ptc, dict):
rows = ptc.get("popular_threat_name")
if isinstance(rows, list):
scored = []
for row in rows:
if not isinstance(row, dict):
continue
val = str(row.get("value") or "").strip()
if not val:
continue
scored.append((_safe_int(row.get("count"), 0) or 0, val))
scored.sort(reverse=True)
names.extend(v for _, v in scored)
suggested = str(ptc.get("suggested_threat_label") or "").strip()
if suggested:
names.append(suggested)
verdicts = attrs.get("sandbox_verdicts") or {}
if isinstance(verdicts, dict):
for v in verdicts.values():
if not isinstance(v, dict):
continue
mn = v.get("malware_names")
if isinstance(mn, list):
names.extend(str(x) for x in mn if x)
mc = v.get("malware_classification")
if isinstance(mc, list):
names.extend(str(x) for x in mc if x)
if object_type == "file":
meaningful = str(attrs.get("meaningful_name") or "").strip()
if meaningful:
names.append(meaningful)
return _uniq(names)
def _extract_ips(attrs: Dict[str, Any]) -> Tuple[str, ...]:
ips: List[str] = []
records = attrs.get("last_dns_records")
if isinstance(records, list):
for rec in records:
if not isinstance(rec, dict):
continue
rtype = str(rec.get("type") or "").upper()
if rtype not in ("A", "AAAA"):
continue
val = str(rec.get("value") or "").strip()
if val:
ips.append(val)
lserv = attrs.get("last_serving_ip_address")
if isinstance(lserv, dict):
val = str(lserv.get("id") or lserv.get("value") or "").strip()
if val:
ips.append(val)
elif isinstance(lserv, str) and lserv:
ips.append(lserv)
return _uniq(ips)
def _parse_vt(data: Optional[Dict[str, Any]], *, object_type: str) -> VTInfo:
data = data or {}
attrs = (
((data.get("data") or {}).get("attributes") or {})
if isinstance(data, dict)
else {}
)
stats = attrs.get("last_analysis_stats") or {}
malicious = _safe_int(stats.get("malicious"), 0) or 0
suspicious = _safe_int(stats.get("suspicious"), 0) or 0
harmless = _safe_int(stats.get("harmless"), 0) or 0
undetected = _safe_int(stats.get("undetected"), 0) or 0
timeout = _safe_int(stats.get("timeout"), 0) or 0
failure = _safe_int(stats.get("failure"), 0) or 0
confirmed_timeout = _safe_int(stats.get("confirmed-timeout"), 0) or 0
type_unsupported = _safe_int(stats.get("type-unsupported"), 0) or 0
try:
total = int(sum(_safe_int(v, 0) or 0 for v in stats.values()))
except Exception:
total = 0
categories = _extract_categories(attrs)
names = _extract_names(attrs, object_type=object_type)
tags = _uniq(str(x) for x in (attrs.get("tags") or []) if x)
ips = _extract_ips(attrs)
rep = _safe_int(attrs.get("reputation"), None)
votes = attrs.get("total_votes") or {}
votes_harmless = 0
votes_malicious = 0
if isinstance(votes, dict):
votes_harmless = _safe_int(votes.get("harmless"), 0) or 0
votes_malicious = _safe_int(votes.get("malicious"), 0) or 0
asn_raw = attrs.get("asn")
asn = ""
if asn_raw is not None:
sval = str(asn_raw).strip()
if sval:
asn = sval if sval.upper().startswith("AS") else f"AS{sval}"
ip = ""
for x in ips:
if "." in x:
ip = x
break
if not ip and ips:
ip = ips[0]
score = None if total <= 0 else malicious / total
name = names[0] if names else ""
la = _safe_int(attrs.get("last_analysis_date"), None)
lm = _safe_int(attrs.get("last_modification_date"), None)
return VTInfo(
object_type=object_type,
malicious=malicious,
suspicious=suspicious,
harmless=harmless,
undetected=undetected,
timeout=timeout,
failure=failure,
confirmed_timeout=confirmed_timeout,
type_unsupported=type_unsupported,
total=total,
reputation=rep,
votes_harmless=votes_harmless,
votes_malicious=votes_malicious,
score=score,
categories=categories,
tags=tags,
names=names,
name=name,
asn=asn,
as_owner=str(attrs.get("as_owner") or ""),
continent=str(attrs.get("continent") or ""),
country=str(attrs.get("country") or ""),
network=str(attrs.get("network") or ""),
ip=ip,
ips=ips,
last_analysis_date=la,
last_modification_date=lm,
raw=data,
source="virustotal",
)