Ir al contenido

ADR: Source Connectors — Hexagonal architecture para fuentes de datos

Aceptada — F1, F2 y F2.1 implementados y archivados (2026-05-27). 2 adapters reales en producción: IMAPConnector + helper compartido ingest_with_connector. alertas_waypoint es el primer pipeline usando el patrón Hexagonal directo (constructor injection + helper, sin shim run_imap_ingest). Engram archives: F1 #1238, F2 #1248, F2.1 #1256. Roadmap restante en Plan-Migracion-F2.2-F2.6.

Los pipelines de IngelCoding consumen datos de fuentes heterogéneas:

FuentePipelines que la usanDónde vive el código hoy
IMAPfacturacion, pedidos_HES, pedidos_SAP, valorizaciones, gantt, alertas_waypointParcialmente unificado en core/imap_ingest_shared.py
Excel localfacturacion (productividad), scripts/db/data_warehouse.pydomains/facturacion/productividad_excel_parser.py + scripts sueltos
PDF adjuntofacturacion (facturas + notas de crédito)domains/facturacion/parse_pdf_facturas.py, parse_pdf_NCred.py
Google Sheetsscripts/db/data_warehouse.py, varios publisherscore/build_cloud_dataframes.py, core/publish/
CSV localgantt (gantt_normalizado_powerbi.csv)Hardcodeado en scripts/db/data_warehouse.py
SQLite (otra DB)dim_proceso reconcilercore/dim_proceso/adapters/ ← único módulo con patrón adapter formalizado

Problemas concretos derivados de esta dispersión:

  1. Reutilización imposible: el parser de PDF de facturas está atrapado en domains/facturacion/. Si mañana un pipeline nuevo (ej. alertas con PDF adjunto) necesita extraer PDFs, hay que copiar código o importar desde otro dominio.

  2. Acoplamiento del dominio a la infra: domains/facturacion/pipeline.py sabe de PDFs, de Excel local y de IMAP. La lógica de negocio de facturación queda enredada con detalles de cómo se trae el dato.

  3. Tests dolorosos: para probar la lógica de negocio de facturación hay que mockear IMAP, Excel y PDF al mismo tiempo. No hay FakeSource.

  4. Cada pipeline reinventa: gantt tiene su CSV reader, valorizaciones tiene su PDF reader, productividad tiene su Excel reader. Casi nadie comparte código.

  5. Programación agéntica sufre: un agente que recibe “agregar pipeline desde PDF” no tiene dónde mirar — el código de PDFs está disperso en domains/facturacion/ mezclado con lógica de negocio.

Hay un patrón emergente y parcial en el proyecto: core/imap_ingest_shared.py (compartido entre pipelines IMAP), core/raw_loaders.py (parquet/jsonl/sqlite loaders), core/dim_proceso/adapters/ (Strategy pattern correcto). La pregunta es: ¿formalizamos este patrón como arquitectura del proyecto, o seguimos con módulos sueltos?

Opción A: Statu quo — módulos compartidos cuando duela

Sección titulada «Opción A: Statu quo — módulos compartidos cuando duela»

Seguir extrayendo helpers a core/ cuando dos pipelines necesiten lo mismo.

Descartada porque:

  • Reactivo, no proactivo: el código compartido aparece tarde y mal ubicado.
  • No hay contrato — cada helper tiene su API ad-hoc.
  • Los nuevos pipelines no encuentran el patrón al buscar y reinventan.
  • Imposible para agentes generar pipelines nuevos consistentes.

Opción B: Hexagonal con SourceConnector (ELEGIDA)

Sección titulada «Opción B: Hexagonal con SourceConnector (ELEGIDA)»

Formalizar un puerto (SourceConnector Protocol) con adaptadores concretos por tipo de fuente. Estructura core/sources/{imap, excel, gsheets, pdf, csv, sqlite}/ con contrato uniforme.

Elegida porque:

  • Define un único lugar donde vive el código de cada fuente.
  • El dominio del pipeline queda limpio: solo lógica de negocio.
  • Tests triviales: FakeConnector reemplaza cualquier fuente real.
  • Reutilización por defecto: cualquier pipeline puede declarar “uso PDFConnector”.
  • Agentes pueden razonar sobre el patrón (“nuevo pipeline desde Excel” → adapter Excel ya existe).
  • Compatible con el framework BasePipeline actual: el SourceConnector es lo que ingest() invoca.

