Orchestrating and Observing Data Pipelines: A Guide to Apache Airflow, PostgreSQL, and Polar
airflow
Building reliable data pipelines is a foundational task in modern software engineering, but ensuring they run efficiently and identifying performance bottlenecks can be a significant challenge. Orchestration tools solve part of the problem, but true operational excellence requires deep visibility into resource consumption.
This tutorial walks you through building a complete, observable data pipeline ecosystem using a powerful, open-source stack:
- Apache Airflow: For robust workflow orchestration.
- PostgreSQL: As our reliable data store.
- Polar: For continuous, low-overhead performance profiling.
- Docker: To containerize and link all our services seamlessly.
By the end, you will have a running pipeline and the tools to monitor its performance characteristics in real-time.
Prerequisites
Before we begin, ensure you have the following installed on your system:
A Scalable Project Structure
A clean project structure is crucial for maintainability. We’ll use a standard, scalable layout for our Airflow project.
.
├── dags/
│ └── simple_etl_dag.py
├── docker-compose.yml
└── .env
dags/
: This directory will hold all our Airflow DAG (Directed Acyclic Graph) files. Airflow automatically discovers and loads any DAGs placed here.docker-compose.yml
: This file is the heart of our infrastructure, defining and connecting all our services..env
: This file will store environment variables, helping us keep configuration separate from our code.
Orchestrating Services with Docker Compose
Our docker-compose.yml
file will define and configure all the necessary services: a PostgreSQL database, the Airflow components (scheduler, webserver, worker), and the Polar agent for profiling.
First, create a .env
file to specify the Airflow user and UID, which helps in avoiding permission issues.
# .env
AIRFLOW_UID=50000
Now, let’s create the docker-compose.yml
file. We are using the official Airflow Docker Compose file as a base and extending it with Polar.
# docker-compose.yml
version: '3'
services:
postgres:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
airflow-webserver:
image: apache/airflow:2.8.1
depends_on:
- postgres
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__LOAD_EXAMPLES=false
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "curl --fail http://localhost:8080/health"]
interval: 30s
timeout: 30s
retries: 3
# Add other Airflow services like scheduler and worker as needed
# For simplicity, we'll focus on the webserver here.
polar-agent:
image: polar-agent:latest # Replace with the actual Polar agent image
command:
- "agent"
- "--config-file=/etc/polar/agent.yaml"
volumes:
- ./polar-agent-config.yaml:/etc/polar/agent.yaml
depends_on:
- airflow-webserver
# You would typically attach the Polar profiler to your application containers
# This is a simplified representation. Refer to Polar documentation for specifics.
Note: The Polar integration shown is conceptual. Please refer to the official Polar documentation for the correct way to attach profilers to your running services, which often involves a sidecar container or an agent running on the host.
Creating Your First Airflow DAG
Now for the fun part. Let’s create a simple ETL pipeline. This DAG will have two tasks: one to create a table in our PostgreSQL database and another to insert some data into it.
Create the file dags/simple_etl_dag.py
:
# dags/simple_etl_dag.py
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime
@dag(
dag_id="simple_postgres_etl",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
tags=["etl", "postgres"],
)
def simple_postgres_etl():
@task
def create_customers_table():
"""Creates a customers table if it doesn't exist."""
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
sql = """
CREATE TABLE IF NOT EXISTS customers (
customer_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
signup_date DATE
);
"""
pg_hook.run(sql)
@task
def insert_new_customer():
"""Inserts a new customer record into the customers table."""
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
sql = """
INSERT INTO customers (name, signup_date)
VALUES ('John Doe', '2025-09-26');
"""
pg_hook.run(sql)
create_customers_table() >> insert_new_customer()
simple_postgres_etl()
This DAG uses the TaskFlow API for a clean, functional approach. Airflow’s default PostgreSQL connection (postgres_default
) will automatically work with the postgres
service defined in our docker-compose.yml
.
Observing with Polar
Once your services are running (docker-compose up
), and you’ve triggered the DAG from the Airflow UI (available at http://localhost:8080
), the Polar agent will begin profiling the resource consumption of the Airflow components.
Navigate to your Polar UI. You will be able to:
- Filter by Service: Isolate the
airflow-webserver
orairflow-scheduler
to see their specific performance profiles. - Analyze CPU and Memory Usage: See exactly how much CPU and memory each process used during the DAG run.
- Identify Bottlenecks: If a particular task is resource-intensive, it will be immediately visible in the profiling data, allowing you to optimize your code or provision resources more effectively.
Conclusion
You have successfully built a modern, observable data pipeline. This setup provides a robust foundation for developing complex workflows in Airflow while maintaining clear visibility into their performance with Polar. By integrating orchestration with continuous profiling, you move from reactive problem-solving to proactive optimization, ensuring your data pipelines are not just reliable but also efficient.
Latest Posts
Mastering Go Project Structure: An Industry-Standard Guide
Move beyond a single main.go and learn how to structure your Go applications for scalability, maintainability, and clarity, following industry-standard patterns.
Mastering Python Context Managers: A Guide to the `with` Statement
Go beyond `with open()` and learn how to build your own context managers in Python. This guide covers both class-based and decorator patterns for robust resource management.
Mastering Python Generators for Memory-Efficient Iteration
Learn the power of Python generators to handle large datasets with ease. Discover what they are, why, when, and when not to use them for cleaner, more efficient code.
Enjoyed this article? Follow me on X for more content and updates!
Follow @Ctrixdev