Restructure repository: organize tools by purpose, create what search tool

- Move single-file tools to tools/ organized by category (security, forensics, data, etc.)
- Move multi-file projects to projects/ (go-tools, puzzlebox, timesketch, rust-tools)
- Move system scripts to scripts/ (proxy, display, setup, windows)
- Organize config files in config/ (shell, visidata, applications)
- Move experimental tools to archive/experimental
- Create 'what' fuzzy search tool with progressive enhancement (ollama->fzf->grep)
- Add initial metadata database for intelligent tool discovery
- Preserve git history using 'git mv' commands
This commit is contained in:
tobias
2025-08-24 19:50:00 +02:00
parent 9518290544
commit 619b0bc432
124 changed files with 1063 additions and 0 deletions

329
config/visidata/visidatarc Normal file
View File

@@ -0,0 +1,329 @@
# copy or link this file to ~/.visidatarc
options.disp_date_fmt="%Y-%m-%dT%H:%M:%S"
import plugins.hidecol
from datetime import datetime
import functools
import json
from urllib.parse import unquote_plus
import os.path
import pickle
import time
import sqlite3
cache_path = os.path.expanduser('~/.visidata_cache.db')
def init_cache_db():
with sqlite3.connect(cache_path) as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS cache
(key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
init_cache_db()
def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def decorator(func):
@functools.lru_cache(maxsize=lru_cache_size)
def get_from_sqlite(*args, **kwargs):
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
with sqlite3.connect(cache_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,))
row = cursor.fetchone()
current_time = int(time.time())
if row and (max_age is None or current_time - row[1] <= max_age):
return pickle.loads(row[0])
else:
result = func(*args, **kwargs)
serialized_value = pickle.dumps(result)
cursor.execute('INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)', (key, serialized_value, current_time))
conn.commit()
return result
return get_from_sqlite
return decorator
def decode_url_safe(url_safe_string):
utf8_string = unquote_plus(url_safe_string)
return utf8_string
def what(item):
return f"{type(item)}:{str(item)}"
def avgdiff(values):
L = sorted(values)
a = L[1:]
b = L[:-1]
c = sum([abs(x[0]-x[1]) for x in zip(a,b)])
return c/len(a)
vd.aggregator('avgdiff', avgdiff)
def distinct_list(values):
return [x for x in set(values)]
vd.aggregator('distinct_list', distinct_list)
def logtime(val):
a=str(val)
a=a.strip()
a=a.split(" ")
d=a[0].split("/")
t=a[1].split(":")
if (a[2] == "PM") and (t[0] != "12"):
t[0]=str(int(t[0])+12)
if (a[2] == "AM") and (t[0] == "12"):
t[0]="0"
return datetime(int(d[2]),int(d[0]),int(d[1]),int(t[0]),int(t[1]),int(t[2])).timestamp()
def tsfromtime(val, format):
import time
from calendar import timegm
utc_time = time.strptime(str(val).strip(), format)
return timegm(utc_time)
def timefromts(val):
try:
return datetime.utcfromtimestamp(float(val))
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000)
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000000)
except ValueError:
pass
# sym-ts = hexNcoded NT-Timestamp = Nanoseconds since 01.01.1601
def sym_time(val):
a = int(val, 16) # decode hex
# convert to seconds and subtract offset to 01.01.1970
b = (a / 10000000) - 11644473600
return datetime.fromtimestamp(b)
@functools.lru_cache(maxsize=1000)
def vendor(mac):
try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
try:
return mlu().lookup(mac.strip())
except InvalidMacError:
return f"not a MAC {str(mac).strip()} of type {type(mac)}"
except ModuleNotFoundError:
return "module not available"
@functools.lru_cache(maxsize=1000)
def _get_vt():
try:
from virus_total_apis import PublicApi as VirusTotalPublicApi
import os.path
with open(os.path.expanduser('~/.virustotal_api_key')) as af:
API_KEY = af.readline().strip()
vt = VirusTotalPublicApi(API_KEY)
return vt
except:
return None
@disk_cache_decorator()
def vt_ip(ip):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_ip_report(ip)
return response
@disk_cache_decorator()
def vt_file(hash):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_file_report(hash)
return response
@disk_cache_decorator()
def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")])
try:
import dns
import dns.resolver as rs
result = rs.query(domain, record)
return ",".join([x.to_text() for x in result])
except dns.resolver.NoAnswer as e:
return ""
except dns.exception.DNSException as e:
# return e.msg
return ""
except ModuleNotFoundError:
return "module not available"
@disk_cache_decorator()
def _asn(ip):
from bs4 import BeautifulSoup
import requests
data = { 'q': ip,'query': 'Query'}
response = requests.post('https://asnip.net/ip2asn.php', data=data)
soup=BeautifulSoup(response.text,features='lxml')
table=soup.find_all('table')[1]
row=table.find_all('tr')[1]
cols = [ele.text.strip() for ele in row.find_all('td') ]
res = { 'asn' : cols[0] }
res['ip'] = cols[1]
res['name'] = cols[2]
res['country'] = ""
if "," in res['name']:
name_split=res['name'].split(",")
res['country']=name_split[-1].strip()
res['name']=" ".join(name_split[:-1])
return res
@functools.lru_cache(maxsize=1000)
def asn(ip, type="asn"):
if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")])
try:
record = _asn(ip)
return f'({record["asn"]}:{record["name"]}[{record["country"]}]'
except:
return ""
@disk_cache_decorator()
def _ipinfo(ip):
try:
import requests
import json
r = requests.get(url='http://ipinfo.io/{}/json'.format(ip))
return r.json()
except json.JSONDecodeError as e:
return None
except ModuleNotFoundError:
return None
@functools.lru_cache(maxsize=1000)
def ipinfo(ip, type="country"):
if len(ip.split(",")) > 1:
return ",".join([ipinfo(x, type) for x in ip.split(",")])
try:
if type:
return _ipinfo(ip)[type]
else:
return _ipinfo(ip)
except:
return ""
def split_number2ip(number):
number=str(number)
import re
pattern=re.compile("^([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$")
match = pattern.match(number)
if match:
return ".".join(match.groups())
else:
return number
@functools.lru_cache(maxsize=1000)
def mx_lookup(domain):
domain = domain.lstrip("www.")
try:
mxs = dns_lookup(domain, 'MX').split(",")
mxt = [x.split(" ")[1] for x in mxs if len(x.split(" ")) == 2]
return ",".join(mxt)
except Exception as e:
return str(e)
@disk_cache_decorator(max_age=60*60*24)
def _grab_banner(ip, port=25):
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
sock.settimeout(2)
sock.connect((ip, port))
ret = sock.recv(1024)
return str(ret.strip().decode())
except Exception as e:
return f"!Error {ip} {e}"
def grab_banner(ip, port=25):
return ",".join([_grab_banner(x.strip(), port) for x in ip.split(",")])
def sym_id(val):
event_ids = {
"2": "Scan Stopped",
"3": "Scan Started",
"4": "Definition File Sent To Server",
"5": "Virus Found",
"6": "Scan Omission",
"7": "Definition File Loaded",
"10": "Checksum",
"11": "Auto-Protect",
"12": "Configuration Changed",
"13": "Symantec AntiVirus Shutdown",
"14": "Symantec AntiVirus Startup",
"16": "Definition File Download",
"17": "Scan Action Auto-Changed",
"18": "Sent To Quarantine Server",
"19": "Delivered To Symantec Security Response",
"20": "Backup Restore Error",
"21": "Scan Aborted",
"22": "Load Error",
"23": "Symantec AntiVirus Auto-Protect Loaded",
"24": "Symantec AntiVirus Auto-Protect Unloaded",
"26": "Scan Delayed",
"27": "Scan Re-started",
"34": "Log Forwarding Error",
"39": "Definitions Rollback",
"40": "Definitions Unprotected",
"41": "Auto-Protect Error",
"42": "Configuration Error",
"45": "SymProtect Action",
"46": "Detection Start",
"47": "Detection Action",
"48": "Pending Remediation Action",
"49": "Failed Remediation Action",
"50": "Successful Remediation Action",
"51": "Detection Finish",
"65": "Scan Stopped",
"66": "Scan Started",
"71": "Threat Now Whitelisted",
"72": "Interesting Process Found Start",
"73": "SONAR engine load error",
"74": "SONAR definitions load error",
"75": "Interesting Process Found Finish",
"76": "SONAR operating system not supported",
"77": "SONAR Detected Threat Now Known",
"78": "SONAR engine is disabled",
"79": "SONAR engine is enabled",
"80": "Definition load failed",
"81": "Cache server error",
"82": "Reputation check timed out"}
return event_ids[val]
# convert 4-byte integer to IP-String
def int2ip(zahl):
return ".".join([str(c) for c in zahl.to_bytes(4,'big')])
# convert IP-String to Integer
def ip2int(ip):
return int.from_bytes(b"".join([int(c).to_bytes(1,'big') for c in b.split('.')]),'big')
# parse KeyValue
def dirty_kv(data):
return {y[0] : y[1] for y in [x.strip().split("=") for x in data.strip().strip('"{}').split(',')]}
# parse json with missing quotes around attribute names
import yaml
def dirty_json(data):
return yaml.load(data, yaml.SafeLoader)