Cómo crear un pipeline nuevo
Guía operativa. Para el “por qué” del framework, ver ADR-Pipeline-V2-Framework.
Esta guía describe los pasos exactos para agregar un pipeline al sistema. Usa alertas_waypoint como referencia viva — siempre que dudes, mira ese pipeline.
Antes de empezar
Sección titulada «Antes de empezar»Define en una frase:
- Nombre: snake_case, único. Ejemplo:
alertas_waypoint. - Fuente: IMAP / archivo local / Google Sheets / API. Determina qué etapas implementas.
- Salida: Excel / GSheets / SQLite / Drive. Determina
publish_targets. - Cuenta IMAP (si aplica): debe existir en
config.pycomo usuario válido.
Checklist de archivos a crear
Sección titulada «Checklist de archivos a crear»Para un pipeline llamado <name> (snake_case):
| # | Archivo | Propósito |
|---|---|---|
| 1 | config/pipelines/<name>.yaml | Spec declarativa (IMAP queries, raw storage, publish targets) |
| 2 | domains/<name>/__init__.py | Marca el módulo |
| 3 | domains/<name>/pipeline.py | Subclase de BasePipeline |
| 4 | domains/<name>/process_<name>_from_raw.py | Lógica de transformación raw → salida |
| 5 | main_<name>.py | CLI standalone (entry point humano) |
| 6 | Edición de run_pipeline.py | Registrar el pipeline en el orquestador |
Opcionales según necesidad:
domains/<name>/business_<name>.py— lógica de negocio complejadomains/<name>/excel_writer_<name>.py— escritura de Excel multi-hojadomains/<name>/scope.py— versionado de scope para reprocesos
Paso 1 — YAML spec
Sección titulada «Paso 1 — YAML spec»config/pipelines/<name>.yaml:
name: <name>usuario: <USUARIO_IMAP> # debe existir en config.pyraw: dataset: emails_<name>_raw filename_base: emails_<name>_raw base_dir: ./data_raw storage: parquet truncate_html_kb: nullimap: use_all_mailboxes: false date_mode: internal # o "header" search_queries: - [SUBJECT, "<patrón>"] - [FROM, "<remitente>"] only_from: - <email@dominio>extras: {}Si el pipeline no usa IMAP (lee de un archivo local, por ejemplo), pon imap: null y raw: null. Ver config/pipelines/cierres_tecnicos.yaml como ejemplo.
Paso 2 — Subclase de BasePipeline
Sección titulada «Paso 2 — Subclase de BasePipeline»domains/<name>/pipeline.py. Esqueleto mínimo para un pipeline IMAP:
"""Pipeline <name>: <descripción de 1 línea>."""from __future__ import annotations
from core.pipeline import BasePipeline, Stage, StageResultfrom core.sources.imap import IMAPConnectorfrom core.sources.imap.shared_ingest import ingest_with_connectorfrom domains.<name>.process_<name>_from_raw import build_excel_from_raw_<name>
class <Name>Pipeline(BasePipeline): def __init__(self, config, *, spec, mail=None, logger=None, connector=None): super().__init__(config, logger=logger) self.spec = spec self.mail = mail # Constructor injection opcional para tests con FakeConnector. # En producción `connector=None` → se construye uno real desde el spec. self.imap_source = connector or IMAPConnector.from_spec(spec, mail, logger=self.log)
def ingest(self) -> StageResult: return ingest_with_connector(self.imap_source, self.config, self.log)
def final(self, inter) -> StageResult: out_path = build_excel_from_raw_<name>( storage=self.spec.raw.storage, base_dir=self.spec.raw.base_dir, dataset_name=self.spec.raw.dataset, since=self.config.fecha_desde, before=self.config.fecha_hasta, ) return StageResult( stage=Stage.FINAL, status="ok" if out_path else "skipped", data={"out_path": out_path}, metrics={"out_path": str(out_path) if out_path else None}, )Patrón Hexagonal F2.1 (vigente). Este es el patrón validado en producción con
alertas_waypoint(sesión 2026-05-27, ver Pipeline-Alertas-Waypoint). Los pipelines nuevos DEBEN usarIMAPConnectordirecto +ingest_with_connector, no el shim legacyrun_imap_ingest. El shim sigue existiendo para los 5 pipelines IMAP aún no migrados (ver Plan-Migracion-F2.2-F2.6) pero será eliminado tras F2.6.
Reglas clave:
- Sobrescribe solo las etapas que el pipeline implementa. El resto hereda
skipped. - Cada etapa devuelve un
StageResult. Eldatase pasa a la siguiente. - Delega la transformación a
process_<name>_from_raw.pyobusiness_<name>.py— no la metas dentro del método de la etapa.
Detalles completos del contrato (signature exacta de cada etapa, campos de PipelineConfig y StageResult, comportamiento de run(), short-circuits, schema YAML, métricas auto-persistidas): Reference-BasePipeline.
Paso 3 — Lógica de procesamiento
Sección titulada «Paso 3 — Lógica de procesamiento»domains/<name>/process_<name>_from_raw.py:
"""Lee RAW parquet, parsea, escribe Excel."""from __future__ import annotationsfrom typing import Optionalfrom core.raw_loaders import load_parquet # o load_sqlite / load_jsonl
def build_excel_from_raw_<name>( storage: str = "parquet", base_dir: str = "./data_raw", dataset_name: str = "emails_<name>_raw", since: Optional[str] = None, before: Optional[str] = None, output_dir: Optional[str] = None,) -> Optional[str]: """Carga RAW → parsea → escribe Excel. Retorna out_path o None.""" df = load_parquet(base_dir, dataset_name, since, before) if df.empty: return None
# ... transformación ...
out_path = "SALIDAS/<name>_..." # ... escribir Excel ... return out_pathEsta separación (pipeline.py = orquesta, process_*.py = transforma) es la convención del proyecto. Permite testear la transformación sin levantar IMAP.
Paso 4 — CLI standalone
Sección titulada «Paso 4 — CLI standalone»main_<name>.py:
"""CLI standalone para el pipeline <name>."""from __future__ import annotationsimport argparsefrom datetime import datetime# ... imports estándar de pipeline ...
DEFAULT_FECHA_DESDE = "01-01-2026"
def build_config(args: argparse.Namespace): """Construye PipelineConfig desde args CLI. Llamada por run_pipeline.py.""" # ... parsear fechas, leer spec, devolver PipelineConfig ...
def main(): parser = argparse.ArgumentParser(...) parser.add_argument("--FECHA_DESDE", default=DEFAULT_FECHA_DESDE) parser.add_argument("--FECHA_HASTA", default=None) parser.add_argument("--all", action="store_true") parser.add_argument("--offline", action="store_true") parser.add_argument("--verbose", "-v", action="store_true") # ... args = parser.parse_args() # ... ejecutar pipeline ...
if __name__ == "__main__": main()Copia y adapta main_alertas_waypoint.py como template.
Paso 5 — Registrar en el orquestador
Sección titulada «Paso 5 — Registrar en el orquestador»Editar run_pipeline.py:
5a. Agregar a PIPELINES_V2
Sección titulada «5a. Agregar a PIPELINES_V2»PIPELINES_V2: dict[str, PipelineEntry] = { # ... existentes ... "<name>": PipelineEntry( name="<name>", spec_name="<name>", usuario="<name>", # o nombre real del usuario IMAP pipeline_class_path="domains.<name>.pipeline:<Name>Pipeline", build_config_path="main_<name>:build_config", needs_mail=True, # False si no usa IMAP accepts_dates=True, accepts_offline=True, ),}5b. Agregar a ALL_PIPELINE_ORDER
Sección titulada «5b. Agregar a ALL_PIPELINE_ORDER»Decide la posición según dependencias:
ALL_PIPELINE_ORDER: list[str] = [ "pedidos_HES", "facturacion", # ... <name> aquí, en el orden adecuado ...]5c. Agregar a _PIPELINE_WAVES
Sección titulada «5c. Agregar a _PIPELINE_WAVES»_PIPELINE_WAVES: list[list[str]] = [ ["fetch_saldos", "fetch_cierres"], ["pedidos_HES", "facturacion", ..., "<name>"], # wave 2: paralelo]Pon el pipeline en la wave donde todas sus dependencias ya completaron. Si no depende de nada, va a wave 2 (la wave principal).
Paso 6 — Documentación
Sección titulada «Paso 6 — Documentación»6a. Crear ficha en PIPELINES/
Sección titulada «6a. Crear ficha en PIPELINES/»Pipeline-<Name>.md en el vault. Usa Pipeline-Alertas-Waypoint como template. Debe incluir:
- Frontmatter con
portal:(ejecutiva + técnica para el portal Angular) - Origen de los datos
- Estructura del input (correo, archivo, etc.)
- Procesamiento aplicado
- Salida (rutas, hojas, columnas)
- Comandos de ejecución
- Cuenta IMAP (si aplica)
6b. Actualizar Orquestador-Pipelines.md
Sección titulada «6b. Actualizar Orquestador-Pipelines.md»Agregar fila a la tabla de pipelines registrados.
6c. Crear sesión
Sección titulada «6c. Crear sesión»03-Sesiones/sesion-YYYY-MM-DD-<name>.md con qué se hizo, qué se descubrió, próximos pasos.
Paso 7 — Validar
Sección titulada «Paso 7 — Validar»# Ejecución aislada — verifica que el pipeline correpython main_<name>.py --FECHA_DESDE 01-05-2026 --FECHA_HASTA 18-05-2026 --verbose
# Reproceso offline — verifica que las etapas posteriores a ingest funcionanpython main_<name>.py --offline
# Vía orquestador — verifica el registropython run_pipeline.py <name>
# En el flujo completopython run_pipeline.py --allVerificar que:
- Se generó la salida esperada (Excel/GSheets/etc.)
- Se registró la corrida en
ingeldata.db.pipeline_runs - Las etapas dejaron tiempos y métricas en
pipeline_stage_runs
Checklist final
Sección titulada «Checklist final»Antes de commitear:
- YAML spec creado y válido
- Subclase
BasePipelineimplementa al menosingest(si IMAP) yfinal -
process_<name>_from_raw.pysepara la transformación de la orquestación -
main_<name>.pycon CLI estándar (FECHA_DESDE, FECHA_HASTA, —all, —offline, —verbose) - Registrado en
PIPELINES_V2,ALL_PIPELINE_ORDER,_PIPELINE_WAVES - Ficha de documentación en
PIPELINES/Pipeline-<Name>.md - Tabla del orquestador actualizada
- Primer run exitoso con datos reales
- Métricas y registro en
pipeline_runsverificados
Cosas que NO debes hacer
Sección titulada «Cosas que NO debes hacer»- No mezcles fuentes: si un pipeline lee de IMAP Y de Google Sheets, separa en dos pipelines o usa
extras:para parametrizar — no metas dos flujos en una clase. - No omitas
process_<name>_from_raw.py: la separación pipeline/proceso es la convención. No metas el parsing dentro del métodofinal(). - No hardcodees credenciales ni paths absolutos en el código del pipeline. Todo va en
config.pyo el YAML. - No persistas a
data_warehouse.dbdirectamente. El pipeline escribe a su salida (Excel /ingeldata.dboperacional / GSheets). La consolidación analítica es un paso aparte (ver ADR-Pipeline-V2-Framework §Consecuencias). - No saltes el registro: un pipeline que no está en
PIPELINES_V2es invisible para el orquestador y para--all.alertas_waypointsufrió este olvido al inicio (construido pero sin registrar) — fue corregido en sesión 2026-05-18.
Referencias
Sección titulada «Referencias»- ADR-Pipeline-V2-Framework — diseño y trade-offs
- Orquestador-Pipelines — registro completo y waves
- Pipeline-Alertas-Waypoint — pipeline V2 más reciente (referencia viva)
core/pipeline.py— el contrato en códigocore/pipeline_loader.py— parser YAMLcore/sources/imap/—IMAPConnector+ helperingest_with_connector(patrón Hexagonal F2/F2.1)core/imap_ingest_shared.py(legacy) — shimrun_imap_ingestpara pipelines aún no migrados; ver Plan-Migracion-F2.2-F2.6- ADR-Source-Connectors — el por qué del patrón Hexagonal (status
accepted)