Ir al contenido

ADR: Framework de pipelines V2 (BasePipeline + DAG 6-stage)

Aceptada — implementada progresivamente desde 2026-04. Esta ADR formaliza retroactivamente la decisión y deja el contrato escrito.

Hasta principios de 2026 cada pipeline era un script suelto (main_*.py) con su propia mecánica: argparse, conexión IMAP, parsing, escritura a Excel/GSheets, todo mezclado en 300-800 líneas. Esto producía:

  • Duplicación masiva: cada main_*.py reimplementaba la conexión IMAP, el manejo de fechas, el flujo offline, el dry-run, el logging.
  • Observabilidad nula: imposible saber cuánto tardó cada etapa, qué etapa falló, qué métricas dejó cada paso.
  • Imposible orquestar en paralelo: cada script era una caja negra subprocess.
  • Sin contrato claro: agregar un pipeline nuevo requería copiar un main existente y rezar.
  • Reprocesar era manual: no había forma estándar de saltar la ingesta y reprocesar archivos raw locales.

La pregunta: ¿debería existir un framework formal con etapas estandarizadas y contrato uniforme, o seguir con scripts ad-hoc?

Opción A: Mantener scripts sueltos (statu quo)

Sección titulada «Opción A: Mantener scripts sueltos (statu quo)»

Cada pipeline sigue siendo un main_*.py independiente.

Descartada porque:

  • La duplicación crece con cada pipeline nuevo.
  • No hay forma de paralelizar sin levantar N subprocess.
  • La observabilidad por etapa requiere instrumentar cada script a mano.
  • Cualquier cambio transversal (ej. agregar columna de versionado) toca N archivos.

Opción B: BasePipeline con DAG 6-stage (ELEGIDA)

Sección titulada «Opción B: BasePipeline con DAG 6-stage (ELEGIDA)»

Una clase abstracta BasePipeline con seis etapas opcionales:

ingest → store → process → intermediate → final → publish

Cada subclase sobrescribe sólo las etapas que necesita. Las que no, devuelven StageResult(status="skipped").

Elegida porque:

  • Contrato único: todos los pipelines tienen el mismo shape de entrada/salida.
  • Observabilidad gratis: run() mide tiempo por etapa, captura excepciones, registra métricas.
  • Short-circuits estándar: offline saltea ingest, dry_run saltea publish, reprocess lee de raw local, target=dev apunta a dev.sqlite.
  • Paralelización trivial: el orquestador puede correr pipelines V2 en ThreadPoolExecutor porque la API es uniforme.
  • Configuración inmutable: PipelineConfig es un dataclass frozen — un pipeline es deterministicamente reproducible.
  • YAML externo: los filtros IMAP, datasets y publish targets viven en config/pipelines/<name>.yaml, no en código.

Opción C: Framework externo (Prefect, Airflow, Dagster)

Sección titulada «Opción C: Framework externo (Prefect, Airflow, Dagster)»

Adoptar un orquestador estándar de la industria.

Descartada porque:

  • Overhead operativo desproporcionado al tamaño actual (≤10 pipelines).
  • Requiere infraestructura adicional (broker, scheduler, UI).
  • El usuario (Manuel) trabaja desde laptop personal sin servidor permanente.
  • El valor real está en el contrato uniforme, no en el scheduler.
  • Si en el futuro se justifica, BasePipeline es trivialmente envolvible en una Prefect task o un Dagster op.

Opción B — Framework propio centrado en BasePipeline, con DAG de 6 etapas, configuración via PipelineConfig (dataclass frozen) + YAML spec externo, ejecución orquestada por run_pipeline.py con waves paralelas.

El framework define seis etapas opcionales en DAG lineal:

ingest → store → process → intermediate → final → publish

Cada etapa recibe el StageResult de la previa y devuelve uno nuevo. Una etapa no implementada devuelve skipped y run() propaga el resultado al siguiente. Configuración inmutable via PipelineConfig (dataclass frozen). Short-circuits estándar: offline saltea ingest, dry_run saltea publish, reprocess lee raw local, target=dev apunta a dev.sqlite. Spec declarativa en config/pipelines/<name>.yaml.

Detalle técnico completo del contrato — etapas, PipelineConfig, StageResult, PipelineRun, run(), YAML spec, métricas auto-persistidas: Reference-BasePipeline.

El patrón es un Template Method clásico (Gamma et al., 1994): la superclase fija el esqueleto algorítmico (run()), las subclases parametrizan los pasos. Las etapas son opcionales por defecto, no abstractas, evitando que un pipeline simple (ej. solo ingest + final) tenga que stub-ear cuatro métodos.

La separación entre ingest/store/process permite que --offline y --reprocess salten exactamente lo correcto sin que la subclase implemente la lógica del short-circuit.

El StageResult.data pasando entre etapas es un chain of responsibility ligero: cada etapa decide qué pasar a la siguiente (DataFrame, path, dict de métricas, lo que sea).

  • Agregar un pipeline nuevo es mecánico: YAML + subclase + main + registro. Ver Como-Crear-Pipeline-Nuevo.
  • Observabilidad uniforme: pipeline_runs y pipeline_stage_runs en ingeldata.db registran cada corrida.
  • Paralelización trivial via waves en run_pipeline.py.
  • Reproceso desde raw local es flag, no script aparte.
  • Tests por etapa son posibles porque cada método es aislable.
  • Hay deuda: no todos los pipelines viejos están portados. cruce_facturacion, fetch_saldos, fetch_cierres siguen como subprocess legacy.
  • El framework asume IMAP como fuente por defecto. Pipelines con otra fuente (cierres_tecnicos lee local) usan imap: null en YAML — funcional pero menos elegante.
  • data_warehouse.py (consolidación star schema) NO está adaptado al framework: sigue siendo script suelto. Decisión separada — ver ADR pendiente sobre orquestador de epilogue.
  • Los scripts main_*.py siguen existiendo como entry points humanos. El orquestador los ignora y va directo a la clase del pipeline (PIPELINES_V2[name].pipeline_class_path).
PipelineV2Notas
pedidos_SAP
pedidos_HES
facturacion
valorizaciones
gantt
cierres_tecnicossin IMAP (raw: null, imap: null)
pagos_pendientessin IMAP
alertas_waypointconstruido pero no registrado en PIPELINES_V2 del main
costos🟡domains/costos/main_costos.py existe, sin subclase ni YAML
fetch_saldos, fetch_cierres, cruce_facturacionlegacy subprocess
data_warehouse, generate_dim_ot, seed_dim_proceso, pbi_refreshscripts sueltos, fuera del framework
  • core/pipeline.py — el contrato en código
  • core/pipeline_loader.py — parser del YAML spec
  • domains/alertas_waypoint/ — pipeline V2 más reciente como referencia
  • Como-Crear-Pipeline-Nuevo — guía paso a paso
  • Orquestador-Pipelines — registro y waves de ejecución
  • REFACTOR_PIPELINE_PLAN.md en el repo — plan original de migración