more fixes on process_leak

added argparsing and fixed pool invocation
This commit is contained in:
Tobias Kessels
2019-02-07 17:37:39 +01:00
parent 30d026cf70
commit c47b306ee5
2 changed files with 138 additions and 18 deletions

View File

@@ -10,6 +10,13 @@ from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from multiprocessing import Pool,Lock
import multiprocessing
import hashlib
import json
import argparse
lock = Lock()
@@ -119,8 +126,9 @@ def get_lines(file,encoding=None):
if not encoding:
encoding = get_file_enconding(file)
with open(file, 'rb') as f:
for line in f:
yield (strip_badbytes(line, encoding))
return [strip_badbytes(line, encoding) for line in f]
# for line in f:
# yield (strip_badbytes(line, encoding))
def get_parsable_lines(file,encoding):
@@ -140,10 +148,13 @@ def get_parsable_lines(file,encoding):
def get_hash(text):
return hex(mmh3.hash(text, 12, signed=False)).split("x")[1]
hash_object = hashlib.md5(text.encode())
return hash_object.hexdigest()
# return hex(mmh3.hash(text, 12, signed=False)).split("x")[1]
def get_user_pw_hash(text):
return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1]
return get_hash(text)
# return hex(mmh3.hash128(text, 12,signed=False) % 1000000000000000).split("x")[1]
def create_doc(file,encoding):
for cred in get_parsable_lines(file,encoding):
@@ -164,8 +175,7 @@ def create_doc(file,encoding):
if len(username_split[0]) > 0 and len(username_split[1]) > 0:
doc["username"]=username_split[0]
doc["user"]=username_split[1]
id_hash=get_user_pw_hash("{}{}".format(doc["user"],doc["password"]))
# id_domain=get_domain_hash(cred[1])
id_hash=get_user_pw_hash("{}{}{}".format(doc["user"],doc["domain"],doc["password"]))
id_domain=id_hash[:1]
yield id_domain, id_hash, doc
@@ -174,40 +184,87 @@ def process_file(input_file,encoding):
global index, doc_type_name
for id_domain, id_hash, doc in create_doc(input_file,encoding):
yield {
"_index": "{}_{}".format(index,id_domain),
"_type": doc_type_name,
"_id": id_hash,
"_source": doc
"_index": "{}_{}".format(index,id_domain),
"_type": doc_type_name,
"_id": id_hash,
"_source": doc
}
def index_file(input_file):
ps=multiprocessing.current_process()
encoding=get_file_enconding(input_file)
if encoding:
es = Elasticsearch()
es = Elasticsearch(["172.16.1.141"],http_compress=True)
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
# pre=count["count"]
log_to_console('[{}:*] Indexing file: {}'.format(ps.pid,input_file))
success, _ = bulk(es, process_file(input_file,encoding), request_timeout=60, raise_on_exception=False)
try:
success, _ = bulk(es, process_file(input_file,encoding), chunk_size=10000, request_timeout=60, raise_on_error=True, raise_on_exception=False)
# es.bulk()
except Exception as e:
log_to_console('[{}:!] Indexing failed for: {}\n[{}:!] REASON: {}'.format(ps.pid,input_file,e.message))
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
# post=count["count"]
# log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file))
else:
log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file))
index = "leak_col1"
def bench_file(input_file):
ps=multiprocessing.current_process()
encoding=get_file_enconding(input_file)
devnull=open(os.devnull,'w')
if encoding:
es = Elasticsearch()
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
# pre=count["count"]
log_to_console('[{}:*] Benching file: {}'.format(ps.pid,input_file))
docs=0
try:
# success, _ = bulk(es, process_file(input_file,encoding), chunk_size=1000, request_timeout=60, raise_on_error=False, raise_on_exception=False)
for doc in process_file(input_file,encoding):
docs+=1
devnull.write(json.dumps(doc))
log_to_console('[{}:*] Benching Done: {} [processed {} docs]'.format(ps.pid,input_file,docs))
except Exception as e:
log_to_console('[{}:!] Benching failed for: {}\n[{}:!] REASON: {}'.format(ps.pid,input_file,e.message))
# count = es.count(index=index, doc_type=doc_type_name, body={ "query": {"match_all" : { }}})
# post=count["count"]
# log_to_console('[{}:=] Added {} Documents with {}'.format(ps.pid,post-pre,input_file))
else:
log_to_console('[{}:~] Skipping file [Unknown Encoding]: {}'.format(ps.pid,input_file))
index=""
doc_type_name = "credential"
log_filename = "processed_files"
threshold = -1 #threshold for reparsing an already parsed file
p=Pool(20)
def main():
dir=sys.argv[1]
# for filename in get_files(dir):
# index_file(filename)
p.map(index_file,get_files(dir))
global index
parser = argparse.ArgumentParser(description="Put Leakdata into local Elasticsearch")
parser.add_argument("-p",help="how many workers (default:4)",default=4,type=int,nargs='?')
parser.add_argument("-i",help="index suffix",default="leak_data")
parser.add_argument("-b",help="dont write to es just benchmark",action='store_true')
parser.add_argument('folder')
args = parser.parse_args()
index=args.i
workers=args.p
dir=args.folder
p=Pool(workers)
if args.b:
p.map(bench_file,get_files(dir))
else:
p.map(index_file,get_files(dir))