diff --git a/config/visidata/README.md b/config/visidata/README.md index a89138a..9f5c24c 100644 --- a/config/visidata/README.md +++ b/config/visidata/README.md @@ -36,13 +36,14 @@ vd --visidata-dir "$PWD" --config "$PWD/visidatarc" --play showcase_ioc.vdj What it showcases: - custom types: `IP`, `Domain`, `URL`, `Hash` - IP membership expressions: `src_ip * network` +- IP network fields: `src_ip.type`, `src_ip.mask`, `src_ip.range`, `src_ip.broadcast`, `src_ip.identity`, `src_ip.hostcount`, `src_ip.rfc_type` - URL parsing fields: `url.host`, `url.parts.path`, `url.domain` - hash classification: `file_hash.kind` - IP lookups: `src_ip.ipinfo.*`, `src_ip.asn.*`, `src_ip.geo.*`, `src_ip.country()` - provider visibility: `src_ip.geo.source`, `src_ip.asn.source`, `domain.dns.source` -- domain/network intel: `domain.dns.*`, `domain.rdap.*` +- domain/network intel: `domain.dns.*`, `domain.rdap.*`, `domain.resolveip`, `domain.resolveips`, `domain.resolveipv4`, `domain.resolveipv6` - hash intel: `file_hash.mb.*` (MalwareBazaar) -- VirusTotal lookups: `src_ip.vt.*`, `file_hash.vt.*`, `domain.vt.*`, `url.vt.*` +- VirusTotal lookups: `src_ip.vt.*`, `file_hash.vt.*`, `domain.vt.*`, `url.vt.*` (plus `hash.vt.name`, `hash.vt.names`, `hash.vt.score`, `domain.vt.ip`, `domain.vt.ips`) - local plugin command: `tke-hidecol` Lookup notes: @@ -82,15 +83,24 @@ Membership test: Lookup objects expose both normalized fields and raw response data: +- `ipcol.type` (`ipv4`/`ipv6`/`cidr4`/`cidr6`), `ipcol.family`, `ipcol.is_cidr` +- `ipcol.mask`, `ipcol.netmask`, `ipcol.identity`, `ipcol.broadcast`, `ipcol.range`, `ipcol.hostcount`, `ipcol.address_count` +- `ipcol.rfc_type` (classification: e.g. `global`, `private`, `documentation`, `shared`, `link-local`, ...) - `ipcol.ipinfo.country` - `ipcol.ipinfo.data.` - `ipcol.asn.asn`, `ipcol.asn.name`, `ipcol.asn.country` - `ipcol.asn.data.` -- `ipcol.vt.verdict` (e.g. `"3/94"`), `ipcol.vt.malicious`, `ipcol.vt.total`, `ipcol.vt.category` (alias: `ipcol.vt.type`) +- `ipcol.vt.verdict` (e.g. `"3/94"`), `ipcol.vt.score`, `ipcol.vt.malicious`, `ipcol.vt.total`, `ipcol.vt.category` (alias: `ipcol.vt.type`) - `ipcol.vt.data.` - `ipcol.geo.*` (best-available geo: prefers MaxMind mmdb, else free HTTP providers) - `ipcol.maxmind.*` (offline-only MaxMind lookup; empty if no mmdb) +Type shortcuts on table-like sheets: +- `;i` -> `type-ip` +- `;d` -> `type-domain` +- `;u` -> `type-url-ioc` +- `;h` -> `type-hash` + #### Caching All lookup providers cache results in a local sqlite+pickle DB (default `~/.visidata_cache.db`). @@ -124,6 +134,28 @@ Pure-Python library used by `iptype.py` for: This file intentionally does **not** import VisiData so it can be validated outside the VisiData runtime. +### VT schema (`*.vt`) + +`ip.vt`, `domain.vt`, `url.vt`, and `hash.vt` expose a normalized shape for quick querying across free + premium responses, while still preserving full raw JSON: + +Common fields: +- `verdict` (`"malicious/total"`) +- `score` / `confidence` (`malicious/total` float) +- `malicious`, `suspicious`, `harmless`, `undetected`, `timeout`, `total` +- `category` / `categories` +- `reputation`, `votes_harmless`, `votes_malicious` +- `tags`, `last_analysis_date`, `last_modification_date` +- `results` (normalized engine results map), `stats`, `data` (full raw API response) + +Object-specific conveniences: +- `ip.vt`: `asn`, `as_owner`, `country`, `continent`, `network` +- `domain.vt`: `ip` (best/last known), `ips` (all extracted A/AAAA) +- `url.vt`: URL-level verdict/score plus direct raw access via `url.vt.attrs.*` +- `hash.vt`: `name` (best malware name), `names` (all extracted names), plus verdict/score + +Raw passthrough: +- Any VT `attributes` field is also available via `obj.vt.` and `obj.vt.attrs.`. + ## Config: `visidatarc` This repo’s `visidatarc` is intended to be installed as VisiData’s `config.py`: diff --git a/config/visidata/plugins/ioc.py b/config/visidata/plugins/ioc.py index 2b5635c..edc2644 100644 --- a/config/visidata/plugins/ioc.py +++ b/config/visidata/plugins/ioc.py @@ -21,7 +21,7 @@ from urllib.parse import urlsplit from visidata import vd from visidata.sheets import TableSheet -from .iplib import JSONNode, VTInfo, parse_vt_ip +from .iplib import JSONNode, VTInfo, parse_vt_domain, parse_vt_file, parse_vt_url from .ioclib import MBInfo, URLParts, parse_mb_info, vt_url_id from .lookupcore import ( auth_tag, @@ -231,7 +231,43 @@ class DomainValue: @property def vt(self) -> VTInfo: data = _vt_domain_raw(self._d) - return parse_vt_ip(data) if data else VTInfo() + return parse_vt_domain(data) if data else VTInfo(object_type="domain") + + @property + def resolveipv4(self): + from .iptype import ip + + out = [] + for v in self.dns.a: + iv = ip(v) + if iv is not None: + out.append(iv) + return tuple(out) + + @property + def resolveipv6(self): + from .iptype import ip + + out = [] + for v in self.dns.aaaa: + iv = ip(v) + if iv is not None: + out.append(iv) + return tuple(out) + + @property + def resolveips(self): + return tuple(list(self.resolveipv4) + list(self.resolveipv6)) + + @property + def resolveip(self): + ips4 = self.resolveipv4 + if ips4: + return ips4[0] + ips6 = self.resolveipv6 + if ips6: + return ips6[0] + return None def _normalize_domain(s: str) -> str: @@ -308,7 +344,7 @@ class URLValue: @property def vt(self) -> VTInfo: data = _vt_url_raw(self._u) - return parse_vt_ip(data) if data else VTInfo() + return parse_vt_url(data) if data else VTInfo(object_type="url") def url_ioc(val: Any) -> Optional[URLValue]: @@ -381,7 +417,7 @@ class HashValue: @property def vt(self) -> VTInfo: data = _vt_file_raw(self._h) - return parse_vt_ip(data) if data else VTInfo() + return parse_vt_file(data) if data else VTInfo(object_type="file") @property def mb(self) -> MBInfo: @@ -411,16 +447,16 @@ vd.addGlobals(domain=domain, url_ioc=url_ioc, hash_ioc=hash_ioc) vd.addType( domain, - icon="d", + icon="🌐", formatter=lambda fmt, v: "" if v is None else str(v), name="Domain", ) vd.addType( - url_ioc, icon="u", formatter=lambda fmt, v: "" if v is None else str(v), name="URL" + url_ioc, icon="🔗", formatter=lambda fmt, v: "" if v is None else str(v), name="URL" ) vd.addType( hash_ioc, - icon="#", + icon="🔐", formatter=lambda fmt, v: "" if v is None else str(v), name="Hash", ) @@ -444,3 +480,15 @@ TableSheet.addCommand( vd.addMenuItem("Column", "Type", "Domain", "type-domain") vd.addMenuItem("Column", "Type", "URL (IOC)", "type-url-ioc") vd.addMenuItem("Column", "Type", "Hash", "type-hash") + + +try: + _probe = TableSheet("_probe") + if _probe.getCommand(";d") is None: + TableSheet.bindkey(";d", "type-domain") + if _probe.getCommand(";u") is None: + TableSheet.bindkey(";u", "type-url-ioc") + if _probe.getCommand(";h") is None: + TableSheet.bindkey(";h", "type-hash") +except Exception: + pass diff --git a/config/visidata/plugins/iplib.py b/config/visidata/plugins/iplib.py index 68e318b..703c972 100644 --- a/config/visidata/plugins/iplib.py +++ b/config/visidata/plugins/iplib.py @@ -8,7 +8,7 @@ any Python interpreter. from __future__ import annotations from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple class JSONNode: @@ -124,11 +124,33 @@ class ASNInfo: @dataclass(frozen=True) class VTInfo: + object_type: str = "" malicious: int = 0 suspicious: int = 0 + harmless: int = 0 + undetected: int = 0 + timeout: int = 0 + failure: int = 0 + confirmed_timeout: int = 0 + type_unsupported: int = 0 total: int = 0 reputation: Optional[int] = None + votes_harmless: int = 0 + votes_malicious: int = 0 + score: Optional[float] = None categories: Tuple[str, ...] = () + tags: Tuple[str, ...] = () + names: Tuple[str, ...] = () + name: str = "" + asn: str = "" + as_owner: str = "" + continent: str = "" + country: str = "" + network: str = "" + ip: str = "" + ips: Tuple[str, ...] = () + last_analysis_date: Optional[int] = None + last_modification_date: Optional[int] = None raw: Optional[Dict[str, Any]] = None source: str = "" @@ -136,6 +158,19 @@ class VTInfo: def data(self) -> JSONNode: return JSONNode(self.raw) + @property + def attrs(self) -> JSONNode: + attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {} + return JSONNode(attrs) + + @property + def results(self) -> JSONNode: + return self.attrs.get("last_analysis_results", {}) + + @property + def stats(self) -> JSONNode: + return self.attrs.get("last_analysis_stats", {}) + @property def verdict(self) -> str: return "" if self.total <= 0 else f"{self.malicious}/{self.total}" @@ -144,6 +179,10 @@ class VTInfo: def ratio(self) -> Optional[float]: return None if self.total <= 0 else self.malicious / self.total + @property + def confidence(self) -> Optional[float]: + return self.score + @property def type(self) -> str: # noqa: A003 return self.category @@ -153,6 +192,9 @@ class VTInfo: return self.categories[0] if self.categories else "" def __getattr__(self, name: str) -> Any: + attrs = ((self.raw or {}).get("data") or {}).get("attributes") or {} + if isinstance(attrs, dict) and name in attrs: + return attrs.get(name) if isinstance(self.raw, dict) and name in self.raw: return self.raw.get(name) raise AttributeError(name) @@ -317,42 +359,208 @@ def parse_geo_maxmind(raw: Optional[Dict[str, Any]]) -> GeoInfo: def parse_vt_ip(data: Optional[Dict[str, Any]]) -> VTInfo: + return _parse_vt(data, object_type="ip") + + +def parse_vt_domain(data: Optional[Dict[str, Any]]) -> VTInfo: + return _parse_vt(data, object_type="domain") + + +def parse_vt_url(data: Optional[Dict[str, Any]]) -> VTInfo: + return _parse_vt(data, object_type="url") + + +def parse_vt_file(data: Optional[Dict[str, Any]]) -> VTInfo: + return _parse_vt(data, object_type="file") + + +def _safe_int(v: Any, default: Optional[int] = 0) -> Optional[int]: + try: + return int(v) + except Exception: + return default + + +def _uniq(items: Iterable[str]) -> Tuple[str, ...]: + out: List[str] = [] + seen = set() + for x in items: + s = str(x or "").strip() + if not s or s in seen: + continue + seen.add(s) + out.append(s) + return tuple(out) + + +def _extract_categories(attrs: Dict[str, Any]) -> Tuple[str, ...]: + cats = attrs.get("categories") or {} + if isinstance(cats, dict): + return _uniq(str(v) for v in cats.values() if v) + if isinstance(cats, list): + return _uniq(str(v) for v in cats if v) + return () + + +def _extract_names(attrs: Dict[str, Any], object_type: str) -> Tuple[str, ...]: + names: List[str] = [] + + threat_names = attrs.get("threat_names") + if isinstance(threat_names, list): + names.extend(str(x) for x in threat_names if x) + elif isinstance(threat_names, str) and threat_names: + names.append(threat_names) + + ptc = attrs.get("popular_threat_classification") or {} + if isinstance(ptc, dict): + rows = ptc.get("popular_threat_name") + if isinstance(rows, list): + scored = [] + for row in rows: + if not isinstance(row, dict): + continue + val = str(row.get("value") or "").strip() + if not val: + continue + scored.append((_safe_int(row.get("count"), 0) or 0, val)) + scored.sort(reverse=True) + names.extend(v for _, v in scored) + + suggested = str(ptc.get("suggested_threat_label") or "").strip() + if suggested: + names.append(suggested) + + verdicts = attrs.get("sandbox_verdicts") or {} + if isinstance(verdicts, dict): + for v in verdicts.values(): + if not isinstance(v, dict): + continue + mn = v.get("malware_names") + if isinstance(mn, list): + names.extend(str(x) for x in mn if x) + mc = v.get("malware_classification") + if isinstance(mc, list): + names.extend(str(x) for x in mc if x) + + if object_type == "file": + meaningful = str(attrs.get("meaningful_name") or "").strip() + if meaningful: + names.append(meaningful) + + return _uniq(names) + + +def _extract_ips(attrs: Dict[str, Any]) -> Tuple[str, ...]: + ips: List[str] = [] + + records = attrs.get("last_dns_records") + if isinstance(records, list): + for rec in records: + if not isinstance(rec, dict): + continue + rtype = str(rec.get("type") or "").upper() + if rtype not in ("A", "AAAA"): + continue + val = str(rec.get("value") or "").strip() + if val: + ips.append(val) + + lserv = attrs.get("last_serving_ip_address") + if isinstance(lserv, dict): + val = str(lserv.get("id") or lserv.get("value") or "").strip() + if val: + ips.append(val) + elif isinstance(lserv, str) and lserv: + ips.append(lserv) + + return _uniq(ips) + + +def _parse_vt(data: Optional[Dict[str, Any]], *, object_type: str) -> VTInfo: data = data or {} - attrs = (((data.get("data") or {}).get("attributes") or {}) if isinstance(data, dict) else {}) + attrs = ( + ((data.get("data") or {}).get("attributes") or {}) + if isinstance(data, dict) + else {} + ) stats = attrs.get("last_analysis_stats") or {} + malicious = _safe_int(stats.get("malicious"), 0) or 0 + suspicious = _safe_int(stats.get("suspicious"), 0) or 0 + harmless = _safe_int(stats.get("harmless"), 0) or 0 + undetected = _safe_int(stats.get("undetected"), 0) or 0 + timeout = _safe_int(stats.get("timeout"), 0) or 0 + failure = _safe_int(stats.get("failure"), 0) or 0 + confirmed_timeout = _safe_int(stats.get("confirmed-timeout"), 0) or 0 + type_unsupported = _safe_int(stats.get("type-unsupported"), 0) or 0 try: - malicious = int(stats.get("malicious") or 0) - except Exception: - malicious = 0 - try: - suspicious = int(stats.get("suspicious") or 0) - except Exception: - suspicious = 0 - try: - total = int(sum(int(v or 0) for v in stats.values())) + total = int(sum(_safe_int(v, 0) or 0 for v in stats.values())) except Exception: total = 0 - cats = attrs.get("categories") or {} - if isinstance(cats, dict): - categories = tuple(str(v) for v in cats.values() if v) - elif isinstance(cats, list): - categories = tuple(str(v) for v in cats if v) - else: - categories = () + categories = _extract_categories(attrs) + names = _extract_names(attrs, object_type=object_type) + tags = _uniq(str(x) for x in (attrs.get("tags") or []) if x) + ips = _extract_ips(attrs) - rep = attrs.get("reputation") - try: - reputation = int(rep) if rep is not None else None - except Exception: - reputation = None + rep = _safe_int(attrs.get("reputation"), None) + + votes = attrs.get("total_votes") or {} + votes_harmless = 0 + votes_malicious = 0 + if isinstance(votes, dict): + votes_harmless = _safe_int(votes.get("harmless"), 0) or 0 + votes_malicious = _safe_int(votes.get("malicious"), 0) or 0 + + asn_raw = attrs.get("asn") + asn = "" + if asn_raw is not None: + sval = str(asn_raw).strip() + if sval: + asn = sval if sval.upper().startswith("AS") else f"AS{sval}" + + ip = "" + for x in ips: + if "." in x: + ip = x + break + if not ip and ips: + ip = ips[0] + + score = None if total <= 0 else malicious / total + + name = names[0] if names else "" + + la = _safe_int(attrs.get("last_analysis_date"), None) + lm = _safe_int(attrs.get("last_modification_date"), None) return VTInfo( + object_type=object_type, malicious=malicious, suspicious=suspicious, + harmless=harmless, + undetected=undetected, + timeout=timeout, + failure=failure, + confirmed_timeout=confirmed_timeout, + type_unsupported=type_unsupported, total=total, - reputation=reputation, + reputation=rep, + votes_harmless=votes_harmless, + votes_malicious=votes_malicious, + score=score, categories=categories, + tags=tags, + names=names, + name=name, + asn=asn, + as_owner=str(attrs.get("as_owner") or ""), + continent=str(attrs.get("continent") or ""), + country=str(attrs.get("country") or ""), + network=str(attrs.get("network") or ""), + ip=ip, + ips=ips, + last_analysis_date=la, + last_modification_date=lm, raw=data, source="virustotal", ) diff --git a/config/visidata/plugins/iptype.py b/config/visidata/plugins/iptype.py index 27d71b6..eb0c994 100644 --- a/config/visidata/plugins/iptype.py +++ b/config/visidata/plugins/iptype.py @@ -54,6 +54,60 @@ def _is_nullish(v: Any) -> bool: return v is None or v == "" or v == "null" +_DOC_NETS_V4 = ( + ipaddress.ip_network("192.0.2.0/24"), + ipaddress.ip_network("198.51.100.0/24"), + ipaddress.ip_network("203.0.113.0/24"), +) +_SHARED_NET_V4 = ipaddress.ip_network("100.64.0.0/10") +_BENCHMARK_NET_V4 = ipaddress.ip_network("198.18.0.0/15") +_DOC_NET_V6 = ipaddress.ip_network("2001:db8::/32") +_ULA_NET_V6 = ipaddress.ip_network("fc00::/7") + + +def _obj_net( + obj: Union[ipaddress._BaseAddress, ipaddress._BaseNetwork], +) -> ipaddress._BaseNetwork: + if isinstance(obj, ipaddress._BaseNetwork): + return obj + maxp = 32 if int(obj.version) == 4 else 128 + return ipaddress.ip_network(f"{obj}/{maxp}", strict=False) + + +def _rfc_type(obj: Union[ipaddress._BaseAddress, ipaddress._BaseNetwork]) -> str: + net = _obj_net(obj) + + if net.version == 4: + if any(net.subnet_of(n) for n in _DOC_NETS_V4): + return "documentation" + if net.subnet_of(_SHARED_NET_V4): + return "shared" + if net.subnet_of(_BENCHMARK_NET_V4): + return "benchmark" + else: + if net.subnet_of(_DOC_NET_V6): + return "documentation" + if net.subnet_of(_ULA_NET_V6): + return "unique-local" + + a = net.network_address + if a.is_loopback: + return "loopback" + if a.is_link_local: + return "link-local" + if a.is_multicast: + return "multicast" + if a.is_unspecified: + return "unspecified" + if a.is_reserved: + return "reserved" + if a.is_private: + return "private" + if a.is_global: + return "global" + return "special" + + def _ipinfo_token() -> str: return str(opt("tke_ipinfo_token", "") or os.getenv("IPINFO_TOKEN") or "") @@ -282,14 +336,83 @@ class IPValue: def version(self) -> int: return int(self._obj.version) + @property + def family(self) -> str: + return "ipv4" if self.version == 4 else "ipv6" + @property def is_network(self) -> bool: return isinstance(self._obj, ipaddress._BaseNetwork) + @property + def is_cidr(self) -> bool: + return self.is_network + @property def is_address(self) -> bool: return isinstance(self._obj, ipaddress._BaseAddress) + def _network_view(self) -> ipaddress._BaseNetwork: + return _obj_net(self._obj) + + @property + def kind(self) -> str: + if self.is_network: + return "cidr4" if self.version == 4 else "cidr6" + return "ipv4" if self.version == 4 else "ipv6" + + @property + def type(self) -> str: # noqa: A003 + return self.kind + + @property + def prefixlen(self) -> int: + return int(self._network_view().prefixlen) + + @property + def netmask(self) -> str: + return str(self._network_view().netmask) + + @property + def mask(self) -> str: + return self.netmask + + @property + def identity(self) -> str: + return str(self._network_view().network_address) + + @property + def broadcast(self) -> str: + return str(self._network_view().broadcast_address) + + @property + def range(self) -> str: + n = self._network_view() + return f"{n.network_address}-{n.broadcast_address}" + + @property + def address_count(self) -> int: + return int(self._network_view().num_addresses) + + @property + def hostcount(self) -> int: + n = self._network_view() + if n.version == 6: + return int(n.num_addresses) + if n.prefixlen == 32: + return 1 + if n.prefixlen == 31: + return 2 + return max(0, int(n.num_addresses) - 2) + + @property + def rfc_type(self) -> str: + return _rfc_type(self._obj) + + @property + def classification(self) -> str: + return self.rfc_type + @property def sort_key(self) -> Tuple[int, int, int, int]: # (version, kind, addrint, prefixlen) @@ -438,7 +561,7 @@ vd.addGlobals(ip=ip) vd.addType( - ip, icon=":", formatter=lambda fmt, v: "" if v is None else str(v), name="IP" + ip, icon="🛜", formatter=lambda fmt, v: "" if v is None else str(v), name="IP" ) TableSheet.addCommand( @@ -449,3 +572,11 @@ TableSheet.addCommand( ) vd.addMenuItem("Column", "Type", "IP (IPv4/IPv6/CIDR)", "type-ip") + + +try: + _probe = TableSheet("_probe") + if _probe.getCommand(";i") is None: + TableSheet.bindkey(";i", "type-ip") +except Exception: + pass diff --git a/config/visidata/scripts/validate_ip_lookups.py b/config/visidata/scripts/validate_ip_lookups.py index 4683992..7729de0 100644 --- a/config/visidata/scripts/validate_ip_lookups.py +++ b/config/visidata/scripts/validate_ip_lookups.py @@ -26,7 +26,10 @@ from plugins.iplib import ( # noqa: E402 parse_geo_ipinfo, parse_geo_ipwho, parse_geo_maxmind, + parse_vt_domain, + parse_vt_file, parse_vt_ip, + parse_vt_url, ) @@ -56,19 +59,43 @@ def main() -> int: _assert(str(ipi.data.country) == "US", "ipinfo.data.country") # ASNInfo basics - asn = ASNInfo(asn="AS15169", name="Google LLC", country="US", raw={"org": "AS15169 Google LLC"}, source="ipinfo") + asn = ASNInfo( + asn="AS15169", + name="Google LLC", + country="US", + raw={"org": "AS15169 Google LLC"}, + source="ipinfo", + ) _assert(asn.asn == "AS15169", "asn.asn") _assert(str(asn.data.org) == "AS15169 Google LLC", "asn.data.org") # VTInfo basics - vt = VTInfo(malicious=3, suspicious=1, total=94, categories=("search engine",), raw={"data": "x"}, source="virustotal") + vt = VTInfo( + malicious=3, + suspicious=1, + harmless=90, + total=94, + score=3 / 94, + categories=("search engine",), + names=("foo", "bar"), + name="foo", + raw={"data": "x"}, + source="virustotal", + ) _assert(vt.verdict == "3/94", "vt.verdict") _assert(vt.category == "search engine", "vt.category") _assert(vt.type == "search engine", "vt.type alias") + _assert(vt.name == "foo" and vt.names[1] == "bar", "vt names") + _assert(vt.confidence is not None and vt.confidence > 0, "vt.confidence") # Parse helpers _assert(parse_asn_ipinfo(ipinfo_raw).asn == "AS15169", "parse_asn_ipinfo") - ipapi_raw = {"asn": "AS123", "org": "Example ISP", "country_code": "DE", "country_name": "Germany"} + ipapi_raw = { + "asn": "AS123", + "org": "Example ISP", + "country_code": "DE", + "country_name": "Germany", + } _assert(parse_asn_ipapi(ipapi_raw).asn == "AS123", "parse_asn_ipapi") ipwho_raw = {"country_code": "NL", "connection": {"asn": 9009, "isp": "M247"}} _assert(parse_asn_ipwho(ipwho_raw).asn == "AS9009", "parse_asn_ipwho") @@ -76,29 +103,131 @@ def main() -> int: geo1 = parse_geo_ipinfo(ipinfo_raw) _assert(geo1.country_code == "US", "parse_geo_ipinfo country_code") _assert(geo1.lat is not None and geo1.lon is not None, "parse_geo_ipinfo loc") - geo2 = parse_geo_ipapi({"country_code": "DE", "country_name": "Germany", "latitude": 1, "longitude": 2}) + geo2 = parse_geo_ipapi( + {"country_code": "DE", "country_name": "Germany", "latitude": 1, "longitude": 2} + ) _assert(geo2.country_code == "DE", "parse_geo_ipapi country_code") - geo3 = parse_geo_ipwho({"country_code": "NL", "country": "Netherlands", "latitude": 1, "longitude": 2}) + geo3 = parse_geo_ipwho( + {"country_code": "NL", "country": "Netherlands", "latitude": 1, "longitude": 2} + ) _assert(geo3.country_code == "NL", "parse_geo_ipwho country_code") - mm_raw = {"country": {"iso_code": "US", "names": {"en": "United States"}}, "location": {"latitude": 1, "longitude": 2}} + mm_raw = { + "country": {"iso_code": "US", "names": {"en": "United States"}}, + "location": {"latitude": 1, "longitude": 2}, + } mm = parse_geo_maxmind(mm_raw) - _assert(mm.country_code == "US" and mm.country == "United States", "parse_geo_maxmind") + _assert( + mm.country_code == "US" and mm.country == "United States", "parse_geo_maxmind" + ) vt_raw = { "data": { "attributes": { - "last_analysis_stats": {"malicious": 2, "suspicious": 1, "harmless": 10}, + "last_analysis_stats": { + "malicious": 2, + "suspicious": 1, + "harmless": 10, + }, "reputation": 5, "categories": {"foo": "search engine"}, + "asn": 15169, + "as_owner": "Google", + "country": "US", + "network": "8.8.8.0/24", } } } vt2 = parse_vt_ip(vt_raw) _assert(vt2.verdict == "2/13", "parse_vt_ip verdict") + _assert(vt2.score is not None and vt2.score > 0, "parse_vt_ip score") + _assert(vt2.asn == "AS15169", "parse_vt_ip asn") + + vd = parse_vt_domain( + { + "data": { + "attributes": { + "last_analysis_stats": {"malicious": 1, "undetected": 9}, + "last_dns_records": [ + {"type": "A", "value": "1.2.3.4"}, + {"type": "AAAA", "value": "2001:db8::1"}, + ], + "categories": {"x": "phishing"}, + } + } + } + ) + _assert(vd.ip == "1.2.3.4", "parse_vt_domain last ip") + _assert(vd.ips == ("1.2.3.4", "2001:db8::1"), "parse_vt_domain ips") + _assert(vd.verdict == "1/10", "parse_vt_domain verdict") + + vf = parse_vt_file( + { + "data": { + "attributes": { + "last_analysis_stats": {"malicious": 5, "undetected": 5}, + "popular_threat_classification": { + "popular_threat_name": [ + {"value": "emotet", "count": 10}, + {"value": "trojan", "count": 7}, + ], + "suggested_threat_label": "trojan.emotet", + }, + "meaningful_name": "sample.exe", + } + } + } + ) + _assert(vf.name == "emotet", "parse_vt_file best name") + _assert("trojan.emotet" in vf.names, "parse_vt_file names") + _assert(vf.score == 0.5, "parse_vt_file score") + + vu = parse_vt_url( + { + "data": { + "attributes": { + "last_analysis_stats": {"malicious": 0, "undetected": 10}, + "threat_names": ["brand-impersonation"], + } + } + } + ) + _assert(vu.name == "brand-impersonation", "parse_vt_url name") + + try: + from plugins.iptype import ip as ipconv + + n = ipconv("192.168.12.1/24") + if n is None: + raise AssertionError("ip converter network parse") + _assert(n.type == "cidr4", "cidr type") + _assert(n.mask == "255.255.255.0", "cidr netmask") + _assert(n.identity == "192.168.12.0", "cidr identity") + _assert(n.broadcast == "192.168.12.255", "cidr broadcast") + _assert(n.range == "192.168.12.0-192.168.12.255", "cidr range") + _assert(n.hostcount == 254, "cidr hostcount") + _assert(n.rfc_type == "private", "cidr rfc type") + + a = ipconv("8.8.8.8") + if a is None: + raise AssertionError("ip converter ip parse") + _assert(a.type == "ipv4", "ip type") + _assert(a.rfc_type == "global", "ip rfc type") + except ModuleNotFoundError: + # Script is still useful in limited environments without VisiData runtime. + pass # GeoInfo basics - geo = GeoInfo(country="United States", country_code="US", region="California", city="Mountain View", lat=1.0, lon=2.0, raw={"country": {"iso_code": "US"}}, source="maxmind") + geo = GeoInfo( + country="United States", + country_code="US", + region="California", + city="Mountain View", + lat=1.0, + lon=2.0, + raw={"country": {"iso_code": "US"}}, + source="maxmind", + ) _assert(geo.country_code == "US", "geo.country_code") _assert(str(geo.data.country.iso_code) == "US", "geo.data.country.iso_code") diff --git a/config/visidata/showcase_ioc.vdj b/config/visidata/showcase_ioc.vdj index df48e6c..8bb7b98 100644 --- a/config/visidata/showcase_ioc.vdj +++ b/config/visidata/showcase_ioc.vdj @@ -12,6 +12,13 @@ {"sheet": "showcase_ioc", "col": "url", "row": "", "longname": "addcol-expr", "input": "url.domain", "keystrokes": "=", "comment": "Convert URL host into DomainValue"} {"sheet": "showcase_ioc", "col": "url", "row": "", "longname": "addcol-expr", "input": "url.parts.path", "keystrokes": "=", "comment": "Show parsed URL path"} {"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "file_hash.kind", "keystrokes": "=", "comment": "Detect MD5/SHA1/SHA256 hash kind"} +{"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "src_ip.type", "keystrokes": "=", "comment": "IP kind (ipv4/ipv6/cidr4/cidr6)"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.mask", "keystrokes": "=", "comment": "CIDR netmask"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.range", "keystrokes": "=", "comment": "CIDR full range"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.broadcast", "keystrokes": "=", "comment": "CIDR broadcast/last IP"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.identity", "keystrokes": "=", "comment": "CIDR network identity"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.hostcount", "keystrokes": "=", "comment": "CIDR hostcount"} +{"sheet": "showcase_ioc", "col": "network", "row": "", "longname": "addcol-expr", "input": "network.rfc_type", "keystrokes": "=", "comment": "CIDR RFC classification"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.ipinfo.country or ''", "keystrokes": "=", "comment": "IPInfo country (limited rows to keep demo fast)"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.ipinfo.org or ''", "keystrokes": "=", "comment": "IPInfo org (limited rows to keep demo fast)"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.asn.asn or ''", "keystrokes": "=", "comment": "ASN lookup (limited rows to keep demo fast)"} @@ -21,8 +28,17 @@ {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and src_ip and src_ip.vt.verdict or ''", "keystrokes": "=", "comment": "VirusTotal IP verdict (single row for rate-limited API)"} {"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and file_hash and file_hash.vt.verdict or ''", "keystrokes": "=", "comment": "VirusTotal hash verdict (single row for rate-limited API)"} {"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and file_hash and file_hash.vt.malicious or ''", "keystrokes": "=", "comment": "VirusTotal hash malicious count (single row)"} +{"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and file_hash and file_hash.vt.score or ''", "keystrokes": "=", "comment": "VirusTotal hash score (single row)"} +{"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and file_hash and file_hash.vt.name or ''", "keystrokes": "=", "comment": "VirusTotal hash best malware name"} +{"sheet": "showcase_ioc", "col": "file_hash", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and file_hash and ','.join(file_hash.vt.names) or ''", "keystrokes": "=", "comment": "VirusTotal hash all malware names"} {"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and domain and domain.vt.verdict or ''", "keystrokes": "=", "comment": "VirusTotal domain verdict (single row)"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and domain and domain.vt.ip or ''", "keystrokes": "=", "comment": "VirusTotal domain last known IP"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and domain and ','.join(domain.vt.ips) or ''", "keystrokes": "=", "comment": "VirusTotal domain all known IPs"} {"sheet": "showcase_ioc", "col": "url", "row": "", "longname": "addcol-expr", "input": "event_id == 'evt-001' and url and url.vt.verdict or ''", "keystrokes": "=", "comment": "VirusTotal URL verdict (single row)"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and domain and domain.resolveip or ''", "keystrokes": "=", "comment": "Resolve first IP (A then AAAA)"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and domain and ','.join([str(x) for x in domain.resolveipv4]) or ''", "keystrokes": "=", "comment": "Resolve IPv4 addresses"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and domain and ','.join([str(x) for x in domain.resolveipv6]) or ''", "keystrokes": "=", "comment": "Resolve IPv6 addresses"} +{"sheet": "showcase_ioc", "col": "domain", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and domain and ','.join([str(x) for x in domain.resolveips]) or ''", "keystrokes": "=", "comment": "Resolve all IP addresses"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.country() or ''", "keystrokes": "=", "comment": "Best country helper"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.geo.source or ''", "keystrokes": "=", "comment": "Geo provider source"} {"sheet": "showcase_ioc", "col": "src_ip", "row": "", "longname": "addcol-expr", "input": "event_id in ('evt-001','evt-002') and src_ip and src_ip.asn.source or ''", "keystrokes": "=", "comment": "ASN provider source"}