From 48e0884bf75211728cdeab4d1a949ac2cc5937a8 Mon Sep 17 00:00:00 2001 From: Esteban Date: Sun, 1 Feb 2026 20:54:13 -0600 Subject: [PATCH] Notifications --- src/api/notifications.ts | 101 ++++++ src/components/NotificationDropdown.tsx | 257 +++++++++++++++ src/components/layout/TopMenu.tsx | 43 ++- src/hooks/useNotifications.ts | 183 +++++++++++ water-api/package.json | 2 + water-api/sql/add_notifications.sql | 39 +++ .../controllers/notification.controller.ts | 233 ++++++++++++++ water-api/src/index.ts | 4 + water-api/src/jobs/negativeFlowDetection.ts | 170 ++++++++++ water-api/src/routes/index.ts | 12 + water-api/src/routes/notification.routes.ts | 62 ++++ .../src/services/notification.service.ts | 296 ++++++++++++++++++ water-api/src/types/index.ts | 37 +++ .../src/validators/notification.validator.ts | 196 ++++++++++++ 14 files changed, 1628 insertions(+), 7 deletions(-) create mode 100644 src/api/notifications.ts create mode 100644 src/components/NotificationDropdown.tsx create mode 100644 src/hooks/useNotifications.ts create mode 100644 water-api/sql/add_notifications.sql create mode 100644 water-api/src/controllers/notification.controller.ts create mode 100644 water-api/src/jobs/negativeFlowDetection.ts create mode 100644 water-api/src/routes/notification.routes.ts create mode 100644 water-api/src/services/notification.service.ts create mode 100644 water-api/src/validators/notification.validator.ts diff --git a/src/api/notifications.ts b/src/api/notifications.ts new file mode 100644 index 0000000..2379d26 --- /dev/null +++ b/src/api/notifications.ts @@ -0,0 +1,101 @@ +import { apiClient } from './client'; + +export type NotificationType = 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE'; + +export interface Notification { + id: string; + user_id: string; + meter_id: string | null; + notification_type: NotificationType; + title: string; + message: string; + meter_serial_number: string | null; + flow_value: number | null; + is_read: boolean; + read_at: string | null; + created_at: string; +} + +export interface PaginatedNotifications { + data: Notification[]; + pagination: { + page: number; + limit: number; + total: number; + totalPages: number; + hasNextPage: boolean; + hasPreviousPage: boolean; + }; +} + +export interface NotificationFilters { + page?: number; + limit?: number; + is_read?: boolean; + notification_type?: NotificationType; + start_date?: string; + end_date?: string; +} + +/** + * Fetch all notifications for the current user with optional filtering + * @param filters - Optional filters for notifications + * @returns Promise resolving to paginated notifications + */ +export async function fetchNotifications(filters?: NotificationFilters): Promise { + const params: Record = {}; + + if (filters?.page !== undefined) params.page = filters.page; + if (filters?.limit !== undefined) params.limit = filters.limit; + if (filters?.is_read !== undefined) params.is_read = filters.is_read; + if (filters?.notification_type) params.notification_type = filters.notification_type; + if (filters?.start_date) params.start_date = filters.start_date; + if (filters?.end_date) params.end_date = filters.end_date; + + return apiClient.get('/api/notifications', { params }); +} + +/** + * Get count of unread notifications + * @returns Promise resolving to unread count + */ +export async function getUnreadCount(): Promise { + const response = await apiClient.get<{ count: number }>('/api/notifications/unread-count'); + return response.count; +} + +/** + * Fetch a single notification by ID + * @param id - The notification ID + * @returns Promise resolving to the notification + */ +export async function fetchNotification(id: string): Promise { + return apiClient.get(`/api/notifications/${id}`); +} + +/** + * Mark a notification as read + * @param id - The notification ID + * @returns Promise resolving to the updated notification + */ +export async function markAsRead(id: string): Promise { + return apiClient.patch(`/api/notifications/${id}/read`); +} + +/** + * Mark all notifications as read + * @returns Promise resolving to count of marked notifications + */ +export async function markAllAsRead(): Promise { + const response = await apiClient.patch<{ count: number }>('/api/notifications/read-all'); + return response.count; +} + +/** + * Delete a notification + * @param id - The notification ID + * @returns Promise resolving when the notification is deleted + */ +export async function deleteNotification(id: string): Promise { + return apiClient.delete(`/api/notifications/${id}`); +} diff --git a/src/components/NotificationDropdown.tsx b/src/components/NotificationDropdown.tsx new file mode 100644 index 0000000..90a2fc1 --- /dev/null +++ b/src/components/NotificationDropdown.tsx @@ -0,0 +1,257 @@ +/** + * NotificationDropdown Component + * Displays a dropdown with user notifications + */ + +import React, { useEffect } from 'react'; +import { X, Check, Trash2, AlertCircle } from 'lucide-react'; +import { useNotifications } from '../hooks/useNotifications'; +import type { Notification } from '../api/notifications'; + +interface NotificationDropdownProps { + isOpen: boolean; + onClose: () => void; +} + +/** + * Format timestamp to relative time (e.g., "2 hours ago") + */ +function formatTimeAgo(timestamp: string): string { + const now = new Date(); + const created = new Date(timestamp); + const diffMs = now.getTime() - created.getTime(); + const diffMins = Math.floor(diffMs / 60000); + const diffHours = Math.floor(diffMs / 3600000); + const diffDays = Math.floor(diffMs / 86400000); + + if (diffMins < 1) return 'Just now'; + if (diffMins < 60) return `${diffMins} minute${diffMins !== 1 ? 's' : ''} ago`; + if (diffHours < 24) return `${diffHours} hour${diffHours !== 1 ? 's' : ''} ago`; + if (diffDays < 7) return `${diffDays} day${diffDays !== 1 ? 's' : ''} ago`; + + return created.toLocaleDateString(); +} + +/** + * Single notification item component + */ +const NotificationItem: React.FC<{ + notification: Notification; + onMarkAsRead: (id: string) => void; + onDelete: (id: string) => void; +}> = ({ notification, onMarkAsRead, onDelete }) => { + const handleClick = () => { + if (!notification.is_read) { + onMarkAsRead(notification.id); + } + }; + + const handleDelete = (e: React.MouseEvent) => { + e.stopPropagation(); + onDelete(notification.id); + }; + + return ( +
+
+ {/* Icon */} +
+ +
+ + {/* Content */} +
+
+

