Files
gists/visidatarc
2021-03-09 19:13:39 +01:00

219 lines
6.3 KiB
Plaintext
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# copy or link this file to ~/.visidatarc
from datetime import datetime
import functools
def tsfromtime(val, format):
import time
from calendar import timegm
utc_time = time.strptime(val, format)
return timegm(utc_time)
def timefromts(val):
try:
return datetime.fromtimestamp(float(val))
except ValueError:
pass
try:
return datetime.fromtimestamp(float(val)/1000)
except ValueError:
pass
try:
return datetime.fromtimestamp(float(val)/1000000)
except ValueError:
pass
# sym-ts = hexNcoded NT-Timestamp = Nanoseconds since 01.01.1601
def sym_time(val):
a = int(val, 16) # decode hex
# convert to seconds and subtract offset to 01.01.1970
b = (a / 10000000) - 11644473600
return datetime.fromtimestamp(b)
@functools.lru_cache()
def vendor(mac):
try:
from mac_vendor_lookup import InvalidMacError, MacLookup as mlu
return mlu().lookup(mac.strip())
except InvalidMacError:
return f"not a MAC {str(mac).strip()} of type {type(mac)}"
except ModuleNotFoundError:
return "module not available"
@functools.lru_cache(maxsize=1000)
def _get_vt():
try:
from virus_total_apis import PublicApi as VirusTotalPublicApi
with open('~/.virustotal_api_key') as af:
API_KEY = af.readline()
vt = VirusTotalPublicApi(API_KEY)
return vt
except:
return None
@functools.lru_cache()
def vt_ip(ip):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_ip_report(ip)
return response
@functools.lru_cache()
def vt_file(hash):
vt = _get_vt()
if vt is None:
return "VT-Error"
response = vt.get_file_report(hash)
return response
@functools.lru_cache(maxsize=1000)
def dns_lookup(domain, record='A'):
if len(domain.split(",")) > 1:
return ",".join([dns_lookup(x, record) for x in domain.split(",")])
try:
import dns
import dns.resolver as rs
result = rs.query(domain, record)
return ",".join([x.to_text() for x in result])
except dns.resolver.NoAnswer as e:
return ""
except dns.exception.DNSException as e:
# return e.msg
return ""
except ModuleNotFoundError:
return "module not available"
@functools.lru_cache()
def _asn(ip):
from bs4 import BeautifulSoup
import requests
data = { 'q': ip,'query': 'Query'}
response = requests.post('https://asnip.net/ip2asn.php', data=data)
soup=BeautifulSoup(response.text,features='lxml')
table=soup.find_all('table')[1]
row=table.find_all('tr')[1]
cols = [ele.text.strip() for ele in row.find_all('td') ]
res = { 'asn' : cols[0] }
res['ip'] = cols[1]
res['name'] = cols[2]
res['country'] = ""
if "," in res['name']:
name_split=res['name'].split(",")
res['country']=name_split[-1].strip()
res['name']=" ".join(name_split[:-1])
return res
@functools.lru_cache()
def asn(ip, type="asn"):
if len(ip.split(",")) > 1:
return ",".join([_asn(x, type) for x in ip.split(",")])
try:
return _asn(ip)[type]
except:
return ""
@functools.lru_cache(maxsize=1000)
def _ipinfo(ip):
try:
import requests
import json
r = requests.get(url='http://ipinfo.io/{}/json'.format(ip))
return r.json()
except json.JSONDecodeError as e:
return None
except ModuleNotFoundError:
return None
@functools.lru_cache()
def ipinfo(ip, type="country"):
if len(ip.split(",")) > 1:
return ",".join([ipinfo(x, type) for x in ip.split(",")])
try:
return _ipinfo(ip)[type]
except:
return ""
@functools.lru_cache()
def mx_lookup(domain):
domain = domain.lstrip("www.")
try:
mxs = dns_lookup(domain, 'MX').split(",")
mxt = [x.split(" ")[1] for x in mxs if len(x.split(" ")) == 2]
return ",".join(mxt)
except Exception as e:
return str(e)
@functools.lru_cache(maxsize=1000)
def grab_banner(ip, port=25):
if len(ip.split(",")) > 1:
return ",".join([grab_banner(x, port) for x in ip.split(",")])
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
sock.settimeout(2)
sock.connect((ip, port))
ret = sock.recv(1024)
return str(ret.strip().decode())
except:
return ""
def sym_id(val):
event_ids = {
"2": "Scan Stopped",
"3": "Scan Started",
"4": "Definition File Sent To Server",
"5": "Virus Found",
"6": "Scan Omission",
"7": "Definition File Loaded",
"10": "Checksum",
"11": "Auto-Protect",
"12": "Configuration Changed",
"13": "Symantec AntiVirus Shutdown",
"14": "Symantec AntiVirus Startup",
"16": "Definition File Download",
"17": "Scan Action Auto-Changed",
"18": "Sent To Quarantine Server",
"19": "Delivered To Symantec Security Response",
"20": "Backup Restore Error",
"21": "Scan Aborted",
"22": "Load Error",
"23": "Symantec AntiVirus Auto-Protect Loaded",
"24": "Symantec AntiVirus Auto-Protect Unloaded",
"26": "Scan Delayed",
"27": "Scan Re-started",
"34": "Log Forwarding Error",
"39": "Definitions Rollback",
"40": "Definitions Unprotected",
"41": "Auto-Protect Error",
"42": "Configuration Error",
"45": "SymProtect Action",
"46": "Detection Start",
"47": "Detection Action",
"48": "Pending Remediation Action",
"49": "Failed Remediation Action",
"50": "Successful Remediation Action",
"51": "Detection Finish",
"65": "Scan Stopped",
"66": "Scan Started",
"71": "Threat Now Whitelisted",
"72": "Interesting Process Found Start",
"73": "SONAR engine load error",
"74": "SONAR definitions load error",
"75": "Interesting Process Found Finish",
"76": "SONAR operating system not supported",
"77": "SONAR Detected Threat Now Known",
"78": "SONAR engine is disabled",
"79": "SONAR engine is enabled",
"80": "Definition load failed",
"81": "Cache server error",
"82": "Reputation check timed out"}
return event_ids[val]