Centralize provider caching and rate-limit handling, then add Domain/URL/Hash IOC types and safer VT/IPInfo key resolution so lookups stay reliable on free-tier APIs.
85 lines
2.1 KiB
Python
85 lines
2.1 KiB
Python
"""
|
|
Pure-Python IOC helpers for domains/urls/hashes.
|
|
|
|
No VisiData imports; safe to unit-test with any interpreter.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Optional, Tuple
|
|
|
|
from .iplib import JSONNode
|
|
import base64
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class URLParts:
|
|
scheme: str = ""
|
|
username: str = ""
|
|
password: str = ""
|
|
host: str = ""
|
|
port: Optional[int] = None
|
|
path: str = ""
|
|
query: str = ""
|
|
fragment: str = ""
|
|
|
|
@property
|
|
def data(self) -> JSONNode:
|
|
return JSONNode(
|
|
{
|
|
"scheme": self.scheme,
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"host": self.host,
|
|
"port": self.port,
|
|
"path": self.path,
|
|
"query": self.query,
|
|
"fragment": self.fragment,
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class MBInfo:
|
|
"""MalwareBazaar hash info (abuse.ch)."""
|
|
|
|
status: str = "" # query_status
|
|
signatures: Tuple[str, ...] = ()
|
|
tags: Tuple[str, ...] = ()
|
|
raw: Optional[Dict[str, Any]] = None
|
|
source: str = "malwarebazaar"
|
|
|
|
@property
|
|
def data(self) -> JSONNode:
|
|
return JSONNode(self.raw)
|
|
|
|
@property
|
|
def signature(self) -> str:
|
|
return self.signatures[0] if self.signatures else ""
|
|
|
|
|
|
def parse_mb_info(raw: Optional[Dict[str, Any]]) -> MBInfo:
|
|
raw = raw or {}
|
|
status = str(raw.get("query_status") or "")
|
|
sigs = []
|
|
tags = []
|
|
|
|
data = raw.get("data")
|
|
if isinstance(data, list) and data:
|
|
item = data[0] if isinstance(data[0], dict) else {}
|
|
sig = item.get("signature")
|
|
if sig:
|
|
sigs.append(str(sig))
|
|
t = item.get("tags")
|
|
if isinstance(t, list):
|
|
tags.extend(str(x) for x in t if x)
|
|
|
|
return MBInfo(status=status, signatures=tuple(sigs), tags=tuple(tags), raw=raw)
|
|
|
|
|
|
def vt_url_id(url: str) -> str:
|
|
"""Compute VirusTotal URL ID (urlsafe base64 without padding)."""
|
|
b = base64.urlsafe_b64encode(url.encode("utf-8")).decode("ascii")
|
|
return b.rstrip("=")
|