""" Image preprocessing for Sales Bot OCR Adaptive preprocessing pipelines for different image conditions """ import logging import os from typing import Tuple, Optional, List from io import BytesIO logger = logging.getLogger(__name__) # Try to import image processing libraries try: import cv2 import numpy as np CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False logger.warning("OpenCV not available. Image preprocessing will be limited.") try: from PIL import Image, ImageEnhance, ImageFilter PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False logger.warning("PIL not available. Image preprocessing will be limited.") try: from deskew import determine_skew DESKEW_AVAILABLE = True except ImportError: DESKEW_AVAILABLE = False logger.warning("deskew library not available. Rotation correction disabled.") try: import imutils IMUTILS_AVAILABLE = True except ImportError: IMUTILS_AVAILABLE = False logger.warning("imutils not available. Some rotations may not work.") class ImagePreprocessor: """ Preprocesses ticket images for better OCR accuracy. Supports multiple preprocessing pipelines for different image conditions. """ def __init__(self): self.enable_deskew = os.getenv('OCR_ENABLE_DESKEW', 'true').lower() == 'true' self.max_rotation = float(os.getenv('OCR_MAX_ROTATION_ANGLE', '15')) self.use_adaptive = os.getenv('OCR_USE_ADAPTIVE_PIPELINE', 'true').lower() == 'true' # Define preprocessing pipelines self.pipelines = { 'standard': ['grayscale', 'contrast', 'otsu'], 'low_contrast': ['grayscale', 'clahe', 'adaptive_threshold'], 'noisy': ['grayscale', 'denoise', 'sharpen', 'otsu'], 'rotated': ['deskew', 'grayscale', 'contrast', 'otsu'], 'dark': ['grayscale', 'brighten', 'contrast', 'otsu'], 'light': ['grayscale', 'darken', 'contrast', 'otsu'], } def preprocess(self, image_bytes: bytes) -> bytes: """ Preprocess image bytes for OCR. Args: image_bytes: Raw image bytes Returns: Preprocessed image bytes """ if self.use_adaptive and CV2_AVAILABLE: return self.preprocess_adaptive(image_bytes) else: return self.preprocess_basic(image_bytes) def preprocess_basic(self, image_bytes: bytes) -> bytes: """ Basic preprocessing using PIL only. """ if not PIL_AVAILABLE: return image_bytes try: # Load image img = Image.open(BytesIO(image_bytes)) # Convert to grayscale img = img.convert('L') # Enhance contrast enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(1.5) # Sharpen img = img.filter(ImageFilter.SHARPEN) # Save to bytes output = BytesIO() img.save(output, format='PNG') return output.getvalue() except Exception as e: logger.error(f"Error in basic preprocessing: {e}") return image_bytes def preprocess_adaptive(self, image_bytes: bytes) -> bytes: """ Adaptive preprocessing that tries multiple pipelines and returns the best result. """ if not CV2_AVAILABLE: return self.preprocess_basic(image_bytes) try: # Decode image nparr = np.frombuffer(image_bytes, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is None: logger.error("Could not decode image") return image_bytes # Analyze image to determine best pipeline pipeline_name = self._determine_best_pipeline(image) logger.info(f"Using preprocessing pipeline: {pipeline_name}") # Apply pipeline processed = self._apply_pipeline(image, pipeline_name) # Encode back to bytes _, buffer = cv2.imencode('.png', processed) return buffer.tobytes() except Exception as e: logger.error(f"Error in adaptive preprocessing: {e}") return self.preprocess_basic(image_bytes) def _determine_best_pipeline(self, image: 'np.ndarray') -> str: """ Analyzes image to determine the best preprocessing pipeline. """ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Calculate image statistics mean_brightness = np.mean(gray) std_brightness = np.std(gray) # Check for rotation if deskew is enabled if self.enable_deskew and DESKEW_AVAILABLE: try: angle = determine_skew(gray) if abs(angle) > 1.0 and abs(angle) <= self.max_rotation: return 'rotated' except Exception: pass # Determine based on brightness/contrast if mean_brightness < 80: return 'dark' elif mean_brightness > 180: return 'light' elif std_brightness < 40: return 'low_contrast' elif std_brightness > 80: return 'noisy' else: return 'standard' def _apply_pipeline(self, image: 'np.ndarray', pipeline_name: str) -> 'np.ndarray': """ Applies a preprocessing pipeline to the image. """ pipeline = self.pipelines.get(pipeline_name, self.pipelines['standard']) result = image.copy() for step in pipeline: try: result = getattr(self, f'_step_{step}')(result) except AttributeError: logger.warning(f"Unknown preprocessing step: {step}") except Exception as e: logger.warning(f"Error in step {step}: {e}") return result # Pipeline steps def _step_grayscale(self, image: 'np.ndarray') -> 'np.ndarray': """Convert to grayscale.""" if len(image.shape) == 3: return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) return image def _step_contrast(self, image: 'np.ndarray') -> 'np.ndarray': """Enhance contrast using histogram equalization.""" if len(image.shape) == 3: image = self._step_grayscale(image) return cv2.equalizeHist(image) def _step_otsu(self, image: 'np.ndarray') -> 'np.ndarray': """Apply Otsu's thresholding.""" if len(image.shape) == 3: image = self._step_grayscale(image) _, binary = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) return binary def _step_adaptive_threshold(self, image: 'np.ndarray') -> 'np.ndarray': """Apply adaptive thresholding.""" if len(image.shape) == 3: image = self._step_grayscale(image) return cv2.adaptiveThreshold( image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) def _step_clahe(self, image: 'np.ndarray') -> 'np.ndarray': """Apply CLAHE (Contrast Limited Adaptive Histogram Equalization).""" if len(image.shape) == 3: image = self._step_grayscale(image) clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) return clahe.apply(image) def _step_denoise(self, image: 'np.ndarray') -> 'np.ndarray': """Remove noise while preserving edges.""" if len(image.shape) == 3: return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21) return cv2.fastNlMeansDenoising(image, None, 10, 7, 21) def _step_sharpen(self, image: 'np.ndarray') -> 'np.ndarray': """Sharpen the image.""" kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) return cv2.filter2D(image, -1, kernel) def _step_brighten(self, image: 'np.ndarray') -> 'np.ndarray': """Increase image brightness.""" return cv2.convertScaleAbs(image, alpha=1.2, beta=30) def _step_darken(self, image: 'np.ndarray') -> 'np.ndarray': """Decrease image brightness.""" return cv2.convertScaleAbs(image, alpha=0.8, beta=-20) def _step_deskew(self, image: 'np.ndarray') -> 'np.ndarray': """Detect and correct image rotation.""" if not DESKEW_AVAILABLE: return image try: if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image angle = determine_skew(gray) if abs(angle) > self.max_rotation: logger.info(f"Rotation angle {angle} exceeds max {self.max_rotation}, skipping") return image if abs(angle) < 0.5: return image # No significant rotation logger.info(f"Correcting rotation: {angle} degrees") if IMUTILS_AVAILABLE: import imutils return imutils.rotate_bound(image, -angle) else: # Manual rotation (h, w) = image.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) return cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE) except Exception as e: logger.error(f"Error in deskew: {e}") return image def preprocess_image(image_bytes: bytes) -> bytes: """ Convenience function to preprocess image bytes. Args: image_bytes: Raw image bytes Returns: Preprocessed image bytes """ preprocessor = ImagePreprocessor() return preprocessor.preprocess(image_bytes) def preprocess_for_ocr(image_bytes: bytes) -> bytes: """ Alias for preprocess_image. """ return preprocess_image(image_bytes)