Notifications

This commit is contained in:
2026-02-01 20:54:13 -06:00
parent 6c02bd5448
commit 48e0884bf7
14 changed files with 1628 additions and 7 deletions

View File

@@ -26,6 +26,7 @@
"helmet": "^7.1.0",
"jsonwebtoken": "^9.0.2",
"multer": "^2.0.2",
"node-cron": "^3.0.3",
"pg": "^8.11.3",
"winston": "^3.11.0",
"xlsx": "^0.18.5",
@@ -37,6 +38,7 @@
"@types/express": "^4.17.21",
"@types/jsonwebtoken": "^9.0.5",
"@types/node": "^20.11.5",
"@types/node-cron": "^3.0.11",
"@types/pg": "^8.10.9",
"nodemon": "^3.0.3",
"ts-node-dev": "^2.0.0",

View File

@@ -0,0 +1,39 @@
-- ============================================================================
-- Add Notifications Table
-- Migration for notification system supporting negative flow alerts
-- ============================================================================
-- Create notification type enum
CREATE TYPE notification_type AS ENUM ('NEGATIVE_FLOW', 'SYSTEM_ALERT', 'MAINTENANCE');
-- ============================================================================
-- TABLE: notifications
-- ============================================================================
CREATE TABLE notifications (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
meter_id UUID REFERENCES meters(id) ON DELETE SET NULL,
notification_type notification_type NOT NULL DEFAULT 'NEGATIVE_FLOW',
title VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
meter_serial_number VARCHAR(255),
flow_value DECIMAL(12, 4),
is_read BOOLEAN NOT NULL DEFAULT FALSE,
read_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX idx_notifications_user_id ON notifications(user_id);
CREATE INDEX idx_notifications_meter_id ON notifications(meter_id);
CREATE INDEX idx_notifications_is_read ON notifications(is_read);
CREATE INDEX idx_notifications_created_at ON notifications(created_at DESC);
CREATE INDEX idx_notifications_user_unread ON notifications(user_id, is_read) WHERE is_read = FALSE;
COMMENT ON TABLE notifications IS 'User notifications for meter alerts and system events';
COMMENT ON COLUMN notifications.user_id IS 'User who receives this notification';
COMMENT ON COLUMN notifications.meter_id IS 'Related meter (nullable if meter is deleted)';
COMMENT ON COLUMN notifications.notification_type IS 'Type of notification';
COMMENT ON COLUMN notifications.flow_value IS 'Flow value if negative flow alert';
COMMENT ON COLUMN notifications.is_read IS 'Whether notification has been read by user';
COMMENT ON COLUMN notifications.read_at IS 'Timestamp when notification was marked as read';

View File

@@ -0,0 +1,233 @@
import { Response } from 'express';
import { AuthenticatedRequest } from '../middleware/auth.middleware';
import * as notificationService from '../services/notification.service';
import { NotificationFilter } from '../types';
/**
* GET /api/notifications
* List all notifications for the authenticated user with pagination
* Query params: page, limit, is_read, notification_type, start_date, end_date
*/
export async function getAll(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const page = parseInt(req.query.page as string, 10) || 1;
const limit = Math.min(parseInt(req.query.limit as string, 10) || 20, 100);
const filters: NotificationFilter = {};
if (req.query.is_read !== undefined) {
filters.is_read = req.query.is_read === 'true';
}
if (req.query.notification_type) {
filters.notification_type = req.query.notification_type as 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE';
}
if (req.query.start_date) {
filters.start_date = new Date(req.query.start_date as string);
}
if (req.query.end_date) {
filters.end_date = new Date(req.query.end_date as string);
}
const result = await notificationService.getAllForUser(
req.user.userId,
filters,
{ page, limit }
);
res.status(200).json({
success: true,
data: result.notifications,
pagination: result.pagination,
});
} catch (error) {
console.error('Error fetching notifications:', error);
res.status(500).json({
success: false,
error: 'Failed to fetch notifications',
});
}
}
/**
* GET /api/notifications/unread-count
* Get count of unread notifications for the authenticated user
*/
export async function getUnreadCount(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const count = await notificationService.getUnreadCount(req.user.userId);
res.status(200).json({
success: true,
data: { count },
});
} catch (error) {
console.error('Error fetching unread count:', error);
res.status(500).json({
success: false,
error: 'Failed to fetch unread count',
});
}
}
/**
* GET /api/notifications/:id
* Get a single notification by ID
*/
export async function getById(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const { id } = req.params;
const notification = await notificationService.getById(id, req.user.userId);
if (!notification) {
res.status(404).json({
success: false,
error: 'Notification not found',
});
return;
}
res.status(200).json({
success: true,
data: notification,
});
} catch (error) {
console.error('Error fetching notification:', error);
res.status(500).json({
success: false,
error: 'Failed to fetch notification',
});
}
}
/**
* PATCH /api/notifications/:id/read
* Mark a notification as read
*/
export async function markAsRead(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const { id } = req.params;
const notification = await notificationService.markAsRead(id, req.user.userId);
if (!notification) {
res.status(404).json({
success: false,
error: 'Notification not found',
});
return;
}
res.status(200).json({
success: true,
data: notification,
});
} catch (error) {
console.error('Error marking notification as read:', error);
res.status(500).json({
success: false,
error: 'Failed to mark notification as read',
});
}
}
/**
* PATCH /api/notifications/read-all
* Mark all notifications as read for the authenticated user
*/
export async function markAllAsRead(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const count = await notificationService.markAllAsRead(req.user.userId);
res.status(200).json({
success: true,
data: { count },
message: `Marked ${count} notification(s) as read`,
});
} catch (error) {
console.error('Error marking all notifications as read:', error);
res.status(500).json({
success: false,
error: 'Failed to mark all notifications as read',
});
}
}
/**
* DELETE /api/notifications/:id
* Delete a notification
*/
export async function deleteNotification(req: AuthenticatedRequest, res: Response): Promise<void> {
try {
if (!req.user?.userId) {
res.status(401).json({
success: false,
error: 'Unauthorized',
});
return;
}
const { id } = req.params;
const deleted = await notificationService.deleteNotification(id, req.user.userId);
if (!deleted) {
res.status(404).json({
success: false,
error: 'Notification not found',
});
return;
}
res.status(200).json({
success: true,
message: 'Notification deleted successfully',
});
} catch (error) {
console.error('Error deleting notification:', error);
res.status(500).json({
success: false,
error: 'Failed to delete notification',
});
}
}

