From c47b306ee5a6ff6fa02fd6b4d46ba2f31580a6d4 Mon Sep 17 00:00:00 2001 From: Tobias Kessels Date: Thu, 7 Feb 2019 17:37:39 +0100 Subject: [PATCH] more fixes on process_leak added argparsing and fixed pool invocation --- mapping | 63 +++++++++++++++++++++++++++++++++ process_leak.py | 93 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 138 insertions(+), 18 deletions(-) create mode 100644 mapping diff --git a/mapping b/mapping new file mode 100644 index 0000000..7120c1c --- /dev/null +++ b/mapping @@ -0,0 +1,63 @@ +PUT _template/template_1 +{ + "index_patterns" : ["leak*"], + "settings" : { + "number_of_shards" : 2, + "number_of_replicas" : 0, + "refresh_interval": "60s" + }, + + "mappings": { + "credential": { + "properties": { + "containsDigits": { + "type": "boolean" + }, + "containsLowerCase": { + "type": "boolean" + }, + "containsSpecial": { + "type": "boolean" + }, + "containsUpperCase": { + "type": "boolean" + }, + "domain": { + "type": "keyword", + "ignore_above": 512, + "norms" : false + }, + "file": { + "type": "keyword", + "ignore_above": 1024, + "norms" : false + }, + "length": { + "type": "short" + }, + "password": { + "type": "keyword", + "norms" : false, + "ignore_above": 512 + }, + "passwordMask": { + "type": "keyword", + "norms" : false, + "ignore_above": 512 + }, + "user": { + "type": "keyword", + "norms" : false, + "ignore_above": 512 + }, + "username": { + "type": "keyword", + "norms" : false, + "ignore_above": 512 + } + } + } + } + + +} diff --git a/process_leak.py b/process_leak.py index b8ef22c..3f98057 100755 --- a/process_leak.py +++ b/process_leak.py @@ -10,6 +10,13 @@ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from multiprocessing import Pool,Lock import multiprocessing +import hashlib +import json + +import argparse + + + lock = Lock() @@ -119,8 +126,9 @@ def get_lines(file,encoding=None): if not encoding: encoding = get_file_enconding(file) with open(file, 'rb') as f: - for line in f: - yield (strip_badbytes(line, encoding)) + return [strip_badbytes(line, encoding) for line in f] + # for line in f: + # yield (strip_badbytes(line, encoding)) def get_parsable_lines(file,encoding): @@ -140,10 +148,13 @@ def get_parsable_lines(file,encoding): def get_hash(text): - return hex(mmh3.hash(text, 12, signed=False)).split("x")[1] + hash_object = hashlib.md5(text.encode()) + return hash_object.hexdigest() + # return hex(mmh3.hash(text, 12, signed=False)).split("x")[1] def get_user_pw_hash(text): - return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1] + return get_hash(text) + # return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1] def create_doc(file,encoding): for cred in get_parsable_lines(file,encoding): @@ -164,8 +175,7 @@ def create_doc(file,encoding): if len(username_split[0]) > 0 and len(username_split[1]) > 0: doc["username"]=username_split[0] doc["user"]=username_split[1] - id_hash=get_user_pw_hash("{}{}".format(doc["user"],doc["password"])) - # id_domain=get_domain_hash(cred[1]) + id_hash=get_user_pw_hash("{}{}{}".format(doc["user"],doc["domain"],doc["password"])) id_domain=id_hash[:1] yield id_domain, id_hash, doc @@ -174,40 +184,87 @@ def process_file(input_file,encoding): global index, doc_type_name for id_domain, id_hash, doc in create_doc(input_file,encoding): yield { - "_index": "{}_{}".format(index,id_domain), - "_type": doc_type_name, - "_id": id_hash, - "_source": doc + "_index": "{}_{}".format(index,id_domain), + "_type": doc_type_name, + "_id": id_hash, + "_source": doc } + def index_file(input_file): ps=multiprocessing.current_process() encoding=get_file_enconding(input_file) if encoding: - es = Elasticsearch() + es = Elasticsearch(["172.16.1.141"],http_compress=True) # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) # pre=count["count"] log_to_console('[{}:*] Indexing file: {}'.format(ps.pid,input_file)) - success, _ = bulk(es, process_file(input_file,encoding), request_timeout=60, raise_on_exception=False) + try: + success, _ = bulk(es, process_file(input_file,encoding), chunk_size=10000, request_timeout=60, raise_on_error=True, raise_on_exception=False) + # es.bulk() + except Exception as e: + log_to_console('[{}:!] Indexing failed for: {}\n[{}:!] REASON: {}'.format(ps.pid,input_file,e.message)) # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) # post=count["count"] # log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file)) else: log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file)) -index = "leak_col1" +def bench_file(input_file): + ps=multiprocessing.current_process() + encoding=get_file_enconding(input_file) + devnull=open(os.devnull,'w') + if encoding: + es = Elasticsearch() + # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) + # pre=count["count"] + log_to_console('[{}:*] Benching file: {}'.format(ps.pid,input_file)) + docs=0 + try: + # success, _ = bulk(es, process_file(input_file,encoding), chunk_size=1000, request_timeout=60, raise_on_error=False, raise_on_exception=False) + for doc in process_file(input_file,encoding): + docs+=1 + devnull.write(json.dumps(doc)) + + + log_to_console('[{}:*] Benching Done: {} [processed {} docs]'.format(ps.pid,input_file,docs)) + + + + + except Exception as e: + log_to_console('[{}:!] Benching failed for: {}\n[{}:!] REASON: {}'.format(ps.pid,input_file,e.message)) + # count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}}) + # post=count["count"] + # log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file)) + else: + log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file)) + + + +index="" doc_type_name = "credential" log_filename = "processed_files" threshold = -1 #threshold for reparsing an already parsed file -p=Pool(20) def main(): - dir=sys.argv[1] - # for filename in get_files(dir): - # index_file(filename) - p.map(index_file,get_files(dir)) + global index + parser = argparse.ArgumentParser(description="Put Leakdata into local Elasticsearch") + parser.add_argument("-p",help="how many workers (default:4)",default=4,type=int,nargs='?') + parser.add_argument("-i",help="index suffix",default="leak_data") + parser.add_argument("-b",help="dont write to es just benchmark",action='store_true') + parser.add_argument('folder') + args = parser.parse_args() + index=args.i + workers=args.p + dir=args.folder + p=Pool(workers) + if args.b: + p.map(bench_file,get_files(dir)) + else: + p.map(index_file,get_files(dir))