Meltano has established itself as a robust alternative to proprietary ETL solutions, but many organizations struggle to move beyond basic use cases. This post is a sequel to our previous article about Meltano which explores advanced patterns, optimization techniques, and architectural considerations for deploying Meltano in production environments at scale.
Beyond Standard Connectors: Custom Tap Development
While the Singer ecosystem provides extensive coverage, production environments frequently require custom extractors for proprietary systems or specialized data sources.
Building a Custom Tap
Custom taps follow the Singer protocol, emitting RECORD, STATE, and SCHEMA messages as JSON-formatted streams. Here’s a minimal implementation pattern:
# custom_tap.py
import json
import sys
from datetime import datetime, timedelta
import requests
class CustomTap:
def __init__(self, config):
self.config = config
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {config["api_token"]}'
})
def discover(self):
"""Emit schema for available tables"""
schema = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"created_at": {"type": "string", "format": "date-time"},
"metadata": {"type": "object"}
},
"key_properties": ["id"]
}
catalog = {
"streams": [{
"tap_stream_id": "orders",
"key_properties": ["id"],
"schema": schema,
"replication_key": "created_at",
"replication_method": "INCREMENTAL"
}]
}
print(json.dumps(catalog))
def sync(self, state=None):
"""Extract data using incremental replication"""
last_sync = state.get("bookmarks", {}).get("orders", {}).get("last_sync")
start_date = last_sync or self.config["start_date"]
page = 1
while True:
response = self.session.get(
f"{self.config['base_url']}/orders",
params={"start_date": start_date, "page": page}
)
response.raise_for_status()
data = response.json()
if not data["records"]:
break
for record in data["records"]:
print(json.dumps({
"type": "RECORD",
"stream": "orders",
"record": record,
"time_extracted": datetime.utcnow().isoformat()
}))
state["bookmarks"]["orders"]["last_sync"] = record["created_at"]
page += 1
print(json.dumps({
"type": "STATE",
"value": state
}))
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", required=True)
parser.add_argument("-d", "--discover", action="store_true")
parser.add_argument("-s", "--state", default="{}")
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
tap = CustomTap(config)
if args.discover:
tap.discover()
else:
state = json.loads(args.state)
tap.sync(state)
This pattern enables incremental replication with state management, critical for handling large datasets efficiently.
Advanced Meltano Configuration and Plugin Development
Plugin Metadata and Custom Properties
Meltano’s plugin system extends beyond standard connectors. Custom plugins with metadata-driven behavior provide flexibility:
plugins:
extractors:
- name: tap-custom-api
variant: custom
pip_url: git+https://github.com/org/tap-custom-api.git@main
executable: tap-custom-api
config:
base_url: https://api.example.com
request_timeout: 30
max_retries: 3
backoff_strategy: exponential
settings:
- name: api_token
kind: password
value: ${CUSTOM_API_TOKEN}
- name: start_date
kind: date_iso8601
value: 2023-01-01T00:00:00Z
- name: batch_size
kind: integer
value: 1000
transformers:
- name: dbt-custom
variant: dbt-postgres
pip_url: dbt-core~=1.6.0 dbt-postgres~=1.6.0
commands:
debug:
args: debug
run:
args: run --select state:modified+
container: false
test:
args: test --fail-fast
Custom Mappers for Data Normalization
Mappers transform data stream structure before loading. Create custom mappers for domain-specific transformations:
# custom_mapper.py
import json
import sys
from typing import Dict, Any
class CustomMapper:
"""Flatten nested objects and normalize field names"""
def __init__(self):
self.field_mapping = {
"user_info.email": "user_email",
"user_info.created_at": "user_created_at",
"metadata.tags": "tags"
}
def transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""Flatten and normalize record structure"""
result = {}
# Handle nested paths
for target_field, source_path in self.field_mapping.items():
value = self._get_nested(record, source_path)
if value is not None:
result[target_field] = value
# Copy remaining top-level fields
for key, value in record.items():
if key not in self.field_mapping.values():
result[key] = value
return result
@staticmethod
def _get_nested(obj: Dict, path: str) -> Any:
"""Access nested dictionary values using dot notation"""
for key in path.split('.'):
if isinstance(obj, dict):
obj = obj.get(key)
else:
return None
return obj
mapper = CustomMapper()
for line in sys.stdin:
msg = json.loads(line)
if msg["type"] == "RECORD":
msg["record"] = mapper.transform_record(msg["record"])
print(json.dumps(msg))
State Management and Incremental Replication at Scale
Proper state management is critical for scalable pipelines. Beyond simple bookmarks, advanced strategies handle complex scenarios:
Distributed State Across Multiple Streams
# advanced_state_management.py
import json
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
class DistributedStateManager:
def __init__(self, state_backend="postgres"):
self.state_backend = state_backend
self.batch_states = {}
def create_partition_state(self, stream_name: str, partition_key: str,
value: Any) -> Dict:
"""Create granular state for partitioned data"""
return {
"stream": stream_name,
"partition": partition_key,
"value": value,
"updated_at": datetime.utcnow().isoformat(),
"version": 1
}
def batch_process_with_checkpoints(self, extractor, batch_size: int = 10000):
"""Process data in batches with intermediate state commits"""
batch = []
batch_number = 0
for record in extractor.extract():
batch.append(record)
if len(batch) >= batch_size:
yield {
"type": "BATCH",
"batch_number": batch_number,
"records": batch,
"checkpoint": {
"processed": batch_size * (batch_number + 1),
"timestamp": datetime.utcnow().isoformat()
}
}
batch = []
batch_number += 1
# Final batch
if batch:
yield {
"type": "BATCH",
"batch_number": batch_number,
"records": batch,
"checkpoint": {
"processed": batch_size * batch_number + len(batch),
"timestamp": datetime.utcnow().isoformat()
}
}
Performance Optimization Techniques
Parallel Tap Execution with Process Pooling
For independent data sources, parallel extraction significantly reduces runtime:
# meltano.yml
run_steps:
parallel_extract:
commands:
- meltano run tap-salesforce target-warehouse
- meltano run tap-stripe target-warehouse
- meltano run tap-hubspot target-warehouse
pipelines:
- id: multi-source-parallel
steps:
- run_step: parallel_extract
Combined with run_dir configurations for isolation:
meltano run --jobs 3 \
tap-salesforce target-warehouse \
tap-stripe target-warehouse \
tap-hubspot target-warehouse
Query Optimization in Taps
Implement efficient data extraction patterns:
def extract_with_partitioning(self, start_date, end_date, partition_days=7):
"""Extract large datasets using time-based partitioning"""
current = start_date
while current < end_date:
partition_end = min(current + timedelta(days=partition_days), end_date)
records = self.query_api(
f"SELECT * FROM orders WHERE created_at >= '{current}' AND created_at < '{partition_end}'",
order_by="created_at ASC",
page_size=5000
)
for record in records:
yield record
current = partition_end
Advanced dbt Integration and Testing
Semantic Versioning with dbt
Implement versioned models for backward compatibility:
-- models/marts/fact_orders_v1.sql
{{ config(
version=1,
materialized='table',
on_schema_change='fail'
) }}
SELECT
order_id,
customer_id,
order_total,
created_at
FROM {{ ref('stg_orders') }}
-- models/marts/fact_orders_v2.sql
{{ config(
version=2,
materialized='table',
on_schema_change='fail'
) }}
SELECT
order_id,
customer_id,
order_total,
tax_amount,
net_total,
created_at,
updated_at
FROM {{ ref('stg_orders') }}
WHERE deleted_at IS NULL
Contract Testing in dbt
Define and enforce data contracts:
# models/marts/fact_orders.yml
models:
- name: fact_orders
config:
contract:
enforced: true
columns:
- name: order_id
data_tests:
- not_null
- unique
description: Primary key
data_type: bigint
- name: customer_id
data_tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
data_type: bigint
- name: order_total
data_tests:
- dbt_utils.expression_is_true:
expression: "order_total >= 0"
data_type: numeric
Production Deployment Patterns
Environment-Specific Configuration
# meltano.yml with environment overrides
env:
dev:
MELTANO_ENVIRONMENT: dev
PG_HOST: localhost
PG_PORT: 5432
PG_DATABASE: meltano_dev
staging:
MELTANO_ENVIRONMENT: staging
PG_HOST: postgres-staging.internal
PG_PORT: 5432
PG_DATABASE: meltano_staging
MELTANO_LOG_LEVEL: debug
prod:
MELTANO_ENVIRONMENT: prod
PG_HOST: postgres-prod.internal
PG_PORT: 5432
PG_DATABASE: meltano_prod
MELTANO_LOG_LEVEL: info
Health Checks and Monitoring
# monitoring.py
import logging
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
@dataclass
class PipelineHealthMetrics:
pipeline_name: str
status: str # success, warning, failure
records_extracted: int
records_loaded: int
duration_seconds: float
last_run: str
next_run: str
error_message: str = None
class PipelineMonitor:
def __init__(self, metrics_backend):
self.backend = metrics_backend
self.logger = logging.getLogger(__name__)
def record_pipeline_execution(self, metrics: PipelineHealthMetrics):
"""Record pipeline metrics for monitoring"""
self.backend.insert("pipeline_metrics", asdict(metrics))
# Alert if extraction failed
if metrics.status == "failure":
self.alert(f"Pipeline {metrics.pipeline_name} failed: {metrics.error_message}")
# Warn if significantly slower than average
avg_duration = self.backend.get_average_duration(metrics.pipeline_name)
if metrics.duration_seconds > avg_duration * 1.5:
self.logger.warning(f"Pipeline {metrics.pipeline_name} running slow")
def validate_freshness(self, pipeline_name: str, max_age_hours: int):
"""Verify data is within SLA"""
last_run = self.backend.get_last_run(pipeline_name)
age = datetime.utcnow() - last_run
if age > timedelta(hours=max_age_hours):
self.alert(f"Pipeline {pipeline_name} data is stale")
return False
return True
Orchestration with Airflow
# dags/meltano_pipeline.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'meltano_multi_source',
default_args=default_args,
schedule_interval='0 2 * * *',
max_active_runs=1,
tags=['meltano', 'production']
)
extract_salesforce = BashOperator(
task_id='extract_salesforce',
bash_command='meltano run tap-salesforce target-warehouse',
env={'MELTANO_ENVIRONMENT': 'prod'},
dag=dag
)
extract_stripe = BashOperator(
task_id='extract_stripe',
bash_command='meltano run tap-stripe target-warehouse',
env={'MELTANO_ENVIRONMENT': 'prod'},
dag=dag
)
transform = BashOperator(
task_id='transform_dbt',
bash_command='meltano invoke dbt:run --models staging.* marts.*',
env={'MELTANO_ENVIRONMENT': 'prod'},
dag=dag
)
test = BashOperator(
task_id='test_dbt',
bash_command='meltano invoke dbt:test --fail-fast',
env={'MELTANO_ENVIRONMENT': 'prod'},
dag=dag
)
[extract_salesforce, extract_stripe] >> transform >> test
Advanced Troubleshooting and Debugging
Deep Logging and Trace Analysis
# debug_runner.py
import logging
import sys
import json
from collections import defaultdict
class PipelineDebugger:
def __init__(self, log_level=logging.DEBUG):
self.logger = logging.getLogger('meltano')
self.logger.setLevel(log_level)
self.stats = defaultdict(int)
def analyze_tap_output(self, stdin):
"""Analyze Singer messages for anomalies"""
for line in stdin:
msg = json.loads(line)
msg_type = msg.get('type')
self.stats[msg_type] += 1
if msg_type == 'RECORD':
stream = msg.get('stream')
self.logger.debug(f"Record from {stream}: {len(json.dumps(msg['record']))} bytes")
elif msg_type == 'STATE':
self.logger.debug(f"State checkpoint: {msg['value']}")
elif msg_type == 'SCHEMA':
properties = msg['schema'].get('properties', {})
self.logger.debug(f"Schema fields: {list(properties.keys())}")
yield msg
def print_summary(self):
"""Report pipeline statistics"""
print("\n=== Pipeline Summary ===")
for msg_type, count in sorted(self.stats.items()):
print(f"{msg_type}: {count}")
Conclusion
Production Meltano deployments require careful attention to custom development, state management, performance optimization, and observability. By combining Meltano’s flexibility with advanced patterns—custom taps, parallel execution, semantic versioning in dbt, and comprehensive monitoring—organizations can build scalable, maintainable data platforms that rival proprietary solutions while maintaining the agility of open-source tools.
The key is starting with solid fundamentals, measuring performance, and iterating on your architecture as requirements evolve.