Ir al contenido

Orquestador Pipelines - IngelCoding

Origen: GitHub Copilot (claude-sonnet-4.6) — actualizado 2026-05-18 Última actualización: 2026-05-18 (refleja registry V2 + waves paralelas + alertas_waypoint)

Ver también: ADR-Pipeline-V2-Framework (decisión del framework), Reference-BasePipeline (contrato técnico), Como-Crear-Pipeline-Nuevo (guía operativa), ADR-Zonal-Gestora-Derivada, ADR-Source-Connectors (decisión pendiente meta-orquestador).


El orquestador run_pipeline.py gestiona la ejecución de todos los pipelines de datos de IngelCoding desde un punto de entrada centralizado.


El orquestador mantiene tres registries internos según el modo de ejecución:

PIPELINES_V2 — pipelines in-process (BasePipeline)

Sección titulada «PIPELINES_V2 — pipelines in-process (BasePipeline)»
PipelineDomainneeds_mailaccepts_datesNotas
pedidos_SAPdomains.pedidos_sap
pedidos_HESdomains.pedidos_hespublish_step: cruce_facturacion
facturaciondomains.facturacionpublish_step: cruce_facturacion
valorizacionesdomains.valorizaciones
ganttdomains.gantt
cierres_tecnicosdomains.cierres_tecnicossin IMAP (lee local)
pagos_pendientesdomains.pagos_pendientessin IMAP
alertas_waypointdomains.alertas_waypointnuevo 2026-05
PipelineScriptNotas
cruce_facturacionscripts/reports/CRUCE_FACTURACION_VS_FACTURAS.pyUsado como publish_step de facturacion y pedidos_HES
fetch_saldosscripts/updates/fetch_missing_reports.py --mode saldosPre-step de pagos_full
fetch_cierresscripts/updates/fetch_missing_reports.py --mode control

PIPELINES_COMPOUND — pre-step legacy + main V2

Sección titulada «PIPELINES_COMPOUND — pre-step legacy + main V2»
PipelineComposición
pagos_fullfetch_saldos (legacy) + pagos_pendientes (V2)
ItemTipoEstado
costosPipeline parcialdomains/costos/main_costos.py existe sin clase V2 ni YAML
data_warehouseConsolidador analíticoscripts/db/data_warehouse.py standalone — debería ser epilogue
generate_dim_otGenerador dimensiónscripts/generate_dim_ot.py standalone
seed_dim_procesoSeeder dimensiónscripts/seed_dim_proceso.py standalone
pbi_refreshRefresh Power BI Servicescripts/pbi_refresh.py standalone

Ver: discusión 2026-05-18 sobre meta-orquestador (ADR pendiente).


FlagAliasDescripción
--all-aEjecutar todos los pipelines en secuencia
--verbose-vLogging detallado
--skip-revisionPara cierres_tecnicos: omitir paso 1 (generar revisión)
--dry-runPara cierres_tecnicos: solo generar, no subir a GSheets
  • --fecha-desde: Fecha inicio de extracción
  • --fecha-hasta: Fecha fin de extracción

Scripts marked con no_date_args=True no requieren fechas (ej. actualizaciones incrementales).


Pipeline de dos pasos:

  1. generate_revision: Genera revisión desde CONTROL OTs.py → parquet temporal
  2. write_cierres_to_sheets: Escribe a Google Sheets via ACTUALIZAR_CIERRES_TECNICOS.py
domains/cierres_tecnicos/
├── __init__.py
├── generate_revision.py # Wrapper de CONTROL OTs.py
└── write_cierres_to_sheets.py # Wrapper de ACTUALIZAR_CIERRES_TECNICOS.py

Flags de main:

  • --zonal: Procesar por zona específica
  • --skip-revision: Omitir paso 1 (generar revisión)
  • --dry-run: Simulación

import importlib
# Cargar script legacy como módulo
legacy_module = importlib.import_module("scripts.CONTROL_OTs")
# Llamar función del script
result = legacy_module.main(...)

