""" Pure-Python helpers for IP lookups. This module intentionally does not import VisiData so it can be unit-tested with any Python interpreter. """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple class JSONNode: """Recursive dict/list wrapper for attribute access in expressions. Missing keys/attrs return a JSONNode(None) which renders as empty string. """ __slots__ = ("_v",) def __init__(self, v: Any): self._v = v def _wrap(self, v: Any) -> "JSONNode": return JSONNode(v) def __getattr__(self, name: str) -> "JSONNode": if isinstance(self._v, dict): return self._wrap(self._v.get(name)) return self._wrap(None) def __getitem__(self, key: Any) -> "JSONNode": try: if isinstance(self._v, dict): return self._wrap(self._v.get(key)) if isinstance(self._v, list) and isinstance(key, int): return self._wrap(self._v[key] if 0 <= key < len(self._v) else None) except Exception: pass return self._wrap(None) def get(self, key: Any, default: Any = None) -> "JSONNode": if isinstance(self._v, dict): return self._wrap(self._v.get(key, default)) return self._wrap(default) @property def raw(self) -> Any: return self._v def __bool__(self) -> bool: return bool(self._v) def __str__(self) -> str: return "" if self._v is None else str(self._v) def __repr__(self) -> str: return f"JSONNode({self._v!r})" @dataclass(frozen=True) class IPInfo: raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) @property def country(self) -> str: return str((self.raw or {}).get("country") or "") @property def region(self) -> str: return str((self.raw or {}).get("region") or "") @property def city(self) -> str: return str((self.raw or {}).get("city") or "") @property def org(self) -> str: return str((self.raw or {}).get("org") or "") @property def asn(self) -> str: org = self.org if org.startswith("AS"): return org.split(" ", 1)[0] return "" def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) @dataclass(frozen=True) class ASNInfo: asn: str = "" name: str = "" country: str = "" raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) @dataclass(frozen=True) class VTInfo: malicious: int = 0 suspicious: int = 0 total: int = 0 reputation: Optional[int] = None categories: Tuple[str, ...] = () raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) @property def verdict(self) -> str: return "" if self.total <= 0 else f"{self.malicious}/{self.total}" @property def ratio(self) -> Optional[float]: return None if self.total <= 0 else self.malicious / self.total @property def type(self) -> str: # noqa: A003 return self.category @property def category(self) -> str: return self.categories[0] if self.categories else "" def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) @dataclass(frozen=True) class GeoInfo: country: str = "" country_code: str = "" region: str = "" city: str = "" lat: Optional[float] = None lon: Optional[float] = None timezone: str = "" postal: str = "" raw: Optional[Dict[str, Any]] = None source: str = "" @property def data(self) -> JSONNode: return JSONNode(self.raw) def __getattr__(self, name: str) -> Any: if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) def _to_float(v: Any) -> Optional[float]: try: return float(v) if v is not None else None except Exception: return None def parse_asn_ipinfo(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} org = str(raw.get("org") or "") cc = str(raw.get("country") or "") if org.startswith("AS"): parts = org.split(" ", 1) asn = parts[0] name = parts[1] if len(parts) > 1 else "" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipinfo") return ASNInfo(raw=raw, source="ipinfo") def parse_asn_ipapi(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} asn = str(raw.get("asn") or "") name = str(raw.get("org") or raw.get("organisation") or "") cc = str(raw.get("country_code") or raw.get("country") or "") if asn: if not asn.startswith("AS"): # ipapi typically already uses "AS123", but normalize if needed. asn = f"AS{asn}" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipapi") return ASNInfo(raw=raw, source="ipapi") def parse_asn_ipwho(raw: Optional[Dict[str, Any]]) -> ASNInfo: raw = raw or {} conn = raw.get("connection") if isinstance(conn, dict): asn = str(conn.get("asn") or "") name = str(conn.get("isp") or "") cc = str(raw.get("country_code") or "") if asn: if not asn.startswith("AS"): asn = f"AS{asn}" return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipwho") return ASNInfo(raw=raw, source="ipwho") def parse_geo_ipinfo(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} loc = str(raw.get("loc") or "") lat = lon = None if "," in loc: a, b = loc.split(",", 1) lat = _to_float(a) lon = _to_float(b) cc = str(raw.get("country") or "") return GeoInfo( country=cc, # ipinfo returns 2-letter country code; keep as-is country_code=cc, region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=lat, lon=lon, timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipinfo", ) def parse_geo_ipapi(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} return GeoInfo( country=str(raw.get("country_name") or ""), country_code=str(raw.get("country_code") or raw.get("country") or ""), region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=_to_float(raw.get("latitude")), lon=_to_float(raw.get("longitude")), timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipapi", ) def parse_geo_ipwho(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} return GeoInfo( country=str(raw.get("country") or ""), country_code=str(raw.get("country_code") or ""), region=str(raw.get("region") or ""), city=str(raw.get("city") or ""), lat=_to_float(raw.get("latitude")), lon=_to_float(raw.get("longitude")), timezone=str(raw.get("timezone") or ""), postal=str(raw.get("postal") or ""), raw=raw, source="ipwho", ) def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo: raw = raw or {} cc = str(((raw.get("country") or {}).get("iso_code")) or "") country = str(((raw.get("country") or {}).get("names") or {}).get("en") or "") city = str(((raw.get("city") or {}).get("names") or {}).get("en") or "") region = "" subs = raw.get("subdivisions") or [] if isinstance(subs, list) and subs: region = str(((subs[0] or {}).get("names") or {}).get("en") or "") loc = raw.get("location") or {} lat = _to_float(loc.get("latitude")) lon = _to_float(loc.get("longitude")) tz = str(loc.get("time_zone") or "") postal = str(((raw.get("postal") or {}).get("code")) or "") return GeoInfo( country=country, country_code=cc, region=region, city=city, lat=lat, lon=lon, timezone=tz, postal=postal, raw=raw, source="maxmind", ) def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo: data = data or {} attrs = (((data.get("data") or {}).get("attributes") or {}) if isinstance(data, dict) else {}) stats = attrs.get("last_analysis_stats") or {} try: malicious = int(stats.get("malicious") or 0) except Exception: malicious = 0 try: suspicious = int(stats.get("suspicious") or 0) except Exception: suspicious = 0 try: total = int(sum(int(v or 0) for v in stats.values())) except Exception: total = 0 cats = attrs.get("categories") or {} if isinstance(cats, dict): categories = tuple(str(v) for v in cats.values() if v) elif isinstance(cats, list): categories = tuple(str(v) for v in cats if v) else: categories = () rep = attrs.get("reputation") try: reputation = int(rep) if rep is not None else None except Exception: reputation = None return VTInfo( malicious=malicious, suspicious=suspicious, total=total, reputation=reputation, categories=categories, raw=data, source="virustotal", )