Ir al contenido

Cómo crear un pipeline nuevo

Guía operativa. Para el “por qué” del framework, ver ADR-Pipeline-V2-Framework.

Esta guía describe los pasos exactos para agregar un pipeline al sistema. Usa alertas_waypoint como referencia viva — siempre que dudes, mira ese pipeline.

Define en una frase:

  • Nombre: snake_case, único. Ejemplo: alertas_waypoint.
  • Fuente: IMAP / archivo local / Google Sheets / API. Determina qué etapas implementas.
  • Salida: Excel / GSheets / SQLite / Drive. Determina publish_targets.
  • Cuenta IMAP (si aplica): debe existir en config.py como usuario válido.

Para un pipeline llamado <name> (snake_case):

#ArchivoPropósito
1config/pipelines/<name>.yamlSpec declarativa (IMAP queries, raw storage, publish targets)
2domains/<name>/__init__.pyMarca el módulo
3domains/<name>/pipeline.pySubclase de BasePipeline
4domains/<name>/process_<name>_from_raw.pyLógica de transformación raw → salida
5main_<name>.pyCLI standalone (entry point humano)
6Edición de run_pipeline.pyRegistrar el pipeline en el orquestador

Opcionales según necesidad:

  • domains/<name>/business_<name>.py — lógica de negocio compleja
  • domains/<name>/excel_writer_<name>.py — escritura de Excel multi-hoja
  • domains/<name>/scope.py — versionado de scope para reprocesos

config/pipelines/<name>.yaml:

name: <name>
usuario: <USUARIO_IMAP> # debe existir en config.py
raw:
dataset: emails_<name>_raw
filename_base: emails_<name>_raw
base_dir: ./data_raw
storage: parquet
truncate_html_kb: null
imap:
use_all_mailboxes: false
date_mode: internal # o "header"
search_queries:
- [SUBJECT, "<patrón>"]
- [FROM, "<remitente>"]
only_from:
- <email@dominio>
extras: {}

Si el pipeline no usa IMAP (lee de un archivo local, por ejemplo), pon imap: null y raw: null. Ver config/pipelines/cierres_tecnicos.yaml como ejemplo.

domains/<name>/pipeline.py. Esqueleto mínimo para un pipeline IMAP:

"""Pipeline <name>: <descripción de 1 línea>."""
from __future__ import annotations
from core.pipeline import BasePipeline, Stage, StageResult
from core.sources.imap import IMAPConnector
from core.sources.imap.shared_ingest import ingest_with_connector
from domains.<name>.process_<name>_from_raw import build_excel_from_raw_<name>
class <Name>Pipeline(BasePipeline):
def __init__(self, config, *, spec, mail=None, logger=None, connector=None):
super().__init__(config, logger=logger)
self.spec = spec
self.mail = mail
# Constructor injection opcional para tests con FakeConnector.
# En producción `connector=None` → se construye uno real desde el spec.
self.imap_source = connector or IMAPConnector.from_spec(spec, mail, logger=self.log)
def ingest(self) -> StageResult:
return ingest_with_connector(self.imap_source, self.config, self.log)
def final(self, inter) -> StageResult:
out_path = build_excel_from_raw_<name>(
storage=self.spec.raw.storage,
base_dir=self.spec.raw.base_dir,
dataset_name=self.spec.raw.dataset,
since=self.config.fecha_desde,
before=self.config.fecha_hasta,
)
return StageResult(
stage=Stage.FINAL,
status="ok" if out_path else "skipped",
data={"out_path": out_path},
metrics={"out_path": str(out_path) if out_path else None},
)

Patrón Hexagonal F2.1 (vigente). Este es el patrón validado en producción con alertas_waypoint (sesión 2026-05-27, ver Pipeline-Alertas-Waypoint). Los pipelines nuevos DEBEN usar IMAPConnector directo + ingest_with_connector, no el shim legacy run_imap_ingest. El shim sigue existiendo para los 5 pipelines IMAP aún no migrados (ver Plan-Migracion-F2.2-F2.6) pero será eliminado tras F2.6.

Reglas clave:

  • Sobrescribe solo las etapas que el pipeline implementa. El resto hereda skipped.
  • Cada etapa devuelve un StageResult. El data se pasa a la siguiente.
  • Delega la transformación a process_<name>_from_raw.py o business_<name>.py — no la metas dentro del método de la etapa.

Detalles completos del contrato (signature exacta de cada etapa, campos de PipelineConfig y StageResult, comportamiento de run(), short-circuits, schema YAML, métricas auto-persistidas): Reference-BasePipeline.

domains/<name>/process_<name>_from_raw.py:

"""Lee RAW parquet, parsea, escribe Excel."""
from __future__ import annotations
from typing import Optional
from core.raw_loaders import load_parquet # o load_sqlite / load_jsonl
def build_excel_from_raw_<name>(
storage: str = "parquet",
base_dir: str = "./data_raw",
dataset_name: str = "emails_<name>_raw",
since: Optional[str] = None,
before: Optional[str] = None,
output_dir: Optional[str] = None,
) -> Optional[str]:
"""Carga RAW → parsea → escribe Excel. Retorna out_path o None."""
df = load_parquet(base_dir, dataset_name, since, before)
if df.empty:
return None
# ... transformación ...
out_path = "SALIDAS/<name>_..."
# ... escribir Excel ...
return out_path

Esta separación (pipeline.py = orquesta, process_*.py = transforma) es la convención del proyecto. Permite testear la transformación sin levantar IMAP.

