Files
2026-06-19 16:44:54 -04:00

315 lines
10 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
ocr_screenshots.py OCR screenshot images and store results in SQLite database.
Usage:
ocr_screenshots.py [--db <path>] [--screenshots-dir <path>]
Arguments:
--db Path to the SQLite database file.
Default: $XDG_DATA_HOME/screenshot-gallery/screenshot_ocr.db
(~/.local/share/screenshot-gallery/screenshot_ocr.db)
--screenshots-dir Directory containing screenshot images to process.
Default: ~/Screenshots
"""
import argparse
import glob
import os
import shutil
import signal
import sqlite3
import subprocess
import sys
import tempfile
from datetime import datetime
# ---------------------------------------------------------------------------
# Globals used by the signal handler to stop processing gracefully
# ---------------------------------------------------------------------------
_current_proc: subprocess.Popen | None = None
_shutdown = False
def _handle_signal(signum, frame):
"""Terminate any running tesseract child, then exit."""
global _shutdown
_shutdown = True
if _current_proc is not None:
try:
_current_proc.terminate()
except OSError:
pass
print("\nCancelled.", flush=True)
sys.exit(1)
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
xdg_data = os.environ.get("XDG_DATA_HOME", os.path.expanduser("~/.local/share"))
default_db = os.path.join(xdg_data, "screenshot-gallery", "screenshot_ocr.db")
default_dir = os.path.expanduser("~/Screenshots")
parser = argparse.ArgumentParser(
description="OCR screenshot images and store results in a SQLite database."
)
parser.add_argument(
"--db",
default=default_db,
metavar="PATH",
help=f"Path to the SQLite database (default: {default_db})",
)
parser.add_argument(
"--screenshots-dir",
default=default_dir,
metavar="DIR",
help=f"Directory containing screenshots (default: {default_dir})",
)
return parser.parse_args()
# ---------------------------------------------------------------------------
# Dependency / environment checks
# ---------------------------------------------------------------------------
def check_dependencies(screenshots_dir: str) -> bool:
ok = True
if shutil.which("tesseract") is None:
print(
"Error: tesseract is not installed. Please install it with:\n"
" sudo pacman -S tesseract tesseract-data-eng # Arch\n"
" sudo apt install tesseract-ocr # Debian/Ubuntu",
file=sys.stderr,
)
ok = False
if not os.path.isdir(screenshots_dir):
print(
f"Error: screenshots directory not found: {screenshots_dir}",
file=sys.stderr,
)
ok = False
return ok
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def init_database(db_path: str) -> None:
"""Create the database file and table if they do not already exist."""
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS ocr_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
full_path TEXT,
ocr_text TEXT,
file_size INTEGER,
created_date TEXT,
ocr_date TEXT
)
""")
conn.commit()
conn.close()
print(f"Database ready: {db_path}", flush=True)
def get_processed_files(db_path: str) -> set:
conn = sqlite3.connect(db_path)
rows = conn.execute("SELECT filename FROM ocr_results").fetchall()
conn.close()
return {row[0] for row in rows}
def remove_deleted_files(db_path: str, db_filenames: set, fs_filenames: set) -> int:
"""Delete DB rows for files that no longer exist on the filesystem.
Returns the number of rows removed.
"""
deleted = db_filenames - fs_filenames
if not deleted:
return 0
conn = sqlite3.connect(db_path)
removed = 0
try:
for filename in sorted(deleted):
conn.execute("DELETE FROM ocr_results WHERE filename = ?", (filename,))
print(f"Removed (file deleted): {filename}", flush=True)
removed += 1
conn.commit()
except Exception as exc:
print(f"Error removing deleted entries: {exc}", file=sys.stderr, flush=True)
conn.rollback()
finally:
conn.close()
return removed
def get_total_count(db_path: str) -> int:
conn = sqlite3.connect(db_path)
count = conn.execute("SELECT COUNT(*) FROM ocr_results").fetchone()[0]
conn.close()
return count
def insert_result(db_path: str, filename: str, full_path: str,
ocr_text: str, file_size: int, created_date: str) -> None:
conn = sqlite3.connect(db_path)
try:
conn.execute(
"""
INSERT INTO ocr_results
(filename, full_path, ocr_text, file_size, created_date, ocr_date)
VALUES (?, ?, ?, ?, ?, ?)
""",
(filename, full_path, ocr_text, file_size, created_date,
datetime.now().isoformat()),
)
conn.commit()
print(f"Added: {filename}", flush=True)
except sqlite3.IntegrityError:
print(f"Skipping (already in database): {filename}", flush=True)
except Exception as exc:
print(f"Error adding {filename}: {exc}", file=sys.stderr, flush=True)
finally:
conn.close()
# ---------------------------------------------------------------------------
# OCR
# ---------------------------------------------------------------------------
def perform_ocr(image_path: str) -> str:
"""Run tesseract on image_path and return the extracted text (may be empty)."""
global _current_proc
try:
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as tmp:
tmp_txt = tmp.name
tmp_base = tmp_txt[:-4] # tesseract appends .txt itself
_current_proc = subprocess.Popen(
["tesseract", image_path, tmp_base],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
_, stderr = _current_proc.communicate()
returncode = _current_proc.returncode
_current_proc = None
if returncode != 0:
print(
f"tesseract error on {os.path.basename(image_path)}: "
f"{stderr.decode(errors='replace').strip()}",
file=sys.stderr,
flush=True,
)
return ""
with open(tmp_txt, "r", encoding="utf-8", errors="replace") as fh:
return fh.read().strip()
except Exception as exc:
_current_proc = None
print(f"Error processing {os.path.basename(image_path)}: {exc}",
file=sys.stderr, flush=True)
return ""
finally:
# Clean up temp file if it exists
try:
os.remove(tmp_txt)
except OSError:
pass
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
args = parse_args()
db_path = args.db
screenshots_dir = args.screenshots_dir
print(f"Database: {db_path}", flush=True)
print(f"Screenshots dir: {screenshots_dir}", flush=True)
if not check_dependencies(screenshots_dir):
return 1
init_database(db_path)
processed_files = get_processed_files(db_path)
print(f"Already processed: {len(processed_files)} files", flush=True)
image_files: list[str] = []
for ext in ("*.png", "*.jpg", "*.jpeg", "*.PNG", "*.JPG", "*.JPEG"):
image_files.extend(glob.glob(os.path.join(screenshots_dir, ext)))
image_files = sorted(set(image_files)) # deduplicate (case-insensitive FS edge case)
print(f"Found: {len(image_files)} image files", flush=True)
# Remove DB entries for screenshots that no longer exist on disk
fs_filenames = {os.path.basename(p) for p in image_files}
removed_count = remove_deleted_files(db_path, processed_files, fs_filenames)
if removed_count:
print(f"Removed: {removed_count} stale entries", flush=True)
# Keep processed_files in sync so skip logic below is accurate
processed_files -= (processed_files - fs_filenames)
processed_count = 0
skipped_count = 0
error_count = 0
for image_path in image_files:
if _shutdown:
break
filename = os.path.basename(image_path)
if filename in processed_files:
skipped_count += 1
continue
print(f"Processing: {filename}", flush=True)
stat = os.stat(image_path)
file_size = stat.st_size
created_date = datetime.fromtimestamp(stat.st_mtime).isoformat()
ocr_text = perform_ocr(image_path)
if _shutdown:
break
if ocr_text:
insert_result(db_path, filename, image_path,
ocr_text, file_size, created_date)
processed_count += 1
else:
print(f"No text extracted: {filename}", flush=True)
error_count += 1
total = get_total_count(db_path)
print("", flush=True)
print("OCR process completed:", flush=True)
print(f" Processed: {processed_count}", flush=True)
print(f" Skipped (already in database): {skipped_count}", flush=True)
print(f" Removed (files deleted): {removed_count}", flush=True)
print(f" Errors: {error_count}", flush=True)
print(f" Total records in database: {total}", flush=True)
return 0
if __name__ == "__main__":
sys.exit(main())