- Add scripts/import_atlas_data.py to load Atlas data from Excel files - Import 6,206 inventory items, 251 customers and 4,582 historical sales - Create historical_sales table in tenant DB - Add /pos/historical-sales page and /pos/api/historical-sales endpoint - Link in reports sidebar for easy access
300 lines
8.6 KiB
Python
300 lines
8.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import Atlas data into Nexus POS tenant_refaccionaria_atlas.
|
|
|
|
Sources (expected in /home/):
|
|
- Articulos Atlas.xlsx -> inventory catalog (stock = 0)
|
|
- Clientes.xlsx -> customers
|
|
- Historico V.xlsx -> historical_sales (read-only reference)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
|
|
import psycopg2
|
|
import pandas as pd
|
|
|
|
BASE_DIR = "/home"
|
|
TENANT_DB = "tenant_refaccionaria_atlas"
|
|
|
|
|
|
def get_tenant_conn():
|
|
dsn = os.environ.get("TENANT_DB_URL", f"postgresql://postgres@localhost/{TENANT_DB}")
|
|
return psycopg2.connect(dsn)
|
|
|
|
|
|
def normalize_text(val, max_len=None):
|
|
if pd.isna(val):
|
|
return None
|
|
s = str(val).strip()
|
|
if s in ("", "nan", "None"):
|
|
return None
|
|
if max_len:
|
|
s = s[:max_len]
|
|
return s
|
|
|
|
|
|
def normalize_price(val):
|
|
if pd.isna(val):
|
|
return Decimal("0")
|
|
try:
|
|
return Decimal(str(float(val))).quantize(Decimal("0.01"))
|
|
except (ValueError, TypeError):
|
|
return Decimal("0")
|
|
|
|
|
|
def clean_part_number(val):
|
|
s = normalize_text(val, max_len=100)
|
|
if not s:
|
|
return None
|
|
# Remove problematic chars but keep basic alphanumeric + dash/underscore
|
|
s = re.sub(r"[^\w\-./]", "", s)
|
|
return s[:100] or None
|
|
|
|
|
|
def import_inventory(conn):
|
|
path = os.path.join(BASE_DIR, "Articulos Atlas.xlsx")
|
|
df = pd.read_excel(path)
|
|
print(f"[inventory] Read {len(df)} rows from {path}")
|
|
|
|
cur = conn.cursor()
|
|
inserted = 0
|
|
skipped_dup = 0
|
|
seen = set()
|
|
|
|
for _, row in df.iterrows():
|
|
part_number = clean_part_number(row.get("Clave"))
|
|
if not part_number:
|
|
continue
|
|
if part_number in seen:
|
|
skipped_dup += 1
|
|
continue
|
|
seen.add(part_number)
|
|
|
|
name = normalize_text(row.get("Producto"), max_len=300) or part_number
|
|
description = normalize_text(row.get("Nombre Alterno"), max_len=500)
|
|
category = normalize_text(row.get("Categoría 1"), max_len=100)
|
|
subcategory = normalize_text(row.get("Categoría 2"), max_len=100)
|
|
unit = normalize_text(row.get("Unidad"), max_len=20)
|
|
barcode = clean_part_number(row.get("Código de barras"))
|
|
price = normalize_price(row.get("Precio lista"))
|
|
|
|
# Concatenate category info into description if present
|
|
extra_info = " | ".join(filter(None, [category, subcategory, unit]))
|
|
if extra_info:
|
|
description = f"{description or ''} [{extra_info}]".strip() if description else f"[{extra_info}]"
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO inventory
|
|
(part_number, name, description, cost, price_1, tax_rate,
|
|
unit, barcode, brand, is_active)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (part_number) DO NOTHING
|
|
RETURNING id
|
|
""",
|
|
(
|
|
part_number,
|
|
name,
|
|
description,
|
|
Decimal("0"),
|
|
price,
|
|
Decimal("0.16"),
|
|
unit,
|
|
barcode,
|
|
category,
|
|
True,
|
|
),
|
|
)
|
|
if cur.fetchone():
|
|
inserted += 1
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
print(f"[inventory] Inserted: {inserted}, Duplicates skipped: {skipped_dup}")
|
|
|
|
|
|
def import_customers(conn):
|
|
path = os.path.join(BASE_DIR, "Clientes.xlsx")
|
|
df = pd.read_excel(path)
|
|
print(f"[customers] Read {len(df)} rows from {path}")
|
|
|
|
cur = conn.cursor()
|
|
inserted = 0
|
|
skipped = 0
|
|
|
|
for _, row in df.iterrows():
|
|
name = normalize_text(row.get("Empresa"), max_len=200)
|
|
if not name:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Skip generic "Publico" row if it has no real name
|
|
if name.lower() in ("publico", "publico en general", "público", "público en general"):
|
|
skipped += 1
|
|
continue
|
|
|
|
credit_limit = normalize_price(row.get("Límite de crédito"))
|
|
customer_type = normalize_text(row.get("Tipo de cliente"), max_len=50)
|
|
segment = normalize_text(row.get("Segmento"), max_len=50)
|
|
payment_terms = normalize_text(row.get("Condiciones de Pago"), max_len=100)
|
|
notes = " | ".join(filter(None, [customer_type, segment, payment_terms]))
|
|
|
|
# Avoid duplicates by name
|
|
cur.execute("SELECT 1 FROM customers WHERE name = %s LIMIT 1", (name,))
|
|
if cur.fetchone():
|
|
skipped += 1
|
|
continue
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO customers
|
|
(name, razon_social, credit_limit, price_tier, is_active)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
name,
|
|
name,
|
|
credit_limit,
|
|
1,
|
|
True,
|
|
),
|
|
)
|
|
if cur.fetchone():
|
|
inserted += 1
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
print(f"[customers] Inserted: {inserted}, Skipped: {skipped}")
|
|
|
|
|
|
def create_historical_sales_table(conn):
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS historical_sales (
|
|
id SERIAL PRIMARY KEY,
|
|
external_document_id VARCHAR(50),
|
|
document_no VARCHAR(50),
|
|
sale_date DATE,
|
|
customer_name VARCHAR(200),
|
|
total NUMERIC(12,2),
|
|
subtotal NUMERIC(12,2),
|
|
amount_paid NUMERIC(12,2),
|
|
payment_method VARCHAR(50),
|
|
discount NUMERIC(12,2) DEFAULT 0,
|
|
balance NUMERIC(12,2) DEFAULT 0,
|
|
raw_payment_code VARCHAR(20),
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_historical_sales_date
|
|
ON historical_sales(sale_date);
|
|
CREATE INDEX IF NOT EXISTS idx_historical_sales_customer
|
|
ON historical_sales(customer_name);
|
|
CREATE INDEX IF NOT EXISTS idx_historical_sales_document
|
|
ON historical_sales(document_no);
|
|
"""
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
|
|
|
|
def payment_label(code):
|
|
mapping = {
|
|
"1": "Efectivo",
|
|
"3": "Tarjeta",
|
|
"4": "Transferencia",
|
|
"6": "Cheque",
|
|
"28": "Crédito",
|
|
"99": "Por definir",
|
|
}
|
|
return mapping.get(str(code).strip(), f"Código {code}")
|
|
|
|
|
|
def import_historical_sales(conn):
|
|
path = os.path.join(BASE_DIR, "Historico V.xlsx")
|
|
df = pd.read_excel(path)
|
|
print(f"[historical_sales] Read {len(df)} rows from {path}")
|
|
|
|
create_historical_sales_table(conn)
|
|
|
|
cur = conn.cursor()
|
|
inserted = 0
|
|
|
|
for _, row in df.iterrows():
|
|
doc_id = normalize_text(row.get("ID Documento"), max_len=50)
|
|
doc_no = normalize_text(row.get("Documento No."), max_len=50)
|
|
|
|
fecha = row.get("Fecha")
|
|
if pd.isna(fecha):
|
|
sale_date = None
|
|
else:
|
|
try:
|
|
sale_date = pd.to_datetime(fecha).date()
|
|
except Exception:
|
|
sale_date = None
|
|
|
|
customer = normalize_text(row.get("Cliente"), max_len=200)
|
|
total = normalize_price(row.get("Total"))
|
|
subtotal = normalize_price(row.get("SubTotal"))
|
|
paid = normalize_price(row.get("Total Pagado"))
|
|
discount = normalize_price(row.get("Descuento"))
|
|
balance = normalize_price(row.get("Saldo"))
|
|
raw_payment = normalize_text(row.get("Forma de Pago"), max_len=20)
|
|
payment_label_str = payment_label(raw_payment) if raw_payment else None
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO historical_sales
|
|
(external_document_id, document_no, sale_date, customer_name,
|
|
total, subtotal, amount_paid, payment_method, discount, balance,
|
|
raw_payment_code)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
(
|
|
doc_id,
|
|
doc_no,
|
|
sale_date,
|
|
customer,
|
|
total,
|
|
subtotal,
|
|
paid,
|
|
payment_label_str,
|
|
discount,
|
|
balance,
|
|
raw_payment,
|
|
),
|
|
)
|
|
inserted += cur.rowcount
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
print(f"[historical_sales] Inserted: {inserted}")
|
|
|
|
|
|
def main():
|
|
print(f"Connecting to tenant {TENANT_DB}...")
|
|
conn = get_tenant_conn()
|
|
|
|
try:
|
|
import_inventory(conn)
|
|
import_customers(conn)
|
|
import_historical_sales(conn)
|
|
print("\nImport completed successfully.")
|
|
except Exception as e:
|
|
print(f"ERROR: {e}")
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|