Part 47: An Airflow Data Pipeline
"Airflow on Kubernetes, with the KubernetesExecutor, is the data engineer's k8s sweet spot."
Why
Data pipelines are the third common HomeLab K8s use case. The freelancer might be hired by a data team to build a pipeline that ingests CSVs from S3, transforms them with dbt, and writes Parquet back. The standard tool is Apache Airflow with the KubernetesExecutor, which spawns one pod per task. dbt runs as a series of transformations against a warehouse (or, in this case, against the same Postgres CloudNativePG cluster that serves the rest of the lab).
The thesis: Airflow runs as a Helm release. Postgres metadata uses CloudNativePG. The data lake is in MinIO. The DAGs live in a git repo that Airflow's git-sync sidecar pulls. dbt runs as KubernetesExecutor pods that spawn on demand. Total memory: ~6 GB on top of the base cluster, easy to fit in k8s-multi.
The shape
[Injectable(ServiceLifetime.Singleton)]
public sealed class AirflowHelmReleaseContributor : IHelmReleaseContributor
{
public bool ShouldContribute() => _config.K8s?.Plugins?.Contains("airflow") == true;
public void Contribute(KubernetesBundle bundle)
{
bundle.HelmReleases.Add(new HelmReleaseSpec
{
Name = "airflow",
Namespace = "airflow",
Chart = "apache-airflow/airflow",
Version = "1.15.0",
RepoUrl = "https://airflow.apache.org",
CreateNamespace = true,
Wait = true,
Timeout = TimeSpan.FromMinutes(15),
Values = new()
{
["executor"] = "KubernetesExecutor",
["postgresql"] = new Dictionary<string, object?> { ["enabled"] = false }, // we use CloudNativePG
["data"] = new Dictionary<string, object?>
{
["metadataConnection"] = new Dictionary<string, object?>
{
["user"] = "airflow",
["pass"] = "{{ secret:AIRFLOW_DB_PASSWORD }}",
["host"] = "airflow-pg-rw.airflow-data.svc.cluster.local",
["port"] = 5432,
["db"] = "airflow"
}
},
["redis"] = new Dictionary<string, object?> { ["enabled"] = false },
["dags"] = new Dictionary<string, object?>
{
["gitSync"] = new Dictionary<string, object?>
{
["enabled"] = true,
["repo"] = $"https://gitlab.{_config.Acme.Tld}/data/airflow-dags.git",
["branch"] = "main",
["wait"] = 60,
["containerName"] = "git-sync"
}
},
["webserver"] = new Dictionary<string, object?>
{
["service"] = new Dictionary<string, object?>
{
["type"] = "ClusterIP"
},
["ingress"] = new Dictionary<string, object?>
{
["enabled"] = true,
["ingressClassName"] = "nginx",
["hosts"] = new[] { new Dictionary<string, object?> { ["name"] = $"airflow.{_config.Acme.Tld}" } },
["tls"] = new[] { new Dictionary<string, object?> { ["secretName"] = "airflow-tls", ["hosts"] = new[] { $"airflow.{_config.Acme.Tld}" } } },
["annotations"] = new Dictionary<string, object?>
{
["cert-manager.io/cluster-issuer"] = "homelab-ca"
}
}
},
["scheduler"] = new Dictionary<string, object?>
{
["replicas"] = _config.K8s?.Topology == "k8s-ha" ? 2 : 1
}
}
});
}
}[Injectable(ServiceLifetime.Singleton)]
public sealed class AirflowHelmReleaseContributor : IHelmReleaseContributor
{
public bool ShouldContribute() => _config.K8s?.Plugins?.Contains("airflow") == true;
public void Contribute(KubernetesBundle bundle)
{
bundle.HelmReleases.Add(new HelmReleaseSpec
{
Name = "airflow",
Namespace = "airflow",
Chart = "apache-airflow/airflow",
Version = "1.15.0",
RepoUrl = "https://airflow.apache.org",
CreateNamespace = true,
Wait = true,
Timeout = TimeSpan.FromMinutes(15),
Values = new()
{
["executor"] = "KubernetesExecutor",
["postgresql"] = new Dictionary<string, object?> { ["enabled"] = false }, // we use CloudNativePG
["data"] = new Dictionary<string, object?>
{
["metadataConnection"] = new Dictionary<string, object?>
{
["user"] = "airflow",
["pass"] = "{{ secret:AIRFLOW_DB_PASSWORD }}",
["host"] = "airflow-pg-rw.airflow-data.svc.cluster.local",
["port"] = 5432,
["db"] = "airflow"
}
},
["redis"] = new Dictionary<string, object?> { ["enabled"] = false },
["dags"] = new Dictionary<string, object?>
{
["gitSync"] = new Dictionary<string, object?>
{
["enabled"] = true,
["repo"] = $"https://gitlab.{_config.Acme.Tld}/data/airflow-dags.git",
["branch"] = "main",
["wait"] = 60,
["containerName"] = "git-sync"
}
},
["webserver"] = new Dictionary<string, object?>
{
["service"] = new Dictionary<string, object?>
{
["type"] = "ClusterIP"
},
["ingress"] = new Dictionary<string, object?>
{
["enabled"] = true,
["ingressClassName"] = "nginx",
["hosts"] = new[] { new Dictionary<string, object?> { ["name"] = $"airflow.{_config.Acme.Tld}" } },
["tls"] = new[] { new Dictionary<string, object?> { ["secretName"] = "airflow-tls", ["hosts"] = new[] { $"airflow.{_config.Acme.Tld}" } } },
["annotations"] = new Dictionary<string, object?>
{
["cert-manager.io/cluster-issuer"] = "homelab-ca"
}
}
},
["scheduler"] = new Dictionary<string, object?>
{
["replicas"] = _config.K8s?.Topology == "k8s-ha" ? 2 : 1
}
}
});
}
}The chart installs the webserver, the scheduler, and the metadata DB connection. Each task in a DAG is a pod that the KubernetesExecutor spawns dynamically — the pod runs the task, writes the result to MinIO, and exits. No long-lived worker processes.
The CloudNativePG instance for Airflow
public void Contribute(KubernetesBundle bundle)
{
bundle.Namespaces["airflow-data"] ??= new NamespaceManifest { Name = "airflow-data" };
bundle.CrdInstances.Add(new RawManifest
{
ApiVersion = "postgresql.cnpg.io/v1",
Kind = "Cluster",
Metadata = new() { Name = "airflow-pg", Namespace = "airflow-data" },
Spec = new Dictionary<string, object?>
{
["instances"] = 1,
["bootstrap"] = new Dictionary<string, object?>
{
["initdb"] = new Dictionary<string, object?>
{
["database"] = "airflow",
["owner"] = "airflow",
["secret"] = new Dictionary<string, object?> { ["name"] = "airflow-pg-credentials" }
}
},
["storage"] = new Dictionary<string, object?>
{
["size"] = "10Gi",
["storageClass"] = "longhorn"
}
}
});
}public void Contribute(KubernetesBundle bundle)
{
bundle.Namespaces["airflow-data"] ??= new NamespaceManifest { Name = "airflow-data" };
bundle.CrdInstances.Add(new RawManifest
{
ApiVersion = "postgresql.cnpg.io/v1",
Kind = "Cluster",
Metadata = new() { Name = "airflow-pg", Namespace = "airflow-data" },
Spec = new Dictionary<string, object?>
{
["instances"] = 1,
["bootstrap"] = new Dictionary<string, object?>
{
["initdb"] = new Dictionary<string, object?>
{
["database"] = "airflow",
["owner"] = "airflow",
["secret"] = new Dictionary<string, object?> { ["name"] = "airflow-pg-credentials" }
}
},
["storage"] = new Dictionary<string, object?>
{
["size"] = "10Gi",
["storageClass"] = "longhorn"
}
}
});
}A separate Cluster instance from the rest of the workloads. Same operator, different namespace, different cluster.
A sample DAG
The data pipeline itself lives in airflow-dags in DevLab GitLab:
# dags/csv_to_parquet.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime
with DAG("csv_to_parquet",
schedule="0 1 * * *",
start_date=datetime(2026, 1, 1),
catchup=False) as dag:
fetch = KubernetesPodOperator(
task_id="fetch_csv",
name="fetch-csv",
image="registry.acme.lab/data/fetch-csv:1.0.0",
cmds=["python", "fetch.py"],
env_vars={
"MINIO_BUCKET": "data-raw",
"SOURCE_URL": "https://example.com/daily.csv"
},
get_logs=True
)
transform = KubernetesPodOperator(
task_id="dbt_transform",
name="dbt-transform",
image="ghcr.io/dbt-labs/dbt-postgres:1.9.0",
cmds=["dbt", "run"],
env_vars={
"DBT_PROFILES_DIR": "/profiles",
"DBT_PROFILE": "warehouse"
},
volume_mounts=[{"name": "profiles", "mount_path": "/profiles", "read_only": True}],
volumes=[{"name": "profiles", "config_map": {"name": "dbt-profiles"}}]
)
write_parquet = KubernetesPodOperator(
task_id="write_parquet",
name="write-parquet",
image="registry.acme.lab/data/parquet-writer:1.0.0",
cmds=["python", "write.py"],
env_vars={
"MINIO_BUCKET": "data-warehouse",
"OUTPUT_PATH": "year={{ ds_nodash[:4] }}/month={{ ds_nodash[4:6] }}/data.parquet"
}
)
fetch >> transform >> write_parquet# dags/csv_to_parquet.py
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime
with DAG("csv_to_parquet",
schedule="0 1 * * *",
start_date=datetime(2026, 1, 1),
catchup=False) as dag:
fetch = KubernetesPodOperator(
task_id="fetch_csv",
name="fetch-csv",
image="registry.acme.lab/data/fetch-csv:1.0.0",
cmds=["python", "fetch.py"],
env_vars={
"MINIO_BUCKET": "data-raw",
"SOURCE_URL": "https://example.com/daily.csv"
},
get_logs=True
)
transform = KubernetesPodOperator(
task_id="dbt_transform",
name="dbt-transform",
image="ghcr.io/dbt-labs/dbt-postgres:1.9.0",
cmds=["dbt", "run"],
env_vars={
"DBT_PROFILES_DIR": "/profiles",
"DBT_PROFILE": "warehouse"
},
volume_mounts=[{"name": "profiles", "mount_path": "/profiles", "read_only": True}],
volumes=[{"name": "profiles", "config_map": {"name": "dbt-profiles"}}]
)
write_parquet = KubernetesPodOperator(
task_id="write_parquet",
name="write-parquet",
image="registry.acme.lab/data/parquet-writer:1.0.0",
cmds=["python", "write.py"],
env_vars={
"MINIO_BUCKET": "data-warehouse",
"OUTPUT_PATH": "year={{ ds_nodash[:4] }}/month={{ ds_nodash[4:6] }}/data.parquet"
}
)
fetch >> transform >> write_parquetThree pod-spawning tasks in sequence. Each task is a fresh pod that the KubernetesExecutor schedules, runs to completion, and reaps. The whole DAG runs nightly. The freelancer iterates on it locally by git pushing to the airflow-dags repo, which the git-sync sidecar pulls into the Airflow scheduler within 60 seconds.
What this gives you that "Airflow on a VM" doesn't
Airflow on a single VM is the traditional install. It works for simple DAGs. It does not handle:
- Per-task isolation (every task runs in the same Python process; OOM in one task kills the scheduler)
- Per-task images (every task uses the same Python environment; multiple Python versions collide)
- Horizontal scaling (one worker process for everything)
- Resource limits per task (no way to say "this task needs 4 GB of RAM")
The KubernetesExecutor on HomeLab K8s gives you, for the same surface area:
- One pod per task with isolated CPU/memory limits
- Different images per task (one task uses dbt, another uses pandas, another uses pyspark)
- Native Kubernetes scheduling so the cluster's autoscaler can grow workers if you have an HPA
- Logs in Loki via the standard kube-prometheus-stack scraping
- A development environment that matches the production cluster's Airflow setup
The bargain pays back the first time you write a DAG locally and run it in your dev cluster against the same MinIO and the same Postgres operator that production uses.