main_<name>.py:

"""CLI standalone para el pipeline <name>."""
from __future__ import annotations
import argparse
from datetime import datetime
# ... imports estándar de pipeline ...
DEFAULT_FECHA_DESDE = "01-01-2026"
def build_config(args: argparse.Namespace):
"""Construye PipelineConfig desde args CLI. Llamada por run_pipeline.py."""
# ... parsear fechas, leer spec, devolver PipelineConfig ...
def main():
parser = argparse.ArgumentParser(...)
parser.add_argument("--FECHA_DESDE", default=DEFAULT_FECHA_DESDE)
parser.add_argument("--FECHA_HASTA", default=None)
parser.add_argument("--all", action="store_true")
parser.add_argument("--offline", action="store_true")
parser.add_argument("--verbose", "-v", action="store_true")
# ...
args = parser.parse_args()
# ... ejecutar pipeline ...
if __name__ == "__main__":
main()

Copia y adapta main_alertas_waypoint.py como template.

Editar run_pipeline.py:

PIPELINES_V2: dict[str, PipelineEntry] = {
# ... existentes ...
"<name>": PipelineEntry(
name="<name>",
spec_name="<name>",
usuario="<name>", # o nombre real del usuario IMAP
pipeline_class_path="domains.<name>.pipeline:<Name>Pipeline",
build_config_path="main_<name>:build_config",
needs_mail=True, # False si no usa IMAP
accepts_dates=True,
accepts_offline=True,
),
}

Decide la posición según dependencias:

ALL_PIPELINE_ORDER: list[str] = [
"pedidos_HES",
"facturacion",
# ... <name> aquí, en el orden adecuado ...
]
_PIPELINE_WAVES: list[list[str]] = [
["fetch_saldos", "fetch_cierres"],
["pedidos_HES", "facturacion", ..., "<name>"], # wave 2: paralelo
]

Pon el pipeline en la wave donde todas sus dependencias ya completaron. Si no depende de nada, va a wave 2 (la wave principal).

Pipeline-<Name>.md en el vault. Usa Pipeline-Alertas-Waypoint como template. Debe incluir:

  • Frontmatter con portal: (ejecutiva + técnica para el portal Angular)
  • Origen de los datos
  • Estructura del input (correo, archivo, etc.)
  • Procesamiento aplicado
  • Salida (rutas, hojas, columnas)
  • Comandos de ejecución
  • Cuenta IMAP (si aplica)

Agregar fila a la tabla de pipelines registrados.

03-Sesiones/sesion-YYYY-MM-DD-<name>.md con qué se hizo, qué se descubrió, próximos pasos.

Ventana de terminal
# Ejecución aislada — verifica que el pipeline corre
python main_<name>.py --FECHA_DESDE 01-05-2026 --FECHA_HASTA 18-05-2026 --verbose
# Reproceso offline — verifica que las etapas posteriores a ingest funcionan
python main_<name>.py --offline
# Vía orquestador — verifica el registro
python run_pipeline.py <name>
# En el flujo completo
python run_pipeline.py --all

Verificar que:

  • Se generó la salida esperada (Excel/GSheets/etc.)
  • Se registró la corrida en ingeldata.db.pipeline_runs
  • Las etapas dejaron tiempos y métricas en pipeline_stage_runs

Antes de commitear:

  • YAML spec creado y válido
  • Subclase BasePipeline implementa al menos ingest (si IMAP) y final
  • process_<name>_from_raw.py separa la transformación de la orquestación
  • main_<name>.py con CLI estándar (FECHA_DESDE, FECHA_HASTA, —all, —offline, —verbose)
  • Registrado en PIPELINES_V2, ALL_PIPELINE_ORDER, _PIPELINE_WAVES
  • Ficha de documentación en PIPELINES/Pipeline-<Name>.md
  • Tabla del orquestador actualizada
  • Primer run exitoso con datos reales
  • Métricas y registro en pipeline_runs verificados
  • No mezcles fuentes: si un pipeline lee de IMAP Y de Google Sheets, separa en dos pipelines o usa extras: para parametrizar — no metas dos flujos en una clase.
  • No omitas process_<name>_from_raw.py: la separación pipeline/proceso es la convención. No metas el parsing dentro del método final().
  • No hardcodees credenciales ni paths absolutos en el código del pipeline. Todo va en config.py o el YAML.
  • No persistas a data_warehouse.db directamente. El pipeline escribe a su salida (Excel / ingeldata.db operacional / GSheets). La consolidación analítica es un paso aparte (ver ADR-Pipeline-V2-Framework §Consecuencias).
  • No saltes el registro: un pipeline que no está en PIPELINES_V2 es invisible para el orquestador y para --all. alertas_waypoint sufrió este olvido al inicio (construido pero sin registrar) — fue corregido en sesión 2026-05-18.
  • ADR-Pipeline-V2-Framework — diseño y trade-offs
  • Orquestador-Pipelines — registro completo y waves
  • Pipeline-Alertas-Waypoint — pipeline V2 más reciente (referencia viva)
  • core/pipeline.py — el contrato en código
  • core/pipeline_loader.py — parser YAML
  • core/sources/imap/IMAPConnector + helper ingest_with_connector (patrón Hexagonal F2/F2.1)
  • core/imap_ingest_shared.py (legacy) — shim run_imap_ingest para pipelines aún no migrados; ver Plan-Migracion-F2.2-F2.6
  • ADR-Source-Connectors — el por qué del patrón Hexagonal (status accepted)