Opción C: Framework externo (Singer taps, Airbyte connectors)

Sección titulada «Opción C: Framework externo (Singer taps, Airbyte connectors)»

Adoptar un estándar de conectores ya existente.

Descartada porque:

  • Sobredimensionado para ≤10 pipelines.
  • Cada conector externo asume un runtime (containerizado, JSON Schema, etc.) que no aplica.
  • Los datos privados (IMAP de cuentas internas, PDFs propietarios) no se prestan a un protocolo público.
  • El patrón Hexagonal local es 90% del valor con 5% del esfuerzo.
  • Si en el futuro se justifica migrar a Airbyte, los adaptadores Hexagonal son envolvibles trivialmente.

Opción B — Patrón Hexagonal (Ports & Adapters) con SourceConnector como puerto y un adaptador concreto por tipo de fuente. Estructura core/sources/.

core/sources/base.py
from typing import Protocol, TypeVar, Generic
class FetchParams(Protocol):
"""Parámetros de extracción. Cada source define su shape concreto."""
...
class RawData(Protocol):
"""Resultado crudo de la extracción. Cada source define el shape."""
...
class SourceConnector(Protocol):
"""Puerto: contrato que toda fuente debe cumplir."""
def fetch(self, params: FetchParams) -> RawData:
"""Trae el dato desde la fuente externa."""
...
def store(self, raw: RawData, target: Path, format: str) -> StoredRef:
"""Persiste el raw localmente (parquet/jsonl/sqlite)."""
...
def hash(self, params: FetchParams) -> str:
"""Hash determinístico de los params — alimenta raw_files.query_hash."""
...
def describe(self) -> SourceMetadata:
"""Metadata para lineage: tipo, versión, parámetros normalizados."""
...
core/sources/
├── base.py # Protocol SourceConnector + tipos comunes
├── imap/
│ ├── __init__.py
│ ├── connector.py # IMAPConnector implements SourceConnector
│ ├── query_builder.py # construye queries IMAP
│ └── shared_ingest.py # absorbe el actual imap_ingest_shared.py
├── excel/
│ ├── __init__.py
│ ├── connector.py # ExcelConnector (file local o OneDrive)
│ └── parsers/ # parsers por template (factura, productividad, etc.)
├── gsheets/
│ ├── __init__.py
│ ├── connector.py # GSheetsConnector (gspread + service account)
│ └── readers.py
├── pdf/
│ ├── __init__.py
│ ├── connector.py # PDFConnector
│ └── extractors/ # absorbe parse_pdf_facturas, parse_pdf_NCred
├── csv/
│ ├── __init__.py
│ └── connector.py
└── sqlite/
├── __init__.py
└── connector.py # lee de otra DB (caso dim_proceso reconciler)

Un pipeline declara su(s) source(s) en el constructor:

class FacturacionPipeline(BasePipeline):
def __init__(self, config, *, spec, logger=None):
super().__init__(config, logger=logger)
self.spec = spec
self.imap_source = IMAPConnector.from_spec(spec.imap)
self.pdf_source = PDFConnector(extractor="factura_v2")
# ...
def ingest(self) -> StageResult:
raw = self.imap_source.fetch(self._fetch_params())
stored_ref = self.imap_source.store(raw, target=..., format="parquet")
return StageResult(
stage=Stage.INGEST,
status="ok",
data=stored_ref,
metrics={"emails": len(raw), "query_hash": self.imap_source.hash(...)},
)

El método final() puede invocar otro source para enriquecer:

def final(self, inter) -> StageResult:
df = inter.data
# Enriquecer con productividad (Excel local)
excel_source = ExcelConnector(template="productividad")
productividad = excel_source.fetch(...)
df = enrich_with_productividad(df, productividad)
# ...

Metadata uniforme alimentada automáticamente

Sección titulada «Metadata uniforme alimentada automáticamente»

Cada SourceConnector provee la metadata que las tablas de observabilidad necesitan:

Tabla en ingeldata.dbCampoLo provee SourceConnector
raw_filesquery_hashconnector.hash(params)
raw_filespipeline_nameinyectado por BasePipeline.run()
pipeline_stage_runs.metrics_jsonextraction metricsconnector.fetch() retorna métricas en RawData
data_contracts (futuro)schema_hashconnector.describe().schema_hash