View File

@@ -8,6 +8,7 @@ import routes from './routes';
import logger from './utils/logger';
import { testConnection } from './config/database';
import { auditMiddleware } from './middleware/audit.middleware';
import { scheduleNegativeFlowDetection } from './jobs/negativeFlowDetection';
const app: Application = express();
@@ -85,6 +86,9 @@ const startServer = async () => {
await testConnection();
logger.info('Database connection established');
scheduleNegativeFlowDetection();
logger.info('Cron jobs initialized');
app.listen(PORT, () => {
logger.info(`Server running on port ${PORT} in ${NODE_ENV} mode`);
logger.info(`Health check available at http://localhost:${PORT}/health`);

View File

@@ -0,0 +1,170 @@
import cron from 'node-cron';
import * as notificationService from '../services/notification.service';
/**
* Cron job that runs daily at 1:00 AM to detect meters with negative flow
* and create notifications for responsible users
*/
export function scheduleNegativeFlowDetection(): void {
// Schedule: Every day at 1:00 AM
// Cron format: minute hour day-of-month month day-of-week
// '0 1 * * *' = At 01:00 (1:00 AM) every day
cron.schedule('0 1 * * *', async () => {
console.log('🔍 [Cron] Starting negative flow detection job...');
const startTime = Date.now();
try {
// Get all meters with negative flow values
const negativeFlowMeters = await notificationService.getMetersWithNegativeFlow();
if (negativeFlowMeters.length === 0) {
console.log('✅ [Cron] No meters with negative flow found');
return;
}
console.log(`⚠️ [Cron] Found ${negativeFlowMeters.length} meter(s) with negative flow`);
let notificationsCreated = 0;
let errors = 0;
// Group meters by project to avoid duplicate notifications
const metersByProject = new Map<string, typeof negativeFlowMeters>();
for (const meter of negativeFlowMeters) {
const projectId = meter.project_id;
if (!metersByProject.has(projectId)) {
metersByProject.set(projectId, []);
}
metersByProject.get(projectId)!.push(meter);
}
// Create notifications for each project's users
for (const [projectId, meters] of metersByProject.entries()) {
try {
// Get users responsible for this project
const userIds = await notificationService.getUsersForProject(projectId);
if (userIds.length === 0) {
console.log(`⚠️ [Cron] No users found for project ${projectId}`);
continue;
}
// Create notification for each meter for each user
for (const meter of meters) {
const title = 'Negative Flow Alert';
const message = `${meter.name} (${meter.serial_number}) has negative flow of ${meter.last_reading_value} units`;
for (const userId of userIds) {
try {
await notificationService.create({
user_id: userId,
meter_id: meter.id,
notification_type: 'NEGATIVE_FLOW',
title,
message,
meter_serial_number: meter.serial_number,
flow_value: meter.last_reading_value,
});
notificationsCreated++;
} catch (error) {
console.error(`❌ [Cron] Error creating notification for user ${userId}, meter ${meter.id}:`, error);
errors++;
}
}
}
} catch (error) {
console.error(`❌ [Cron] Error processing project ${projectId}:`, error);
errors++;
}
}
const duration = Date.now() - startTime;
console.log(
`✅ [Cron] Negative flow detection completed in ${duration}ms:`,
`${notificationsCreated} notification(s) created,`,
`${errors} error(s)`
);
// Clean up old read notifications (optional maintenance task)
try {
const deletedCount = await notificationService.deleteOldReadNotifications();
if (deletedCount > 0) {
console.log(`🗑️ [Cron] Cleaned up ${deletedCount} old read notification(s)`);
}
} catch (error) {
console.error('❌ [Cron] Error cleaning up old notifications:', error);
}
} catch (error) {
console.error('❌ [Cron] Fatal error in negative flow detection job:', error);
}
});
console.log('⏰ [Cron] Negative flow detection job scheduled (daily at 1:00 AM)');
}
/**
* Manual trigger for testing the negative flow detection
* Can be called directly for testing purposes
*/
export async function triggerNegativeFlowDetection(): Promise<void> {
console.log('🔍 [Manual] Starting negative flow detection...');
const startTime = Date.now();
try {
const negativeFlowMeters = await notificationService.getMetersWithNegativeFlow();
if (negativeFlowMeters.length === 0) {
console.log('✅ [Manual] No meters with negative flow found');
return;
}
console.log(`⚠️ [Manual] Found ${negativeFlowMeters.length} meter(s) with negative flow`);
let notificationsCreated = 0;
// Group meters by project
const metersByProject = new Map<string, typeof negativeFlowMeters>();
for (const meter of negativeFlowMeters) {
const projectId = meter.project_id;
if (!metersByProject.has(projectId)) {
metersByProject.set(projectId, []);
}
metersByProject.get(projectId)!.push(meter);
}
// Create notifications
for (const [projectId, meters] of metersByProject.entries()) {
const userIds = await notificationService.getUsersForProject(projectId);
for (const meter of meters) {
const title = 'Negative Flow Alert';
const message = `${meter.name} (${meter.serial_number}) has negative flow of ${meter.last_reading_value} units`;
for (const userId of userIds) {
await notificationService.create({
user_id: userId,
meter_id: meter.id,
notification_type: 'NEGATIVE_FLOW',
title,
message,
meter_serial_number: meter.serial_number,
flow_value: meter.last_reading_value,
});
notificationsCreated++;
}
}
}
const duration = Date.now() - startTime;
console.log(`✅ [Manual] Created ${notificationsCreated} notification(s) in ${duration}ms`);
} catch (error) {
console.error('❌ [Manual] Error in negative flow detection:', error);
throw error;
}
}

View File

@@ -13,6 +13,7 @@ import ttsRoutes from './tts.routes';
import readingRoutes from './reading.routes';
import bulkUploadRoutes from './bulk-upload.routes';
import auditRoutes from './audit.routes';
import notificationRoutes from './notification.routes';
// Create main router
const router = Router();
@@ -141,4 +142,15 @@ router.use('/bulk-upload', bulkUploadRoutes);
*/
router.use('/audit-logs', auditRoutes);
/**
* Notification routes:
* - GET /notifications - List user's notifications
* - GET /notifications/unread-count - Get unread count
* - GET /notifications/:id - Get notification by ID
* - PATCH /notifications/:id/read - Mark notification as read
* - PATCH /notifications/read-all - Mark all as read
* - DELETE /notifications/:id - Delete notification
*/
router.use('/notifications', notificationRoutes);
export default router;

View File

@@ -0,0 +1,62 @@
import { Router } from 'express';
import { authenticateToken } from '../middleware/auth.middleware';
import { validateListNotifications, validateUuidParam } from '../validators/notification.validator';
import * as notificationController from '../controllers/notification.controller';
const router = Router();
/**
* All notification endpoints require authentication
* Users can only access their own notifications
*/
/**
* GET /api/notifications
* Get all notifications for the authenticated user with pagination
* Headers: Authorization: Bearer <accessToken>
* Query params: page, limit, is_read, notification_type, start_date, end_date
* Response: { success: true, data: Notification[], pagination: {...} }
*/
router.get('/', authenticateToken, validateListNotifications, notificationController.getAll);
/**
* GET /api/notifications/unread-count
* Get count of unread notifications for the authenticated user
* Headers: Authorization: Bearer <accessToken>
* Response: { success: true, data: { count: number } }
*/
router.get('/unread-count', authenticateToken, notificationController.getUnreadCount);
/**
* GET /api/notifications/:id
* Get a single notification by ID
* Headers: Authorization: Bearer <accessToken>
* Response: { success: true, data: Notification }
*/
router.get('/:id', authenticateToken, validateUuidParam, notificationController.getById);
/**
* PATCH /api/notifications/:id/read
* Mark a notification as read
* Headers: Authorization: Bearer <accessToken>
* Response: { success: true, data: Notification }
*/
router.patch('/:id/read', authenticateToken, validateUuidParam, notificationController.markAsRead);
/**
* PATCH /api/notifications/read-all
* Mark all user's notifications as read
* Headers: Authorization: Bearer <accessToken>
* Response: { success: true, data: { count: number } }
*/
router.patch('/read-all', authenticateToken, notificationController.markAllAsRead);
/**
* DELETE /api/notifications/:id
* Delete a notification
* Headers: Authorization: Bearer <accessToken>
* Response: { success: true, message: string }
*/
router.delete('/:id', authenticateToken, validateUuidParam, notificationController.deleteNotification);
export default router;

View File

@@ -0,0 +1,296 @@
import { query } from '../config/database';
import { Notification, NotificationFilter, PaginationParams } from '../types';
/**
* Paginated notifications result
*/
export interface PaginatedNotifications {
notifications: Notification[];
pagination: {
page: number;
limit: number;
total: number;
totalPages: number;
hasNextPage: boolean;
hasPreviousPage: boolean;
};
}
/**
* Create notification input
*/
export interface CreateNotificationInput {
user_id: string;
meter_id?: string | null;
notification_type: 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE';
title: string;
message: string;
meter_serial_number?: string | null;
flow_value?: number | null;
}
/**
* Get all notifications for a user with optional filtering and pagination
* @param userId - User ID
* @param filters - Optional filters
* @param pagination - Optional pagination parameters
* @returns Paginated list of notifications
*/
export async function getAllForUser(
userId: string,
filters?: NotificationFilter,
pagination?: PaginationParams
): Promise<PaginatedNotifications> {
const page = pagination?.page || 1;
const limit = pagination?.limit || 20;
const offset = (page - 1) * limit;
const sortBy = pagination?.sortBy || 'created_at';
const sortOrder = pagination?.sortOrder || 'desc';
// Build WHERE clauses
const conditions: string[] = ['user_id = $1'];
const params: unknown[] = [userId];
let paramIndex = 2;
if (filters?.is_read !== undefined) {
conditions.push(`is_read = $${paramIndex}`);
params.push(filters.is_read);
paramIndex++;
}
if (filters?.notification_type) {
conditions.push(`notification_type = $${paramIndex}`);
params.push(filters.notification_type);
paramIndex++;
}
if (filters?.start_date) {
conditions.push(`created_at >= $${paramIndex}`);
params.push(filters.start_date);
paramIndex++;
}
if (filters?.end_date) {
conditions.push(`created_at <= $${paramIndex}`);
params.push(filters.end_date);
paramIndex++;
}
const whereClause = conditions.join(' AND ');
// Validate sortBy to prevent SQL injection
const allowedSortColumns = ['created_at', 'is_read', 'notification_type'];
const safeSortBy = allowedSortColumns.includes(sortBy) ? sortBy : 'created_at';
const safeSortOrder = sortOrder === 'asc' ? 'ASC' : 'DESC';
// Get total count
const countQuery = `
SELECT COUNT(*) as total
FROM notifications
WHERE ${whereClause}
`;
const countResult = await query(countQuery, params);
const total = parseInt(countResult.rows[0].total, 10);
// Get paginated data - unread first, then by created_at
const dataQuery = `
SELECT *
FROM notifications
WHERE ${whereClause}
ORDER BY is_read ASC, ${safeSortBy} ${safeSortOrder}
LIMIT $${paramIndex} OFFSET $${paramIndex + 1}
`;
const dataResult = await query(dataQuery, [...params, limit, offset]);
const totalPages = Math.ceil(total / limit);
return {
notifications: dataResult.rows,
pagination: {
page,
limit,
total,
totalPages,
hasNextPage: page < totalPages,
hasPreviousPage: page > 1,
},
};
}
/**
* Get count of unread notifications for a user
* @param userId - User ID
* @returns Count of unread notifications
*/
export async function getUnreadCount(userId: string): Promise<number> {
const sql = `
SELECT COUNT(*) as count
FROM notifications
WHERE user_id = $1 AND is_read = false
`;
const result = await query(sql, [userId]);
return parseInt(result.rows[0].count, 10);
}
/**
* Get a single notification by ID
* @param id - Notification ID
* @param userId - User ID (for ownership verification)
* @returns Notification or null if not found
*/
export async function getById(id: string, userId: string): Promise<Notification | null> {
const sql = `
SELECT *
FROM notifications
WHERE id = $1 AND user_id = $2
`;
const result = await query(sql, [id, userId]);
return result.rows[0] || null;
}
/**
* Create a new notification
* @param input - Notification data
* @returns Created notification
*/
export async function create(input: CreateNotificationInput): Promise<Notification> {
const sql = `
INSERT INTO notifications (
user_id,
meter_id,
notification_type,
title,
message,
meter_serial_number,
flow_value
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
`;
const result = await query(sql, [
input.user_id,
input.meter_id || null,
input.notification_type,
input.title,
input.message,
input.meter_serial_number || null,
input.flow_value || null,
]);
return result.rows[0];
}
/**
* Mark notification as read
* @param id - Notification ID
* @param userId - User ID (for ownership verification)
* @returns Updated notification or null if not found
*/
export async function markAsRead(id: string, userId: string): Promise<Notification | null> {
const sql = `
UPDATE notifications
SET is_read = true, read_at = CURRENT_TIMESTAMP
WHERE id = $1 AND user_id = $2
RETURNING *
`;
const result = await query(sql, [id, userId]);
return result.rows[0] || null;
}
/**
* Mark all notifications as read for a user
* @param userId - User ID
* @returns Number of notifications marked as read
*/
export async function markAllAsRead(userId: string): Promise<number> {
const sql = `
UPDATE notifications
SET is_read = true, read_at = CURRENT_TIMESTAMP
WHERE user_id = $1 AND is_read = false
RETURNING id
`;
const result = await query(sql, [userId]);
return result.rows.length;
}
/**
* Delete a notification
* @param id - Notification ID
* @param userId - User ID (for ownership verification)
* @returns True if deleted, false if not found
*/
export async function deleteNotification(id: string, userId: string): Promise<boolean> {
const sql = `
DELETE FROM notifications
WHERE id = $1 AND user_id = $2
RETURNING id
`;
const result = await query(sql, [id, userId]);
return result.rows.length > 0;
}
/**
* Delete old read notifications (older than 30 days)
* @returns Number of notifications deleted
*/
export async function deleteOldReadNotifications(): Promise<number> {
const sql = `
DELETE FROM notifications
WHERE is_read = true
AND read_at < CURRENT_TIMESTAMP - INTERVAL '30 days'
RETURNING id
`;
const result = await query(sql);
return result.rows.length;
}
/**
* Get meters with negative flow values
* @returns Array of meters with negative flow
*/
export async function getMetersWithNegativeFlow(): Promise<Array<{
id: string;
serial_number: string;
name: string;
last_reading_value: number;
concentrator_id: string;
project_id: string;
}>> {
const sql = `
SELECT
m.id,
m.serial_number,
m.name,
m.last_reading_value,
m.concentrator_id,
c.project_id
FROM meters m
INNER JOIN concentrators c ON c.id = m.concentrator_id
WHERE m.last_reading_value < 0
AND m.status = 'ACTIVE'
`;
const result = await query(sql);
return result.rows;
}
/**
* Get users responsible for a project
* This is a placeholder - adjust based on your user-project relationship
* @param projectId - Project ID
* @returns Array of user IDs
*/
export async function getUsersForProject(projectId: string): Promise<string[]> {
// TODO: Implement based on your user-project relationship
// For now, return all active OPERATOR and ADMIN users
// You may want to add a user_projects table for better user-project assignment
const sql = `
SELECT u.id
FROM users u
INNER JOIN roles r ON r.id = u.role_id
WHERE u.is_active = true
AND r.name IN ('ADMIN', 'OPERATOR')
`;
const result = await query(sql);
return result.rows.map(row => row.id);
}

View File

@@ -340,3 +340,40 @@ export interface TtsWebhookPayload {
correlation_ids?: string[];
received_at: string;
}
export type NotificationType = 'NEGATIVE_FLOW' | 'SYSTEM_ALERT' | 'MAINTENANCE';
export interface Notification {
id: string;
user_id: string;
meter_id: string | null;
notification_type: NotificationType;
title: string;
message: string;
meter_serial_number: string | null;
flow_value: number | null;
is_read: boolean;
read_at: Date | null;
created_at: Date;
}
export interface NotificationPublic {
id: string;
user_id: string;
meter_id: string | null;
notification_type: NotificationType;
title: string;
message: string;
meter_serial_number: string | null;
flow_value: number | null;
is_read: boolean;
read_at: Date | null;
created_at: Date;
}
export interface NotificationFilter {
is_read?: boolean;
notification_type?: NotificationType;
start_date?: Date;
end_date?: Date;
}

View File

@@ -0,0 +1,196 @@
import { z } from 'zod';
import { Request, Response, NextFunction } from 'express';
/**
* Notification type enum values
*/
export const NotificationType = {
NEGATIVE_FLOW: 'NEGATIVE_FLOW',
SYSTEM_ALERT: 'SYSTEM_ALERT',
MAINTENANCE: 'MAINTENANCE',
} as const;
export type NotificationTypeValue = (typeof NotificationType)[keyof typeof NotificationType];
/**
* UUID validation regex
*/
const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
/**
* Schema for creating a notification (internal use, not exposed to API)
*/
export const createNotificationSchema = z.object({
user_id: z
.string({ required_error: 'User ID is required' })
.regex(uuidRegex, 'User ID must be a valid UUID'),
meter_id: z
.string()
.regex(uuidRegex, 'Meter ID must be a valid UUID')
.optional()
.nullable(),
notification_type: z
.enum([NotificationType.NEGATIVE_FLOW, NotificationType.SYSTEM_ALERT, NotificationType.MAINTENANCE])
.default(NotificationType.NEGATIVE_FLOW),
title: z
.string({ required_error: 'Title is required' })
.min(1, 'Title cannot be empty')
.max(255, 'Title must be at most 255 characters'),
message: z
.string({ required_error: 'Message is required' })
.min(1, 'Message cannot be empty')
.max(1000, 'Message must be at most 1000 characters'),
meter_serial_number: z
.string()
.max(100, 'Meter serial number must be at most 100 characters')
.optional()
.nullable(),
flow_value: z
.number()
.optional()
.nullable(),
});
/**
* Schema for query parameters when listing notifications
*/
export const listNotificationsQuerySchema = z.object({
page: z
.string()
.optional()
.transform(val => (val ? parseInt(val, 10) : 1)),
limit: z
.string()
.optional()
.transform(val => (val ? Math.min(parseInt(val, 10), 100) : 20)),
is_read: z
.string()
.optional()
.transform(val => {
if (val === 'true') return true;
if (val === 'false') return false;
return undefined;
}),
notification_type: z
.enum([NotificationType.NEGATIVE_FLOW, NotificationType.SYSTEM_ALERT, NotificationType.MAINTENANCE])
.optional(),
start_date: z
.string()
.datetime({ message: 'Start date must be a valid ISO date string' })
.optional()
.transform(val => (val ? new Date(val) : undefined)),
end_date: z
.string()
.datetime({ message: 'End date must be a valid ISO date string' })
.optional()
.transform(val => (val ? new Date(val) : undefined)),
});
/**
* Schema for UUID parameter validation
*/
export const uuidParamSchema = z.object({
id: z
.string({ required_error: 'ID is required' })
.regex(uuidRegex, 'ID must be a valid UUID'),
});
/**
* Type definitions derived from schemas
*/
export type CreateNotificationInput = z.infer<typeof createNotificationSchema>;
export type ListNotificationsQuery = z.infer<typeof listNotificationsQuerySchema>;
/**
* Generic validation middleware factory for request body
* Creates a middleware that validates request body against a Zod schema
* @param schema - Zod schema to validate against
*/
function validateBody<T extends z.ZodTypeAny>(schema: T) {
return (req: Request, res: Response, next: NextFunction): void => {
const result = schema.safeParse(req.body);
if (!result.success) {
const errors = result.error.errors.map((err) => ({
field: err.path.join('.'),
message: err.message,
}));
res.status(400).json({
success: false,
error: 'Validation failed',
details: errors,
});
return;
}
// Replace body with validated and typed data
req.body = result.data;
next();
};
}
/**
* Generic validation middleware factory for query parameters
* Creates a middleware that validates query parameters against a Zod schema
* @param schema - Zod schema to validate against
*/
function validateQuery<T extends z.ZodTypeAny>(schema: T) {
return (req: Request, res: Response, next: NextFunction): void => {
const result = schema.safeParse(req.query);
if (!result.success) {
const errors = result.error.errors.map((err) => ({
field: err.path.join('.'),
message: err.message,
}));
res.status(400).json({
success: false,
error: 'Validation failed',
details: errors,
});
return;
}
// Replace query with validated and typed data
req.query = result.data as any;
next();
};
}
/**
* Generic validation middleware factory for URL parameters
* Creates a middleware that validates URL parameters against a Zod schema
* @param schema - Zod schema to validate against
*/
function validateParams<T extends z.ZodTypeAny>(schema: T) {
return (req: Request, res: Response, next: NextFunction): void => {
const result = schema.safeParse(req.params);
if (!result.success) {
const errors = result.error.errors.map((err) => ({
field: err.path.join('.'),
message: err.message,
}));
res.status(400).json({
success: false,
error: 'Validation failed',
details: errors,
});
return;
}
// Replace params with validated and typed data
req.params = result.data;
next();
};
}
/**
* Pre-configured validation middlewares for notifications
*/
export const validateCreateNotification = validateBody(createNotificationSchema);
export const validateListNotifications = validateQuery(listNotificationsQuerySchema);
export const validateUuidParam = validateParams(uuidParamSchema);