visidata: add IOC types with cached, throttled lookups

Centralize provider caching and rate-limit handling, then add Domain/URL/Hash IOC types and safer VT/IPInfo key resolution so lookups stay reliable on free-tier APIs.
This commit is contained in:
tobias
2026-02-21 23:10:44 +01:00
parent a931be4707
commit 84d912ac0a
9 changed files with 1048 additions and 173 deletions

View File

@@ -0,0 +1,446 @@
"""
IOC datatypes for VisiData: domains, URLs, and hashes.
Features:
- Domain normalization and lookups: RDAP, DNS, VirusTotal domain report.
- URL parsing and VT URL report.
- Hash detection + VT file report and MalwareBazaar fallback.
All network lookups are cached in the local sqlite cache db (`options.tke_cache_db_path`).
"""
from __future__ import annotations
import functools
import os
import re
from dataclasses import dataclass
from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlsplit
from visidata import vd
from visidata.sheets import TableSheet
from .iplib import JSONNode, VTInfo, parse_vt_ip
from .ioclib import MBInfo, URLParts, parse_mb_info, vt_url_id
from .lookupcore import (
auth_tag,
cache_ttl,
error_ttl,
http_get_json,
http_post_json,
opt,
sqlite_getset,
)
vd.option(
"tke_rdap_base", "https://rdap.org", "base URL for RDAP queries", sheettype=None
)
vd.option(
"tke_mb_api_base",
"https://mb-api.abuse.ch/api/v1/",
"base URL for MalwareBazaar API",
sheettype=None,
)
def _is_nullish(v: Any) -> bool:
return v is None or v == "" or v == "null"
def _vt_key() -> str:
from .lookupcore import read_key_from_file
return str(
opt("tke_vt_api_key", "")
or os.getenv("VT_API_KEY")
or os.getenv("VIRUSTOTAL_API_KEY")
or read_key_from_file("~/.virustotal_api_key")
or ""
)
def _rdap_base() -> str:
return str(opt("tke_rdap_base", "https://rdap.org") or "https://rdap.org").rstrip(
"/"
)
def _mb_base() -> str:
return str(opt("tke_mb_api_base", "https://mb-api.abuse.ch/api/v1/") or "").strip()
@functools.lru_cache(maxsize=4096)
def _rdap_domain_raw(domain: str) -> Optional[Dict[str, Any]]:
base = _rdap_base()
url = f"{base}/domain/{domain}"
return sqlite_getset(
f"rdap_domain:{domain}",
lambda: http_get_json(url, provider="rdap"),
max_age=cache_ttl(),
error_max_age=error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _vt_domain_raw(domain: str) -> Optional[Dict[str, Any]]:
key = _vt_key()
if not key:
return None
tag = auth_tag(key)
url = f"https://www.virustotal.com/api/v3/domains/{domain}"
return sqlite_getset(
f"vt_domain:{tag}:{domain}",
lambda: http_get_json(url, headers={"x-apikey": key}, provider="vt"),
max_age=cache_ttl(),
error_max_age=error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _vt_url_raw(url: str) -> Optional[Dict[str, Any]]:
key = _vt_key()
if not key:
return None
tag = auth_tag(key)
url_id = vt_url_id(url)
api = f"https://www.virustotal.com/api/v3/urls/{url_id}"
return sqlite_getset(
f"vt_url:{tag}:{url_id}",
lambda: http_get_json(api, headers={"x-apikey": key}, provider="vt"),
max_age=cache_ttl(),
error_max_age=error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _vt_file_raw(h: str) -> Optional[Dict[str, Any]]:
key = _vt_key()
if not key:
return None
tag = auth_tag(key)
url = f"https://www.virustotal.com/api/v3/files/{h}"
return sqlite_getset(
f"vt_file:{tag}:{h}",
lambda: http_get_json(url, headers={"x-apikey": key}, provider="vt"),
max_age=cache_ttl(),
error_max_age=error_ttl(),
)
@functools.lru_cache(maxsize=4096)
def _mb_hash_raw(h: str) -> Optional[Dict[str, Any]]:
base = _mb_base()
if not base:
return None
return sqlite_getset(
f"mb_hash:{h}",
lambda: http_post_json(base, {"query": "get_info", "hash": h}, provider="mb"),
max_age=cache_ttl(),
error_max_age=error_ttl(),
)
@dataclass(frozen=True)
class DNSInfo:
a: Tuple[str, ...] = ()
aaaa: Tuple[str, ...] = ()
cname: Tuple[str, ...] = ()
mx: Tuple[str, ...] = ()
ns: Tuple[str, ...] = ()
txt: Tuple[str, ...] = ()
raw: Optional[Dict[str, Any]] = None
source: str = ""
@property
def data(self) -> JSONNode:
return JSONNode(self.raw)
def _dns_resolve(domain: str, rtype: str) -> Tuple[str, ...]:
domain = domain.rstrip(".")
rtype = rtype.upper()
try:
import dns.resolver # optional dep
ans = dns.resolver.resolve(domain, rtype)
return tuple(str(r) for r in ans)
except Exception:
return ()
@functools.lru_cache(maxsize=4096)
def _dns_info(domain: str) -> DNSInfo:
def _do() -> DNSInfo:
a = _dns_resolve(domain, "A")
aaaa = _dns_resolve(domain, "AAAA")
cname = _dns_resolve(domain, "CNAME")
mx = _dns_resolve(domain, "MX")
ns = _dns_resolve(domain, "NS")
txt = _dns_resolve(domain, "TXT")
raw = {"A": a, "AAAA": aaaa, "CNAME": cname, "MX": mx, "NS": ns, "TXT": txt}
return DNSInfo(
a=a, aaaa=aaaa, cname=cname, mx=mx, ns=ns, txt=txt, raw=raw, source="dns"
)
return sqlite_getset(
f"dns:{domain}",
_do,
max_age=cache_ttl(),
error_max_age=error_ttl(),
) or DNSInfo(source="")
@functools.total_ordering
class DomainValue:
__slots__ = ("_d",)
def __init__(self, domain: str):
self._d = domain
@property
def domain(self) -> str:
return self._d
def __str__(self) -> str:
return self._d
def __repr__(self) -> str:
return f"DomainValue({self._d!r})"
def __hash__(self) -> int:
return hash(self._d)
def __eq__(self, other: object) -> bool:
return isinstance(other, DomainValue) and self._d == other._d
def __lt__(self, other: object) -> bool:
if not isinstance(other, DomainValue):
return NotImplemented
return self._d < other._d
@property
def rdap(self) -> JSONNode:
return JSONNode(_rdap_domain_raw(self._d))
@property
def dns(self) -> DNSInfo:
return _dns_info(self._d)
@property
def vt(self) -> VTInfo:
data = _vt_domain_raw(self._d)
return parse_vt_ip(data) if data else VTInfo()
def _normalize_domain(s: str) -> str:
s = s.strip().lower()
if not s:
return ""
# Strip scheme/path if the input is a URL.
if "://" in s:
try:
sp = urlsplit(s)
if sp.hostname:
s = sp.hostname
except Exception:
pass
s = s.strip().rstrip(".")
# Strip brackets around IPv6 host literals if accidentally passed.
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
return s
def domain(val: Any) -> Optional[DomainValue]:
if _is_nullish(val):
return None
if isinstance(val, DomainValue):
return val
s = _normalize_domain(str(val))
if not s:
return None
return DomainValue(s)
@functools.total_ordering
class URLValue:
__slots__ = ("_u", "_parts")
def __init__(self, url: str, parts: URLParts):
self._u = url
self._parts = parts
@property
def url(self) -> str:
return self._u
def __str__(self) -> str:
return self._u
def __repr__(self) -> str:
return f"URLValue({self._u!r})"
def __hash__(self) -> int:
return hash(self._u)
def __eq__(self, other: object) -> bool:
return isinstance(other, URLValue) and self._u == other._u
def __lt__(self, other: object) -> bool:
if not isinstance(other, URLValue):
return NotImplemented
return self._u < other._u
@property
def parts(self) -> URLParts:
return self._parts
@property
def host(self) -> str:
return self._parts.host
@property
def domain(self) -> Optional[DomainValue]:
return domain(self._parts.host)
@property
def vt(self) -> VTInfo:
data = _vt_url_raw(self._u)
return parse_vt_ip(data) if data else VTInfo()
def url_ioc(val: Any) -> Optional[URLValue]:
if _is_nullish(val):
return None
if isinstance(val, URLValue):
return val
s = str(val).strip()
if not s:
return None
# Accept bare domains by prefixing scheme (so parsing is consistent).
if "://" not in s and re.match(r"^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(/|$)", s):
s = "http://" + s
try:
sp = urlsplit(s)
parts = URLParts(
scheme=sp.scheme or "",
username=sp.username or "",
password=sp.password or "",
host=sp.hostname or "",
port=sp.port,
path=sp.path or "",
query=sp.query or "",
fragment=sp.fragment or "",
)
return URLValue(s, parts)
except Exception:
return None
@functools.total_ordering
class HashValue:
__slots__ = ("_h",)
def __init__(self, h: str):
self._h = h
@property
def hash(self) -> str:
return self._h
def __str__(self) -> str:
return self._h
def __repr__(self) -> str:
return f"HashValue({self._h!r})"
def __hash__(self) -> int:
return hash(self._h)
def __eq__(self, other: object) -> bool:
return isinstance(other, HashValue) and self._h == other._h
def __lt__(self, other: object) -> bool:
if not isinstance(other, HashValue):
return NotImplemented
return self._h < other._h
@property
def kind(self) -> str:
n = len(self._h)
if n == 32:
return "md5"
if n == 40:
return "sha1"
if n == 64:
return "sha256"
return ""
@property
def vt(self) -> VTInfo:
data = _vt_file_raw(self._h)
return parse_vt_ip(data) if data else VTInfo()
@property
def mb(self) -> MBInfo:
data = _mb_hash_raw(self._h)
return parse_mb_info(data) if data else MBInfo()
_HASH_RE = re.compile(r"^[A-Fa-f0-9]{32}$|^[A-Fa-f0-9]{40}$|^[A-Fa-f0-9]{64}$")
def hash_ioc(val: Any) -> Optional[HashValue]:
if _is_nullish(val):
return None
if isinstance(val, HashValue):
return val
s = str(val).strip()
if not s:
return None
if not _HASH_RE.match(s):
return None
return HashValue(s.lower())
# Make custom converters available in command/expr globals.
vd.addGlobals(domain=domain, url_ioc=url_ioc, hash_ioc=hash_ioc)
vd.addType(
domain,
icon="d",
formatter=lambda fmt, v: "" if v is None else str(v),
name="Domain",
)
vd.addType(
url_ioc, icon="u", formatter=lambda fmt, v: "" if v is None else str(v), name="URL"
)
vd.addType(
hash_ioc,
icon="#",
formatter=lambda fmt, v: "" if v is None else str(v),
name="Hash",
)
TableSheet.addCommand(
None, "type-domain", "cursorCol.type=domain", "set type of current column to Domain"
)
TableSheet.addCommand(
None,
"type-url-ioc",
"cursorCol.type=url_ioc",
"set type of current column to URL (IOC)",
)
TableSheet.addCommand(
None,
"type-hash",
"cursorCol.type=hash_ioc",
"set type of current column to Hash (md5/sha1/sha256)",
)
vd.addMenuItem("Column", "Type", "Domain", "type-domain")
vd.addMenuItem("Column", "Type", "URL (IOC)", "type-url-ioc")
vd.addMenuItem("Column", "Type", "Hash", "type-hash")