Esto elimina la disciplina manual: hoy un pipeline puede olvidar registrar query_hash; con el conector, es automático.

Estado actual (lo que ya existe en esa línea)

Sección titulada «Estado actual (lo que ya existe en esa línea)»
Componente actualUbicaciónEncaja enAcción
core/imap_ingest_shared.pycore/core/sources/imap/shared_ingest.pyHecho en F2 — movido + renombrado; shim run_imap_ingest preservado para backward-compat
core/ingest_imap.pycore/Reducido a 13 re-exportsHecho en F2 — borrar tras F2.6 cuando todos los pipelines IMAP estén migrados
domains/alertas_waypoint/pipeline.pydomains/Usa IMAPConnector directoHecho en F2.1 — pipeline-referencia del patrón Hexagonal
core/raw_loaders.pycore/Se mantiene — es post-source (lee de raw persistido)Sin cambios
core/dim_proceso/adapters/core/dim_proceso/Caso especial: adapters de fuentes específicas para dim_proceso. Buen ejemplo del patrón.Mantener; opcionalmente refactorizar para usar SQLiteConnector
core/build_cloud_dataframes.pycore/core/sources/gsheets/readers.py (parcialmente)Migrar gradualmente
core/publish/core/NO — los publishers son la cara opuesta (sink, no source). ADR separada.Mantener fuera del scope
domains/facturacion/parse_pdf_*.pydomains/facturacion/core/sources/pdf/extractors/Migrar
domains/facturacion/productividad_excel_parser.pydomains/facturacion/core/sources/excel/parsers/productividad.pyMigrar

La migración no es big-bang. Cada fase deja el sistema funcionando:

FaseAlcanceEsfuerzoBeneficio
F1Crear core/sources/base.py con el Protocol y los tipos. Sin migrar nada todavía.1 sesiónContrato definido, sin riesgo
F2Migrar IMAP: imap_ingest_shared.pycore/sources/imap/. Mantener API actual como wrapper.1 sesiónValidar el patrón en el caso más usado
F3Migrar PDF extractors de facturación a core/sources/pdf/extractors/. Refactorizar FacturacionPipeline para usar PDFConnector.1-2 sesionesLiberar domains/facturacion/ de detalle de PDFs
F4Crear ExcelConnector y migrar productividad_excel_parser.py.1 sesiónPermite que data_warehouse.py use el conector estándar
F5Crear GSheetsConnector y migrar lecturas de build_cloud_dataframes.py.2 sesionesCentraliza credenciales y lógica de retry
F6Crear CSVConnector (caso gantt) y SQLiteConnector (caso dim_proceso).1 sesiónCierra el set de fuentes actuales
F7Actualizar guía Como-Crear-Pipeline-Nuevo para usar conectores. Scaffolder python scaffold_pipeline.py <name> --source imap.1 sesiónCristaliza el patrón para uso futuro

Total estimado: 7-9 sesiones. Cada fase es independiente y commit-able.

