Files
gists/config/visidata/visidatarc
2026-02-21 23:20:42 +01:00

403 lines
12 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Install targets (VisiData v3.3):
# - macOS default: ~/Library/Preferences/visidata/config.py
# - XDG default: ${XDG_CONFIG_HOME:-~/.config}/visidata/config.py
# - legacy fallback:~/.visidatarc
options.disp_date_fmt="%Y-%m-%dT%H:%M:%S"
# User plugins are typically under $VD_DIR/plugins (default ~/.visidata/plugins).
# VisiData adds $VD_DIR to sys.path, so they import as `plugins.<name>`.
# Keep local/custom plugins optional so missing files don't break startup.
try:
import plugins.hidecol
except ModuleNotFoundError:
pass
try:
import plugins.iptype
except ModuleNotFoundError:
pass
from datetime import datetime
import functools
import json
from urllib.parse import unquote_plus
import os.path
import pickle
import time
import sqlite3
# VisiData loads the config via exec(code, vd.getGlobals(), newdefs).
# Functions defined here capture vd.getGlobals() as their global namespace, but
# `import ...` statements assign into `newdefs` (locals). Export key imports into
# the real globals dict so decorators and function bodies can resolve them.
globals().update({
'datetime': datetime,
'functools': functools,
'json': json,
'unquote_plus': unquote_plus,
'os': os,
'pickle': pickle,
'time': time,
'sqlite3': sqlite3,
})
cache_path = os.path.expanduser('~/.visidata_cache.db')
def _ensure_cache_db(conn):
conn.execute('''CREATE TABLE IF NOT EXISTS cache
(key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def decorator(func):
@functools.lru_cache(maxsize=lru_cache_size)
def get_from_sqlite(*args, **kwargs):
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
with sqlite3.connect(cache_path) as conn:
_ensure_cache_db(conn)
cursor = conn.cursor()
cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,))
row = cursor.fetchone()
current_time = int(time.time())
if row and (max_age is None or current_time - row[1] <= max_age):
return pickle.loads(row[0])
else:
result = func(*args, **kwargs)
serialized_value = pickle.dumps(result)
cursor.execute('INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)', (key, serialized_value, current_time))
conn.commit()
return result
return get_from_sqlite
return decorator
def decode_url_safe(url_safe_string):
utf8_string = unquote_plus(url_safe_string)
return utf8_string
def what(item):
return f"{type(item)}:{str(item)}"
def avgdiff(values):
# Average absolute delta between adjacent sorted values.
# Return None for <2 values to avoid ZeroDivisionError and to render blank in VisiData.
nums = []
for v in values:
if v in (None, "", "null"):
continue
try:
nums.append(float(v))
except Exception:
continue
L = sorted(nums)
if len(L) < 2:
return None
a = L[1:]
b = L[:-1]
c = sum([abs(x[0]-x[1]) for x in zip(a, b)])
return c/len(a)
vd.aggregator('avgdiff', avgdiff)
def distinct_list(values):
# Deterministic order for stable display: preserve first-seen order.
seen = set()
out = []
for v in values:
if v in (None, "", "null"):
continue
if v in seen:
continue
seen.add(v)
out.append(v)
return out
vd.aggregator('distinct_list', distinct_list)
def logtime(val):
a=str(val)
a=a.strip()
a=a.split(" ")
d=a[0].split("/")
t=a[1].split(":")
if (a[2] == "PM") and (t[0] != "12"):
t[0]=str(int(t[0])+12)
if (a[2] == "AM") and (t[0] == "12"):
t[0]="0"
return datetime(int(d[2]),int(d[0]),int(d[1]),int(t[0]),int(t[1]),int(t[2])).timestamp()
def tsfromtime(val, format):
import time
from calendar import timegm
utc_time = time.strptime(str(val).strip(), format)
return timegm(utc_time)
def timefromts(val):
try:
return datetime.utcfromtimestamp(float(val))
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000)
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000000)
except ValueError:
pass
# sym-ts = hexNcoded NT-Timestamp = Nanoseconds since 01.01.1601
def sym_time(val):
a = int(val, 16) # decode hex
# convert to seconds and subtract offset to 01.01.1970
b = (a / 10000000) - 11644473600
return datetime.fromtimestamp(b)
@functools.lru_cache(maxsize=1000)
def vendor(mac):
try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
try:
return mlu().lookup(mac.strip())
except InvalidMacError:
return f"not a MAC {str(mac).strip()} of type {type(mac)}"
except ModuleNotFoundError:
return "module not available"
@functools.lru_cache(maxsize=1000)
def _get_vt():
try:
from virus_total_apis import PublicApi as VirusTotalPublicApi
import os.path
with open(os.path.expanduser('~/.virustotal_api_key')) as af:
API_KEY = af.readline().strip()
vt = VirusTotalPublicApi(API_KEY)
return vt
except:
return None
@disk_cache_decorator()
def vt_ip(ip):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_ip_report(ip)
return response
@disk_cache_decorator()
def vt_file(hash):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_file_report(hash)
return response
@disk_cache_decorator()
def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")])
try:
import dns
import dns.resolver as rs
# dnspython 2.x prefers resolve(); keep a fallback for older versions.
try:
result = rs.resolve(domain, record)
except AttributeError:
result = rs.query(domain, record)
return ",".join([x.to_text() for x in result])
except dns.resolver.NoAnswer as e:
return ""
except dns.exception.DNSException as e:
# return e.msg
return ""
except ModuleNotFoundError:
return "module not available"
@disk_cache_decorator()
def _asn(ip):
from bs4 import BeautifulSoup
import requests
data = { 'q': ip,'query': 'Query'}
response = requests.post('https://asnip.net/ip2asn.php', data=data)
soup=BeautifulSoup(response.text,features='lxml')
table=soup.find_all('table')[1]
row=table.find_all('tr')[1]
cols = [ele.text.strip() for ele in row.find_all('td') ]
res = { 'asn' : cols[0] }
res['ip'] = cols[1]
res['name'] = cols[2]
res['country'] = ""
if "," in res['name']:
name_split=res['name'].split(",")
res['country']=name_split[-1].strip()
res['name']=" ".join(name_split[:-1])
return res
@functools.lru_cache(maxsize=1000)
def asn(ip, type="asn"):
if len(ip.split(",")) > 1:
return ",".join([asn(x.strip(), type) for x in ip.split(",") if x.strip()])
try:
record = _asn(ip)
return f'({record["asn"]}:{record["name"]}[{record["country"]}])'
except:
return ""
@disk_cache_decorator()
def _ipinfo(ip):
try:
import requests
import json
r = requests.get(url='http://ipinfo.io/{}/json'.format(ip))
return r.json()
except json.JSONDecodeError as e:
return None
except ModuleNotFoundError:
return None
@functools.lru_cache(maxsize=1000)
def ipinfo(ip, type="country"):
if len(ip.split(",")) > 1:
return ",".join([ipinfo(x, type) for x in ip.split(",")])
try:
if type:
return _ipinfo(ip)[type]
else:
return _ipinfo(ip)
except:
return ""
def split_number2ip(number):
number=str(number)
import re
pattern=re.compile("^([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$")
match = pattern.match(number)
if match:
return ".".join(match.groups())
else:
return number
@functools.lru_cache(maxsize=1000)
def mx_lookup(domain):
domain = domain.lstrip("www.")
try:
mxs = dns_lookup(domain, 'MX').split(",")
mxt = [x.split(" ")[1] for x in mxs if len(x.split(" ")) == 2]
return ",".join(mxt)
except Exception as e:
return str(e)
@disk_cache_decorator(max_age=60*60*24)
def _grab_banner(ip, port=25):
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
sock.settimeout(2)
sock.connect((ip, port))
ret = sock.recv(1024)
return str(ret.strip().decode())
except Exception as e:
return f"!Error {ip} {e}"
def grab_banner(ip, port=25):
return ",".join([_grab_banner(x.strip(), port) for x in ip.split(",")])
def sym_id(val):
event_ids = {
"2": "Scan Stopped",
"3": "Scan Started",
"4": "Definition File Sent To Server",
"5": "Virus Found",
"6": "Scan Omission",
"7": "Definition File Loaded",
"10": "Checksum",
"11": "Auto-Protect",
"12": "Configuration Changed",
"13": "Symantec AntiVirus Shutdown",
"14": "Symantec AntiVirus Startup",
"16": "Definition File Download",
"17": "Scan Action Auto-Changed",
"18": "Sent To Quarantine Server",
"19": "Delivered To Symantec Security Response",
"20": "Backup Restore Error",
"21": "Scan Aborted",
"22": "Load Error",
"23": "Symantec AntiVirus Auto-Protect Loaded",
"24": "Symantec AntiVirus Auto-Protect Unloaded",
"26": "Scan Delayed",
"27": "Scan Re-started",
"34": "Log Forwarding Error",
"39": "Definitions Rollback",
"40": "Definitions Unprotected",
"41": "Auto-Protect Error",
"42": "Configuration Error",
"45": "SymProtect Action",
"46": "Detection Start",
"47": "Detection Action",
"48": "Pending Remediation Action",
"49": "Failed Remediation Action",
"50": "Successful Remediation Action",
"51": "Detection Finish",
"65": "Scan Stopped",
"66": "Scan Started",
"71": "Threat Now Whitelisted",
"72": "Interesting Process Found Start",
"73": "SONAR engine load error",
"74": "SONAR definitions load error",
"75": "Interesting Process Found Finish",
"76": "SONAR operating system not supported",
"77": "SONAR Detected Threat Now Known",
"78": "SONAR engine is disabled",
"79": "SONAR engine is enabled",
"80": "Definition load failed",
"81": "Cache server error",
"82": "Reputation check timed out"}
return event_ids[val]
# convert 4-byte integer to IP-String
def int2ip(zahl):
return ".".join([str(c) for c in zahl.to_bytes(4,'big')])
# convert IP-String to Integer
def ip2int(ip):
try:
parts = [int(c) for c in str(ip).split(".")]
if len(parts) != 4:
return None
if any(p < 0 or p > 255 for p in parts):
return None
return int.from_bytes(bytes(parts), "big")
except Exception:
return None
# parse KeyValue
def dirty_kv(data):
return {y[0] : y[1] for y in [x.strip().split("=") for x in data.strip().strip('"{}').split(',')]}
# parse json with missing quotes around attribute names
try:
import yaml
except ModuleNotFoundError:
yaml = None
globals()['yaml'] = yaml
def dirty_json(data):
if yaml is None:
return None
# yaml.safe_load can handle many "almost-json" payloads.
try:
return yaml.safe_load(data)
except Exception:
return None