ADR: Framework de pipelines V2 (BasePipeline + DAG 6-stage)
Aceptada — implementada progresivamente desde 2026-04. Esta ADR formaliza retroactivamente la decisión y deja el contrato escrito.
Contexto
Sección titulada «Contexto»Hasta principios de 2026 cada pipeline era un script suelto (main_*.py) con su propia mecánica: argparse, conexión IMAP, parsing, escritura a Excel/GSheets, todo mezclado en 300-800 líneas. Esto producía:
- Duplicación masiva: cada
main_*.pyreimplementaba la conexión IMAP, el manejo de fechas, el flujo offline, el dry-run, el logging. - Observabilidad nula: imposible saber cuánto tardó cada etapa, qué etapa falló, qué métricas dejó cada paso.
- Imposible orquestar en paralelo: cada script era una caja negra subprocess.
- Sin contrato claro: agregar un pipeline nuevo requería copiar un main existente y rezar.
- Reprocesar era manual: no había forma estándar de saltar la ingesta y reprocesar archivos raw locales.
La pregunta: ¿debería existir un framework formal con etapas estandarizadas y contrato uniforme, o seguir con scripts ad-hoc?
Opciones evaluadas
Sección titulada «Opciones evaluadas»Opción A: Mantener scripts sueltos (statu quo)
Sección titulada «Opción A: Mantener scripts sueltos (statu quo)»Cada pipeline sigue siendo un main_*.py independiente.
Descartada porque:
- La duplicación crece con cada pipeline nuevo.
- No hay forma de paralelizar sin levantar N subprocess.
- La observabilidad por etapa requiere instrumentar cada script a mano.
- Cualquier cambio transversal (ej. agregar columna de versionado) toca N archivos.
Opción B: BasePipeline con DAG 6-stage (ELEGIDA)
Sección titulada «Opción B: BasePipeline con DAG 6-stage (ELEGIDA)»Una clase abstracta BasePipeline con seis etapas opcionales:
ingest → store → process → intermediate → final → publishCada subclase sobrescribe sólo las etapas que necesita. Las que no, devuelven StageResult(status="skipped").
Elegida porque:
- Contrato único: todos los pipelines tienen el mismo shape de entrada/salida.
- Observabilidad gratis:
run()mide tiempo por etapa, captura excepciones, registra métricas. - Short-circuits estándar:
offlinesalteaingest,dry_runsalteapublish,reprocesslee de raw local,target=devapunta adev.sqlite. - Paralelización trivial: el orquestador puede correr pipelines V2 en
ThreadPoolExecutorporque la API es uniforme. - Configuración inmutable:
PipelineConfiges un dataclass frozen — un pipeline es deterministicamente reproducible. - YAML externo: los filtros IMAP, datasets y publish targets viven en
config/pipelines/<name>.yaml, no en código.
Opción C: Framework externo (Prefect, Airflow, Dagster)
Sección titulada «Opción C: Framework externo (Prefect, Airflow, Dagster)»Adoptar un orquestador estándar de la industria.
Descartada porque:
- Overhead operativo desproporcionado al tamaño actual (≤10 pipelines).
- Requiere infraestructura adicional (broker, scheduler, UI).
- El usuario (Manuel) trabaja desde laptop personal sin servidor permanente.
- El valor real está en el contrato uniforme, no en el scheduler.
- Si en el futuro se justifica, BasePipeline es trivialmente envolvible en una Prefect task o un Dagster op.
Decisión
Sección titulada «Decisión»Opción B — Framework propio centrado en BasePipeline, con DAG de 6 etapas, configuración via PipelineConfig (dataclass frozen) + YAML spec externo, ejecución orquestada por run_pipeline.py con waves paralelas.
El contrato (resumen)
Sección titulada «El contrato (resumen)»El framework define seis etapas opcionales en DAG lineal:
ingest → store → process → intermediate → final → publishCada etapa recibe el StageResult de la previa y devuelve uno nuevo. Una etapa no implementada devuelve skipped y run() propaga el resultado al siguiente. Configuración inmutable via PipelineConfig (dataclass frozen). Short-circuits estándar: offline saltea ingest, dry_run saltea publish, reprocess lee raw local, target=dev apunta a dev.sqlite. Spec declarativa en config/pipelines/<name>.yaml.
Detalle técnico completo del contrato — etapas, PipelineConfig, StageResult, PipelineRun, run(), YAML spec, métricas auto-persistidas: Reference-BasePipeline.
Fundamento
Sección titulada «Fundamento»El patrón es un Template Method clásico (Gamma et al., 1994): la superclase fija el esqueleto algorítmico (run()), las subclases parametrizan los pasos. Las etapas son opcionales por defecto, no abstractas, evitando que un pipeline simple (ej. solo ingest + final) tenga que stub-ear cuatro métodos.
La separación entre ingest/store/process permite que --offline y --reprocess salten exactamente lo correcto sin que la subclase implemente la lógica del short-circuit.
El StageResult.data pasando entre etapas es un chain of responsibility ligero: cada etapa decide qué pasar a la siguiente (DataFrame, path, dict de métricas, lo que sea).
Consecuencias
Sección titulada «Consecuencias»Positivas
Sección titulada «Positivas»- Agregar un pipeline nuevo es mecánico: YAML + subclase + main + registro. Ver Como-Crear-Pipeline-Nuevo.
- Observabilidad uniforme:
pipeline_runsypipeline_stage_runseningeldata.dbregistran cada corrida. - Paralelización trivial via waves en
run_pipeline.py. - Reproceso desde raw local es flag, no script aparte.
- Tests por etapa son posibles porque cada método es aislable.
Negativas
Sección titulada «Negativas»- Hay deuda: no todos los pipelines viejos están portados.
cruce_facturacion,fetch_saldos,fetch_cierressiguen como subprocess legacy. - El framework asume IMAP como fuente por defecto. Pipelines con otra fuente (
cierres_tecnicoslee local) usanimap: nullen YAML — funcional pero menos elegante. data_warehouse.py(consolidación star schema) NO está adaptado al framework: sigue siendo script suelto. Decisión separada — ver ADR pendiente sobre orquestador de epilogue.
Neutras
Sección titulada «Neutras»- Los scripts
main_*.pysiguen existiendo como entry points humanos. El orquestador los ignora y va directo a la clase del pipeline (PIPELINES_V2[name].pipeline_class_path).
Estado de adopción (2026-05)
Sección titulada «Estado de adopción (2026-05)»| Pipeline | V2 | Notas |
|---|---|---|
pedidos_SAP | ✅ | |
pedidos_HES | ✅ | |
facturacion | ✅ | |
valorizaciones | ✅ | |
gantt | ✅ | |
cierres_tecnicos | ✅ | sin IMAP (raw: null, imap: null) |
pagos_pendientes | ✅ | sin IMAP |
alertas_waypoint | ✅ | construido pero no registrado en PIPELINES_V2 del main |
costos | 🟡 | domains/costos/main_costos.py existe, sin subclase ni YAML |
fetch_saldos, fetch_cierres, cruce_facturacion | ❌ | legacy subprocess |
data_warehouse, generate_dim_ot, seed_dim_proceso, pbi_refresh | ❌ | scripts sueltos, fuera del framework |
Ver también
Sección titulada «Ver también»core/pipeline.py— el contrato en códigocore/pipeline_loader.py— parser del YAML specdomains/alertas_waypoint/— pipeline V2 más reciente como referencia- Como-Crear-Pipeline-Nuevo — guía paso a paso
- Orquestador-Pipelines — registro y waves de ejecución
REFACTOR_PIPELINE_PLAN.mden el repo — plan original de migración