Extended caching capability

All cached function calls are now persistent on disk in ~/.visidata_cache
This commit is contained in:
tke
2023-05-04 16:41:02 +02:00
parent a42a20afc5
commit b114dceb2c

View File

@@ -9,6 +9,25 @@ import json
from urllib.parse import unquote_plus from urllib.parse import unquote_plus
import os.path
import shelve
cache_path = os.path.expanduser('~/.visidata_cache')
def disk_cache_decorator(func):
@functools.lru_cache(maxsize=1000)
def get_from_shelve(*args, **kwargs):
with shelve.open(cache_path) as cache:
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
if key in cache:
return cache[key]
else:
result = func(*args, **kwargs)
cache[key] = result
return result
return get_from_shelve
def decode_url_safe(url_safe_string): def decode_url_safe(url_safe_string):
utf8_string = unquote_plus(url_safe_string) utf8_string = unquote_plus(url_safe_string)
return utf8_string return utf8_string
@@ -73,7 +92,7 @@ def sym_time(val):
return datetime.fromtimestamp(b) return datetime.fromtimestamp(b)
@functools.lru_cache() @disk_cache_decorator
def vendor(mac): def vendor(mac):
try: try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
@@ -96,7 +115,7 @@ def _get_vt():
except: except:
return None return None
@functools.lru_cache() @disk_cache_decorator
def vt_ip(ip): def vt_ip(ip):
vt = _get_vt() vt = _get_vt()
if vt is None: if vt is None:
@@ -104,7 +123,8 @@ def vt_ip(ip):
response = vt.get_ip_report(ip) response = vt.get_ip_report(ip)
return response return response
@functools.lru_cache()
@disk_cache_decorator
def vt_file(hash): def vt_file(hash):
vt = _get_vt() vt = _get_vt()
if vt is None: if vt is None:
@@ -113,7 +133,7 @@ def vt_file(hash):
return response return response
@functools.lru_cache(maxsize=1000) @disk_cache_decorator
def dns_lookup(domain, record='A'): def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1: if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")]) return ",".join([dns_lookup(x, record) for x in domain.split(",")])
@@ -130,7 +150,7 @@ def dns_lookup(domain, record='A'):
except ModuleNotFoundError: except ModuleNotFoundError:
return "module not available" return "module not available"
@functools.lru_cache() @disk_cache_decorator
def _asn(ip): def _asn(ip):
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import requests import requests
@@ -150,7 +170,7 @@ def _asn(ip):
res['name']=" ".join(name_split[:-1]) res['name']=" ".join(name_split[:-1])
return res return res
@functools.lru_cache() @disk_cache_decorator
def asn(ip, type="asn"): def asn(ip, type="asn"):
if len(ip.split(",")) > 1: if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")]) return ",".join([_asn(x, type) for x in ip.split(",")])
@@ -159,7 +179,7 @@ def asn(ip, type="asn"):
except: except:
return "" return ""
@functools.lru_cache(maxsize=1000) @disk_cache_decorator
def _ipinfo(ip): def _ipinfo(ip):
try: try:
import requests import requests
@@ -172,7 +192,6 @@ def _ipinfo(ip):
return None return None
@functools.lru_cache()
def ipinfo(ip, type="country"): def ipinfo(ip, type="country"):
if len(ip.split(",")) > 1: if len(ip.split(",")) > 1:
return ",".join([ipinfo(x, type) for x in ip.split(",")]) return ",".join([ipinfo(x, type) for x in ip.split(",")])
@@ -191,7 +210,6 @@ def split_number2ip(number):
else: else:
return number return number
@functools.lru_cache()
def mx_lookup(domain): def mx_lookup(domain):
domain = domain.lstrip("www.") domain = domain.lstrip("www.")
try: try:
@@ -202,10 +220,8 @@ def mx_lookup(domain):
return str(e) return str(e)
@functools.lru_cache(maxsize=1000) @disk_cache_decorator
def grab_banner(ip, port=25): def _grab_banner(ip, port=25):
if len(ip.split(",")) > 1:
return ",".join([grab_banner(x, port) for x in ip.split(",")])
try: try:
import socket import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
@@ -213,8 +229,11 @@ def grab_banner(ip, port=25):
sock.connect((ip, port)) sock.connect((ip, port))
ret = sock.recv(1024) ret = sock.recv(1024)
return str(ret.strip().decode()) return str(ret.strip().decode())
except: except Exception as e:
return "" return f"!Error {ip} {e}"
def grab_banner(ip, port=25):
return ",".join([_grab_banner(x.strip(), port) for x in ip.split(",")])
def sym_id(val): def sym_id(val):