ADR: Source Connectors — Hexagonal architecture para fuentes de datos
Aceptada — F1, F2 y F2.1 implementados y archivados (2026-05-27). 2 adapters reales en producción: IMAPConnector + helper compartido ingest_with_connector. alertas_waypoint es el primer pipeline usando el patrón Hexagonal directo (constructor injection + helper, sin shim run_imap_ingest). Engram archives: F1 #1238, F2 #1248, F2.1 #1256. Roadmap restante en Plan-Migracion-F2.2-F2.6.
Contexto
Sección titulada «Contexto»Los pipelines de IngelCoding consumen datos de fuentes heterogéneas:
| Fuente | Pipelines que la usan | Dónde vive el código hoy |
|---|---|---|
| IMAP | facturacion, pedidos_HES, pedidos_SAP, valorizaciones, gantt, alertas_waypoint | Parcialmente unificado en core/imap_ingest_shared.py |
| Excel local | facturacion (productividad), scripts/db/data_warehouse.py | domains/facturacion/productividad_excel_parser.py + scripts sueltos |
| PDF adjunto | facturacion (facturas + notas de crédito) | domains/facturacion/parse_pdf_facturas.py, parse_pdf_NCred.py |
| Google Sheets | scripts/db/data_warehouse.py, varios publishers | core/build_cloud_dataframes.py, core/publish/ |
| CSV local | gantt (gantt_normalizado_powerbi.csv) | Hardcodeado en scripts/db/data_warehouse.py |
| SQLite (otra DB) | dim_proceso reconciler | core/dim_proceso/adapters/ ← único módulo con patrón adapter formalizado |
Problemas concretos derivados de esta dispersión:
-
Reutilización imposible: el parser de PDF de facturas está atrapado en
domains/facturacion/. Si mañana un pipeline nuevo (ej. alertas con PDF adjunto) necesita extraer PDFs, hay que copiar código o importar desde otro dominio. -
Acoplamiento del dominio a la infra:
domains/facturacion/pipeline.pysabe de PDFs, de Excel local y de IMAP. La lógica de negocio de facturación queda enredada con detalles de cómo se trae el dato. -
Tests dolorosos: para probar la lógica de negocio de facturación hay que mockear IMAP, Excel y PDF al mismo tiempo. No hay
FakeSource. -
Cada pipeline reinventa: gantt tiene su CSV reader, valorizaciones tiene su PDF reader, productividad tiene su Excel reader. Casi nadie comparte código.
-
Programación agéntica sufre: un agente que recibe “agregar pipeline desde PDF” no tiene dónde mirar — el código de PDFs está disperso en
domains/facturacion/mezclado con lógica de negocio.
Hay un patrón emergente y parcial en el proyecto: core/imap_ingest_shared.py (compartido entre pipelines IMAP), core/raw_loaders.py (parquet/jsonl/sqlite loaders), core/dim_proceso/adapters/ (Strategy pattern correcto). La pregunta es: ¿formalizamos este patrón como arquitectura del proyecto, o seguimos con módulos sueltos?
Opciones evaluadas
Sección titulada «Opciones evaluadas»Opción A: Statu quo — módulos compartidos cuando duela
Sección titulada «Opción A: Statu quo — módulos compartidos cuando duela»Seguir extrayendo helpers a core/ cuando dos pipelines necesiten lo mismo.
Descartada porque:
- Reactivo, no proactivo: el código compartido aparece tarde y mal ubicado.
- No hay contrato — cada helper tiene su API ad-hoc.
- Los nuevos pipelines no encuentran el patrón al buscar y reinventan.
- Imposible para agentes generar pipelines nuevos consistentes.
Opción B: Hexagonal con SourceConnector (ELEGIDA)
Sección titulada «Opción B: Hexagonal con SourceConnector (ELEGIDA)»Formalizar un puerto (SourceConnector Protocol) con adaptadores concretos por tipo de fuente. Estructura core/sources/{imap, excel, gsheets, pdf, csv, sqlite}/ con contrato uniforme.
Elegida porque:
- Define un único lugar donde vive el código de cada fuente.
- El dominio del pipeline queda limpio: solo lógica de negocio.
- Tests triviales:
FakeConnectorreemplaza cualquier fuente real. - Reutilización por defecto: cualquier pipeline puede declarar “uso
PDFConnector”. - Agentes pueden razonar sobre el patrón (“nuevo pipeline desde Excel” → adapter Excel ya existe).
- Compatible con el framework
BasePipelineactual: elSourceConnectores lo queingest()invoca.
Opción C: Framework externo (Singer taps, Airbyte connectors)
Sección titulada «Opción C: Framework externo (Singer taps, Airbyte connectors)»Adoptar un estándar de conectores ya existente.
Descartada porque:
- Sobredimensionado para ≤10 pipelines.
- Cada conector externo asume un runtime (containerizado, JSON Schema, etc.) que no aplica.
- Los datos privados (IMAP de cuentas internas, PDFs propietarios) no se prestan a un protocolo público.
- El patrón Hexagonal local es 90% del valor con 5% del esfuerzo.
- Si en el futuro se justifica migrar a Airbyte, los adaptadores Hexagonal son envolvibles trivialmente.
Decisión
Sección titulada «Decisión»Opción B — Patrón Hexagonal (Ports & Adapters) con SourceConnector como puerto y un adaptador concreto por tipo de fuente. Estructura core/sources/.
El contrato
Sección titulada «El contrato»Puerto base
Sección titulada «Puerto base»from typing import Protocol, TypeVar, Generic
class FetchParams(Protocol): """Parámetros de extracción. Cada source define su shape concreto.""" ...
class RawData(Protocol): """Resultado crudo de la extracción. Cada source define el shape.""" ...
class SourceConnector(Protocol): """Puerto: contrato que toda fuente debe cumplir."""
def fetch(self, params: FetchParams) -> RawData: """Trae el dato desde la fuente externa.""" ...
def store(self, raw: RawData, target: Path, format: str) -> StoredRef: """Persiste el raw localmente (parquet/jsonl/sqlite).""" ...
def hash(self, params: FetchParams) -> str: """Hash determinístico de los params — alimenta raw_files.query_hash.""" ...
def describe(self) -> SourceMetadata: """Metadata para lineage: tipo, versión, parámetros normalizados.""" ...Adaptadores concretos
Sección titulada «Adaptadores concretos»core/sources/├── base.py # Protocol SourceConnector + tipos comunes├── imap/│ ├── __init__.py│ ├── connector.py # IMAPConnector implements SourceConnector│ ├── query_builder.py # construye queries IMAP│ └── shared_ingest.py # absorbe el actual imap_ingest_shared.py├── excel/│ ├── __init__.py│ ├── connector.py # ExcelConnector (file local o OneDrive)│ └── parsers/ # parsers por template (factura, productividad, etc.)├── gsheets/│ ├── __init__.py│ ├── connector.py # GSheetsConnector (gspread + service account)│ └── readers.py├── pdf/│ ├── __init__.py│ ├── connector.py # PDFConnector│ └── extractors/ # absorbe parse_pdf_facturas, parse_pdf_NCred├── csv/│ ├── __init__.py│ └── connector.py└── sqlite/ ├── __init__.py └── connector.py # lee de otra DB (caso dim_proceso reconciler)Integración con BasePipeline
Sección titulada «Integración con BasePipeline»Un pipeline declara su(s) source(s) en el constructor:
class FacturacionPipeline(BasePipeline): def __init__(self, config, *, spec, logger=None): super().__init__(config, logger=logger) self.spec = spec self.imap_source = IMAPConnector.from_spec(spec.imap) self.pdf_source = PDFConnector(extractor="factura_v2") # ...
def ingest(self) -> StageResult: raw = self.imap_source.fetch(self._fetch_params()) stored_ref = self.imap_source.store(raw, target=..., format="parquet") return StageResult( stage=Stage.INGEST, status="ok", data=stored_ref, metrics={"emails": len(raw), "query_hash": self.imap_source.hash(...)}, )El método final() puede invocar otro source para enriquecer:
def final(self, inter) -> StageResult: df = inter.data # Enriquecer con productividad (Excel local) excel_source = ExcelConnector(template="productividad") productividad = excel_source.fetch(...) df = enrich_with_productividad(df, productividad) # ...Metadata uniforme alimentada automáticamente
Sección titulada «Metadata uniforme alimentada automáticamente»Cada SourceConnector provee la metadata que las tablas de observabilidad necesitan:
Tabla en ingeldata.db | Campo | Lo provee SourceConnector |
|---|---|---|
raw_files | query_hash | connector.hash(params) |
raw_files | pipeline_name | inyectado por BasePipeline.run() |
pipeline_stage_runs.metrics_json | extraction metrics | connector.fetch() retorna métricas en RawData |
data_contracts (futuro) | schema_hash | connector.describe().schema_hash |
Esto elimina la disciplina manual: hoy un pipeline puede olvidar registrar query_hash; con el conector, es automático.
Estado actual (lo que ya existe en esa línea)
Sección titulada «Estado actual (lo que ya existe en esa línea)»| Componente actual | Ubicación | Encaja en | Acción |
|---|---|---|---|
core/imap_ingest_shared.py | core/ | core/sources/imap/shared_ingest.py | Hecho en F2 — movido + renombrado; shim run_imap_ingest preservado para backward-compat |
core/ingest_imap.py | core/ | Reducido a 13 re-exports | Hecho en F2 — borrar tras F2.6 cuando todos los pipelines IMAP estén migrados |
domains/alertas_waypoint/pipeline.py | domains/ | Usa IMAPConnector directo | Hecho en F2.1 — pipeline-referencia del patrón Hexagonal |
core/raw_loaders.py | core/ | Se mantiene — es post-source (lee de raw persistido) | Sin cambios |
core/dim_proceso/adapters/ | core/dim_proceso/ | Caso especial: adapters de fuentes específicas para dim_proceso. Buen ejemplo del patrón. | Mantener; opcionalmente refactorizar para usar SQLiteConnector |
core/build_cloud_dataframes.py | core/ | core/sources/gsheets/readers.py (parcialmente) | Migrar gradualmente |
core/publish/ | core/ | NO — los publishers son la cara opuesta (sink, no source). ADR separada. | Mantener fuera del scope |
domains/facturacion/parse_pdf_*.py | domains/facturacion/ | core/sources/pdf/extractors/ | Migrar |
domains/facturacion/productividad_excel_parser.py | domains/facturacion/ | core/sources/excel/parsers/productividad.py | Migrar |
Plan de migración por fases
Sección titulada «Plan de migración por fases»La migración no es big-bang. Cada fase deja el sistema funcionando:
| Fase | Alcance | Esfuerzo | Beneficio |
|---|---|---|---|
| F1 | Crear core/sources/base.py con el Protocol y los tipos. Sin migrar nada todavía. | 1 sesión | Contrato definido, sin riesgo |
| F2 | Migrar IMAP: imap_ingest_shared.py → core/sources/imap/. Mantener API actual como wrapper. | 1 sesión | Validar el patrón en el caso más usado |
| F3 | Migrar PDF extractors de facturación a core/sources/pdf/extractors/. Refactorizar FacturacionPipeline para usar PDFConnector. | 1-2 sesiones | Liberar domains/facturacion/ de detalle de PDFs |
| F4 | Crear ExcelConnector y migrar productividad_excel_parser.py. | 1 sesión | Permite que data_warehouse.py use el conector estándar |
| F5 | Crear GSheetsConnector y migrar lecturas de build_cloud_dataframes.py. | 2 sesiones | Centraliza credenciales y lógica de retry |
| F6 | Crear CSVConnector (caso gantt) y SQLiteConnector (caso dim_proceso). | 1 sesión | Cierra el set de fuentes actuales |
| F7 | Actualizar guía Como-Crear-Pipeline-Nuevo para usar conectores. Scaffolder python scaffold_pipeline.py <name> --source imap. | 1 sesión | Cristaliza el patrón para uso futuro |
Total estimado: 7-9 sesiones. Cada fase es independiente y commit-able.
Estado de adopción (al 2026-05-27)
Sección titulada «Estado de adopción (al 2026-05-27)»| Fase | Estado | Engram archive | Notas |
|---|---|---|---|
F1 — core/sources/base.py | ✅ Completada | #1238 | Protocol + Mixin + 4 dataclasses; 12/12 tests |
F2 — IMAPConnector adapter | ✅ Completada | #1248 | 15/15 tests + 21 patches en 5 tests legacy migrados; shim conserva backward-compat |
F2.1 — alertas_waypoint migrado | ✅ Completada | #1256 | Primer pipeline en patrón Hexagonal directo; helper ingest_with_connector extraído |
| F2.2-F2.6 — 5 pipelines IMAP restantes | ⏸ Pendiente | — | Ver Plan-Migracion-F2.2-F2.6 |
F3 — PDFConnector (probable) | ⏸ Pendiente | — | — |
| F4-F6 — Excel/GSheets/CSV/SQLite | ⏸ Pendiente | — | — |
| F7 — guía + scaffolder | 🟡 Parcial | — | Guía Como-Crear-Pipeline-Nuevo ya muestra patrón Hexagonal; scaffolder aún no |
Consecuencias
Sección titulada «Consecuencias»Positivas
Sección titulada «Positivas»- Dominio limpio:
domains/*/pipeline.pydescribe qué hace el pipeline, no cómo lee. Reduce el archivo a 30-80 líneas típicamente. - Reutilización por defecto: agregar un pipeline desde una fuente existente = línea de instanciación, no copiar parsers.
- Tests reales del dominio:
FakeIMAPConnector(emails=[...])reemplaza un servidor IMAP en tests. - Metadata automática:
query_hash,schema_hash,extraction_metricsse llenan solos. - Agentes pueden generar pipelines correctos: la convención uniforme + el scaffolder bajan el error rate a casi cero.
- Lineage cross-pipeline posible: si todos los pipelines pasan por conectores nombrados, el grafo de dependencias se puede construir automáticamente.
- Compatible con ADR-Pipeline-V2-Framework: el conector es lo que
ingest()invoca; no rompe el contrato existente. - Patrón validado en producción (F2.1, 2026-05-27):
alertas_waypointya corre conIMAPConnectordirecto + constructor injection opcional + helperingest_with_connector. 5 tests de integración verdes, 0 regresiones, backward-compat 100% para los 5 pipelines IMAP restantes que aún usan el shim. Esto baja a “ejecución mecánica” la migración F2.2-F2.6 (ver Plan-Migracion-F2.2-F2.6).
Negativas
Sección titulada «Negativas»- Capa de indirección: leer un pipeline ahora requiere abrir el conector también. Trade-off justificado porque el conector concentra el detalle ruidoso.
- Migración progresiva: durante meses convivirán el código viejo (parsers en
domains/) y el nuevo (encore/sources/). Hay que tolerar la duplicación temporal. - Falsos positivos de generalización: forzar todo a
SourceConnectorpuede ser awkward para fuentes únicas/irregulares (ej. scrape ad-hoc). La salida es: si el caso es único y no se va a reutilizar, está bien dejarlo endomains/.
Neutras
Sección titulada «Neutras»core/raw_loaders.pyse mantiene como capa post-source (lee del raw ya persistido). No es un source; es lo que ven las etapasprocess/intermediate/final.core/publish/es la cara opuesta (sinks). Merece ADR propia con el mismo patrón Hexagonal pero en dirección output.
Fundamento teórico
Sección titulada «Fundamento teórico»Es Hexagonal Architecture (Alistair Cockburn, 2005) aplicada a un proyecto de scale moderado. Equivalente a Ports & Adapters de DDD:
- Puerto:
SourceConnectorProtocol — define qué el dominio necesita de cualquier fuente. - Adaptadores: implementaciones concretas (IMAP, PDF, Excel…) — traducen la fuente al puerto.
- Dominio:
BasePipelinesubclasses — habla solo con puertos, ignora qué adaptador hay detrás.
El patrón es lo opuesto de “scripts que saben de todo”: invierte la dependencia. El dominio no importa de PDF; el adaptador de PDF se inyecta en el dominio.
A escala de IngelCoding (≤10 pipelines, ~5 tipos de fuente), Hexagonal es el sweet spot: suficiente formalismo para escalar a 20-30 pipelines sin reescribir, sin ser tan pesado como un framework externo.
Ver también
Sección titulada «Ver también»- ADR-Pipeline-V2-Framework — base sobre la que esta ADR construye
- Como-Crear-Pipeline-Nuevo — guía a actualizar en Fase 7
core/imap_ingest_shared.py— embrión del patrón para IMAPcore/dim_proceso/adapters/— ejemplo Strategy bien hecho que inspira esta ADR- ADRs relacionados pendientes:
- ADR-Maturity-Levels — niveles L1-L5 de madurez del pipeline
- ADR-Data-Contracts — schema_hash + freshness SLA
- ADR-Sink-Connectors — la cara opuesta (publishers) con el mismo patrón
- ADR-Connector-Statefulness — cuándo se permiten connectors con estado
- SDD artifacts de la Fase 1:
openspec/changes/source-connectors-f1/(repo) — proposal, spec (4 reqs + 1 NFR + 12 scenarios), design, tasks, apply-progress, verify-report. Engram IDs: explore #1231, proposal #1232, spec #1233, design #1234, tasks #1235, apply #1236, verify #1237, archive #1238. - SDD artifacts de la Fase 2:
openspec/changes/source-connectors-f2/(repo) +openspec/specs/imap-connector/spec.md(7 Requirements iniciales). Archive Engram #1248. - SDD artifacts de la Fase 2.1:
openspec/changes/source-connectors-f2.1/— specimap-connectorampliado a 8 Requirements (R8 ADDED + R7 MODIFIED con scenario s4 “shim delega”). Archive Engram #1256. - Plan-Migracion-F2.2-F2.6 — roadmap detallado de los 5 pipelines IMAP que aún usan el shim.
- Pipeline-Alertas-Waypoint — pipeline-referencia del patrón Hexagonal (post-F2.1).
- sesion-2026-05-27-source-connectors-f2-f2.1 — narrativa completa de los ciclos F2 y F2.1.