visidatarc: v3.3-safe config + helper fixes

This commit is contained in:
tobias
2026-02-14 11:15:10 +01:00
parent d994b57fee
commit 7e45c6f843

View File

@@ -1,7 +1,16 @@
# copy or link this file to ~/.visidatarc # Install targets (VisiData v3.3):
# - macOS default: ~/Library/Preferences/visidata/config.py
# - XDG default: ${XDG_CONFIG_HOME:-~/.config}/visidata/config.py
# - legacy fallback:~/.visidatarc
options.disp_date_fmt="%Y-%m-%dT%H:%M:%S" options.disp_date_fmt="%Y-%m-%dT%H:%M:%S"
import plugins.hidecol # User plugins are typically under $VD_DIR/plugins (default ~/.visidata/plugins).
# VisiData adds $VD_DIR to sys.path, so they import as `plugins.<name>`.
# Keep local/custom plugins optional so missing files don't break startup.
try:
import plugins.hidecol
except ModuleNotFoundError:
pass
from datetime import datetime from datetime import datetime
import functools import functools
@@ -14,14 +23,26 @@ import pickle
import time import time
import sqlite3 import sqlite3
# VisiData loads the config via exec(code, vd.getGlobals(), newdefs).
# Functions defined here capture vd.getGlobals() as their global namespace, but
# `import ...` statements assign into `newdefs` (locals). Export key imports into
# the real globals dict so decorators and function bodies can resolve them.
globals().update({
'datetime': datetime,
'functools': functools,
'json': json,
'unquote_plus': unquote_plus,
'os': os,
'pickle': pickle,
'time': time,
'sqlite3': sqlite3,
})
cache_path = os.path.expanduser('~/.visidata_cache.db') cache_path = os.path.expanduser('~/.visidata_cache.db')
def init_cache_db(): def _ensure_cache_db(conn):
with sqlite3.connect(cache_path) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS cache
conn.execute('''CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
(key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
init_cache_db()
def disk_cache_decorator(max_age=None, lru_cache_size=1000): def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def decorator(func): def decorator(func):
@@ -29,6 +50,7 @@ def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def get_from_sqlite(*args, **kwargs): def get_from_sqlite(*args, **kwargs):
key = f"{func.__name__}:{str(args)}:{str(kwargs)}" key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
with sqlite3.connect(cache_path) as conn: with sqlite3.connect(cache_path) as conn:
_ensure_cache_db(conn)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,)) cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,))
row = cursor.fetchone() row = cursor.fetchone()
@@ -53,16 +75,38 @@ def what(item):
return f"{type(item)}:{str(item)}" return f"{type(item)}:{str(item)}"
def avgdiff(values): def avgdiff(values):
L = sorted(values) # Average absolute delta between adjacent sorted values.
# Return None for <2 values to avoid ZeroDivisionError and to render blank in VisiData.
nums = []
for v in values:
if v in (None, "", "null"):
continue
try:
nums.append(float(v))
except Exception:
continue
L = sorted(nums)
if len(L) < 2:
return None
a = L[1:] a = L[1:]
b = L[:-1] b = L[:-1]
c = sum([abs(x[0]-x[1]) for x in zip(a,b)]) c = sum([abs(x[0]-x[1]) for x in zip(a, b)])
return c/len(a) return c/len(a)
vd.aggregator('avgdiff', avgdiff) vd.aggregator('avgdiff', avgdiff)
def distinct_list(values): def distinct_list(values):
return [x for x in set(values)] # Deterministic order for stable display: preserve first-seen order.
seen = set()
out = []
for v in values:
if v in (None, "", "null"):
continue
if v in seen:
continue
seen.add(v)
out.append(v)
return out
vd.aggregator('distinct_list', distinct_list) vd.aggregator('distinct_list', distinct_list)
@@ -155,7 +199,11 @@ def dns_lookup(domain, record='A'):
try: try:
import dns import dns
import dns.resolver as rs import dns.resolver as rs
result = rs.query(domain, record) # dnspython 2.x prefers resolve(); keep a fallback for older versions.
try:
result = rs.resolve(domain, record)
except AttributeError:
result = rs.query(domain, record)
return ",".join([x.to_text() for x in result]) return ",".join([x.to_text() for x in result])
except dns.resolver.NoAnswer as e: except dns.resolver.NoAnswer as e:
return "" return ""
@@ -188,10 +236,10 @@ def _asn(ip):
@functools.lru_cache(maxsize=1000) @functools.lru_cache(maxsize=1000)
def asn(ip, type="asn"): def asn(ip, type="asn"):
if len(ip.split(",")) > 1: if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")]) return ",".join([asn(x.strip(), type) for x in ip.split(",") if x.strip()])
try: try:
record = _asn(ip) record = _asn(ip)
return f'({record["asn"]}:{record["name"]}[{record["country"]}]' return f'({record["asn"]}:{record["name"]}[{record["country"]}])'
except: except:
return "" return ""
@@ -314,7 +362,15 @@ def int2ip(zahl):
# convert IP-String to Integer # convert IP-String to Integer
def ip2int(ip): def ip2int(ip):
return int.from_bytes(b"".join([int(c).to_bytes(1,'big') for c in b.split('.')]),'big') try:
parts = [int(c) for c in str(ip).split(".")]
if len(parts) != 4:
return None
if any(p < 0 or p > 255 for p in parts):
return None
return int.from_bytes(bytes(parts), "big")
except Exception:
return None
# parse KeyValue # parse KeyValue
def dirty_kv(data): def dirty_kv(data):
@@ -324,6 +380,18 @@ def dirty_kv(data):
# parse json with missing quotes around attribute names # parse json with missing quotes around attribute names
import yaml try:
import yaml
except ModuleNotFoundError:
yaml = None
globals()['yaml'] = yaml
def dirty_json(data): def dirty_json(data):
return yaml.load(data, yaml.SafeLoader) if yaml is None:
return None
# yaml.safe_load can handle many "almost-json" payloads.
try:
return yaml.safe_load(data)
except Exception:
return None