fixed filepaths and removed all cmake files

This commit is contained in:
yzinchuk
2026-06-19 16:44:54 -04:00
parent 274f3ad344
commit 125a1354e0
95 changed files with 419 additions and 81249 deletions
+262 -117
View File
@@ -1,169 +1,314 @@
#!/usr/bin/env python3
"""
ocr_screenshots.py OCR screenshot images and store results in SQLite database.
Usage:
ocr_screenshots.py [--db <path>] [--screenshots-dir <path>]
Arguments:
--db Path to the SQLite database file.
Default: $XDG_DATA_HOME/screenshot-gallery/screenshot_ocr.db
(~/.local/share/screenshot-gallery/screenshot_ocr.db)
--screenshots-dir Directory containing screenshot images to process.
Default: ~/Screenshots
"""
import argparse
import glob
import os
import shutil
import signal
import sqlite3
import subprocess
import sys
import tempfile
from datetime import datetime
# Configuration
SCREENSHOTS_DIR = os.path.expanduser("~/Screenshots")
DATABASE_PATH = os.path.expanduser("~/screenshot_ocr.db")
# ---------------------------------------------------------------------------
# Globals used by the signal handler to stop processing gracefully
# ---------------------------------------------------------------------------
_current_proc: subprocess.Popen | None = None
_shutdown = False
def create_database():
"""Create SQLite database and table if they don't exist."""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
def _handle_signal(signum, frame):
"""Terminate any running tesseract child, then exit."""
global _shutdown
_shutdown = True
if _current_proc is not None:
try:
_current_proc.terminate()
except OSError:
pass
print("\nCancelled.", flush=True)
sys.exit(1)
# Create table for OCR results
cursor.execute("""
CREATE TABLE IF NOT EXISTS ocr_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
full_path TEXT,
ocr_text TEXT,
file_size INTEGER,
created_date TEXT,
ocr_date TEXT
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
xdg_data = os.environ.get("XDG_DATA_HOME", os.path.expanduser("~/.local/share"))
default_db = os.path.join(xdg_data, "screenshot-gallery", "screenshot_ocr.db")
default_dir = os.path.expanduser("~/Screenshots")
parser = argparse.ArgumentParser(
description="OCR screenshot images and store results in a SQLite database."
)
""")
parser.add_argument(
"--db",
default=default_db,
metavar="PATH",
help=f"Path to the SQLite database (default: {default_db})",
)
parser.add_argument(
"--screenshots-dir",
default=default_dir,
metavar="DIR",
help=f"Directory containing screenshots (default: {default_dir})",
)
return parser.parse_args()
# ---------------------------------------------------------------------------
# Dependency / environment checks
# ---------------------------------------------------------------------------
def check_dependencies(screenshots_dir: str) -> bool:
ok = True
if shutil.which("tesseract") is None:
print(
"Error: tesseract is not installed. Please install it with:\n"
" sudo pacman -S tesseract tesseract-data-eng # Arch\n"
" sudo apt install tesseract-ocr # Debian/Ubuntu",
file=sys.stderr,
)
ok = False
if not os.path.isdir(screenshots_dir):
print(
f"Error: screenshots directory not found: {screenshots_dir}",
file=sys.stderr,
)
ok = False
return ok
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def init_database(db_path: str) -> None:
"""Create the database file and table if they do not already exist."""
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS ocr_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT UNIQUE,
full_path TEXT,
ocr_text TEXT,
file_size INTEGER,
created_date TEXT,
ocr_date TEXT
)
""")
conn.commit()
conn.close()
print(f"Database initialized at {DATABASE_PATH}")
print(f"Database ready: {db_path}", flush=True)
def get_processed_files():
"""Get a set of filenames that have already been processed."""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT filename FROM ocr_results")
processed_files = {row[0] for row in cursor.fetchall()}
def get_processed_files(db_path: str) -> set:
conn = sqlite3.connect(db_path)
rows = conn.execute("SELECT filename FROM ocr_results").fetchall()
conn.close()
return processed_files
return {row[0] for row in rows}
def perform_ocr(image_path):
"""Perform OCR on an image file using tesseract."""
def remove_deleted_files(db_path: str, db_filenames: set, fs_filenames: set) -> int:
"""Delete DB rows for files that no longer exist on the filesystem.
Returns the number of rows removed.
"""
deleted = db_filenames - fs_filenames
if not deleted:
return 0
conn = sqlite3.connect(db_path)
removed = 0
try:
# Create a temporary output file
temp_output = f"/tmp/{os.path.basename(image_path)}.txt"
temp_base = temp_output.replace(".txt", "")
# Run tesseract
subprocess.run(
["tesseract", image_path, temp_base],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Read OCR text from the output file
with open(temp_output, "r", encoding="utf-8") as f:
ocr_text = f.read().strip()
# Clean up temporary file
os.remove(temp_output)
return ocr_text
except subprocess.CalledProcessError as e:
print(f"Error running tesseract on {image_path}: {str(e)}")
return ""
except Exception as e:
print(f"Error processing {image_path}: {str(e)}")
return ""
def add_to_database(filename, full_path, ocr_text, file_size, created_date):
"""Add OCR result to the database."""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO ocr_results
(filename, full_path, ocr_text, file_size, created_date, ocr_date)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
filename,
full_path,
ocr_text,
file_size,
created_date,
datetime.now().isoformat(),
),
)
for filename in sorted(deleted):
conn.execute("DELETE FROM ocr_results WHERE filename = ?", (filename,))
print(f"Removed (file deleted): {filename}", flush=True)
removed += 1
conn.commit()
print(f"Added {filename} to database")
except Exception as exc:
print(f"Error removing deleted entries: {exc}", file=sys.stderr, flush=True)
conn.rollback()
finally:
conn.close()
return removed
def get_total_count(db_path: str) -> int:
conn = sqlite3.connect(db_path)
count = conn.execute("SELECT COUNT(*) FROM ocr_results").fetchone()[0]
conn.close()
return count
def insert_result(db_path: str, filename: str, full_path: str,
ocr_text: str, file_size: int, created_date: str) -> None:
conn = sqlite3.connect(db_path)
try:
conn.execute(
"""
INSERT INTO ocr_results
(filename, full_path, ocr_text, file_size, created_date, ocr_date)
VALUES (?, ?, ?, ?, ?, ?)
""",
(filename, full_path, ocr_text, file_size, created_date,
datetime.now().isoformat()),
)
conn.commit()
print(f"Added: {filename}", flush=True)
except sqlite3.IntegrityError:
print(f"File {filename} already exists in database")
except Exception as e:
print(f"Error adding {filename} to database: {str(e)}")
print(f"Skipping (already in database): {filename}", flush=True)
except Exception as exc:
print(f"Error adding {filename}: {exc}", file=sys.stderr, flush=True)
finally:
conn.close()
def main():
"""Main function to process screenshot images."""
print("Starting OCR process for screenshots...")
# ---------------------------------------------------------------------------
# OCR
# ---------------------------------------------------------------------------
def perform_ocr(image_path: str) -> str:
"""Run tesseract on image_path and return the extracted text (may be empty)."""
global _current_proc
# Create database if it doesn't exist
create_database()
try:
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as tmp:
tmp_txt = tmp.name
tmp_base = tmp_txt[:-4] # tesseract appends .txt itself
# Get list of already processed files
processed_files = get_processed_files()
print(f"Found {len(processed_files)} already processed files")
_current_proc = subprocess.Popen(
["tesseract", image_path, tmp_base],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
_, stderr = _current_proc.communicate()
returncode = _current_proc.returncode
_current_proc = None
# Get list of PNG and JPG files
image_files = glob.glob(os.path.join(SCREENSHOTS_DIR, "*.png"))
image_files.extend(glob.glob(os.path.join(SCREENSHOTS_DIR, "*.jpg")))
image_files.extend(glob.glob(os.path.join(SCREENSHOTS_DIR, "*.jpeg")))
print(f"Found {len(image_files)} image files")
if returncode != 0:
print(
f"tesseract error on {os.path.basename(image_path)}: "
f"{stderr.decode(errors='replace').strip()}",
file=sys.stderr,
flush=True,
)
return ""
with open(tmp_txt, "r", encoding="utf-8", errors="replace") as fh:
return fh.read().strip()
except Exception as exc:
_current_proc = None
print(f"Error processing {os.path.basename(image_path)}: {exc}",
file=sys.stderr, flush=True)
return ""
finally:
# Clean up temp file if it exists
try:
os.remove(tmp_txt)
except OSError:
pass
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
args = parse_args()
db_path = args.db
screenshots_dir = args.screenshots_dir
print(f"Database: {db_path}", flush=True)
print(f"Screenshots dir: {screenshots_dir}", flush=True)
if not check_dependencies(screenshots_dir):
return 1
init_database(db_path)
processed_files = get_processed_files(db_path)
print(f"Already processed: {len(processed_files)} files", flush=True)
image_files: list[str] = []
for ext in ("*.png", "*.jpg", "*.jpeg", "*.PNG", "*.JPG", "*.JPEG"):
image_files.extend(glob.glob(os.path.join(screenshots_dir, ext)))
image_files = sorted(set(image_files)) # deduplicate (case-insensitive FS edge case)
print(f"Found: {len(image_files)} image files", flush=True)
# Remove DB entries for screenshots that no longer exist on disk
fs_filenames = {os.path.basename(p) for p in image_files}
removed_count = remove_deleted_files(db_path, processed_files, fs_filenames)
if removed_count:
print(f"Removed: {removed_count} stale entries", flush=True)
# Keep processed_files in sync so skip logic below is accurate
processed_files -= (processed_files - fs_filenames)
# Process each image file
processed_count = 0
skipped_count = 0
error_count = 0
skipped_count = 0
error_count = 0
for image_path in image_files:
if _shutdown:
break
filename = os.path.basename(image_path)
# Skip if already processed
if filename in processed_files:
print(f"Skipping {filename} (already processed)")
skipped_count += 1
continue
print(f"Processing {filename}...")
print(f"Processing: {filename}", flush=True)
# Get file information
file_stats = os.stat(image_path)
file_size = file_stats.st_size
created_date = datetime.fromtimestamp(file_stats.st_mtime).isoformat()
stat = os.stat(image_path)
file_size = stat.st_size
created_date = datetime.fromtimestamp(stat.st_mtime).isoformat()
# Perform OCR
ocr_text = perform_ocr(image_path)
if _shutdown:
break
if ocr_text:
# Add to database
add_to_database(filename, image_path, ocr_text, file_size, created_date)
insert_result(db_path, filename, image_path,
ocr_text, file_size, created_date)
processed_count += 1
else:
print(f"No OCR text extracted from {filename}")
print(f"No text extracted: {filename}", flush=True)
error_count += 1
print("\nOCR process completed:")
print(f"- Processed: {processed_count}")
print(f"- Skipped (already in database): {skipped_count}")
print(f"- Errors: {error_count}")
print(f"- Total files in database: {len(processed_files) + processed_count}")
total = get_total_count(db_path)
print("", flush=True)
print("OCR process completed:", flush=True)
print(f" Processed: {processed_count}", flush=True)
print(f" Skipped (already in database): {skipped_count}", flush=True)
print(f" Removed (files deleted): {removed_count}", flush=True)
print(f" Errors: {error_count}", flush=True)
print(f" Total records in database: {total}", flush=True)
return 0
if __name__ == "__main__":
main()
sys.exit(main())