#!/usr/bin/env python3 """ ocr_screenshots.py – OCR screenshot images and store results in SQLite database. Usage: ocr_screenshots.py [--db ] [--screenshots-dir ] Arguments: --db Path to the SQLite database file. Default: $XDG_DATA_HOME/screenshot-gallery/screenshot_ocr.db (~/.local/share/screenshot-gallery/screenshot_ocr.db) --screenshots-dir Directory containing screenshot images to process. Default: ~/Screenshots """ import argparse import glob import os import shutil import signal import sqlite3 import subprocess import sys import tempfile from datetime import datetime # --------------------------------------------------------------------------- # Globals used by the signal handler to stop processing gracefully # --------------------------------------------------------------------------- _current_proc: subprocess.Popen | None = None _shutdown = False def _handle_signal(signum, frame): """Terminate any running tesseract child, then exit.""" global _shutdown _shutdown = True if _current_proc is not None: try: _current_proc.terminate() except OSError: pass print("\nCancelled.", flush=True) sys.exit(1) signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) # --------------------------------------------------------------------------- # Argument parsing # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: xdg_data = os.environ.get("XDG_DATA_HOME", os.path.expanduser("~/.local/share")) default_db = os.path.join(xdg_data, "screenshot-gallery", "screenshot_ocr.db") default_dir = os.path.expanduser("~/Screenshots") parser = argparse.ArgumentParser( description="OCR screenshot images and store results in a SQLite database." ) parser.add_argument( "--db", default=default_db, metavar="PATH", help=f"Path to the SQLite database (default: {default_db})", ) parser.add_argument( "--screenshots-dir", default=default_dir, metavar="DIR", help=f"Directory containing screenshots (default: {default_dir})", ) return parser.parse_args() # --------------------------------------------------------------------------- # Dependency / environment checks # --------------------------------------------------------------------------- def check_dependencies(screenshots_dir: str) -> bool: ok = True if shutil.which("tesseract") is None: print( "Error: tesseract is not installed. Please install it with:\n" " sudo pacman -S tesseract tesseract-data-eng # Arch\n" " sudo apt install tesseract-ocr # Debian/Ubuntu", file=sys.stderr, ) ok = False if not os.path.isdir(screenshots_dir): print( f"Error: screenshots directory not found: {screenshots_dir}", file=sys.stderr, ) ok = False return ok # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def init_database(db_path: str) -> None: """Create the database file and table if they do not already exist.""" os.makedirs(os.path.dirname(db_path), exist_ok=True) conn = sqlite3.connect(db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS ocr_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT UNIQUE, full_path TEXT, ocr_text TEXT, file_size INTEGER, created_date TEXT, ocr_date TEXT ) """) conn.commit() conn.close() print(f"Database ready: {db_path}", flush=True) def get_processed_files(db_path: str) -> set: conn = sqlite3.connect(db_path) rows = conn.execute("SELECT filename FROM ocr_results").fetchall() conn.close() return {row[0] for row in rows} def remove_deleted_files(db_path: str, db_filenames: set, fs_filenames: set) -> int: """Delete DB rows for files that no longer exist on the filesystem. Returns the number of rows removed. """ deleted = db_filenames - fs_filenames if not deleted: return 0 conn = sqlite3.connect(db_path) removed = 0 try: for filename in sorted(deleted): conn.execute("DELETE FROM ocr_results WHERE filename = ?", (filename,)) print(f"Removed (file deleted): {filename}", flush=True) removed += 1 conn.commit() except Exception as exc: print(f"Error removing deleted entries: {exc}", file=sys.stderr, flush=True) conn.rollback() finally: conn.close() return removed def get_total_count(db_path: str) -> int: conn = sqlite3.connect(db_path) count = conn.execute("SELECT COUNT(*) FROM ocr_results").fetchone()[0] conn.close() return count def insert_result(db_path: str, filename: str, full_path: str, ocr_text: str, file_size: int, created_date: str) -> None: conn = sqlite3.connect(db_path) try: conn.execute( """ INSERT INTO ocr_results (filename, full_path, ocr_text, file_size, created_date, ocr_date) VALUES (?, ?, ?, ?, ?, ?) """, (filename, full_path, ocr_text, file_size, created_date, datetime.now().isoformat()), ) conn.commit() print(f"Added: {filename}", flush=True) except sqlite3.IntegrityError: print(f"Skipping (already in database): {filename}", flush=True) except Exception as exc: print(f"Error adding {filename}: {exc}", file=sys.stderr, flush=True) finally: conn.close() # --------------------------------------------------------------------------- # OCR # --------------------------------------------------------------------------- def perform_ocr(image_path: str) -> str: """Run tesseract on image_path and return the extracted text (may be empty).""" global _current_proc try: with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as tmp: tmp_txt = tmp.name tmp_base = tmp_txt[:-4] # tesseract appends .txt itself _current_proc = subprocess.Popen( ["tesseract", image_path, tmp_base], stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) _, stderr = _current_proc.communicate() returncode = _current_proc.returncode _current_proc = None if returncode != 0: print( f"tesseract error on {os.path.basename(image_path)}: " f"{stderr.decode(errors='replace').strip()}", file=sys.stderr, flush=True, ) return "" with open(tmp_txt, "r", encoding="utf-8", errors="replace") as fh: return fh.read().strip() except Exception as exc: _current_proc = None print(f"Error processing {os.path.basename(image_path)}: {exc}", file=sys.stderr, flush=True) return "" finally: # Clean up temp file if it exists try: os.remove(tmp_txt) except OSError: pass # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: args = parse_args() db_path = args.db screenshots_dir = args.screenshots_dir print(f"Database: {db_path}", flush=True) print(f"Screenshots dir: {screenshots_dir}", flush=True) if not check_dependencies(screenshots_dir): return 1 init_database(db_path) processed_files = get_processed_files(db_path) print(f"Already processed: {len(processed_files)} files", flush=True) image_files: list[str] = [] for ext in ("*.png", "*.jpg", "*.jpeg", "*.PNG", "*.JPG", "*.JPEG"): image_files.extend(glob.glob(os.path.join(screenshots_dir, ext))) image_files = sorted(set(image_files)) # deduplicate (case-insensitive FS edge case) print(f"Found: {len(image_files)} image files", flush=True) # Remove DB entries for screenshots that no longer exist on disk fs_filenames = {os.path.basename(p) for p in image_files} removed_count = remove_deleted_files(db_path, processed_files, fs_filenames) if removed_count: print(f"Removed: {removed_count} stale entries", flush=True) # Keep processed_files in sync so skip logic below is accurate processed_files -= (processed_files - fs_filenames) processed_count = 0 skipped_count = 0 error_count = 0 for image_path in image_files: if _shutdown: break filename = os.path.basename(image_path) if filename in processed_files: skipped_count += 1 continue print(f"Processing: {filename}", flush=True) stat = os.stat(image_path) file_size = stat.st_size created_date = datetime.fromtimestamp(stat.st_mtime).isoformat() ocr_text = perform_ocr(image_path) if _shutdown: break if ocr_text: insert_result(db_path, filename, image_path, ocr_text, file_size, created_date) processed_count += 1 else: print(f"No text extracted: {filename}", flush=True) error_count += 1 total = get_total_count(db_path) print("", flush=True) print("OCR process completed:", flush=True) print(f" Processed: {processed_count}", flush=True) print(f" Skipped (already in database): {skipped_count}", flush=True) print(f" Removed (files deleted): {removed_count}", flush=True) print(f" Errors: {error_count}", flush=True) print(f" Total records in database: {total}", flush=True) return 0 if __name__ == "__main__": sys.exit(main())