FaseEstadoEngram archiveNotas
F1 — core/sources/base.py✅ Completada#1238Protocol + Mixin + 4 dataclasses; 12/12 tests
F2 — IMAPConnector adapter✅ Completada#124815/15 tests + 21 patches en 5 tests legacy migrados; shim conserva backward-compat
F2.1 — alertas_waypoint migrado✅ Completada#1256Primer pipeline en patrón Hexagonal directo; helper ingest_with_connector extraído
F2.2-F2.6 — 5 pipelines IMAP restantes⏸ PendienteVer Plan-Migracion-F2.2-F2.6
F3 — PDFConnector (probable)⏸ Pendiente
F4-F6 — Excel/GSheets/CSV/SQLite⏸ Pendiente
F7 — guía + scaffolder🟡 ParcialGuía Como-Crear-Pipeline-Nuevo ya muestra patrón Hexagonal; scaffolder aún no
  • Dominio limpio: domains/*/pipeline.py describe qué hace el pipeline, no cómo lee. Reduce el archivo a 30-80 líneas típicamente.
  • Reutilización por defecto: agregar un pipeline desde una fuente existente = línea de instanciación, no copiar parsers.
  • Tests reales del dominio: FakeIMAPConnector(emails=[...]) reemplaza un servidor IMAP en tests.
  • Metadata automática: query_hash, schema_hash, extraction_metrics se llenan solos.
  • Agentes pueden generar pipelines correctos: la convención uniforme + el scaffolder bajan el error rate a casi cero.
  • Lineage cross-pipeline posible: si todos los pipelines pasan por conectores nombrados, el grafo de dependencias se puede construir automáticamente.
  • Compatible con ADR-Pipeline-V2-Framework: el conector es lo que ingest() invoca; no rompe el contrato existente.
  • Patrón validado en producción (F2.1, 2026-05-27): alertas_waypoint ya corre con IMAPConnector directo + constructor injection opcional + helper ingest_with_connector. 5 tests de integración verdes, 0 regresiones, backward-compat 100% para los 5 pipelines IMAP restantes que aún usan el shim. Esto baja a “ejecución mecánica” la migración F2.2-F2.6 (ver Plan-Migracion-F2.2-F2.6).
  • Capa de indirección: leer un pipeline ahora requiere abrir el conector también. Trade-off justificado porque el conector concentra el detalle ruidoso.
  • Migración progresiva: durante meses convivirán el código viejo (parsers en domains/) y el nuevo (en core/sources/). Hay que tolerar la duplicación temporal.
  • Falsos positivos de generalización: forzar todo a SourceConnector puede ser awkward para fuentes únicas/irregulares (ej. scrape ad-hoc). La salida es: si el caso es único y no se va a reutilizar, está bien dejarlo en domains/.
  • core/raw_loaders.py se mantiene como capa post-source (lee del raw ya persistido). No es un source; es lo que ven las etapas process/intermediate/final.
  • core/publish/ es la cara opuesta (sinks). Merece ADR propia con el mismo patrón Hexagonal pero en dirección output.

Es Hexagonal Architecture (Alistair Cockburn, 2005) aplicada a un proyecto de scale moderado. Equivalente a Ports & Adapters de DDD:

  • Puerto: SourceConnector Protocol — define qué el dominio necesita de cualquier fuente.
  • Adaptadores: implementaciones concretas (IMAP, PDF, Excel…) — traducen la fuente al puerto.
  • Dominio: BasePipeline subclasses — habla solo con puertos, ignora qué adaptador hay detrás.

El patrón es lo opuesto de “scripts que saben de todo”: invierte la dependencia. El dominio no importa de PDF; el adaptador de PDF se inyecta en el dominio.

A escala de IngelCoding (≤10 pipelines, ~5 tipos de fuente), Hexagonal es el sweet spot: suficiente formalismo para escalar a 20-30 pipelines sin reescribir, sin ser tan pesado como un framework externo.

  • ADR-Pipeline-V2-Framework — base sobre la que esta ADR construye
  • Como-Crear-Pipeline-Nuevo — guía a actualizar en Fase 7
  • core/imap_ingest_shared.py — embrión del patrón para IMAP
  • core/dim_proceso/adapters/ — ejemplo Strategy bien hecho que inspira esta ADR
  • ADRs relacionados pendientes:
    • ADR-Maturity-Levels — niveles L1-L5 de madurez del pipeline
    • ADR-Data-Contracts — schema_hash + freshness SLA
    • ADR-Sink-Connectors — la cara opuesta (publishers) con el mismo patrón
    • ADR-Connector-Statefulness — cuándo se permiten connectors con estado
  • SDD artifacts de la Fase 1: openspec/changes/source-connectors-f1/ (repo) — proposal, spec (4 reqs + 1 NFR + 12 scenarios), design, tasks, apply-progress, verify-report. Engram IDs: explore #1231, proposal #1232, spec #1233, design #1234, tasks #1235, apply #1236, verify #1237, archive #1238.
  • SDD artifacts de la Fase 2: openspec/changes/source-connectors-f2/ (repo) + openspec/specs/imap-connector/spec.md (7 Requirements iniciales). Archive Engram #1248.
  • SDD artifacts de la Fase 2.1: openspec/changes/source-connectors-f2.1/ — spec imap-connector ampliado a 8 Requirements (R8 ADDED + R7 MODIFIED con scenario s4 “shim delega”). Archive Engram #1256.
  • Plan-Migracion-F2.2-F2.6 — roadmap detallado de los 5 pipelines IMAP que aún usan el shim.
  • Pipeline-Alertas-Waypoint — pipeline-referencia del patrón Hexagonal (post-F2.1).
  • sesion-2026-05-27-source-connectors-f2-f2.1 — narrativa completa de los ciclos F2 y F2.1.