""" Pure-Python helpers for IP lookups. This module intentionally does not import VisiData so it can be unit-tested with any Python interpreter. """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional, Tuple class JSONNode: """Recursive dict/list wrapper for attribute access in expressions. Missing keys/attrs return a JSONNode(None) which renders as empty string. """ __slots__ = ("_v",) def __init__(self, v: Any): self._v = v def _wrap(self, v: Any) -> "JSONNode": return JSONNode(v) def __getattr__(self, name: str) -> "JSONNode": if isinstance(self._v, dict): return self._wrap(self._v.get(name)) return self._wrap(None) def __getitem__(self, key: Any) -> "JSONNode": try: if isinstance(self._v, dict): return self._wrap(self._v.get(key)) if isinstance(self._v, list) and isinstance(key, int): return self._wrap(self._v[key] if 0 <= key < len(self._v) else None) except Exception: pass return self._wrap(None) def get(self, key: Any, default: Any = None) -> "JSONNode": if isinstance(self._v, dict): return self._wrap(self._v.get(key, default)) return self._wrap(default) @property def raw(self) -> Any: return self._v def __bool__(self) -> bool: return bool(self._v) def __str__(self) -> str: return "" if self._v is None else str(self._v) def __repr__(self) -> str: return f"JSONNode({self._v!r})" @dataclass(frozen=True) class IPInfo: raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) @property def country(self) -> str: return str((self.raw or {}).get("country") or "") @property def region(self) -> str: return str((self.raw or {}).get("region") or "") @property def city(self) -> str: return str((self.raw or {}).get("city") or "") @property def org(self) -> str: return str((self.raw or {}).get("org") or "") @property def asn(self) -> str: org = self.org if org.startswith("AS"): return org.split(" ", 1)[0] return "" def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) def __call__(self) -> "IPInfo": # Allow `ip.ipinfo()` in VisiData expressions. return self @dataclass(frozen=True) class ASNInfo: asn: str = "" name: str = "" country: str = "" raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) def __call__(self) -> "ASNInfo": return self @dataclass(frozen=True) class VTInfo: object_type: str = "" malicious: int = 0 suspicious: int = 0 harmless: int = 0 undetected: int = 0 timeout: int = 0 failure: int = 0 confirmed_timeout: int = 0 type_unsupported: int = 0 total: int = 0 reputation: Optional[int] = None votes_harmless: int = 0 votes_malicious: int = 0 score: Optional[float] = None categories: Tuple[str, ...] = () tags: Tuple[str, ...] = () names: Tuple[str, ...] = () name: str = "" asn: str = "" as_owner: str = "" continent: str = "" country: str = "" network: str = "" ip: str = "" ips: Tuple[str, ...] = () last_analysis_date: Optional[int] = None last_modification_date: Optional[int] = None raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) @property def attrs(self) -> JSONNode: attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {} return JSONNode(attrs) @property def results(self) -> JSONNode: return self.attrs.get("last_analysis_results", {}) @property def stats(self) -> JSONNode: return self.attrs.get("last_analysis_stats", {}) @property def verdict(self) -> str: return "" if self.total <= 0 else f"{self.malicious}/{self.total}" @property def ratio(self) -> Optional[float]: return None if self.total <= 0 else self.malicious / self.total @property def confidence(self) -> Optional[float]: return self.score @property def type(self) -> str: # noqa: A003 return self.category @property def category(self) -> str: return self.categories[0] if self.categories else "" def __getattr__(self, name: str) -> Any: attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {} if isinstance(attrs, dict) and name in attrs: return attrs.get(name) if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) def __call__(self) -> "VTInfo": return self @dataclass(frozen=True) class GeoInfo: country: str = "" country_code: str = "" region: str = "" city: str = "" lat: Optional[float] = None lon: Optional[float] = None timezone: str = "" postal: str = "" raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) def __call__(self) -> "GeoInfo": return self def _to_float(v: Any) -> Optional[float]: try: return float(v) if v is not None else None except Exception: return None def parse_asn_ipinfo(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} org = str(raw.get("org") or "") cc = str(raw.get("country") or "") if org.startswith("AS"): parts = org.split(" ", 1) asn = parts[0] name = parts[1] if len(parts) > 1 else "" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipinfo") return ASNInfo(raw=raw, source="ipinfo") def parse_asn_ipapi(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} asn = str(raw.get("asn") or "") name = str(raw.get("org") or raw.get("organisation") or "") cc = str(raw.get("country_code") or raw.get("country") or "") if asn: if not asn.startswith("AS"): # ipapi typically already uses "AS123", but normalize if needed. asn = f"AS{asn}" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipapi") return ASNInfo(raw=raw, source="ipapi") def parse_asn_ipwho(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} conn = raw.get("connection") if isinstance(conn, dict): asn = str(conn.get("asn") or "") name = str(conn.get("isp") or "") cc = str(raw.get("country_code") or "") if asn: if not asn.startswith("AS"): asn = f"AS{asn}" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipwho") return ASNInfo(raw=raw, source="ipwho") def parse_geo_ipinfo(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} loc = str(raw.get("loc") or "") lat = lon = None if "," in loc: a, b = loc.split(",", 1) lat = _to_float(a) lon = _to_float(b) cc = str(raw.get("country") or "") return GeoInfo( country=cc, # ipinfo returns 2-letter country code; keep as-is country_code=cc, region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=lat, lon=lon, timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipinfo", ) def parse_geo_ipapi(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} return GeoInfo( country=str(raw.get("country_name") or ""), country_code=str(raw.get("country_code") or raw.get("country") or ""), region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=_to_float(raw.get("latitude")), lon=_to_float(raw.get("longitude")), timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipapi", ) def parse_geo_ipwho(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} return GeoInfo( country=str(raw.get("country") or ""), country_code=str(raw.get("country_code") or ""), region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=_to_float(raw.get("latitude")), lon=_to_float(raw.get("longitude")), timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipwho", ) def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} cc = str(((raw.get("country") or {}).get("iso_code")) or "") country = str(((raw.get("country") or {}).get("names") or {}).get("en") or "") city = str(((raw.get("city") or {}).get("names") or {}).get("en") or "") region = "" subs = raw.get("subdivisions") or [] if isinstance(subs, list) and subs: region = str(((subs[0] or {}).get("names") or {}).get("en") or "") loc = raw.get("location") or {} lat = _to_float(loc.get("latitude")) lon = _to_float(loc.get("longitude")) tz = str(loc.get("time_zone") or "") postal = str(((raw.get("postal") or {}).get("code")) or "") return GeoInfo( country=country, country_code=cc, region=region, city=city, lat=lat, lon=lon, timezone=tz, postal=postal, raw=raw, source="maxmind", ) def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo: return _parse_vt(data, object_type="ip") def parse_vt_domain(data: Optional[Dict[str, Any]]) -> VTInfo: return _parse_vt(data, object_type="domain") def parse_vt_url(data: Optional[Dict[str, Any]]) -> VTInfo: return _parse_vt(data, object_type="url") def parse_vt_file(data: Optional[Dict[str, Any]]) -> VTInfo: return _parse_vt(data, object_type="file") def _safe_int(v: Any, default: Optional[int] = 0) -> Optional[int]: try: return int(v) except Exception: return default def _uniq(items: Iterable[str]) -> Tuple[str, ...]: out: List[str] = [] seen = set() for x in items: s = str(x or "").strip() if not s or s in seen: continue seen.add(s) out.append(s) return tuple(out) def _extract_categories(attrs: Dict[str, Any]) -> Tuple[str, ...]: cats = attrs.get("categories") or {} if isinstance(cats, dict): return _uniq(str(v) for v in cats.values() if v) if isinstance(cats, list): return _uniq(str(v) for v in cats if v) return () def _extract_names(attrs: Dict[str, Any], object_type: str) -> Tuple[str, ...]: names: List[str] = [] threat_names = attrs.get("threat_names") if isinstance(threat_names, list): names.extend(str(x) for x in threat_names if x) elif isinstance(threat_names, str) and threat_names: names.append(threat_names) ptc = attrs.get("popular_threat_classification") or {} if isinstance(ptc, dict): rows = ptc.get("popular_threat_name") if isinstance(rows, list): scored = [] for row in rows: if not isinstance(row, dict): continue val = str(row.get("value") or "").strip() if not val: continue scored.append((_safe_int(row.get("count"), 0) or 0, val)) scored.sort(reverse=True) names.extend(v for _, v in scored) suggested = str(ptc.get("suggested_threat_label") or "").strip() if suggested: names.append(suggested) verdicts = attrs.get("sandbox_verdicts") or {} if isinstance(verdicts, dict): for v in verdicts.values(): if not isinstance(v, dict): continue mn = v.get("malware_names") if isinstance(mn, list): names.extend(str(x) for x in mn if x) mc = v.get("malware_classification") if isinstance(mc, list): names.extend(str(x) for x in mc if x) if object_type == "file": meaningful = str(attrs.get("meaningful_name") or "").strip() if meaningful: names.append(meaningful) return _uniq(names) def _extract_ips(attrs: Dict[str, Any]) -> Tuple[str, ...]: ips: List[str] = [] records = attrs.get("last_dns_records") if isinstance(records, list): for rec in records: if not isinstance(rec, dict): continue rtype = str(rec.get("type") or "").upper() if rtype not in ("A", "AAAA"): continue val = str(rec.get("value") or "").strip() if val: ips.append(val) lserv = attrs.get("last_serving_ip_address") if isinstance(lserv, dict): val = str(lserv.get("id") or lserv.get("value") or "").strip() if val: ips.append(val) elif isinstance(lserv, str) and lserv: ips.append(lserv) return _uniq(ips) def _parse_vt(data: Optional[Dict[str, Any]], *, object_type: str) -> VTInfo: data = data or {} attrs = ( ((data.get("data") or {}).get("attributes") or {}) if isinstance(data, dict) else {} ) stats = attrs.get("last_analysis_stats") or {} malicious = _safe_int(stats.get("malicious"), 0) or 0 suspicious = _safe_int(stats.get("suspicious"), 0) or 0 harmless = _safe_int(stats.get("harmless"), 0) or 0 undetected = _safe_int(stats.get("undetected"), 0) or 0 timeout = _safe_int(stats.get("timeout"), 0) or 0 failure = _safe_int(stats.get("failure"), 0) or 0 confirmed_timeout = _safe_int(stats.get("confirmed-timeout"), 0) or 0 type_unsupported = _safe_int(stats.get("type-unsupported"), 0) or 0 try: total = int(sum(_safe_int(v, 0) or 0 for v in stats.values())) except Exception: total = 0 categories = _extract_categories(attrs) names = _extract_names(attrs, object_type=object_type) tags = _uniq(str(x) for x in (attrs.get("tags") or []) if x) ips = _extract_ips(attrs) rep = _safe_int(attrs.get("reputation"), None) votes = attrs.get("total_votes") or {} votes_harmless = 0 votes_malicious = 0 if isinstance(votes, dict): votes_harmless = _safe_int(votes.get("harmless"), 0) or 0 votes_malicious = _safe_int(votes.get("malicious"), 0) or 0 asn_raw = attrs.get("asn") asn = "" if asn_raw is not None: sval = str(asn_raw).strip() if sval: asn = sval if sval.upper().startswith("AS") else f"AS{sval}" ip = "" for x in ips: if "." in x: ip = x break if not ip and ips: ip = ips[0] score = None if total <= 0 else malicious / total name = names[0] if names else "" la = _safe_int(attrs.get("last_analysis_date"), None) lm = _safe_int(attrs.get("last_modification_date"), None) return VTInfo( object_type=object_type, malicious=malicious, suspicious=suspicious, harmless=harmless, undetected=undetected, timeout=timeout, failure=failure, confirmed_timeout=confirmed_timeout, type_unsupported=type_unsupported, total=total, reputation=rep, votes_harmless=votes_harmless, votes_malicious=votes_malicious, score=score, categories=categories, tags=tags, names=names, name=name, asn=asn, as_owner=str(attrs.get("as_owner") or ""), continent=str(attrs.get("continent") or ""), country=str(attrs.get("country") or ""), network=str(attrs.get("network") or ""), ip=ip, ips=ips, last_analysis_date=la, last_modification_date=lm, raw=data, source="virustotal", )