From 73588b21aceae5f67d8feea5af1158353b0b2a7c Mon Sep 17 00:00:00 2001 From: tke Date: Sun, 8 Mar 2026 12:38:08 +0100 Subject: [PATCH] Add SQLite table to JSONL export tool --- README.md | 2 + tools/forensics/sqlite32jsonl.py | 165 +++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100755 tools/forensics/sqlite32jsonl.py diff --git a/README.md b/README.md index 2dd4abd..0e60737 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ Format: `path | goal | usage`. This section is intentionally compact so `what` c - `tools/forensics/chechsqlite.py` | goal: inspect SQLite databases for password or hash style columns | usage: `python3 tools/forensics/chechsqlite.py sample.db` - `tools/forensics/extractfolder.py` | goal: bulk-extract or sort files from a folder workflow | usage: `python3 tools/forensics/extractfolder.py input_dir` +- `tools/forensics/sqlite32jsonl.py` | goal: export each user table from a SQLite database to one JSONL file with robust filename handling | usage: `python3 tools/forensics/sqlite32jsonl.py sample.db -o outdir` - `tools/forensics/process_leak.py` | goal: inspect process-leak style artifacts | usage: `python3 tools/forensics/process_leak.py artifact` - `tools/forensics/mailunpack` | goal: extract mail attachments inside a constrained container workflow | usage: `tools/forensics/mailunpack message.eml` - `tools/forensics/showgm.sh` | goal: open image GPS EXIF coordinates in Google Maps | usage: `tools/forensics/showgm.sh image.jpg` @@ -218,6 +219,7 @@ Format: `path | goal | usage`. This section is intentionally compact so `what` c - `tools/forensics/chechsqlite.py`: inspects SQLite databases for password/hash-like fields and consistency issues. - `tools/forensics/extractfolder.py`: folder extraction/helper script for bulk processing. +- `tools/forensics/sqlite32jsonl.py`: exports each user table in a SQLite database to a separate JSONL file with argparse help and logging. - `tools/forensics/mailunpack`: containerized `munpack` wrapper for extracting mail attachments safely. - `tools/forensics/process_leak.py`: process-memory or artifact triage helper. - `tools/forensics/showgm.sh`, `showosm.sh`: extract GPS EXIF data from images and open the location in Google Maps or OpenStreetMap. diff --git a/tools/forensics/sqlite32jsonl.py b/tools/forensics/sqlite32jsonl.py new file mode 100755 index 0000000..2b44fff --- /dev/null +++ b/tools/forensics/sqlite32jsonl.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 + +import argparse +import base64 +import json +import logging +import re +import sqlite3 +import sys +from pathlib import Path + + +LOGGER = logging.getLogger("sqlite32jsonl") +SAFE_NAME_RE = re.compile(r"[\\/\x00-\x1f\x7f]+") + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Export each user table in a SQLite database to a JSONL file." + ) + parser.add_argument("database", type=Path, help="Path to the SQLite database file") + parser.add_argument( + "-o", + "--output-dir", + type=Path, + default=Path.cwd(), + help="Directory for exported .jsonl files (default: current directory)", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="Logging verbosity (default: INFO)", + ) + return parser.parse_args() + + +def configure_logging(level_name: str) -> None: + logging.basicConfig( + level=getattr(logging, level_name), + format="%(levelname)s %(message)s", + ) + + +def safe_output_stem(table_name: str) -> str: + safe_name = SAFE_NAME_RE.sub("_", table_name).strip() + return safe_name or "table" + + +def unique_output_path(output_dir: Path, table_name: str, used_names: set[str]) -> Path: + stem = safe_output_stem(table_name) + candidate = f"{stem}.jsonl" + suffix = 2 + + while candidate in used_names: + candidate = f"{stem}_{suffix}.jsonl" + suffix += 1 + + used_names.add(candidate) + return output_dir / candidate + + +def json_value(value): + if isinstance(value, bytes): + return {"$base64": base64.b64encode(value).decode("ascii")} + return value + + +def list_tables(connection: sqlite3.Connection) -> list[str]: + cursor = connection.execute( + """ + SELECT name + FROM sqlite_master + WHERE type = 'table' + AND name NOT LIKE 'sqlite_%' + ORDER BY name + """ + ) + return [row[0] for row in cursor] + + +def export_table( + connection: sqlite3.Connection, + table_name: str, + output_path: Path, +) -> int: + cursor = connection.execute(f'SELECT * FROM "{table_name.replace("\"", "\"\"")}"') + columns = [description[0] for description in cursor.description] + row_count = 0 + + with output_path.open("w", encoding="utf-8", newline="\n") as handle: + for row in cursor: + record = { + column: json_value(value) + for column, value in zip(columns, row, strict=True) + } + handle.write(json.dumps(record, ensure_ascii=False)) + handle.write("\n") + row_count += 1 + + return row_count + + +def validate_paths(database: Path, output_dir: Path) -> None: + if not database.is_file(): + raise FileNotFoundError(f"Database file not found: {database}") + + output_dir.mkdir(parents=True, exist_ok=True) + + if not output_dir.is_dir(): + raise NotADirectoryError(f"Output path is not a directory: {output_dir}") + + +def main() -> int: + args = parse_args() + configure_logging(args.log_level) + + try: + validate_paths(args.database, args.output_dir) + except Exception as exc: + LOGGER.error("%s", exc) + return 1 + + used_names: set[str] = set() + failures = 0 + + try: + with sqlite3.connect(args.database) as connection: + tables = list_tables(connection) + if not tables: + LOGGER.info("No user tables found in %s", args.database) + return 0 + + LOGGER.info("Exporting %d table(s) from %s", len(tables), args.database) + + for table_name in tables: + output_path = unique_output_path(args.output_dir, table_name, used_names) + try: + row_count = export_table(connection, table_name, output_path) + except Exception: + failures += 1 + output_path.unlink(missing_ok=True) + LOGGER.exception("Failed to export table %r", table_name) + continue + + LOGGER.info( + "Created %s from table %r with %d row(s)", + output_path, + table_name, + row_count, + ) + except sqlite3.Error: + LOGGER.exception("Failed to read database %s", args.database) + return 1 + + if failures: + LOGGER.error("Completed with %d failed table export(s)", failures) + return 1 + + LOGGER.info("Export complete") + return 0 + + +if __name__ == "__main__": + sys.exit(main())