visidata: add IP type with CIDR + lookups

This commit is contained in:
tobias
2026-02-15 17:23:09 +01:00
parent 1c9cdc5c19
commit 15934ec4a1
4 changed files with 409 additions and 4 deletions

View File

@@ -0,0 +1,19 @@
"""
Local VisiData plugins.
VisiData (v3.3) imports the top-level `plugins` package early (before config.py)
when `options.imports` contains "plugins" (default). Import submodules here so
their commands/types are registered on startup.
"""
# Keep imports resilient: a missing optional plugin shouldn't prevent VisiData from starting.
for _mod in (
"hidecol",
"iptype",
):
try:
__import__(f"{__name__}.{_mod}")
except Exception:
# VisiData will show exceptions in its error sheet if needed; don't hard-fail here.
pass

View File

@@ -0,0 +1,384 @@
"""
IP datatype for VisiData.
Goals:
- One type (`ip`) that accepts IPv4/IPv6 addresses and CIDR networks (e.g. 192.168.7.0/24).
- Correct sorting (by version then numeric value, with networks after addresses).
- `ip * netmask` returns True/False for membership (address in network).
- Enrichment properties with normalized shapes:
- ip.ipinfo.country
- ip.asn.asn / ip.asn.name / ip.asn.country
- ip.vt.verdict (e.g. "3/94")
Network calls are optional; when deps/keys are missing, properties return empty defaults.
"""
from __future__ import annotations
from dataclasses import dataclass
import functools
import os
import pickle
import sqlite3
import time
from typing import Any, Dict, Optional, Tuple, Union
import ipaddress
from visidata import vd
from visidata.sheets import TableSheet
_CACHE_PATH = os.path.expanduser("~/.visidata_cache.db")
def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None):
"""Tiny sqlite+pickle cache. Falls back to computing if db can't be used."""
try:
os.makedirs(os.path.dirname(_CACHE_PATH), exist_ok=True)
with sqlite3.connect(_CACHE_PATH) as conn:
conn.execute(
"CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)"
)
cur = conn.cursor()
cur.execute("SELECT value, timestamp FROM cache WHERE key=?", (key,))
row = cur.fetchone()
now = int(time.time())
if row:
val_blob, ts = row
if max_age is None or now - int(ts) <= max_age:
return pickle.loads(val_blob)
val = fn()
cur.execute(
"INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)",
(key, pickle.dumps(val), now),
)
conn.commit()
return val
except Exception:
return fn()
def _is_nullish(v: Any) -> bool:
return v is None or v == "" or v == "null"
@dataclass(frozen=True)
class IPInfo:
raw: Optional[Dict[str, Any]] = None
@property
def country(self) -> str:
return str((self.raw or {}).get("country") or "")
@property
def region(self) -> str:
return str((self.raw or {}).get("region") or "")
@property
def city(self) -> str:
return str((self.raw or {}).get("city") or "")
@property
def org(self) -> str:
return str((self.raw or {}).get("org") or "")
@property
def asn(self) -> str:
# ipinfo "org" is often "AS12345 Some Org"
org = self.org
if org.startswith("AS"):
tok = org.split(" ", 1)[0]
return tok
return ""
@dataclass(frozen=True)
class ASNInfo:
asn: str = ""
name: str = ""
country: str = ""
raw: Optional[Dict[str, Any]] = None
@dataclass(frozen=True)
class VTInfo:
malicious: int = 0
suspicious: int = 0
total: int = 0
reputation: Optional[int] = None
categories: Tuple[str, ...] = ()
raw: Optional[Dict[str, Any]] = None
@property
def verdict(self) -> str:
if self.total <= 0:
return ""
return f"{self.malicious}/{self.total}"
@property
def ratio(self) -> Optional[float]:
if self.total <= 0:
return None
return self.malicious / self.total
# User asked for vt.type; "category" is a better name, but keep alias.
@property
def type(self) -> str: # noqa: A003 - intentional API
return self.category
@property
def category(self) -> str:
return self.categories[0] if self.categories else ""
def _read_vt_key() -> str:
try:
with open(os.path.expanduser("~/.virustotal_api_key")) as f:
return f.readline().strip()
except Exception:
return ""
@functools.lru_cache(maxsize=4096)
def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]:
def _do():
try:
import requests # optional dep
r = requests.get(f"http://ipinfo.io/{ip}/json", timeout=5)
return r.json()
except Exception:
return None
return _sqlite_getset(f"ipinfo:{ip}", _do, max_age=60 * 60 * 24)
@functools.lru_cache(maxsize=4096)
def _asn_info(ip: str) -> ASNInfo:
# Prefer ipinfo-derived ASN (no scraping deps).
raw = _ipinfo_raw(ip) or {}
org = str(raw.get("org") or "")
country = str(raw.get("country") or "")
asn = ""
name = ""
if org.startswith("AS"):
parts = org.split(" ", 1)
asn = parts[0]
name = parts[1] if len(parts) > 1 else ""
return ASNInfo(asn=asn, name=name, country=country, raw={"org": org, "country": country})
@functools.lru_cache(maxsize=4096)
def _vt_info(ip: str) -> VTInfo:
key = _read_vt_key()
if not key:
return VTInfo()
def _do() -> VTInfo:
try:
import requests # optional dep
r = requests.get(
f"https://www.virustotal.com/api/v3/ip_addresses/{ip}",
headers={"x-apikey": key},
timeout=10,
)
data = r.json() if r.ok else None
attrs = (((data or {}).get("data") or {}).get("attributes") or {})
stats = attrs.get("last_analysis_stats") or {}
malicious = int(stats.get("malicious") or 0)
suspicious = int(stats.get("suspicious") or 0)
total = 0
try:
total = int(sum(int(v or 0) for v in stats.values()))
except Exception:
total = 0
cats = attrs.get("categories") or {}
if isinstance(cats, dict):
# keep stable order
categories = tuple(str(v) for v in cats.values() if v)
elif isinstance(cats, list):
categories = tuple(str(v) for v in cats if v)
else:
categories = ()
rep = attrs.get("reputation")
try:
reputation = int(rep) if rep is not None else None
except Exception:
reputation = None
return VTInfo(
malicious=malicious,
suspicious=suspicious,
total=total,
reputation=reputation,
categories=categories,
raw=data,
)
except Exception:
return VTInfo()
return _sqlite_getset(f"vt_ip:{ip}", _do, max_age=60 * 60 * 24)
@functools.total_ordering
class IPValue:
"""Represents either an address or a network."""
__slots__ = ("_obj",)
def __init__(self, obj: Union[ipaddress._BaseAddress, ipaddress._BaseNetwork]):
self._obj = obj
@property
def obj(self):
return self._obj
@property
def version(self) -> int:
return int(self._obj.version)
@property
def is_network(self) -> bool:
return isinstance(self._obj, ipaddress._BaseNetwork)
@property
def is_address(self) -> bool:
return isinstance(self._obj, ipaddress._BaseAddress)
@property
def sort_key(self) -> Tuple[int, int, int, int]:
# (version, kind, addrint, prefixlen)
if self.is_network:
n: ipaddress._BaseNetwork = self._obj # type: ignore[assignment]
return (self.version, 1, int(n.network_address), int(n.prefixlen))
a: ipaddress._BaseAddress = self._obj # type: ignore[assignment]
return (self.version, 0, int(a), 0)
def __str__(self) -> str:
return str(self._obj)
def __repr__(self) -> str:
return f"IPValue({self._obj!r})"
def __hash__(self) -> int:
return hash(self.sort_key)
def __eq__(self, other: object) -> bool:
if not isinstance(other, IPValue):
return False
return self.sort_key == other.sort_key
def __lt__(self, other: object) -> bool:
if not isinstance(other, IPValue):
return NotImplemented
return self.sort_key < other.sort_key
def _coerce_network(self, other: Any) -> Optional[ipaddress._BaseNetwork]:
if other is None:
return None
if isinstance(other, IPValue) and other.is_network:
return other.obj # type: ignore[return-value]
if isinstance(other, ipaddress._BaseNetwork):
return other
s = str(other).strip()
if not s:
return None
try:
return ipaddress.ip_network(s, strict=False)
except Exception:
return None
def _coerce_address(self, other: Any) -> Optional[ipaddress._BaseAddress]:
if other is None:
return None
if isinstance(other, IPValue) and other.is_address:
return other.obj # type: ignore[return-value]
if isinstance(other, ipaddress._BaseAddress):
return other
s = str(other).strip()
if not s:
return None
try:
return ipaddress.ip_address(s)
except Exception:
return None
def __mul__(self, other: Any):
# "ip * netmask" -> membership test (address in network).
net = self._coerce_network(other)
if self.is_address and net is not None:
try:
return self.obj in net # type: ignore[operator]
except Exception:
return False
addr = self._coerce_address(other)
if self.is_network and addr is not None:
try:
return addr in self.obj # type: ignore[operator]
except Exception:
return False
return False
def __rmul__(self, other: Any):
# allow "netmask * ip" too
return self.__mul__(other)
# Normalized enrichments
@property
def ipinfo(self) -> IPInfo:
if not self.is_address:
return IPInfo()
return IPInfo(_ipinfo_raw(str(self)))
@property
def asn(self) -> ASNInfo:
if not self.is_address:
return ASNInfo()
return _asn_info(str(self))
@property
def vt(self) -> VTInfo:
if not self.is_address:
return VTInfo()
return _vt_info(str(self))
def ip(val: Any) -> Optional[IPValue]:
"""VisiData type converter: parse IPv4/IPv6 address or CIDR network."""
if _is_nullish(val):
return None
if isinstance(val, IPValue):
return val
if isinstance(val, (ipaddress._BaseAddress, ipaddress._BaseNetwork)):
return IPValue(val)
s = str(val).strip()
if not s:
return None
# Strip common "ip:port" for IPv4 only (keep IPv6 intact).
if "/" not in s and s.count(":") == 1:
host, port = s.rsplit(":", 1)
if port.isdigit():
s = host
try:
if "/" in s:
return IPValue(ipaddress.ip_network(s, strict=False))
return IPValue(ipaddress.ip_address(s))
except Exception:
return None
vd.addType(ip, icon=":", formatter=lambda fmt, v: "" if v is None else str(v), name="IP")
TableSheet.addCommand(
None,
"type-ip",
"cursorCol.type=ip",
"set type of current column to IP (IPv4/IPv6/CIDR)",
)

View File

@@ -5,10 +5,8 @@
# Or manually:
# python -m pip install --target ~/.visidata/plugins-deps -r requirements.txt
#
# Core HTTP / scraping
# Core HTTP
requests
beautifulsoup4
lxml
#
# Data parsing
PyYAML
@@ -18,4 +16,3 @@ dnspython
#
# Enrichment helpers
mac-vendor-lookup
virus_total_apis

View File

@@ -12,6 +12,11 @@ try:
except ModuleNotFoundError:
pass
try:
import plugins.iptype
except ModuleNotFoundError:
pass
from datetime import datetime
import functools
import json