Files
gists/config/visidatarc

329 lines
9.7 KiB
Plaintext
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# copy or link this file to ~/.visidatarc
options.disp_date_fmt="%Y-%m-%dT%H:%M:%S"
import plugins.hidecol
from datetime import datetime
import functools
import json
from urllib.parse import unquote_plus
import os.path
import pickle
import time
import sqlite3
cache_path = os.path.expanduser('~/.visidata_cache.db')
def init_cache_db():
with sqlite3.connect(cache_path) as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS cache
(key TEXT PRIMARY KEY, value BLOB, timestamp INTEGER)''')
init_cache_db()
def disk_cache_decorator(max_age=None, lru_cache_size=1000):
def decorator(func):
@functools.lru_cache(maxsize=lru_cache_size)
def get_from_sqlite(*args, **kwargs):
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
with sqlite3.connect(cache_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT value, timestamp FROM cache WHERE key=?', (key,))
row = cursor.fetchone()
current_time = int(time.time())
if row and (max_age is None or current_time - row[1] <= max_age):
return pickle.loads(row[0])
else:
result = func(*args, **kwargs)
serialized_value = pickle.dumps(result)
cursor.execute('INSERT OR REPLACE INTO cache (key, value, timestamp) VALUES (?, ?, ?)', (key, serialized_value, current_time))
conn.commit()
return result
return get_from_sqlite
return decorator
def decode_url_safe(url_safe_string):
utf8_string = unquote_plus(url_safe_string)
return utf8_string
def what(item):
return f"{type(item)}:{str(item)}"
def avgdiff(values):
L = sorted(values)
a = L[1:]
b = L[:-1]
c = sum([abs(x[0]-x[1]) for x in zip(a,b)])
return c/len(a)
vd.aggregator('avgdiff', avgdiff)
def distinct_list(values):
return [x for x in set(values)]
vd.aggregator('distinct_list', distinct_list)
def logtime(val):
a=str(val)
a=a.strip()
a=a.split(" ")
d=a[0].split("/")
t=a[1].split(":")
if (a[2] == "PM") and (t[0] != "12"):
t[0]=str(int(t[0])+12)
if (a[2] == "AM") and (t[0] == "12"):
t[0]="0"
return datetime(int(d[2]),int(d[0]),int(d[1]),int(t[0]),int(t[1]),int(t[2])).timestamp()
def tsfromtime(val, format):
import time
from calendar import timegm
utc_time = time.strptime(str(val).strip(), format)
return timegm(utc_time)
def timefromts(val):
try:
return datetime.utcfromtimestamp(float(val))
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000)
except ValueError:
pass
try:
return datetime.utcfromtimestamp(float(val)/1000000)
except ValueError:
pass
# sym-ts = hexNcoded NT-Timestamp = Nanoseconds since 01.01.1601
def sym_time(val):
a = int(val, 16) # decode hex
# convert to seconds and subtract offset to 01.01.1970
b = (a / 10000000) - 11644473600
return datetime.fromtimestamp(b)
@functools.lru_cache(maxsize=1000)
def vendor(mac):
try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
try:
return mlu().lookup(mac.strip())
except InvalidMacError:
return f"not a MAC {str(mac).strip()} of type {type(mac)}"
except ModuleNotFoundError:
return "module not available"
@functools.lru_cache(maxsize=1000)
def _get_vt():
try:
from virus_total_apis import PublicApi as VirusTotalPublicApi
import os.path
with open(os.path.expanduser('~/.virustotal_api_key')) as af:
API_KEY = af.readline().strip()
vt = VirusTotalPublicApi(API_KEY)
return vt
except:
return None
@disk_cache_decorator()
def vt_ip(ip):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_ip_report(ip)
return response
@disk_cache_decorator()
def vt_file(hash):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_file_report(hash)
return response
@disk_cache_decorator()
def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")])
try:
import dns
import dns.resolver as rs
result = rs.query(domain, record)
return ",".join([x.to_text() for x in result])
except dns.resolver.NoAnswer as e:
return ""
except dns.exception.DNSException as e:
# return e.msg
return ""
except ModuleNotFoundError:
return "module not available"
@disk_cache_decorator()
def _asn(ip):
from bs4 import BeautifulSoup
import requests
data = { 'q': ip,'query': 'Query'}
response = requests.post('https://asnip.net/ip2asn.php', data=data)
soup=BeautifulSoup(response.text,features='lxml')
table=soup.find_all('table')[1]
row=table.find_all('tr')[1]
cols = [ele.text.strip() for ele in row.find_all('td') ]
res = { 'asn' : cols[0] }
res['ip'] = cols[1]
res['name'] = cols[2]
res['country'] = ""
if "," in res['name']:
name_split=res['name'].split(",")
res['country']=name_split[-1].strip()
res['name']=" ".join(name_split[:-1])
return res
@functools.lru_cache(maxsize=1000)
def asn(ip, type="asn"):
if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")])
try:
record = _asn(ip)
return f'({record["asn"]}:{record["name"]}[{record["country"]}]'
except:
return ""
@disk_cache_decorator()
def _ipinfo(ip):
try:
import requests
import json
r = requests.get(url='http://ipinfo.io/{}/json'.format(ip))
return r.json()
except json.JSONDecodeError as e:
return None
except ModuleNotFoundError:
return None
@functools.lru_cache(maxsize=1000)
def ipinfo(ip, type="country"):
if len(ip.split(",")) > 1:
return ",".join([ipinfo(x, type) for x in ip.split(",")])
try:
if type:
return _ipinfo(ip)[type]
else:
return _ipinfo(ip)
except:
return ""
def split_number2ip(number):
number=str(number)
import re
pattern=re.compile("^([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])([1-9]?[0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$")
match = pattern.match(number)
if match:
return ".".join(match.groups())
else:
return number
@functools.lru_cache(maxsize=1000)
def mx_lookup(domain):
domain = domain.lstrip("www.")
try:
mxs = dns_lookup(domain, 'MX').split(",")
mxt = [x.split(" ")[1] for x in mxs if len(x.split(" ")) == 2]
return ",".join(mxt)
except Exception as e:
return str(e)
@disk_cache_decorator(max_age=60*60*24)
def _grab_banner(ip, port=25):
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
sock.settimeout(2)
sock.connect((ip, port))
ret = sock.recv(1024)
return str(ret.strip().decode())
except Exception as e:
return f"!Error {ip} {e}"
def grab_banner(ip, port=25):
return ",".join([_grab_banner(x.strip(), port) for x in ip.split(",")])
def sym_id(val):
event_ids = {
"2": "Scan Stopped",
"3": "Scan Started",
"4": "Definition File Sent To Server",
"5": "Virus Found",
"6": "Scan Omission",
"7": "Definition File Loaded",
"10": "Checksum",
"11": "Auto-Protect",
"12": "Configuration Changed",
"13": "Symantec AntiVirus Shutdown",
"14": "Symantec AntiVirus Startup",
"16": "Definition File Download",
"17": "Scan Action Auto-Changed",
"18": "Sent To Quarantine Server",
"19": "Delivered To Symantec Security Response",
"20": "Backup Restore Error",
"21": "Scan Aborted",
"22": "Load Error",
"23": "Symantec AntiVirus Auto-Protect Loaded",
"24": "Symantec AntiVirus Auto-Protect Unloaded",
"26": "Scan Delayed",
"27": "Scan Re-started",
"34": "Log Forwarding Error",
"39": "Definitions Rollback",
"40": "Definitions Unprotected",
"41": "Auto-Protect Error",
"42": "Configuration Error",
"45": "SymProtect Action",
"46": "Detection Start",
"47": "Detection Action",
"48": "Pending Remediation Action",
"49": "Failed Remediation Action",
"50": "Successful Remediation Action",
"51": "Detection Finish",
"65": "Scan Stopped",
"66": "Scan Started",
"71": "Threat Now Whitelisted",
"72": "Interesting Process Found Start",
"73": "SONAR engine load error",
"74": "SONAR definitions load error",
"75": "Interesting Process Found Finish",
"76": "SONAR operating system not supported",
"77": "SONAR Detected Threat Now Known",
"78": "SONAR engine is disabled",
"79": "SONAR engine is enabled",
"80": "Definition load failed",
"81": "Cache server error",
"82": "Reputation check timed out"}
return event_ids[val]
# convert 4-byte integer to IP-String
def int2ip(zahl):
return ".".join([str(c) for c in zahl.to_bytes(4,'big')])
# convert IP-String to Integer
def ip2int(ip):
return int.from_bytes(b"".join([int(c).to_bytes(1,'big') for c in b.split('.')]),'big')
# parse KeyValue
def dirty_kv(data):
return {y[0] : y[1] for y in [x.strip().split("=") for x in data.strip().strip('"{}').split(',')]}
# parse json with missing quotes around attribute names
import yaml
def dirty_json(data):
return yaml.load(data, yaml.SafeLoader)