Scripts de Capa 2 usan parser.add_argument('--param', help=argparse.SUPPRESS) para no aparecer en help del orquestador.


  • Status: ✅ Corregido 2026-04-13
  • Issue: Usaba datetime.now() en B1 (fecha genérica)
  • Fix: Ahora inyecta la fecha del mail (del nombre del archivo) en B1
  • Aplica a: pagos_pendientes (solo lectura) y pagos_full (fetch + escribir)

Orden de Ejecución (—all) — modelo de waves paralelas

Sección titulada «Orden de Ejecución (—all) — modelo de waves paralelas»

Desde 2026-04 el orquestador ejecuta los pipelines en waves: cada wave espera a que la anterior complete; dentro de la wave los pipelines corren en paralelo (ThreadPoolExecutor).

_PIPELINE_WAVES: list[list[str]] = [
# Wave 1: fetchers (pre-requisitos)
["fetch_saldos", "fetch_cierres"],
# Wave 2: pipelines principales (independientes entre sí)
["pedidos_HES", "facturacion", "valorizaciones", "pedidos_SAP", "gantt",
"alertas_waypoint", "cierres_tecnicos", "pagos_pendientes", "pagos_full"],
]

Post-wave: los publish_steps declarados por pipelines V2 (ej. cruce_facturacion de facturacion + pedidos_HES) se deduplican y ejecutan una sola vez al final cuando --publish está activo.

Total: 11 pipelines registrados (3 legacy + 8 V2 — pagos_full es compound).

Lo que falta en el orquestador (gap conocido)

Sección titulada «Lo que falta en el orquestador (gap conocido)»

El orquestador termina después del post-wave de publish_steps. No incluye la consolidación analítica que alimenta Power BI:

Wave 1 → Wave 2 → publish_steps diferidos → [TERMINA]
FALTA: epilogue de consolidación
- seed_dim_proceso
- generate_dim_ot
- data_warehouse --populate
- pbi_refresh

Estos pasos hoy se corren manualmente con python scripts/db/data_warehouse.py --populate. Decisión arquitectónica pendiente — ver discusión 2026-05-18 sobre meta-orquestador.


Ventana de terminal
# Ejecutar todos los pipelines
python run_pipeline.py --all
# Ejecutar todos con logging detallado
python run_pipeline.py --all --verbose
# Ejecutar un pipeline específico
python run_pipeline.py facturacion --fecha-desde 2026-01-01 --fecha-hasta 2026-04-30
# Ejecutar cierres_tecnicos sin generar revisión (solo escribir)
python run_pipeline.py cierres_tecnicos --skip-revision
# Dry-run para cierres_tecnicos
python run_pipeline.py cierres_tecnicos --dry-run
# Ejecutar pagos_full (fetch + escribir)
python run_pipeline.py pagos_full

FechaCambio
2026-05-18Agregado alertas_waypoint (V2); modelo waves documentado; sección deuda/epilogue agregada
2026-04-13c10 pipelines (actualizar_facturas_drive y actualizar_facturacion_hes ahora son post_steps)
2026-04-13b12 pipelines (+ fetch_saldos, fetch_cierres, pagos_full, flags —all/—skip-revision, fix fecha pagos)
2026-04-139 pipelines (agregados cierres_tecnicos + 2× Capa 2)
2026-04-126 pipelines iniciales


PipelineNota de Referencia
pedidos_HESPipeline-Pedidos-HES (incluye post-step actualizar_facturacion_hes)
facturacionPipeline-Facturacion (incluye post-step actualizar_facturas_drive)
valorizacionesPipeline-Valorizaciones (documentación pendiente)
pedidos_SAPPipeline-Pedidos-SAP (documentación pendiente)
ganttPipeline-Gantt
fetch_saldosPipeline-Fetch-Saldos (legacy, sin doc dedicada)
fetch_cierresPipeline-Cierres-Técnicos (legacy, sin doc dedicada)
pagos_pendientesPipeline-Pagos-Pendientes (sin doc dedicada)
pagos_fullPipeline-Pagos-Full (compound, sin doc dedicada)
cierres_tecnicosPipeline-Cierres-Técnicos (sin doc dedicada)
alertas_waypointPipeline-Alertas-Waypoint

Documentación actualizada 2026-05-18