111 lines
4.0 KiB
Python
111 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Offline validation for normalization/extraction logic in plugins/iptype.py.
|
|
|
|
This intentionally does not perform live network calls (which may be blocked in CI/sandboxes).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
|
|
# Ensure repo root is on sys.path when running as a script.
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from plugins.iplib import ( # noqa: E402
|
|
ASNInfo,
|
|
GeoInfo,
|
|
IPInfo,
|
|
JSONNode,
|
|
VTInfo,
|
|
parse_asn_ipapi,
|
|
parse_asn_ipinfo,
|
|
parse_asn_ipwho,
|
|
parse_geo_ipapi,
|
|
parse_geo_ipinfo,
|
|
parse_geo_ipwho,
|
|
parse_geo_maxmind,
|
|
parse_vt_ip,
|
|
)
|
|
|
|
|
|
def _assert(cond: bool, msg: str):
|
|
if not cond:
|
|
raise AssertionError(msg)
|
|
|
|
|
|
def main() -> int:
|
|
# JSONNode chaining behavior
|
|
n = JSONNode({"a": {"b": 1}, "x": None})
|
|
_assert(str(n.a.b) == "1", "JSONNode dict nesting")
|
|
_assert(str(n.missing.anything) == "", "JSONNode missing keys should render empty")
|
|
_assert(str(n.x.y) == "", "JSONNode None chaining should render empty")
|
|
|
|
# ipinfo basic
|
|
ipinfo_raw = {
|
|
"country": "US",
|
|
"region": "California",
|
|
"city": "Mountain View",
|
|
"org": "AS15169 Google LLC",
|
|
"loc": "37.4056,-122.0775",
|
|
}
|
|
ipi = IPInfo(ipinfo_raw, source="ipinfo")
|
|
_assert(ipi.country == "US", "ipinfo.country")
|
|
_assert(ipi.asn == "AS15169", "ipinfo.asn derived from org")
|
|
_assert(str(ipi.data.country) == "US", "ipinfo.data.country")
|
|
|
|
# ASNInfo basics
|
|
asn = ASNInfo(asn="AS15169", name="Google LLC", country="US", raw={"org": "AS15169 Google LLC"}, source="ipinfo")
|
|
_assert(asn.asn == "AS15169", "asn.asn")
|
|
_assert(str(asn.data.org) == "AS15169 Google LLC", "asn.data.org")
|
|
|
|
# VTInfo basics
|
|
vt = VTInfo(malicious=3, suspicious=1, total=94, categories=("search engine",), raw={"data": "x"}, source="virustotal")
|
|
_assert(vt.verdict == "3/94", "vt.verdict")
|
|
_assert(vt.category == "search engine", "vt.category")
|
|
_assert(vt.type == "search engine", "vt.type alias")
|
|
|
|
# Parse helpers
|
|
_assert(parse_asn_ipinfo(ipinfo_raw).asn == "AS15169", "parse_asn_ipinfo")
|
|
ipapi_raw = {"asn": "AS123", "org": "Example ISP", "country_code": "DE", "country_name": "Germany"}
|
|
_assert(parse_asn_ipapi(ipapi_raw).asn == "AS123", "parse_asn_ipapi")
|
|
ipwho_raw = {"country_code": "NL", "connection": {"asn": 9009, "isp": "M247"}}
|
|
_assert(parse_asn_ipwho(ipwho_raw).asn == "AS9009", "parse_asn_ipwho")
|
|
|
|
geo1 = parse_geo_ipinfo(ipinfo_raw)
|
|
_assert(geo1.country_code == "US", "parse_geo_ipinfo country_code")
|
|
_assert(geo1.lat is not None and geo1.lon is not None, "parse_geo_ipinfo loc")
|
|
geo2 = parse_geo_ipapi({"country_code": "DE", "country_name": "Germany", "latitude": 1, "longitude": 2})
|
|
_assert(geo2.country_code == "DE", "parse_geo_ipapi country_code")
|
|
geo3 = parse_geo_ipwho({"country_code": "NL", "country": "Netherlands", "latitude": 1, "longitude": 2})
|
|
_assert(geo3.country_code == "NL", "parse_geo_ipwho country_code")
|
|
|
|
mm_raw = {"country": {"iso_code": "US", "names": {"en": "United States"}}, "location": {"latitude": 1, "longitude": 2}}
|
|
mm = parse_geo_maxmind(mm_raw)
|
|
_assert(mm.country_code == "US" and mm.country == "United States", "parse_geo_maxmind")
|
|
|
|
vt_raw = {
|
|
"data": {
|
|
"attributes": {
|
|
"last_analysis_stats": {"malicious": 2, "suspicious": 1, "harmless": 10},
|
|
"reputation": 5,
|
|
"categories": {"foo": "search engine"},
|
|
}
|
|
}
|
|
}
|
|
vt2 = parse_vt_ip(vt_raw)
|
|
_assert(vt2.verdict == "2/13", "parse_vt_ip verdict")
|
|
|
|
# GeoInfo basics
|
|
geo = GeoInfo(country="United States", country_code="US", region="California", city="Mountain View", lat=1.0, lon=2.0, raw={"country": {"iso_code": "US"}}, source="maxmind")
|
|
_assert(geo.country_code == "US", "geo.country_code")
|
|
_assert(str(geo.data.country.iso_code) == "US", "geo.data.country.iso_code")
|
|
|
|
print("ok")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|