Changed Diskcache to sqlite with adjustable timeout

This commit is contained in:
tke
2023-05-05 11:13:11 +02:00
parent b114dceb2c
commit 724378945c

View File

@@ -10,24 +10,39 @@ import json
from urllib.parse import unquote_plus from urllib.parse import unquote_plus
import os.path import os.path
import shelve import pickle
import time
import sqlite3
cache_path = os.path.expanduser('~/.visidata_cache') cache_path = os.path.expanduser('~/.visidata_cache.db')
def disk_cache_decorator(func): def init_cache_db():
@functools.lru_cache(maxsize=1000) with sqlite3.connect(cache_path) as conn:
def get_from_shelve(*args, **kwargs): conn.execute('''CREATE TABLE IF NOT EXISTS cache
with shelve.open(cache_path) as cache: (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
init_cache_db()
def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def decorator(func):
@functools.lru_cache(maxsize=lru_cache_size)
def get_from_sqlite(*args, **kwargs):
key = f"{func.__name__}:{str(args)}:{str(kwargs)}" key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
if key in cache: with sqlite3.connect(cache_path) as conn:
return cache[key] cursor = conn.cursor()
else: cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,))
result = func(*args, **kwargs) row = cursor.fetchone()
cache[key] = result current_time = int(time.time())
return result if row and (max_age is None or current_time - row[1] <= max_age):
return get_from_shelve return pickle.loads(row[0])
else:
result = func(*args, **kwargs)
serialized_value = pickle.dumps(result)
cursor.execute('INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)', (key, serialized_value, current_time))
conn.commit()
return result
return get_from_sqlite
return decorator
def decode_url_safe(url_safe_string): def decode_url_safe(url_safe_string):
utf8_string = unquote_plus(url_safe_string) utf8_string = unquote_plus(url_safe_string)
return utf8_string return utf8_string
@@ -92,7 +107,7 @@ def sym_time(val):
return datetime.fromtimestamp(b) return datetime.fromtimestamp(b)
@disk_cache_decorator @functools.lru_cache(maxsize=1000)
def vendor(mac): def vendor(mac):
try: try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
@@ -115,7 +130,7 @@ def _get_vt():
except: except:
return None return None
@disk_cache_decorator @disk_cache_decorator()
def vt_ip(ip): def vt_ip(ip):
vt = _get_vt() vt = _get_vt()
if vt is None: if vt is None:
@@ -124,7 +139,7 @@ def vt_ip(ip):
return response return response
@disk_cache_decorator @disk_cache_decorator()
def vt_file(hash): def vt_file(hash):
vt = _get_vt() vt = _get_vt()
if vt is None: if vt is None:
@@ -133,7 +148,7 @@ def vt_file(hash):
return response return response
@disk_cache_decorator @disk_cache_decorator()
def dns_lookup(domain, record='A'): def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1: if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")]) return ",".join([dns_lookup(x, record) for x in domain.split(",")])
@@ -150,7 +165,7 @@ def dns_lookup(domain, record='A'):
except ModuleNotFoundError: except ModuleNotFoundError:
return "module not available" return "module not available"
@disk_cache_decorator @disk_cache_decorator()
def _asn(ip): def _asn(ip):
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import requests import requests
@@ -170,7 +185,7 @@ def _asn(ip):
res['name']=" ".join(name_split[:-1]) res['name']=" ".join(name_split[:-1])
return res return res
@disk_cache_decorator @disk_cache_decorator()
def asn(ip, type="asn"): def asn(ip, type="asn"):
if len(ip.split(",")) > 1: if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")]) return ",".join([_asn(x, type) for x in ip.split(",")])
@@ -179,7 +194,7 @@ def asn(ip, type="asn"):
except: except:
return "" return ""
@disk_cache_decorator @disk_cache_decorator()
def _ipinfo(ip): def _ipinfo(ip):
try: try:
import requests import requests
@@ -220,7 +235,7 @@ def mx_lookup(domain):
return str(e) return str(e)
@disk_cache_decorator @disk_cache_decorator(max_age=60*60*24)
def _grab_banner(ip, port=25): def _grab_banner(ip, port=25):
try: try:
import socket import socket