multithreading is donw
This commit is contained in:
@@ -5,10 +5,23 @@ import mmh3
|
|||||||
import string
|
import string
|
||||||
import sys
|
import sys
|
||||||
from os import walk
|
from os import walk
|
||||||
|
|
||||||
from chardet.universaldetector import UniversalDetector
|
from chardet.universaldetector import UniversalDetector
|
||||||
from elasticsearch import Elasticsearch
|
from elasticsearch import Elasticsearch
|
||||||
from elasticsearch.helpers import bulk
|
from elasticsearch.helpers import bulk
|
||||||
|
from multiprocessing import Pool,Lock
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
|
lock = Lock()
|
||||||
|
|
||||||
|
def log_to_file(text):
|
||||||
|
global log_filename
|
||||||
|
with lock: # thread blocks at this line until it can obtain lock
|
||||||
|
with open(log_filename, 'a+') as file_log:
|
||||||
|
file_log.write("{}\n".format(text))
|
||||||
|
|
||||||
|
def log_to_console(text):
|
||||||
|
with lock: # thread blocks at this line until it can obtain lock
|
||||||
|
print(text)
|
||||||
|
|
||||||
|
|
||||||
def get_mask(s):
|
def get_mask(s):
|
||||||
@@ -87,24 +100,19 @@ def get_files(dir):
|
|||||||
filedata=line.split(";")
|
filedata=line.split(";")
|
||||||
files_in_log[filedata[0]]=float(filedata[1])
|
files_in_log[filedata[0]]=float(filedata[1])
|
||||||
except:
|
except:
|
||||||
print("Can't parse Line")
|
log_to_console("Can't parse Line")
|
||||||
pass
|
pass
|
||||||
except:
|
except:
|
||||||
print("Can't open Logfile")
|
log_to_console("Can't open Logfile")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
for (dirpath, dirnames, filenames) in walk(dir):
|
for (dirpath, dirnames, filenames) in walk(dir):
|
||||||
for file in filenames:
|
for file in filenames:
|
||||||
full_filename=os.path.join(dirpath, file)
|
full_filename=os.path.join(dirpath, file)
|
||||||
if full_filename in files_in_log and files_in_log[full_filename] > threshold:
|
if full_filename in files_in_log and files_in_log[full_filename] > threshold:
|
||||||
print('[~] Skipping file [Already Parsed]: %s' % full_filename)
|
log_to_console('[~] Skipping file [Already Parsed]: %s' % full_filename)
|
||||||
continue
|
continue
|
||||||
|
yield full_filename
|
||||||
encoding=get_file_enconding(full_filename)
|
|
||||||
if encoding:
|
|
||||||
yield encoding, full_filename
|
|
||||||
else:
|
|
||||||
print('[~] Skipping file [Unknown Encoding]: %s' % full_filename)
|
|
||||||
|
|
||||||
|
|
||||||
def get_lines(file,encoding=None):
|
def get_lines(file,encoding=None):
|
||||||
@@ -115,11 +123,11 @@ def get_lines(file,encoding=None):
|
|||||||
yield (strip_badbytes(line, encoding))
|
yield (strip_badbytes(line, encoding))
|
||||||
|
|
||||||
|
|
||||||
def get_parsable_lines(file):
|
def get_parsable_lines(file,encoding):
|
||||||
global log_filename
|
global log_filename
|
||||||
success = 0 # initialized with 1 to preven div/0
|
success = 0 # initialized with 1 to preven div/0
|
||||||
failure = 0
|
failure = 0
|
||||||
for line in get_lines(file):
|
for line in get_lines(file,encoding):
|
||||||
doc = extract_email(line)
|
doc = extract_email(line)
|
||||||
if doc:
|
if doc:
|
||||||
success += 1
|
success += 1
|
||||||
@@ -127,13 +135,18 @@ def get_parsable_lines(file):
|
|||||||
else:
|
else:
|
||||||
failure += 1
|
failure += 1
|
||||||
success_rate = (success / (success + failure))
|
success_rate = (success / (success + failure))
|
||||||
print('[+] Done with file: {} ({})'.format(file,success_rate))
|
log_to_console('[+] Done with file: {} ({})'.format(file,success_rate))
|
||||||
with open(log_filename, 'a+') as file_log:
|
log_to_file("{};{}".format(file, success_rate))
|
||||||
file_log.write("{};{}\n".format(file, success_rate))
|
|
||||||
|
|
||||||
|
|
||||||
def create_doc(file):
|
def get_hash(text):
|
||||||
for cred in get_parsable_lines(file):
|
return hex(mmh3.hash(text, 12, signed=False)).split("x")[1]
|
||||||
|
|
||||||
|
def get_user_pw_hash(text):
|
||||||
|
return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1]
|
||||||
|
|
||||||
|
def create_doc(file,encoding):
|
||||||
|
for cred in get_parsable_lines(file,encoding):
|
||||||
doc = {
|
doc = {
|
||||||
"user" : cred[0],
|
"user" : cred[0],
|
||||||
"domain" : cred[1],
|
"domain" : cred[1],
|
||||||
@@ -148,40 +161,55 @@ def create_doc(file):
|
|||||||
}
|
}
|
||||||
username_split=cred[0].split(";")
|
username_split=cred[0].split(";")
|
||||||
if len(username_split)==2:
|
if len(username_split)==2:
|
||||||
|
if len(username_split[0]) > 0 and len(username_split[1]) > 0:
|
||||||
doc["username"]=username_split[0]
|
doc["username"]=username_split[0]
|
||||||
doc["user"]=username_split[1]
|
doc["user"]=username_split[1]
|
||||||
id_hash=hex(mmh3.hash128(",".join((doc["user"], doc["domain"], doc["password"])), 12,signed=False) % 1000000000000000000000)
|
id_hash=get_user_pw_hash("{}{}".format(doc["user"],doc["password"]))
|
||||||
yield id_hash, doc
|
# id_domain=get_domain_hash(cred[1])
|
||||||
|
id_domain=id_hash[:1]
|
||||||
|
yield id_domain, id_hash, doc
|
||||||
|
|
||||||
|
|
||||||
def process_file(input_file):
|
def process_file(input_file,encoding):
|
||||||
global index, doc_type_name
|
global index, doc_type_name
|
||||||
for id_hash, doc in create_doc(input_file):
|
for id_domain, id_hash, doc in create_doc(input_file,encoding):
|
||||||
yield {
|
yield {
|
||||||
"_index": index,
|
"_index": "{}_{}".format(index,id_domain),
|
||||||
"_type": doc_type_name,
|
"_type": doc_type_name,
|
||||||
"_id": id_hash,
|
"_id": id_hash,
|
||||||
"_source": doc
|
"_source": doc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def index_file(input_file):
|
||||||
|
ps=multiprocessing.current_process()
|
||||||
|
encoding=get_file_enconding(input_file)
|
||||||
|
if encoding:
|
||||||
|
es = Elasticsearch()
|
||||||
|
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
|
||||||
|
# pre=count["count"]
|
||||||
|
log_to_console('[{}:*] Indexing file: {}'.format(ps.pid,input_file))
|
||||||
|
success, _ = bulk(es, process_file(input_file,encoding), request_timeout=60, raise_on_exception=False)
|
||||||
|
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
|
||||||
|
# post=count["count"]
|
||||||
|
# log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file))
|
||||||
|
else:
|
||||||
|
log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file))
|
||||||
|
|
||||||
index = "leak_col1"
|
index = "leak_col1"
|
||||||
doc_type_name = "credential"
|
doc_type_name = "credential"
|
||||||
log_filename = "processed_files"
|
log_filename = "processed_files"
|
||||||
threshold = 0 #threshold for reparsing an already parsed file
|
threshold = -1 #threshold for reparsing an already parsed file
|
||||||
|
p=Pool(20)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
es = Elasticsearch()
|
|
||||||
dir=sys.argv[1]
|
dir=sys.argv[1]
|
||||||
for encoding, data in get_files(dir):
|
# for filename in get_files(dir):
|
||||||
count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
|
# index_file(filename)
|
||||||
pre=count["count"]
|
p.map(index_file,get_files(dir))
|
||||||
print('[*] Indexing file: %s' % data)
|
|
||||||
success, _ = bulk(es, process_file(data), request_timeout=60, raise_on_exception=False)
|
|
||||||
count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
|
|
||||||
post=count["count"]
|
|
||||||
print('[=] Added {} Documents with {}'.format(post-pre,data))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user