#!/usr/bin/env python3 import argparse import base64 import json import logging import re import sqlite3 import sys from pathlib import Path LOGGER = logging.getLogger("sqlite32jsonl") SAFE_NAME_RE = re.compile(r"[\\/\x00-\x1f\x7f]+") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Export each user table in a SQLite database to a JSONL file." ) parser.add_argument("database", type=Path, help="Path to the SQLite database file") parser.add_argument( "-o", "--output-dir", type=Path, default=Path.cwd(), help="Directory for exported .jsonl files (default: current directory)", ) parser.add_argument( "--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging verbosity (default: INFO)", ) return parser.parse_args() def configure_logging(level_name: str) -> None: logging.basicConfig( level=getattr(logging, level_name), format="%(levelname)s %(message)s", ) def safe_output_stem(table_name: str) -> str: safe_name = SAFE_NAME_RE.sub("_", table_name).strip() return safe_name or "table" def unique_output_path(output_dir: Path, table_name: str, used_names: set[str]) -> Path: stem = safe_output_stem(table_name) candidate = f"{stem}.jsonl" suffix = 2 while candidate in used_names: candidate = f"{stem}_{suffix}.jsonl" suffix += 1 used_names.add(candidate) return output_dir / candidate def json_value(value): if isinstance(value, bytes): return {"$base64": base64.b64encode(value).decode("ascii")} return value def list_tables(connection: sqlite3.Connection) -> list[str]: cursor = connection.execute( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name """ ) return [row[0] for row in cursor] def export_table( connection: sqlite3.Connection, table_name: str, output_path: Path, ) -> int: cursor = connection.execute(f'SELECT * FROM "{table_name.replace("\"", "\"\"")}"') columns = [description[0] for description in cursor.description] row_count = 0 with output_path.open("w", encoding="utf-8", newline="\n") as handle: for row in cursor: record = { column: json_value(value) for column, value in zip(columns, row, strict=True) } handle.write(json.dumps(record, ensure_ascii=False)) handle.write("\n") row_count += 1 return row_count def validate_paths(database: Path, output_dir: Path) -> None: if not database.is_file(): raise FileNotFoundError(f"Database file not found: {database}") output_dir.mkdir(parents=True, exist_ok=True) if not output_dir.is_dir(): raise NotADirectoryError(f"Output path is not a directory: {output_dir}") def main() -> int: args = parse_args() configure_logging(args.log_level) try: validate_paths(args.database, args.output_dir) except Exception as exc: LOGGER.error("%s", exc) return 1 used_names: set[str] = set() failures = 0 try: with sqlite3.connect(args.database) as connection: tables = list_tables(connection) if not tables: LOGGER.info("No user tables found in %s", args.database) return 0 LOGGER.info("Exporting %d table(s) from %s", len(tables), args.database) for table_name in tables: output_path = unique_output_path(args.output_dir, table_name, used_names) try: row_count = export_table(connection, table_name, output_path) except Exception: failures += 1 output_path.unlink(missing_ok=True) LOGGER.exception("Failed to export table %r", table_name) continue LOGGER.info( "Created %s from table %r with %d row(s)", output_path, table_name, row_count, ) except sqlite3.Error: LOGGER.exception("Failed to read database %s", args.database) return 1 if failures: LOGGER.error("Completed with %d failed table export(s)", failures) return 1 LOGGER.info("Export complete") return 0 if __name__ == "__main__": sys.exit(main())