+ {notification.title} +

+ + {!notification.is_read && ( + + )} +
+ +

+ {notification.message} +

+ + {notification.flow_value !== null && ( +

+ Flow value: {notification.flow_value} units +

+ )} + +
+ + {formatTimeAgo(notification.created_at)} + + + +
+
+
+
+ ); +}; + +/** + * Main NotificationDropdown component + */ +const NotificationDropdown: React.FC = ({ isOpen, onClose }) => { + const { + notifications, + loading, + error, + hasMore, + fetchNotifications, + fetchMore, + markAsRead, + markAllAsRead, + deleteNotification, + } = useNotifications(); + + // Fetch notifications when dropdown opens + useEffect(() => { + if (isOpen) { + fetchNotifications(); + } + }, [isOpen, fetchNotifications]); + + // Close dropdown on Escape key + useEffect(() => { + const handleEscape = (e: KeyboardEvent) => { + if (e.key === 'Escape') { + onClose(); + } + }; + + if (isOpen) { + document.addEventListener('keydown', handleEscape); + return () => document.removeEventListener('keydown', handleEscape); + } + }, [isOpen, onClose]); + + if (!isOpen) return null; + + const handleMarkAllAsRead = async () => { + try { + await markAllAsRead(); + } catch (err) { + console.error('Error marking all as read:', err); + } + }; + + const handleMarkAsRead = async (id: string) => { + try { + await markAsRead(id); + } catch (err) { + console.error('Error marking as read:', err); + } + }; + + const handleDelete = async (id: string) => { + try { + await deleteNotification(id); + } catch (err) { + console.error('Error deleting notification:', err); + } + }; + + const unreadCount = notifications.filter(n => !n.is_read).length; + + return ( +
+ {/* Header */} +
+
+

Notifications

+ {unreadCount > 0 && ( +

{unreadCount} unread

+ )} +
+ +
+ {unreadCount > 0 && ( + + )} + + +
+
+ + {/* Content */} +
+ {loading && notifications.length === 0 ? ( +
+ Loading notifications... +
+ ) : error ? ( +
+ {error} +
+ ) : notifications.length === 0 ? ( +
+
+ +
+

No notifications

+

+ You're all caught up! +

+
+ ) : ( + <> + {notifications.map(notification => ( + + ))} + + {hasMore && ( +
+ +
+ )} + + )} +
+
+ ); +}; + +export default NotificationDropdown; diff --git a/src/components/layout/TopMenu.tsx b/src/components/layout/TopMenu.tsx index a0ca694..7977cc5 100644 --- a/src/components/layout/TopMenu.tsx +++ b/src/components/layout/TopMenu.tsx @@ -1,5 +1,7 @@ import React, { useEffect, useMemo, useRef, useState } from "react"; import { Bell, User, LogOut } from "lucide-react"; +import NotificationDropdown from "../NotificationDropdown"; +import { useNotifications } from "../../hooks/useNotifications"; interface TopMenuProps { page: string; @@ -29,7 +31,11 @@ const TopMenu: React.FC = ({ onRequestLogout, }) => { const [openUserMenu, setOpenUserMenu] = useState(false); + const [openNotifications, setOpenNotifications] = useState(false); const menuRef = useRef(null); + const notificationRef = useRef(null); + + const { unreadCount } = useNotifications(); const initials = useMemo(() => { const parts = (userName || "").trim().split(/\s+/).filter(Boolean); @@ -48,6 +54,16 @@ const TopMenu: React.FC = ({ return () => document.removeEventListener("mousedown", handleClickOutside); }, [openUserMenu]); + useEffect(() => { + function handleClickOutside(e: MouseEvent) { + if (!openNotifications) return; + const el = notificationRef.current; + if (el && !el.contains(e.target as Node)) setOpenNotifications(false); + } + document.addEventListener("mousedown", handleClickOutside); + return () => document.removeEventListener("mousedown", handleClickOutside); + }, [openNotifications]); + useEffect(() => { function handleEsc(e: KeyboardEvent) { if (e.key === "Escape") setOpenUserMenu(false); @@ -81,13 +97,26 @@ const TopMenu: React.FC = ({ {/* DERECHA */}
- +
+ + + setOpenNotifications(false)} + /> +
{/* USER MENU */}
diff --git a/src/hooks/useNotifications.ts b/src/hooks/useNotifications.ts new file mode 100644 index 0000000..64ebfd8 --- /dev/null +++ b/src/hooks/useNotifications.ts @@ -0,0 +1,183 @@ +import { useState, useEffect, useCallback, useRef } from 'react'; +import * as notificationsApi from '../api/notifications'; +import type { Notification, NotificationFilters } from '../api/notifications'; + +interface UseNotificationsReturn { + notifications: Notification[]; + unreadCount: number; + loading: boolean; + error: string | null; + hasMore: boolean; + page: number; + + fetchNotifications: (filters?: NotificationFilters) => Promise; + fetchMore: () => Promise; + refreshUnreadCount: () => Promise; + markAsRead: (id: string) => Promise; + markAllAsRead: () => Promise; + deleteNotification: (id: string) => Promise; + refresh: () => Promise; +} + +/** + * Custom hook for managing notifications + * @param autoRefreshInterval - Interval in milliseconds to auto-refresh unread count (default: 30000ms) + * @returns Object with notifications data and methods + */ +export function useNotifications(autoRefreshInterval: number = 30000): UseNotificationsReturn { + const [notifications, setNotifications] = useState([]); + const [unreadCount, setUnreadCount] = useState(0); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [hasMore, setHasMore] = useState(false); + const [page, setPage] = useState(1); + + const refreshIntervalRef = useRef(null); + + const fetchNotifications = useCallback(async (filters?: NotificationFilters) => { + try { + setLoading(true); + setError(null); + + const response = await notificationsApi.fetchNotifications({ + page: 1, + limit: 20, + ...filters, + }); + + setNotifications(response.data); + setHasMore(response.pagination.hasNextPage); + setPage(response.pagination.page); + } catch (err) { + console.error('Error fetching notifications:', err); + setError(err instanceof Error ? err.message : 'Failed to fetch notifications'); + } finally { + setLoading(false); + } + }, []); + + const fetchMore = useCallback(async () => { + if (!hasMore || loading) return; + + try { + setLoading(true); + + const response = await notificationsApi.fetchNotifications({ + page: page + 1, + limit: 20, + }); + + setNotifications(prev => [...prev, ...response.data]); + setHasMore(response.pagination.hasNextPage); + setPage(response.pagination.page); + } catch (err) { + console.error('Error fetching more notifications:', err); + setError(err instanceof Error ? err.message : 'Failed to fetch more notifications'); + } finally { + setLoading(false); + } + }, [hasMore, loading, page]); + + const refreshUnreadCount = useCallback(async () => { + try { + const count = await notificationsApi.getUnreadCount(); + setUnreadCount(count); + } catch (err) { + console.error('Error fetching unread count:', err); + } + }, []); + + const markAsRead = useCallback(async (id: string) => { + try { + await notificationsApi.markAsRead(id); + + setNotifications(prev => + prev.map(notification => + notification.id === id + ? { ...notification, is_read: true, read_at: new Date().toISOString() } + : notification + ) + ); + + await refreshUnreadCount(); + } catch (err) { + console.error('Error marking notification as read:', err); + throw err; + } + }, [refreshUnreadCount]); + + const markAllAsRead = useCallback(async () => { + try { + await notificationsApi.markAllAsRead(); + + setNotifications(prev => + prev.map(notification => ({ + ...notification, + is_read: true, + read_at: new Date().toISOString(), + })) + ); + + setUnreadCount(0); + } catch (err) { + console.error('Error marking all notifications as read:', err); + throw err; + } + }, []); + + const deleteNotification = useCallback(async (id: string) => { + try { + await notificationsApi.deleteNotification(id); + + const deletedNotification = notifications.find(n => n.id === id); + setNotifications(prev => prev.filter(notification => notification.id !== id)); + + if (deletedNotification && !deletedNotification.is_read) { + setUnreadCount(prev => Math.max(0, prev - 1)); + } + } catch (err) { + console.error('Error deleting notification:', err); + throw err; + } + }, [notifications]); + + const refresh = useCallback(async () => { + await Promise.all([ + fetchNotifications(), + refreshUnreadCount(), + ]); + }, [fetchNotifications, refreshUnreadCount]); + + useEffect(() => { + refreshUnreadCount(); + + if (autoRefreshInterval > 0) { + refreshIntervalRef.current = setInterval(() => { + refreshUnreadCount(); + }, autoRefreshInterval); + } + + return () => { + if (refreshIntervalRef.current) { + clearInterval(refreshIntervalRef.current); + } + }; + }, [autoRefreshInterval, refreshUnreadCount]); + + return { + notifications, + unreadCount, + loading, + error, + hasMore, + page, + + fetchNotifications, + fetchMore, + refreshUnreadCount, + markAsRead, + markAllAsRead, + deleteNotification, + refresh, + }; +} diff --git a/water-api/package.json b/water-api/package.json index 068b9a4..968dfd8 100644 --- a/water-api/package.json +++ b/water-api/package.json @@ -26,6 +26,7 @@ "helmet": "^7.1.0", "jsonwebtoken": "^9.0.2", "multer": "^2.0.2", + "node-cron": "^3.0.3", "pg": "^8.11.3", "winston": "^3.11.0", "xlsx": "^0.18.5", @@ -37,6 +38,7 @@ "@types/express": "^4.17.21", "@types/jsonwebtoken": "^9.0.5", "@types/node": "^20.11.5", + "@types/node-cron": "^3.0.11", "@types/pg": "^8.10.9", "nodemon": "^3.0.3", "ts-node-dev": "^2.0.0", diff --git a/water-api/sql/add_notifications.sql b/water-api/sql/add_notifications.sql new file mode 100644 index 0000000..335dc59 --- /dev/null +++ b/water-api/sql/add_notifications.sql @@ -0,0 +1,39 @@ +-- ============================================================================ +-- Add Notifications Table +-- Migration for notification system supporting negative flow alerts +-- ============================================================================ + +-- Create notification type enum +CREATE TYPE notification_type AS ENUM ('NEGATIVE_FLOW', 'SYSTEM_ALERT', 'MAINTENANCE'); + +-- ============================================================================ +-- TABLE: notifications +-- ============================================================================ +CREATE TABLE notifications ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + meter_id UUID REFERENCES meters(id) ON DELETE SET NULL, + notification_type notification_type NOT NULL DEFAULT 'NEGATIVE_FLOW', + title VARCHAR(255) NOT NULL, + message TEXT NOT NULL, + meter_serial_number VARCHAR(255), + flow_value DECIMAL(12, 4), + is_read BOOLEAN NOT NULL DEFAULT FALSE, + read_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- Indexes for performance +CREATE INDEX idx_notifications_user_id ON notifications(user_id); +CREATE INDEX idx_notifications_meter_id ON notifications(meter_id); +CREATE INDEX idx_notifications_is_read ON notifications(is_read); +CREATE INDEX idx_notifications_created_at ON notifications(created_at DESC); +CREATE INDEX idx_notifications_user_unread ON notifications(user_id, is_read) WHERE is_read = FALSE; + +COMMENT ON TABLE notifications IS 'User notifications for meter alerts and system events'; +COMMENT ON COLUMN notifications.user_id IS 'User who receives this notification'; +COMMENT ON COLUMN notifications.meter_id IS 'Related meter (nullable if meter is deleted)'; +COMMENT ON COLUMN notifications.notification_type IS 'Type of notification'; +COMMENT ON COLUMN notifications.flow_value IS 'Flow value if negative flow alert'; +COMMENT ON COLUMN notifications.is_read IS 'Whether notification has been read by user'; +COMMENT ON COLUMN notifications.read_at IS 'Timestamp when notification was marked as read'; diff --git a/water-api/src/controllers/notification.controller.ts b/water-api/src/controllers/notification.controller.ts new file mode 100644 index 0000000..09d1ca9 --- /dev/null +++ b/water-api/src/controllers/notification.controller.ts @@ -0,0 +1,233 @@ +import { Response } from 'express'; +import { AuthenticatedRequest } from '../middleware/auth.middleware'; +import * as notificationService from '../services/notification.service'; +import { NotificationFilter } from '../types'; + +/** + * GET /api/notifications + * List all notifications for the authenticated user with pagination + * Query params: page, limit, is_read, notification_type, start_date, end_date + */ +export async function getAll(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const page = parseInt(req.query.page as string, 10) || 1; + const limit = Math.min(parseInt(req.query.limit as string, 10) || 20, 100); + + const filters: NotificationFilter = {}; + + if (req.query.is_read !== undefined) { + filters.is_read = req.query.is_read === 'true'; + } + + if (req.query.notification_type) { + filters.notification_type = req.query.notification_type as 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE'; + } + + if (req.query.start_date) { + filters.start_date = new Date(req.query.start_date as string); + } + + if (req.query.end_date) { + filters.end_date = new Date(req.query.end_date as string); + } + + const result = await notificationService.getAllForUser( + req.user.userId, + filters, + { page, limit } + ); + + res.status(200).json({ + success: true, + data: result.notifications, + pagination: result.pagination, + }); + } catch (error) { + console.error('Error fetching notifications:', error); + res.status(500).json({ + success: false, + error: 'Failed to fetch notifications', + }); + } +} + +/** + * GET /api/notifications/unread-count + * Get count of unread notifications for the authenticated user + */ +export async function getUnreadCount(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const count = await notificationService.getUnreadCount(req.user.userId); + + res.status(200).json({ + success: true, + data: { count }, + }); + } catch (error) { + console.error('Error fetching unread count:', error); + res.status(500).json({ + success: false, + error: 'Failed to fetch unread count', + }); + } +} + +/** + * GET /api/notifications/:id + * Get a single notification by ID + */ +export async function getById(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const { id } = req.params; + const notification = await notificationService.getById(id, req.user.userId); + + if (!notification) { + res.status(404).json({ + success: false, + error: 'Notification not found', + }); + return; + } + + res.status(200).json({ + success: true, + data: notification, + }); + } catch (error) { + console.error('Error fetching notification:', error); + res.status(500).json({ + success: false, + error: 'Failed to fetch notification', + }); + } +} + +/** + * PATCH /api/notifications/:id/read + * Mark a notification as read + */ +export async function markAsRead(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const { id } = req.params; + const notification = await notificationService.markAsRead(id, req.user.userId); + + if (!notification) { + res.status(404).json({ + success: false, + error: 'Notification not found', + }); + return; + } + + res.status(200).json({ + success: true, + data: notification, + }); + } catch (error) { + console.error('Error marking notification as read:', error); + res.status(500).json({ + success: false, + error: 'Failed to mark notification as read', + }); + } +} + +/** + * PATCH /api/notifications/read-all + * Mark all notifications as read for the authenticated user + */ +export async function markAllAsRead(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const count = await notificationService.markAllAsRead(req.user.userId); + + res.status(200).json({ + success: true, + data: { count }, + message: `Marked ${count} notification(s) as read`, + }); + } catch (error) { + console.error('Error marking all notifications as read:', error); + res.status(500).json({ + success: false, + error: 'Failed to mark all notifications as read', + }); + } +} + +/** + * DELETE /api/notifications/:id + * Delete a notification + */ +export async function deleteNotification(req: AuthenticatedRequest, res: Response): Promise { + try { + if (!req.user?.userId) { + res.status(401).json({ + success: false, + error: 'Unauthorized', + }); + return; + } + + const { id } = req.params; + const deleted = await notificationService.deleteNotification(id, req.user.userId); + + if (!deleted) { + res.status(404).json({ + success: false, + error: 'Notification not found', + }); + return; + } + + res.status(200).json({ + success: true, + message: 'Notification deleted successfully', + }); + } catch (error) { + console.error('Error deleting notification:', error); + res.status(500).json({ + success: false, + error: 'Failed to delete notification', + }); + } +} diff --git a/water-api/src/index.ts b/water-api/src/index.ts index ac11f02..8649788 100644 --- a/water-api/src/index.ts +++ b/water-api/src/index.ts @@ -8,6 +8,7 @@ import routes from './routes'; import logger from './utils/logger'; import { testConnection } from './config/database'; import { auditMiddleware } from './middleware/audit.middleware'; +import { scheduleNegativeFlowDetection } from './jobs/negativeFlowDetection'; const app: Application = express(); @@ -85,6 +86,9 @@ const startServer = async () => { await testConnection(); logger.info('Database connection established'); + scheduleNegativeFlowDetection(); + logger.info('Cron jobs initialized'); + app.listen(PORT, () => { logger.info(`Server running on port ${PORT} in ${NODE_ENV} mode`); logger.info(`Health check available at http://localhost:${PORT}/health`); diff --git a/water-api/src/jobs/negativeFlowDetection.ts b/water-api/src/jobs/negativeFlowDetection.ts new file mode 100644 index 0000000..493bcd1 --- /dev/null +++ b/water-api/src/jobs/negativeFlowDetection.ts @@ -0,0 +1,170 @@ +import cron from 'node-cron'; +import * as notificationService from '../services/notification.service'; + +/** + * Cron job that runs daily at 1:00 AM to detect meters with negative flow + * and create notifications for responsible users + */ +export function scheduleNegativeFlowDetection(): void { + // Schedule: Every day at 1:00 AM + // Cron format: minute hour day-of-month month day-of-week + // '0 1 * * *' = At 01:00 (1:00 AM) every day + + cron.schedule('0 1 * * *', async () => { + console.log('🔍 [Cron] Starting negative flow detection job...'); + const startTime = Date.now(); + + try { + // Get all meters with negative flow values + const negativeFlowMeters = await notificationService.getMetersWithNegativeFlow(); + + if (negativeFlowMeters.length === 0) { + console.log('✅ [Cron] No meters with negative flow found'); + return; + } + + console.log(`⚠️ [Cron] Found ${negativeFlowMeters.length} meter(s) with negative flow`); + + let notificationsCreated = 0; + let errors = 0; + + // Group meters by project to avoid duplicate notifications + const metersByProject = new Map(); + + for (const meter of negativeFlowMeters) { + const projectId = meter.project_id; + if (!metersByProject.has(projectId)) { + metersByProject.set(projectId, []); + } + metersByProject.get(projectId)!.push(meter); + } + + // Create notifications for each project's users + for (const [projectId, meters] of metersByProject.entries()) { + try { + // Get users responsible for this project + const userIds = await notificationService.getUsersForProject(projectId); + + if (userIds.length === 0) { + console.log(`⚠️ [Cron] No users found for project ${projectId}`); + continue; + } + + // Create notification for each meter for each user + for (const meter of meters) { + const title = 'Negative Flow Alert'; + const message = `${meter.name} (${meter.serial_number}) has negative flow of ${meter.last_reading_value} units`; + + for (const userId of userIds) { + try { + await notificationService.create({ + user_id: userId, + meter_id: meter.id, + notification_type: 'NEGATIVE_FLOW', + title, + message, + meter_serial_number: meter.serial_number, + flow_value: meter.last_reading_value, + }); + + notificationsCreated++; + } catch (error) { + console.error(`❌ [Cron] Error creating notification for user ${userId}, meter ${meter.id}:`, error); + errors++; + } + } + } + } catch (error) { + console.error(`❌ [Cron] Error processing project ${projectId}:`, error); + errors++; + } + } + + const duration = Date.now() - startTime; + console.log( + `✅ [Cron] Negative flow detection completed in ${duration}ms:`, + `${notificationsCreated} notification(s) created,`, + `${errors} error(s)` + ); + + // Clean up old read notifications (optional maintenance task) + try { + const deletedCount = await notificationService.deleteOldReadNotifications(); + if (deletedCount > 0) { + console.log(`🗑️ [Cron] Cleaned up ${deletedCount} old read notification(s)`); + } + } catch (error) { + console.error('❌ [Cron] Error cleaning up old notifications:', error); + } + + } catch (error) { + console.error('❌ [Cron] Fatal error in negative flow detection job:', error); + } + }); + + console.log('⏰ [Cron] Negative flow detection job scheduled (daily at 1:00 AM)'); +} + +/** + * Manual trigger for testing the negative flow detection + * Can be called directly for testing purposes + */ +export async function triggerNegativeFlowDetection(): Promise { + console.log('🔍 [Manual] Starting negative flow detection...'); + const startTime = Date.now(); + + try { + const negativeFlowMeters = await notificationService.getMetersWithNegativeFlow(); + + if (negativeFlowMeters.length === 0) { + console.log('✅ [Manual] No meters with negative flow found'); + return; + } + + console.log(`⚠️ [Manual] Found ${negativeFlowMeters.length} meter(s) with negative flow`); + + let notificationsCreated = 0; + + // Group meters by project + const metersByProject = new Map(); + + for (const meter of negativeFlowMeters) { + const projectId = meter.project_id; + if (!metersByProject.has(projectId)) { + metersByProject.set(projectId, []); + } + metersByProject.get(projectId)!.push(meter); + } + + // Create notifications + for (const [projectId, meters] of metersByProject.entries()) { + const userIds = await notificationService.getUsersForProject(projectId); + + for (const meter of meters) { + const title = 'Negative Flow Alert'; + const message = `${meter.name} (${meter.serial_number}) has negative flow of ${meter.last_reading_value} units`; + + for (const userId of userIds) { + await notificationService.create({ + user_id: userId, + meter_id: meter.id, + notification_type: 'NEGATIVE_FLOW', + title, + message, + meter_serial_number: meter.serial_number, + flow_value: meter.last_reading_value, + }); + + notificationsCreated++; + } + } + } + + const duration = Date.now() - startTime; + console.log(`✅ [Manual] Created ${notificationsCreated} notification(s) in ${duration}ms`); + + } catch (error) { + console.error('❌ [Manual] Error in negative flow detection:', error); + throw error; + } +} diff --git a/water-api/src/routes/index.ts b/water-api/src/routes/index.ts index eb0f734..cd9fa0a 100644 --- a/water-api/src/routes/index.ts +++ b/water-api/src/routes/index.ts @@ -13,6 +13,7 @@ import ttsRoutes from './tts.routes'; import readingRoutes from './reading.routes'; import bulkUploadRoutes from './bulk-upload.routes'; import auditRoutes from './audit.routes'; +import notificationRoutes from './notification.routes'; // Create main router const router = Router(); @@ -141,4 +142,15 @@ router.use('/bulk-upload', bulkUploadRoutes); */ router.use('/audit-logs', auditRoutes); +/** + * Notification routes: + * - GET /notifications - List user's notifications + * - GET /notifications/unread-count - Get unread count + * - GET /notifications/:id - Get notification by ID + * - PATCH /notifications/:id/read - Mark notification as read + * - PATCH /notifications/read-all - Mark all as read + * - DELETE /notifications/:id - Delete notification + */ +router.use('/notifications', notificationRoutes); + export default router; diff --git a/water-api/src/routes/notification.routes.ts b/water-api/src/routes/notification.routes.ts new file mode 100644 index 0000000..fec0518 --- /dev/null +++ b/water-api/src/routes/notification.routes.ts @@ -0,0 +1,62 @@ +import { Router } from 'express'; +import { authenticateToken } from '../middleware/auth.middleware'; +import { validateListNotifications, validateUuidParam } from '../validators/notification.validator'; +import * as notificationController from '../controllers/notification.controller'; + +const router = Router(); + +/** + * All notification endpoints require authentication + * Users can only access their own notifications + */ + +/** + * GET /api/notifications + * Get all notifications for the authenticated user with pagination + * Headers: Authorization: Bearer + * Query params: page, limit, is_read, notification_type, start_date, end_date + * Response: { success: true, data: Notification[], pagination: {...} } + */ +router.get('/', authenticateToken, validateListNotifications, notificationController.getAll); + +/** + * GET /api/notifications/unread-count + * Get count of unread notifications for the authenticated user + * Headers: Authorization: Bearer + * Response: { success: true, data: { count: number } } + */ +router.get('/unread-count', authenticateToken, notificationController.getUnreadCount); + +/** + * GET /api/notifications/:id + * Get a single notification by ID + * Headers: Authorization: Bearer + * Response: { success: true, data: Notification } + */ +router.get('/:id', authenticateToken, validateUuidParam, notificationController.getById); + +/** + * PATCH /api/notifications/:id/read + * Mark a notification as read + * Headers: Authorization: Bearer + * Response: { success: true, data: Notification } + */ +router.patch('/:id/read', authenticateToken, validateUuidParam, notificationController.markAsRead); + +/** + * PATCH /api/notifications/read-all + * Mark all user's notifications as read + * Headers: Authorization: Bearer + * Response: { success: true, data: { count: number } } + */ +router.patch('/read-all', authenticateToken, notificationController.markAllAsRead); + +/** + * DELETE /api/notifications/:id + * Delete a notification + * Headers: Authorization: Bearer + * Response: { success: true, message: string } + */ +router.delete('/:id', authenticateToken, validateUuidParam, notificationController.deleteNotification); + +export default router; diff --git a/water-api/src/services/notification.service.ts b/water-api/src/services/notification.service.ts new file mode 100644 index 0000000..cd9df82 --- /dev/null +++ b/water-api/src/services/notification.service.ts @@ -0,0 +1,296 @@ +import { query } from '../config/database'; +import { Notification, NotificationFilter, PaginationParams } from '../types'; + +/** + * Paginated notifications result + */ +export interface PaginatedNotifications { + notifications: Notification[]; + pagination: { + page: number; + limit: number; + total: number; + totalPages: number; + hasNextPage: boolean; + hasPreviousPage: boolean; + }; +} + +/** + * Create notification input + */ +export interface CreateNotificationInput { + user_id: string; + meter_id?: string | null; + notification_type: 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE'; + title: string; + message: string; + meter_serial_number?: string | null; + flow_value?: number | null; +} + +/** + * Get all notifications for a user with optional filtering and pagination + * @param userId - User ID + * @param filters - Optional filters + * @param pagination - Optional pagination parameters + * @returns Paginated list of notifications + */ +export async function getAllForUser( + userId: string, + filters?: NotificationFilter, + pagination?: PaginationParams +): Promise { + const page = pagination?.page || 1; + const limit = pagination?.limit || 20; + const offset = (page - 1) * limit; + const sortBy = pagination?.sortBy || 'created_at'; + const sortOrder = pagination?.sortOrder || 'desc'; + + // Build WHERE clauses + const conditions: string[] = ['user_id = $1']; + const params: unknown[] = [userId]; + let paramIndex = 2; + + if (filters?.is_read !== undefined) { + conditions.push(`is_read = $${paramIndex}`); + params.push(filters.is_read); + paramIndex++; + } + + if (filters?.notification_type) { + conditions.push(`notification_type = $${paramIndex}`); + params.push(filters.notification_type); + paramIndex++; + } + + if (filters?.start_date) { + conditions.push(`created_at >= $${paramIndex}`); + params.push(filters.start_date); + paramIndex++; + } + + if (filters?.end_date) { + conditions.push(`created_at <= $${paramIndex}`); + params.push(filters.end_date); + paramIndex++; + } + + const whereClause = conditions.join(' AND '); + + // Validate sortBy to prevent SQL injection + const allowedSortColumns = ['created_at', 'is_read', 'notification_type']; + const safeSortBy = allowedSortColumns.includes(sortBy) ? sortBy : 'created_at'; + const safeSortOrder = sortOrder === 'asc' ? 'ASC' : 'DESC'; + + // Get total count + const countQuery = ` + SELECT COUNT(*) as total + FROM notifications + WHERE ${whereClause} + `; + const countResult = await query(countQuery, params); + const total = parseInt(countResult.rows[0].total, 10); + + // Get paginated data - unread first, then by created_at + const dataQuery = ` + SELECT * + FROM notifications + WHERE ${whereClause} + ORDER BY is_read ASC, ${safeSortBy} ${safeSortOrder} + LIMIT $${paramIndex} OFFSET $${paramIndex + 1} + `; + const dataResult = await query(dataQuery, [...params, limit, offset]); + + const totalPages = Math.ceil(total / limit); + + return { + notifications: dataResult.rows, + pagination: { + page, + limit, + total, + totalPages, + hasNextPage: page < totalPages, + hasPreviousPage: page > 1, + }, + }; +} + +/** + * Get count of unread notifications for a user + * @param userId - User ID + * @returns Count of unread notifications + */ +export async function getUnreadCount(userId: string): Promise { + const sql = ` + SELECT COUNT(*) as count + FROM notifications + WHERE user_id = $1 AND is_read = false + `; + const result = await query(sql, [userId]); + return parseInt(result.rows[0].count, 10); +} + +/** + * Get a single notification by ID + * @param id - Notification ID + * @param userId - User ID (for ownership verification) + * @returns Notification or null if not found + */ +export async function getById(id: string, userId: string): Promise { + const sql = ` + SELECT * + FROM notifications + WHERE id = $1 AND user_id = $2 + `; + const result = await query(sql, [id, userId]); + return result.rows[0] || null; +} + +/** + * Create a new notification + * @param input - Notification data + * @returns Created notification + */ +export async function create(input: CreateNotificationInput): Promise { + const sql = ` + INSERT INTO notifications ( + user_id, + meter_id, + notification_type, + title, + message, + meter_serial_number, + flow_value + ) VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING * + `; + + const result = await query(sql, [ + input.user_id, + input.meter_id || null, + input.notification_type, + input.title, + input.message, + input.meter_serial_number || null, + input.flow_value || null, + ]); + + return result.rows[0]; +} + +/** + * Mark notification as read + * @param id - Notification ID + * @param userId - User ID (for ownership verification) + * @returns Updated notification or null if not found + */ +export async function markAsRead(id: string, userId: string): Promise { + const sql = ` + UPDATE notifications + SET is_read = true, read_at = CURRENT_TIMESTAMP + WHERE id = $1 AND user_id = $2 + RETURNING * + `; + const result = await query(sql, [id, userId]); + return result.rows[0] || null; +} + +/** + * Mark all notifications as read for a user + * @param userId - User ID + * @returns Number of notifications marked as read + */ +export async function markAllAsRead(userId: string): Promise { + const sql = ` + UPDATE notifications + SET is_read = true, read_at = CURRENT_TIMESTAMP + WHERE user_id = $1 AND is_read = false + RETURNING id + `; + const result = await query(sql, [userId]); + return result.rows.length; +} + +/** + * Delete a notification + * @param id - Notification ID + * @param userId - User ID (for ownership verification) + * @returns True if deleted, false if not found + */ +export async function deleteNotification(id: string, userId: string): Promise { + const sql = ` + DELETE FROM notifications + WHERE id = $1 AND user_id = $2 + RETURNING id + `; + const result = await query(sql, [id, userId]); + return result.rows.length > 0; +} + +/** + * Delete old read notifications (older than 30 days) + * @returns Number of notifications deleted + */ +export async function deleteOldReadNotifications(): Promise { + const sql = ` + DELETE FROM notifications + WHERE is_read = true + AND read_at < CURRENT_TIMESTAMP - INTERVAL '30 days' + RETURNING id + `; + const result = await query(sql); + return result.rows.length; +} + +/** + * Get meters with negative flow values + * @returns Array of meters with negative flow + */ +export async function getMetersWithNegativeFlow(): Promise> { + const sql = ` + SELECT + m.id, + m.serial_number, + m.name, + m.last_reading_value, + m.concentrator_id, + c.project_id + FROM meters m + INNER JOIN concentrators c ON c.id = m.concentrator_id + WHERE m.last_reading_value < 0 + AND m.status = 'ACTIVE' + `; + const result = await query(sql); + return result.rows; +} + +/** + * Get users responsible for a project + * This is a placeholder - adjust based on your user-project relationship + * @param projectId - Project ID + * @returns Array of user IDs + */ +export async function getUsersForProject(projectId: string): Promise { + // TODO: Implement based on your user-project relationship + // For now, return all active OPERATOR and ADMIN users + // You may want to add a user_projects table for better user-project assignment + + const sql = ` + SELECT u.id + FROM users u + INNER JOIN roles r ON r.id = u.role_id + WHERE u.is_active = true + AND r.name IN ('ADMIN', 'OPERATOR') + `; + const result = await query(sql); + return result.rows.map(row => row.id); +} diff --git a/water-api/src/types/index.ts b/water-api/src/types/index.ts index a8dc089..234ce88 100644 --- a/water-api/src/types/index.ts +++ b/water-api/src/types/index.ts @@ -340,3 +340,40 @@ export interface TtsWebhookPayload { correlation_ids?: string[]; received_at: string; } + +export type NotificationType = 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE'; + +export interface Notification { + id: string; + user_id: string; + meter_id: string | null; + notification_type: NotificationType; + title: string; + message: string; + meter_serial_number: string | null; + flow_value: number | null; + is_read: boolean; + read_at: Date | null; + created_at: Date; +} + +export interface NotificationPublic { + id: string; + user_id: string; + meter_id: string | null; + notification_type: NotificationType; + title: string; + message: string; + meter_serial_number: string | null; + flow_value: number | null; + is_read: boolean; + read_at: Date | null; + created_at: Date; +} + +export interface NotificationFilter { + is_read?: boolean; + notification_type?: NotificationType; + start_date?: Date; + end_date?: Date; +} diff --git a/water-api/src/validators/notification.validator.ts b/water-api/src/validators/notification.validator.ts new file mode 100644 index 0000000..69c43cc --- /dev/null +++ b/water-api/src/validators/notification.validator.ts @@ -0,0 +1,196 @@ +import { z } from 'zod'; +import { Request, Response, NextFunction } from 'express'; + +/** + * Notification type enum values + */ +export const NotificationType = { + NEGATIVE_FLOW: 'NEGATIVE_FLOW', + SYSTEM_ALERT: 'SYSTEM_ALERT', + MAINTENANCE: 'MAINTENANCE', +} as const; + +export type NotificationTypeValue = (typeof NotificationType)[keyof typeof NotificationType]; + +/** + * UUID validation regex + */ +const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +/** + * Schema for creating a notification (internal use, not exposed to API) + */ +export const createNotificationSchema = z.object({ + user_id: z + .string({ required_error: 'User ID is required' }) + .regex(uuidRegex, 'User ID must be a valid UUID'), + meter_id: z + .string() + .regex(uuidRegex, 'Meter ID must be a valid UUID') + .optional() + .nullable(), + notification_type: z + .enum([NotificationType.NEGATIVE_FLOW, NotificationType.SYSTEM_ALERT, NotificationType.MAINTENANCE]) + .default(NotificationType.NEGATIVE_FLOW), + title: z + .string({ required_error: 'Title is required' }) + .min(1, 'Title cannot be empty') + .max(255, 'Title must be at most 255 characters'), + message: z + .string({ required_error: 'Message is required' }) + .min(1, 'Message cannot be empty') + .max(1000, 'Message must be at most 1000 characters'), + meter_serial_number: z + .string() + .max(100, 'Meter serial number must be at most 100 characters') + .optional() + .nullable(), + flow_value: z + .number() + .optional() + .nullable(), +}); + +/** + * Schema for query parameters when listing notifications + */ +export const listNotificationsQuerySchema = z.object({ + page: z + .string() + .optional() + .transform(val => (val ? parseInt(val, 10) : 1)), + limit: z + .string() + .optional() + .transform(val => (val ? Math.min(parseInt(val, 10), 100) : 20)), + is_read: z + .string() + .optional() + .transform(val => { + if (val === 'true') return true; + if (val === 'false') return false; + return undefined; + }), + notification_type: z + .enum([NotificationType.NEGATIVE_FLOW, NotificationType.SYSTEM_ALERT, NotificationType.MAINTENANCE]) + .optional(), + start_date: z + .string() + .datetime({ message: 'Start date must be a valid ISO date string' }) + .optional() + .transform(val => (val ? new Date(val) : undefined)), + end_date: z + .string() + .datetime({ message: 'End date must be a valid ISO date string' }) + .optional() + .transform(val => (val ? new Date(val) : undefined)), +}); + +/** + * Schema for UUID parameter validation + */ +export const uuidParamSchema = z.object({ + id: z + .string({ required_error: 'ID is required' }) + .regex(uuidRegex, 'ID must be a valid UUID'), +}); + +/** + * Type definitions derived from schemas + */ +export type CreateNotificationInput = z.infer; +export type ListNotificationsQuery = z.infer; + +/** + * Generic validation middleware factory for request body + * Creates a middleware that validates request body against a Zod schema + * @param schema - Zod schema to validate against + */ +function validateBody(schema: T) { + return (req: Request, res: Response, next: NextFunction): void => { + const result = schema.safeParse(req.body); + + if (!result.success) { + const errors = result.error.errors.map((err) => ({ + field: err.path.join('.'), + message: err.message, + })); + + res.status(400).json({ + success: false, + error: 'Validation failed', + details: errors, + }); + return; + } + + // Replace body with validated and typed data + req.body = result.data; + next(); + }; +} + +/** + * Generic validation middleware factory for query parameters + * Creates a middleware that validates query parameters against a Zod schema + * @param schema - Zod schema to validate against + */ +function validateQuery(schema: T) { + return (req: Request, res: Response, next: NextFunction): void => { + const result = schema.safeParse(req.query); + + if (!result.success) { + const errors = result.error.errors.map((err) => ({ + field: err.path.join('.'), + message: err.message, + })); + + res.status(400).json({ + success: false, + error: 'Validation failed', + details: errors, + }); + return; + } + + // Replace query with validated and typed data + req.query = result.data as any; + next(); + }; +} + +/** + * Generic validation middleware factory for URL parameters + * Creates a middleware that validates URL parameters against a Zod schema + * @param schema - Zod schema to validate against + */ +function validateParams(schema: T) { + return (req: Request, res: Response, next: NextFunction): void => { + const result = schema.safeParse(req.params); + + if (!result.success) { + const errors = result.error.errors.map((err) => ({ + field: err.path.join('.'), + message: err.message, + })); + + res.status(400).json({ + success: false, + error: 'Validation failed', + details: errors, + }); + return; + } + + // Replace params with validated and typed data + req.params = result.data; + next(); + }; +} + +/** + * Pre-configured validation middlewares for notifications + */ +export const validateCreateNotification = validateBody(createNotificationSchema); +export const validateListNotifications = validateQuery(listNotificationsQuerySchema); +export const validateUuidParam = validateParams(uuidParamSchema);