Ir al contenido

Arquitectura de Persistencia

Modelo de 3 capas para el ciclo de vida de los datos en IngelCoding.

IMAP / GSheets / Excel
|
v
┌─────────────┐
│ RAW STORE │ data_raw/{dataset}_{ABV}.parquet
│ (acumulativo)│ Merge+dedup por message_id
│ │ Escritura atomica (tmp + os.replace)
└──────┬───────┘
|
process_*_from_raw.py
|
v
┌─────────────┐
│ PROCESSED │ SALIDAS/processed/{domain}/{name}.parquet
│ STORE │ _manifest.json (timestamp, row_count, columns)
│ (curado) │ Escritura atomica, compresion zstd
└──────┬───────┘
|
┌────┴─────┐
v v
┌────────┐ ┌────────┐
│ OUTPUT │ │ GSHEETS│
│ STORE │ │ (sync) │
│ .xlsx │ │ FACTUR.│
│ .db │ │ FACTURAS│
└────────┘ └────────┘

Almacena emails crudos descargados via IMAP como parquets acumulativos.

  • Naming canonico: {dataset}_{ABV}.parquet (ej. emails_pedidos_HES_raw_ING.parquet)
  • Dedup: por message_id, keep "last" (el mas reciente gana)
  • Escritura atomica: .tmp + os.replace() via core/raw_store.py
  • Descarga incremental: core/incremental.py calcula FECHA_DESDE desde la ultima ejecucion exitosa en pipeline_runs, con overlap configurable (INCREMENTAL_OVERLAP_DAYS)
  • Modo offline: --offline salta la descarga IMAP y usa los parquets existentes

Modulos: core/raw_store.py, core/raw_loaders.py, core/incremental.py

Almacena DataFrames procesados como parquets durables con metadata sidecar.

  • Estructura: SALIDAS/processed/{domain}/{name}.parquet + _manifest.json
  • Manifest: JSON con timestamp, row_count, columns por cada DataFrame
  • Escritura atomica: .tmp + os.replace() via core/processed_store.py
  • Testabilidad: base_dir inyectable para tests con directorio temporal
  • Carga con fallback: load_processed_or_pkl() intenta processed, cae a pkl legacy, regenera si es necesario

Reemplaza el cache pkl volatil (SALIDAS/cache_df/*.pkl) que se sobreescribia en cada run sin metadata.

Modulo: core/processed_store.py

Capa de salida final para consumo humano y sistemas externos.

DestinoFormatoDescripcion
SALIDAS/*.xlsxExcelReportes de negocio por dominio
SALIDAS/db/ingeldata.dbSQLiteBD operacional — trazabilidad de pipelines (pipeline_runs, pipeline_stage_runs, raw_files, publish_log, scope_rebuild_history) + tablas de dominio (facturas, pedidos_hes, pedidos_sap, valorizaciones, gantt_actividades, pagos_pendientes)
SALIDAS/db/data_warehouse.dbSQLiteDW analitico (facts + dims)
Google SheetsAPISync operacional de transicion (FACTURACION, FACTURAS)

Cada ejecucion se registra en ingeldata.db.pipeline_runs con:

CampoProposito
pipeline_nameIdentificador del pipeline
raw_date_from / raw_date_toRango de fechas procesado
usuarioABV del usuario IMAP
input_hashSHA256 del raw parquet de entrada
output_rowsFilas del output
code_versionHash corto del commit git
statusrunning / success / failed

Los parametros operativos viven en config/config.py:

ConstanteValorProposito
INCREMENTAL_OVERLAP_DAYS3Dias de overlap para descarga incremental
RAW_COMPRESSIONzstdCompresion de parquets
KNOWN_ABVS[MC_AT, ING, HC, MC, NC]ABVs para naming canonico