diff --git a/config/visidata/plugins/__init__.py b/config/visidata/plugins/__init__.py new file mode 100644 index 0000000..7d7b404 --- /dev/null +++ b/config/visidata/plugins/__init__.py @@ -0,0 +1,19 @@ +""" +Local VisiData plugins. + +VisiData (v3.3) imports the top-level `plugins` package early (before config.py) +when `options.imports` contains "plugins" (default). Import submodules here so +their commands/types are registered on startup. +""" + +# Keep imports resilient: a missing optional plugin shouldn't prevent VisiData from starting. +for _mod in ( + "hidecol", + "iptype", +): + try: + __import__(f"{__name__}.{_mod}") + except Exception: + # VisiData will show exceptions in its error sheet if needed; don't hard-fail here. + pass + diff --git a/config/visidata/plugins/iptype.py b/config/visidata/plugins/iptype.py new file mode 100644 index 0000000..a9595d0 --- /dev/null +++ b/config/visidata/plugins/iptype.py @@ -0,0 +1,384 @@ +""" +IP datatype for VisiData. + +Goals: +- One type (`ip`) that accepts IPv4/IPv6 addresses and CIDR networks (e.g. 192.168.7.0/24). +- Correct sorting (by version then numeric value, with networks after addresses). +- `ip * netmask` returns True/False for membership (address in network). +- Enrichment properties with normalized shapes: + - ip.ipinfo.country + - ip.asn.asn / ip.asn.name / ip.asn.country + - ip.vt.verdict (e.g. "3/94") + +Network calls are optional; when deps/keys are missing, properties return empty defaults. +""" + +from __future__ import annotations + +from dataclasses import dataclass +import functools +import os +import pickle +import sqlite3 +import time +from typing import Any, Dict, Optional, Tuple, Union + +import ipaddress + +from visidata import vd +from visidata.sheets import TableSheet + + +_CACHE_PATH = os.path.expanduser("~/.visidata_cache.db") + + +def _sqlite_getset(key: str, fn, *, max_age: Optional[int] = None): + """Tiny sqlite+pickle cache. Falls back to computing if db can't be used.""" + try: + os.makedirs(os.path.dirname(_CACHE_PATH), exist_ok=True) + with sqlite3.connect(_CACHE_PATH) as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)" + ) + cur = conn.cursor() + cur.execute("SELECT value, timestamp FROM cache WHERE key=?", (key,)) + row = cur.fetchone() + now = int(time.time()) + if row: + val_blob, ts = row + if max_age is None or now - int(ts) <= max_age: + return pickle.loads(val_blob) + val = fn() + cur.execute( + "INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)", + (key, pickle.dumps(val), now), + ) + conn.commit() + return val + except Exception: + return fn() + + +def _is_nullish(v: Any) -> bool: + return v is None or v == "" or v == "null" + + +@dataclass(frozen=True) +class IPInfo: + raw: Optional[Dict[str, Any]] = None + + @property + def country(self) -> str: + return str((self.raw or {}).get("country") or "") + + @property + def region(self) -> str: + return str((self.raw or {}).get("region") or "") + + @property + def city(self) -> str: + return str((self.raw or {}).get("city") or "") + + @property + def org(self) -> str: + return str((self.raw or {}).get("org") or "") + + @property + def asn(self) -> str: + # ipinfo "org" is often "AS12345 Some Org" + org = self.org + if org.startswith("AS"): + tok = org.split(" ", 1)[0] + return tok + return "" + + +@dataclass(frozen=True) +class ASNInfo: + asn: str = "" + name: str = "" + country: str = "" + raw: Optional[Dict[str, Any]] = None + + +@dataclass(frozen=True) +class VTInfo: + malicious: int = 0 + suspicious: int = 0 + total: int = 0 + reputation: Optional[int] = None + categories: Tuple[str, ...] = () + raw: Optional[Dict[str, Any]] = None + + @property + def verdict(self) -> str: + if self.total <= 0: + return "" + return f"{self.malicious}/{self.total}" + + @property + def ratio(self) -> Optional[float]: + if self.total <= 0: + return None + return self.malicious / self.total + + # User asked for vt.type; "category" is a better name, but keep alias. + @property + def type(self) -> str: # noqa: A003 - intentional API + return self.category + + @property + def category(self) -> str: + return self.categories[0] if self.categories else "" + + +def _read_vt_key() -> str: + try: + with open(os.path.expanduser("~/.virustotal_api_key")) as f: + return f.readline().strip() + except Exception: + return "" + + +@functools.lru_cache(maxsize=4096) +def _ipinfo_raw(ip: str) -> Optional[Dict[str, Any]]: + def _do(): + try: + import requests # optional dep + + r = requests.get(f"http://ipinfo.io/{ip}/json", timeout=5) + return r.json() + except Exception: + return None + + return _sqlite_getset(f"ipinfo:{ip}", _do, max_age=60 * 60 * 24) + + +@functools.lru_cache(maxsize=4096) +def _asn_info(ip: str) -> ASNInfo: + # Prefer ipinfo-derived ASN (no scraping deps). + raw = _ipinfo_raw(ip) or {} + org = str(raw.get("org") or "") + country = str(raw.get("country") or "") + asn = "" + name = "" + if org.startswith("AS"): + parts = org.split(" ", 1) + asn = parts[0] + name = parts[1] if len(parts) > 1 else "" + return ASNInfo(asn=asn, name=name, country=country, raw={"org": org, "country": country}) + + +@functools.lru_cache(maxsize=4096) +def _vt_info(ip: str) -> VTInfo: + key = _read_vt_key() + if not key: + return VTInfo() + + def _do() -> VTInfo: + try: + import requests # optional dep + + r = requests.get( + f"https://www.virustotal.com/api/v3/ip_addresses/{ip}", + headers={"x-apikey": key}, + timeout=10, + ) + data = r.json() if r.ok else None + attrs = (((data or {}).get("data") or {}).get("attributes") or {}) + stats = attrs.get("last_analysis_stats") or {} + malicious = int(stats.get("malicious") or 0) + suspicious = int(stats.get("suspicious") or 0) + total = 0 + try: + total = int(sum(int(v or 0) for v in stats.values())) + except Exception: + total = 0 + + cats = attrs.get("categories") or {} + if isinstance(cats, dict): + # keep stable order + categories = tuple(str(v) for v in cats.values() if v) + elif isinstance(cats, list): + categories = tuple(str(v) for v in cats if v) + else: + categories = () + + rep = attrs.get("reputation") + try: + reputation = int(rep) if rep is not None else None + except Exception: + reputation = None + + return VTInfo( + malicious=malicious, + suspicious=suspicious, + total=total, + reputation=reputation, + categories=categories, + raw=data, + ) + except Exception: + return VTInfo() + + return _sqlite_getset(f"vt_ip:{ip}", _do, max_age=60 * 60 * 24) + + +@functools.total_ordering +class IPValue: + """Represents either an address or a network.""" + + __slots__ = ("_obj",) + + def __init__(self, obj: Union[ipaddress._BaseAddress, ipaddress._BaseNetwork]): + self._obj = obj + + @property + def obj(self): + return self._obj + + @property + def version(self) -> int: + return int(self._obj.version) + + @property + def is_network(self) -> bool: + return isinstance(self._obj, ipaddress._BaseNetwork) + + @property + def is_address(self) -> bool: + return isinstance(self._obj, ipaddress._BaseAddress) + + @property + def sort_key(self) -> Tuple[int, int, int, int]: + # (version, kind, addrint, prefixlen) + if self.is_network: + n: ipaddress._BaseNetwork = self._obj # type: ignore[assignment] + return (self.version, 1, int(n.network_address), int(n.prefixlen)) + a: ipaddress._BaseAddress = self._obj # type: ignore[assignment] + return (self.version, 0, int(a), 0) + + def __str__(self) -> str: + return str(self._obj) + + def __repr__(self) -> str: + return f"IPValue({self._obj!r})" + + def __hash__(self) -> int: + return hash(self.sort_key) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, IPValue): + return False + return self.sort_key == other.sort_key + + def __lt__(self, other: object) -> bool: + if not isinstance(other, IPValue): + return NotImplemented + return self.sort_key < other.sort_key + + def _coerce_network(self, other: Any) -> Optional[ipaddress._BaseNetwork]: + if other is None: + return None + if isinstance(other, IPValue) and other.is_network: + return other.obj # type: ignore[return-value] + if isinstance(other, ipaddress._BaseNetwork): + return other + s = str(other).strip() + if not s: + return None + try: + return ipaddress.ip_network(s, strict=False) + except Exception: + return None + + def _coerce_address(self, other: Any) -> Optional[ipaddress._BaseAddress]: + if other is None: + return None + if isinstance(other, IPValue) and other.is_address: + return other.obj # type: ignore[return-value] + if isinstance(other, ipaddress._BaseAddress): + return other + s = str(other).strip() + if not s: + return None + try: + return ipaddress.ip_address(s) + except Exception: + return None + + def __mul__(self, other: Any): + # "ip * netmask" -> membership test (address in network). + net = self._coerce_network(other) + if self.is_address and net is not None: + try: + return self.obj in net # type: ignore[operator] + except Exception: + return False + addr = self._coerce_address(other) + if self.is_network and addr is not None: + try: + return addr in self.obj # type: ignore[operator] + except Exception: + return False + return False + + def __rmul__(self, other: Any): + # allow "netmask * ip" too + return self.__mul__(other) + + # Normalized enrichments + @property + def ipinfo(self) -> IPInfo: + if not self.is_address: + return IPInfo() + return IPInfo(_ipinfo_raw(str(self))) + + @property + def asn(self) -> ASNInfo: + if not self.is_address: + return ASNInfo() + return _asn_info(str(self)) + + @property + def vt(self) -> VTInfo: + if not self.is_address: + return VTInfo() + return _vt_info(str(self)) + + +def ip(val: Any) -> Optional[IPValue]: + """VisiData type converter: parse IPv4/IPv6 address or CIDR network.""" + if _is_nullish(val): + return None + if isinstance(val, IPValue): + return val + if isinstance(val, (ipaddress._BaseAddress, ipaddress._BaseNetwork)): + return IPValue(val) + s = str(val).strip() + if not s: + return None + + # Strip common "ip:port" for IPv4 only (keep IPv6 intact). + if "/" not in s and s.count(":") == 1: + host, port = s.rsplit(":", 1) + if port.isdigit(): + s = host + + try: + if "/" in s: + return IPValue(ipaddress.ip_network(s, strict=False)) + return IPValue(ipaddress.ip_address(s)) + except Exception: + return None + + +vd.addType(ip, icon=":", formatter=lambda fmt, v: "" if v is None else str(v), name="IP") + +TableSheet.addCommand( + None, + "type-ip", + "cursorCol.type=ip", + "set type of current column to IP (IPv4/IPv6/CIDR)", +) + diff --git a/config/visidata/requirements.txt b/config/visidata/requirements.txt index de3012a..046a48b 100644 --- a/config/visidata/requirements.txt +++ b/config/visidata/requirements.txt @@ -5,10 +5,8 @@ # Or manually: # python -m pip install --target ~/.visidata/plugins-deps -r requirements.txt # -# Core HTTP / scraping +# Core HTTP requests -beautifulsoup4 -lxml # # Data parsing PyYAML @@ -18,4 +16,3 @@ dnspython # # Enrichment helpers mac-vendor-lookup -virus_total_apis diff --git a/config/visidata/visidatarc b/config/visidata/visidatarc index 7c602f5..c26f2f3 100644 --- a/config/visidata/visidatarc +++ b/config/visidata/visidatarc @@ -12,6 +12,11 @@ try: except ModuleNotFoundError: pass +try: + import plugins.iptype +except ModuleNotFoundError: + pass + from datetime import datetime import functools import json