diff --git a/process_leak.py b/process_leak.py index dab3155..b8ef22c 100755 --- a/process_leak.py +++ b/process_leak.py @@ -5,10 +5,23 @@ import mmh3 import string import sys from os import walk - from chardet.universaldetector import UniversalDetector from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk +from multiprocessing import Pool,Lock +import multiprocessing + +lock = Lock() + +def log_to_file(text): + global log_filename + with lock: # thread blocks at this line until it can obtain lock + with open(log_filename, 'a+') as file_log: + file_log.write("{}\n".format(text)) + +def log_to_console(text): + with lock: # thread blocks at this line until it can obtain lock + print(text) def get_mask(s): @@ -87,24 +100,19 @@ def get_files(dir): filedata=line.split(";") files_in_log[filedata[0]]=float(filedata[1]) except: - print("Can't parse Line") + log_to_console("Can't parse Line") pass except: - print("Can't open Logfile") + log_to_console("Can't open Logfile") pass for (dirpath, dirnames, filenames) in walk(dir): for file in filenames: full_filename=os.path.join(dirpath, file) if full_filename in files_in_log and files_in_log[full_filename] > threshold: - print('[~] Skipping file [Already Parsed]: %s' % full_filename) + log_to_console('[~] Skipping file [Already Parsed]: %s' % full_filename) continue - - encoding=get_file_enconding(full_filename) - if encoding: - yield encoding, full_filename - else: - print('[~] Skipping file [Unknown Encoding]: %s' % full_filename) + yield full_filename def get_lines(file,encoding=None): @@ -115,11 +123,11 @@ def get_lines(file,encoding=None): yield (strip_badbytes(line, encoding)) -def get_parsable_lines(file): +def get_parsable_lines(file,encoding): global log_filename success = 0 # initialized with 1 to preven div/0 failure = 0 - for line in get_lines(file): + for line in get_lines(file,encoding): doc = extract_email(line) if doc: success += 1 @@ -127,13 +135,18 @@ def get_parsable_lines(file): else: failure += 1 success_rate = (success / (success + failure)) - print('[+] Done with file: {} ({})'.format(file,success_rate)) - with open(log_filename, 'a+') as file_log: - file_log.write("{};{}\n".format(file, success_rate)) + log_to_console('[+] Done with file: {} ({})'.format(file,success_rate)) + log_to_file("{};{}".format(file, success_rate)) -def create_doc(file): - for cred in get_parsable_lines(file): +def get_hash(text): + return hex(mmh3.hash(text, 12, signed=False)).split("x")[1] + +def get_user_pw_hash(text): + return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1] + +def create_doc(file,encoding): + for cred in get_parsable_lines(file,encoding): doc = { "user" : cred[0], "domain" : cred[1], @@ -148,40 +161,55 @@ def create_doc(file): } username_split=cred[0].split(";") if len(username_split)==2: - doc["username"]=username_split[0] - doc["user"]=username_split[1] - id_hash=hex(mmh3.hash128(",".join((doc["user"], doc["domain"], doc["password"])), 12,signed=False) % 1000000000000000000000) - yield id_hash, doc + if len(username_split[0]) > 0 and len(username_split[1]) > 0: + doc["username"]=username_split[0] + doc["user"]=username_split[1] + id_hash=get_user_pw_hash("{}{}".format(doc["user"],doc["password"])) + # id_domain=get_domain_hash(cred[1]) + id_domain=id_hash[:1] + yield id_domain, id_hash, doc -def process_file(input_file): +def process_file(input_file,encoding): global index, doc_type_name - for id_hash, doc in create_doc(input_file): + for id_domain, id_hash, doc in create_doc(input_file,encoding): yield { - "_index": index, + "_index": "{}_{}".format(index,id_domain), "_type": doc_type_name, "_id": id_hash, "_source": doc } + +def index_file(input_file): + ps=multiprocessing.current_process() + encoding=get_file_enconding(input_file) + if encoding: + es = Elasticsearch() + # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) + # pre=count["count"] + log_to_console('[{}:*] Indexing file: {}'.format(ps.pid,input_file)) + success, _ = bulk(es, process_file(input_file,encoding), request_timeout=60, raise_on_exception=False) + # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) + # post=count["count"] + # log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file)) + else: + log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file)) + index = "leak_col1" doc_type_name = "credential" log_filename = "processed_files" -threshold = 0 #threshold for reparsing an already parsed file +threshold = -1 #threshold for reparsing an already parsed file +p=Pool(20) def main(): - es = Elasticsearch() dir=sys.argv[1] - for encoding, data in get_files(dir): - count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) - pre=count["count"] - print('[*] Indexing file: %s' % data) - success, _ = bulk(es, process_file(data), request_timeout=60, raise_on_exception=False) - count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) - post=count["count"] - print('[=] Added {} Documents with {}'.format(post-pre,data)) + # for filename in get_files(dir): + # index_file(filename) + p.map(index_file,get_files(dir)) -main() +if __name__ == '__main__': + main()