visidata: improve IP lookups (cached, keys, maxmind)

This commit is contained in:
tobias
2026-02-15 18:00:14 +01:00
parent 15934ec4a1
commit 8d40bcc3f9
4 changed files with 765 additions and 141 deletions

View File

@@ -0,0 +1,345 @@
"""
Pure-Python helpers for IP lookups.
This module intentionally does not import VisiData so it can be unit-tested with
any Python interpreter.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
class JSONNode:
"""Recursive dict/list wrapper for attribute access in expressions.
Missing keys/attrs return a JSONNode(None) which renders as empty string.
"""
__slots__ = ("_v",)
def __init__(self, v: Any):
self._v = v
def _wrap(self, v: Any) -> "JSONNode":
return JSONNode(v)
def __getattr__(self, name: str) -> "JSONNode":
if isinstance(self._v, dict):
return self._wrap(self._v.get(name))
return self._wrap(None)
def __getitem__(self, key: Any) -> "JSONNode":
try:
if isinstance(self._v, dict):
return self._wrap(self._v.get(key))
if isinstance(self._v, list) and isinstance(key, int):
return self._wrap(self._v[key] if 0 <= key < len(self._v) else None)
except Exception:
pass
return self._wrap(None)
def get(self, key: Any, default: Any = None) -> "JSONNode":
if isinstance(self._v, dict):
return self._wrap(self._v.get(key, default))
return self._wrap(default)
@property
def raw(self) -> Any:
return self._v
def __bool__(self) -> bool:
return bool(self._v)
def __str__(self) -> str:
return "" if self._v is None else str(self._v)
def __repr__(self) -> str:
return f"JSONNode({self._v!r})"
@dataclass(frozen=True)
class IPInfo:
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
@property
def country(self) -> str:
return str((self.raw or {}).get("country") or "")
@property
def region(self) -> str:
return str((self.raw or {}).get("region") or "")
@property
def city(self) -> str:
return str((self.raw or {}).get("city") or "")
@property
def org(self) -> str:
return str((self.raw or {}).get("org") or "")
@property
def asn(self) -> str:
org = self.org
if org.startswith("AS"):
return org.split(" ", 1)[0]
return ""
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
@dataclass(frozen=True)
class ASNInfo:
asn: str = ""
name: str = ""
country: str = ""
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
@dataclass(frozen=True)
class VTInfo:
malicious: int = 0
suspicious: int = 0
total: int = 0
reputation: Optional[int] = None
categories: Tuple[str, ...] = ()
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
@property
def verdict(self) -> str:
return "" if self.total <= 0 else f"{self.malicious}/{self.total}"
@property
def ratio(self) -> Optional[float]:
return None if self.total <= 0 else self.malicious / self.total
@property
def type(self) -> str: # noqa: A003
return self.category
@property
def category(self) -> str:
return self.categories[0] if self.categories else ""
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
@dataclass(frozen=True)
class GeoInfo:
country: str = ""
country_code: str = ""
region: str = ""
city: str = ""
lat: Optional[float] = None
lon: Optional[float] = None
timezone: str = ""
postal: str = ""
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
def __getattr__(self, name: str) -> Any:
if isinstance(self.raw, dict) and name in self.raw:
return self.raw.get(name)
raise AttributeError(name)
def _to_float(v: Any) -> Optional[float]:
try:
return float(v) if v is not None else None
except Exception:
return None
def parse_asn_ipinfo(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
org = str(raw.get("org") or "")
cc = str(raw.get("country") or "")
if org.startswith("AS"):
parts = org.split(" ", 1)
asn = parts[0]
name = parts[1] if len(parts) > 1 else ""
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipinfo")
return ASNInfo(raw=raw, source="ipinfo")
def parse_asn_ipapi(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
asn = str(raw.get("asn") or "")
name = str(raw.get("org") or raw.get("organisation") or "")
cc = str(raw.get("country_code") or raw.get("country") or "")
if asn:
if not asn.startswith("AS"):
# ipapi typically already uses "AS123", but normalize if needed.
asn = f"AS{asn}"
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipapi")
return ASNInfo(raw=raw, source="ipapi")
def parse_asn_ipwho(raw: Optional[Dict[str, Any]]) -> ASNInfo:
raw = raw or {}
conn = raw.get("connection")
if isinstance(conn, dict):
asn = str(conn.get("asn") or "")
name = str(conn.get("isp") or "")
cc = str(raw.get("country_code") or "")
if asn:
if not asn.startswith("AS"):
asn = f"AS{asn}"
return ASNInfo(asn=asn, name=name, country=cc, raw=raw, source="ipwho")
return ASNInfo(raw=raw, source="ipwho")
def parse_geo_ipinfo(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
loc = str(raw.get("loc") or "")
lat = lon = None
if "," in loc:
a, b = loc.split(",", 1)
lat = _to_float(a)
lon = _to_float(b)
cc = str(raw.get("country") or "")
return GeoInfo(
country=cc, # ipinfo returns 2-letter country code; keep as-is
country_code=cc,
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=lat,
lon=lon,
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipinfo",
)
def parse_geo_ipapi(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
return GeoInfo(
country=str(raw.get("country_name") or ""),
country_code=str(raw.get("country_code") or raw.get("country") or ""),
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=_to_float(raw.get("latitude")),
lon=_to_float(raw.get("longitude")),
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipapi",
)
def parse_geo_ipwho(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
return GeoInfo(
country=str(raw.get("country") or ""),
country_code=str(raw.get("country_code") or ""),
region=str(raw.get("region") or ""),
city=str(raw.get("city") or ""),
lat=_to_float(raw.get("latitude")),
lon=_to_float(raw.get("longitude")),
timezone=str(raw.get("timezone") or ""),
postal=str(raw.get("postal") or ""),
raw=raw,
source="ipwho",
)
def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo:
raw = raw or {}
cc = str(((raw.get("country") or {}).get("iso_code")) or "")
country = str(((raw.get("country") or {}).get("names") or {}).get("en") or "")
city = str(((raw.get("city") or {}).get("names") or {}).get("en") or "")
region = ""
subs = raw.get("subdivisions") or []
if isinstance(subs, list) and subs:
region = str(((subs[0] or {}).get("names") or {}).get("en") or "")
loc = raw.get("location") or {}
lat = _to_float(loc.get("latitude"))
lon = _to_float(loc.get("longitude"))
tz = str(loc.get("time_zone") or "")
postal = str(((raw.get("postal") or {}).get("code")) or "")
return GeoInfo(
country=country,
country_code=cc,
region=region,
city=city,
lat=lat,
lon=lon,
timezone=tz,
postal=postal,
raw=raw,
source="maxmind",
)
def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo:
data = data or {}
attrs = (((data.get("data") or {}).get("attributes") or {}) if isinstance(data, dict) else {})
stats = attrs.get("last_analysis_stats") or {}
try:
malicious = int(stats.get("malicious") or 0)
except Exception:
malicious = 0
try:
suspicious = int(stats.get("suspicious") or 0)
except Exception:
suspicious = 0
try:
total = int(sum(int(v or 0) for v in stats.values()))
except Exception:
total = 0
cats = attrs.get("categories") or {}
if isinstance(cats, dict):
categories = tuple(str(v) for v in cats.values() if v)
elif isinstance(cats, list):
categories = tuple(str(v) for v in cats if v)
else:
categories = ()
rep = attrs.get("reputation")
try:
reputation = int(rep) if rep is not None else None
except Exception:
reputation = None
return VTInfo(
malicious=malicious,
suspicious=suspicious,
total=total,
reputation=reputation,
categories=categories,
raw=data,
source="virustotal",
)

View File

@@ -15,8 +15,8 @@ Network calls are optional; when deps/keys are missing, properties return empty
from __future__ import annotations
from dataclasses import dataclass
import functools
import hashlib
import os
import pickle
import sqlite3
@@ -28,15 +28,78 @@ import ipaddress
from visidata import vd
from visidata.sheets import TableSheet
_CACHE_PATH = os.path.expanduser("~/.visidata_cache.db")
from .iplib import (
ASNInfo,
GeoInfo,
IPInfo,
JSONNode,
VTInfo,
parse_asn_ipapi,
parse_asn_ipinfo,
parse_asn_ipwho,
parse_geo_ipapi,
parse_geo_ipinfo,
parse_geo_ipwho,
parse_geo_maxmind,
parse_vt_ip,
)
def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None):
"""Tiny sqlite+pickle cache. Falls back to computing if db can't be used."""
vd.option(
"tke_cache_db_path",
os.path.expanduser("~/.visidata_cache.db"),
"sqlite cache db path for local lookups (pickle-serialized)",
sheettype=None,
)
vd.option("tke_lookup_cache_ttl", 60 * 60 * 24, "lookup cache ttl in seconds", sheettype=None)
vd.option(
"tke_lookup_error_ttl",
5 * 60,
"cache ttl in seconds for failed lookups (to avoid tight loops)",
sheettype=None,
)
vd.option("tke_lookup_timeout", 10, "HTTP lookup timeout in seconds", sheettype=None)
vd.option("tke_ipinfo_token", "", "ipinfo token (optional)", sheettype=None)
vd.option("tke_ipapi_key", "", "ipapi.co API key (optional)", sheettype=None)
vd.option("tke_vt_api_key", "", "VirusTotal API key (required for VT lookups)", sheettype=None)
vd.option(
"tke_maxmind_mmdb_path",
"",
"path to GeoLite2/GeoIP2 .mmdb file for offline MaxMind lookups",
sheettype=None,
)
def _opt(name: str, default: Any = "") -> Any:
try:
os.makedirs(os.path.dirname(_CACHE_PATH), exist_ok=True)
with sqlite3.connect(_CACHE_PATH) as conn:
return getattr(vd.options, name)
except Exception:
return default
def _cache_path() -> str:
p = str(_opt("tke_cache_db_path", "") or os.path.expanduser("~/.visidata_cache.db"))
return os.path.expanduser(p)
def _auth_tag(secret: str) -> str:
if not secret:
return "noauth"
return hashlib.sha256(secret.encode("utf-8")).hexdigest()[:12]
def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None, error_max_age: Optional[int] = None):
"""Tiny sqlite+pickle cache. Falls back to computing if db can't be used.
`key` should NOT contain secrets; include `_auth_tag()` instead.
"""
try:
path = _cache_path()
os.makedirs(os.path.dirname(path), exist_ok=True)
with sqlite3.connect(path, timeout=2) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute(
"CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)"
)
@@ -46,8 +109,13 @@ def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None):
now = int(time.time())
if row:
val_blob, ts = row
if max_age is None or now - int(ts) <= max_age:
return pickle.loads(val_blob)
cached_val = pickle.loads(val_blob)
age = now - int(ts)
ttl = max_age
if cached_val is None and error_max_age is not None:
ttl = error_max_age
if ttl is None or age <= int(ttl):
return cached_val
val = fn()
cur.execute(
"INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)",
@@ -63,165 +131,250 @@ def _is_nullish(v: Any) -> bool:
return v is None or v == "" or v == "null"
@dataclass(frozen=True)
class IPInfo:
raw: Optional[Dict[str, Any]] = None
@property
def country(self) -> str:
return str((self.raw or {}).get("country") or "")
@property
def region(self) -> str:
return str((self.raw or {}).get("region") or "")
@property
def city(self) -> str:
return str((self.raw or {}).get("city") or "")
@property
def org(self) -> str:
return str((self.raw or {}).get("org") or "")
@property
def asn(self) -> str:
# ipinfo "org" is often "AS12345 Some Org"
org = self.org
if org.startswith("AS"):
tok = org.split(" ", 1)[0]
return tok
return ""
@dataclass(frozen=True)
class ASNInfo:
asn: str = ""
name: str = ""
country: str = ""
raw: Optional[Dict[str, Any]] = None
@dataclass(frozen=True)
class VTInfo:
malicious: int = 0
suspicious: int = 0
total: int = 0
reputation: Optional[int] = None
categories: Tuple[str, ...] = ()
raw: Optional[Dict[str, Any]] = None
@property
def verdict(self) -> str:
if self.total <= 0:
return ""
return f"{self.malicious}/{self.total}"
@property
def ratio(self) -> Optional[float]:
if self.total <= 0:
return None
return self.malicious / self.total
# User asked for vt.type; "category" is a better name, but keep alias.
@property
def type(self) -> str: # noqa: A003 - intentional API
return self.category
@property
def category(self) -> str:
return self.categories[0] if self.categories else ""
def _read_vt_key() -> str:
def _read_key_from_file(path: str) -> str:
try:
with open(os.path.expanduser("~/.virustotal_api_key")) as f:
with open(os.path.expanduser(path)) as f:
return f.readline().strip()
except Exception:
return ""
def _ipinfo_token() -> str:
return str(_opt("tke_ipinfo_token", "") or os.getenv("IPINFO_TOKEN") or "")
def _ipapi_key() -> str:
return str(_opt("tke_ipapi_key", "") or os.getenv("IPAPI_KEY") or "")
def _vt_key() -> str:
return str(
_opt("tke_vt_api_key", "")
or os.getenv("VT_API_KEY")
or os.getenv("VIRUSTOTAL_API_KEY")
or _read_key_from_file("~/.virustotal_api_key")
or ""
)
def _http_timeout() -> int:
try:
return int(_opt("tke_lookup_timeout", 10))
except Exception:
return 10
def _cache_ttl() -> int:
try:
return int(_opt("tke_lookup_cache_ttl", 60 * 60 * 24))
except Exception:
return 60 * 60 * 24
def _error_ttl() -> int:
try:
return int(_opt("tke_lookup_error_ttl", 5 * 60))
except Exception:
return 5 * 60
def _http_get_json(url: str, *, headers: Optional[Dict[str, str]] = None) -> Optional[Dict[str, Any]]:
try:
import requests # optional dep
r = requests.get(url, headers=headers, timeout=_http_timeout())
if not r.ok:
return None
return r.json()
except Exception:
return None
@functools.lru_cache(maxsize=4096)
def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]:
def _do():
try:
import requests # optional dep
token = _ipinfo_token()
tag = _auth_tag(token)
url = f"http://ipinfo.io/{ip}/json"
if token:
url = f"{url}?token={token}"
r = requests.get(f"http://ipinfo.io/{ip}/json", timeout=5)
return r.json()
except Exception:
return None
return _sqlite_getset(
f"ipinfo:{tag}:{ip}",
lambda: _http_get_json(url),
max_age=_cache_ttl(),
error_max_age=_error_ttl(),
)
return _sqlite_getset(f"ipinfo:{ip}", _do, max_age=60 * 60 * 24)
@functools.lru_cache(maxsize=4096)
def _ipwho_raw(ip: str) -> Optional[Dict[str, Any]]:
# Free geo+asn provider; no key.
url = f"https://ipwho.is/{ip}"
return _sqlite_getset(
f"ipwho:{ip}",
lambda: _http_get_json(url),
max_age=_cache_ttl(),
error_max_age=_error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _ipapi_raw(ip: str) -> Optional[Dict[str, Any]]:
# Free tier works without key for many cases; key improves limits/features.
key = _ipapi_key()
tag = _auth_tag(key)
url = f"https://ipapi.co/{ip}/json/"
if key:
url = f"{url}?key={key}"
return _sqlite_getset(
f"ipapi:{tag}:{ip}",
lambda: _http_get_json(url),
max_age=_cache_ttl(),
error_max_age=_error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _asn_info(ip: str) -> ASNInfo:
# Prefer ipinfo-derived ASN (no scraping deps).
raw = _ipinfo_raw(ip) or {}
org = str(raw.get("org") or "")
country = str(raw.get("country") or "")
asn = ""
name = ""
if org.startswith("AS"):
parts = org.split(" ", 1)
asn = parts[0]
name = parts[1] if len(parts) > 1 else ""
return ASNInfo(asn=asn, name=name, country=country, raw={"org": org, "country": country})
# Prefer ipinfo-derived ASN; fall back to ipapi/ipwho (free).
ipinfo = _ipinfo_raw(ip) or {}
asn = parse_asn_ipinfo(ipinfo)
if asn.asn:
return asn
ipapi = _ipapi_raw(ip) or {}
asn = parse_asn_ipapi(ipapi)
if asn.asn:
return asn
ipwho = _ipwho_raw(ip) or {}
asn = parse_asn_ipwho(ipwho)
if asn.asn:
return asn
return ASNInfo(raw=None, source="")
@functools.lru_cache(maxsize=4096)
def _vt_info(ip: str) -> VTInfo:
key = _read_vt_key()
key = _vt_key()
if not key:
return VTInfo()
def _do() -> VTInfo:
try:
import requests # optional dep
r = requests.get(
data = _http_get_json(
f"https://www.virustotal.com/api/v3/ip_addresses/{ip}",
headers={"x-apikey": key},
timeout=10,
)
data = r.json() if r.ok else None
attrs = (((data or {}).get("data") or {}).get("attributes") or {})
stats = attrs.get("last_analysis_stats") or {}
malicious = int(stats.get("malicious") or 0)
suspicious = int(stats.get("suspicious") or 0)
total = 0
try:
total = int(sum(int(v or 0) for v in stats.values()))
except Exception:
total = 0
cats = attrs.get("categories") or {}
if isinstance(cats, dict):
# keep stable order
categories = tuple(str(v) for v in cats.values() if v)
elif isinstance(cats, list):
categories = tuple(str(v) for v in cats if v)
else:
categories = ()
rep = attrs.get("reputation")
try:
reputation = int(rep) if rep is not None else None
except Exception:
reputation = None
return VTInfo(
malicious=malicious,
suspicious=suspicious,
total=total,
reputation=reputation,
categories=categories,
raw=data,
)
return parse_vt_ip(data)
except Exception:
return VTInfo()
return _sqlite_getset(f"vt_ip:{ip}", _do, max_age=60 * 60 * 24)
tag = _auth_tag(key)
return _sqlite_getset(
f"vt_ip:{tag}:{ip}",
_do,
max_age=_cache_ttl(),
error_max_age=_error_ttl(),
)
def _maxmind_paths() -> Tuple[str, ...]:
p = str(_opt("tke_maxmind_mmdb_path", "") or os.getenv("MAXMIND_MMDB_PATH") or os.getenv("GEOIP_MMDB_PATH") or "")
if p:
return (os.path.expanduser(p),)
# Default under VD_DIR (visidata_dir); users can drop mmdb there.
vd_dir = str(getattr(vd.options, "visidata_dir", "") or "")
candidates = []
if vd_dir:
candidates.extend(
[
os.path.join(vd_dir, "GeoLite2-City.mmdb"),
os.path.join(vd_dir, "GeoLite2-Country.mmdb"),
os.path.join(vd_dir, "GeoIP2-City.mmdb"),
os.path.join(vd_dir, "GeoIP2-Country.mmdb"),
]
)
# Common OS locations (best-effort).
candidates.extend(
[
os.path.expanduser("~/.local/share/GeoIP/GeoLite2-City.mmdb"),
os.path.expanduser("~/.local/share/GeoIP/GeoLite2-Country.mmdb"),
]
)
return tuple(dict.fromkeys(candidates))
@functools.lru_cache(maxsize=4)
def _maxmind_reader(path: str):
try:
import geoip2.database # optional dep
return geoip2.database.Reader(path)
except Exception:
return None
@functools.lru_cache(maxsize=4096)
def _maxmind_geo(ip: str) -> GeoInfo:
# Uses local mmdb; no network required. Cache results in sqlite too.
paths = [p for p in _maxmind_paths() if p and os.path.exists(p)]
if not paths:
return GeoInfo(source="")
path = paths[0]
try:
st = os.stat(path)
sig = f"{path}:{int(st.st_mtime)}:{int(st.st_size)}"
except Exception:
sig = path
def _do() -> GeoInfo:
reader = _maxmind_reader(path)
if reader is None:
return GeoInfo(source="")
try:
# Try City first, fallback to Country models if the DB doesn't support City.
try:
rec = reader.city(ip)
except Exception:
rec = reader.country(ip)
raw = getattr(rec, "raw", None)
return parse_geo_maxmind(raw if isinstance(raw, dict) else {})
except Exception:
return GeoInfo(source="")
return _sqlite_getset(
f"maxmind:{sig}:{ip}",
_do,
max_age=_cache_ttl(),
error_max_age=_error_ttl(),
) or GeoInfo(source="")
@functools.lru_cache(maxsize=4096)
def _geo_info(ip: str) -> GeoInfo:
# Prefer offline maxmind; fall back to free HTTP services.
mm = _maxmind_geo(ip)
if mm.source:
return mm
ipinfo = _ipinfo_raw(ip) or {}
if ipinfo:
return parse_geo_ipinfo(ipinfo)
ipapi = _ipapi_raw(ip) or {}
if ipapi:
return parse_geo_ipapi(ipapi)
ipwho = _ipwho_raw(ip) or {}
if ipwho and ipwho.get("success", True):
return parse_geo_ipwho(ipwho)
return GeoInfo(source="")
@functools.total_ordering
@@ -332,7 +485,7 @@ class IPValue:
def ipinfo(self) -> IPInfo:
if not self.is_address:
return IPInfo()
return IPInfo(_ipinfo_raw(str(self)))
return IPInfo(_ipinfo_raw(str(self)), source="ipinfo")
@property
def asn(self) -> ASNInfo:
@@ -346,6 +499,19 @@ class IPValue:
return VTInfo()
return _vt_info(str(self))
@property
def geo(self) -> GeoInfo:
if not self.is_address:
return GeoInfo()
return _geo_info(str(self))
@property
def maxmind(self) -> GeoInfo:
# Explicit offline lookup (may be empty if no mmdb available).
if not self.is_address:
return GeoInfo()
return _maxmind_geo(str(self))
def ip(val: Any) -> Optional[IPValue]:
"""VisiData type converter: parse IPv4/IPv6 address or CIDR network."""
@@ -381,4 +547,3 @@ TableSheet.addCommand(
"cursorCol.type=ip",
"set type of current column to IP (IPv4/IPv6/CIDR)",
)

View File

@@ -16,3 +16,7 @@ dnspython
#
# Enrichment helpers
mac-vendor-lookup
# Optional offline MaxMind mmdb reader (GeoLite2/GeoIP2)
geoip2
maxminddb

View File

@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
Offline validation for normalization/extraction logic in plugins/iptype.py.
This intentionally does not perform live network calls (which may be blocked in CI/sandboxes).
"""
from __future__ import annotations
import os
import sys
# Ensure repo root is on sys.path when running as a script.
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from plugins.iplib import ( # noqa: E402
ASNInfo,
GeoInfo,
IPInfo,
JSONNode,
VTInfo,
parse_asn_ipapi,
parse_asn_ipinfo,
parse_asn_ipwho,
parse_geo_ipapi,
parse_geo_ipinfo,
parse_geo_ipwho,
parse_geo_maxmind,
parse_vt_ip,
)
def _assert(cond: bool, msg: str):
if not cond:
raise AssertionError(msg)
def main() -> int:
# JSONNode chaining behavior
n = JSONNode({"a": {"b": 1}, "x": None})
_assert(str(n.a.b) == "1", "JSONNode dict nesting")
_assert(str(n.missing.anything) == "", "JSONNode missing keys should render empty")
_assert(str(n.x.y) == "", "JSONNode None chaining should render empty")
# ipinfo basic
ipinfo_raw = {
"country": "US",
"region": "California",
"city": "Mountain View",
"org": "AS15169 Google LLC",
"loc": "37.4056,-122.0775",
}
ipi = IPInfo(ipinfo_raw, source="ipinfo")
_assert(ipi.country == "US", "ipinfo.country")
_assert(ipi.asn == "AS15169", "ipinfo.asn derived from org")
_assert(str(ipi.data.country) == "US", "ipinfo.data.country")
# ASNInfo basics
asn = ASNInfo(asn="AS15169", name="Google LLC", country="US", raw={"org": "AS15169 Google LLC"}, source="ipinfo")
_assert(asn.asn == "AS15169", "asn.asn")
_assert(str(asn.data.org) == "AS15169 Google LLC", "asn.data.org")
# VTInfo basics
vt = VTInfo(malicious=3, suspicious=1, total=94, categories=("search engine",), raw={"data": "x"}, source="virustotal")
_assert(vt.verdict == "3/94", "vt.verdict")
_assert(vt.category == "search engine", "vt.category")
_assert(vt.type == "search engine", "vt.type alias")
# Parse helpers
_assert(parse_asn_ipinfo(ipinfo_raw).asn == "AS15169", "parse_asn_ipinfo")
ipapi_raw = {"asn": "AS123", "org": "Example ISP", "country_code": "DE", "country_name": "Germany"}
_assert(parse_asn_ipapi(ipapi_raw).asn == "AS123", "parse_asn_ipapi")
ipwho_raw = {"country_code": "NL", "connection": {"asn": 9009, "isp": "M247"}}
_assert(parse_asn_ipwho(ipwho_raw).asn == "AS9009", "parse_asn_ipwho")
geo1 = parse_geo_ipinfo(ipinfo_raw)
_assert(geo1.country_code == "US", "parse_geo_ipinfo country_code")
_assert(geo1.lat is not None and geo1.lon is not None, "parse_geo_ipinfo loc")
geo2 = parse_geo_ipapi({"country_code": "DE", "country_name": "Germany", "latitude": 1, "longitude": 2})
_assert(geo2.country_code == "DE", "parse_geo_ipapi country_code")
geo3 = parse_geo_ipwho({"country_code": "NL", "country": "Netherlands", "latitude": 1, "longitude": 2})
_assert(geo3.country_code == "NL", "parse_geo_ipwho country_code")
mm_raw = {"country": {"iso_code": "US", "names": {"en": "United States"}}, "location": {"latitude": 1, "longitude": 2}}
mm = parse_geo_maxmind(mm_raw)
_assert(mm.country_code == "US" and mm.country == "United States", "parse_geo_maxmind")
vt_raw = {
"data": {
"attributes": {
"last_analysis_stats": {"malicious": 2, "suspicious": 1, "harmless": 10},
"reputation": 5,
"categories": {"foo": "search engine"},
}
}
}
vt2 = parse_vt_ip(vt_raw)
_assert(vt2.verdict == "2/13", "parse_vt_ip verdict")
# GeoInfo basics
geo = GeoInfo(country="United States", country_code="US", region="California", city="Mountain View", lat=1.0, lon=2.0, raw={"country": {"iso_code": "US"}}, source="maxmind")
_assert(geo.country_code == "US", "geo.country_code")
_assert(str(geo.data.country.iso_code) == "US", "geo.data.country.iso_code")
print("ok")
return 0
if __name__ == "__main__":
raise SystemExit(main())