Back to blog
Oct 30, 2023
6 min read

Elevating DataHub with dbt Core in Apache Airflow, Coordinated by Astronomer Cosmos

Elevate your data management with DataHub, dbt Core in Apache Airflow, and Astronomer Cosmos. Learn how to automate metadata ingestion and streamline data quality checks for your data model in this comprehensive guide.

Background Image

Over the past month, we decided to implement DataHub at our organization as the central repository for storing and managing metadata about our data model. We installed DataHub via Plural on a Kubernetes Cluster in our cloud account.

The data model we wanted to build a catalog for primarily exists in Amazon Redshift and is orchestrated by Apache Airflow. We use dbt core within Airflow as the ‘T’ layer of our ETL. To run dbt core in Airflow, we use the Astronomer Cosmos package. In this transformative layer, we:

  • Write metadata descriptions
  • Complete data quality checks
  • Normalize our data

These are vital steps that should exist in our DataHub catalog. DataHub offers a dbt ingestion integration that makes this possible. In this guide, we’ll specifically discuss how we automate this metadata ingestion.

The Needed Files

For the DataHub dbt integration to work correctly, we need the following files generated by dbt into an S3 bucket:

  1. manifest.json
  2. catalog.json
  3. sources.json
  4. run_results.json

To generate these, we can run a dbt source freshness command. In Airflow, we can use inheritance to override the DbtBaseLocalOperator and create a custom operator that accomplishes this:

# dags/utils/dbt/custom_docs.py

from __future__ import annotations

from typing import Any

import logging
import os

from cosmos.operators.local import DbtDocsLocalOperator, DbtLocalBaseOperator

logger = logging.getLogger(__name__)


class DbtFreshnessLocalOperator(DbtLocalBaseOperator):
    """
    Executes the `dbt source freshness` command
    Use the `callback` parameter to specify a callback function to run after the command completes.
    """

    ui_color = "#8194E0"

    def __init__(self, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self.base_cmd = ["source", "freshness"]


class DbtFreshnessS3LocalOperator(DbtFreshnessLocalOperator):
    """
    Executes `dbt source freshness` command and upload to S3 storage. Returns the S3 path to the generated documentation.

    :param aws_conn_id: S3's Airflow connection ID
    :param bucket_name: S3's bucket name
    :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
        uploaded.
    """

    ui_color = "#FF9900"

    def __init__(
        self,
        aws_conn_id: str,
        bucket_name: str,
        folder_dir: str | None = None,
        **kwargs: str,
    ) -> None:
        "Initializes the operator."
        self.aws_conn_id = aws_conn_id
        self.bucket_name = bucket_name
        self.folder_dir = folder_dir

        super().__init__(**kwargs)

        # override the callback with our own
        self.callback = self.upload_to_s3

    def upload_to_s3(self, project_dir: str) -> None:
        "Uploads the generated documentation to S3."
        logger.info(
            'Attempting to upload generated docs to S3 using S3Hook("%s")',
            self.aws_conn_id,
        )

        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        target_dir = f"{project_dir}/target"

        hook = S3Hook(
            self.aws_conn_id,
            extra_args={
                "ContentType": "text/html",
            },
        )

        logger.info("Uploading %s to %s", "sources.json", f"s3://{self.bucket_name}/sources.json")

        key = f"{self.folder_dir}/sources.json" if self.folder_dir else "sources.json"

        hook.load_file(
            filename=f"{target_dir}/sources.json",
            bucket_name=self.bucket_name,
            key=key,
            replace=True,
        )


class DbtDocsS3LocalOperator(DbtDocsLocalOperator):
    """
    Executes `dbt docs generate` command and upload to S3 storage. Returns the S3 path to the generated documentation.

    :param aws_conn_id: S3's Airflow connection ID
    :param bucket_name: S3's bucket name
    :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be
        uploaded.
    """

    ui_color = "#FF9900"

    def __init__(
        self,
        aws_conn_id: str,
        bucket_name: str,
        folder_dir: str | None = None,
        **kwargs: str,
    ) -> None:
        "Initializes the operator."
        self.aws_conn_id = aws_conn_id
        self.bucket_name = bucket_name
        self.folder_dir = folder_dir

        super().__init__(**kwargs)

        # override the callback with our own
        self.callback = self.upload_to_s3

    def upload_to_s3(self, project_dir: str) -> None:
        "Uploads the generated documentation to S3."
        logger.info(
            'Attempting to upload generated docs to S3 using S3Hook("%s")',
            self.aws_conn_id,
        )

        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        target_dir = f"{project_dir}/target"

        hook = S3Hook(
            self.aws_conn_id,
            extra_args={
                "ContentType": "text/html",
            },
        )

        for filename in ["manifest.json", "catalog.json", "run_results.json"]:
            logger.info("Uploading %s to %s", filename, f"s3://{self.bucket_name}/{filename}")

            key = f"{self.folder_dir}/{filename}" if self.folder_dir else filename

            hook.load_file(
                filename=f"{target_dir}/{filename}",
                bucket_name=self.bucket_name,
                key=key,
                replace=True,
            )

Using Airflow to Run the Operators

Now that we’ve created our custom operator, we can use it in our Airflow DAG to generate the files that DataHub needs (and upload them to an S3 bucket):

# dags/dbt_docs.py

from datetime import datetime
from airflow import DAG
from cosmos import ProfileConfig
from cosmos.profiles.redshift.user_pass import RedshiftUserPasswordProfileMapping

from dags.utils.dbt.custom_docs import DbtFreshnessS3LocalOperator

'''other targets can be used instead of Redshift - for more information, 
visit Cosmos's official docs here: https://astronomer.github.io/astronomer-cosmos/profiles/index.html'''
redshift_profile = RedshiftUserPasswordProfileMapping(
    conn_id="redshift_default",
    profile_args={
        "dbname": "your_db_name", 
        "schema": "your_default_dbt_schema"
    }
)

aws_bucket = "<your-s3-bucket-name>"

with DAG(
    dag_id="dbt_docs",
    start_date=datetime(2023, 10, 30),
    schedule=None,
    catchup=False,
    max_active_runs=1,
    tags=["datahub"],
):

    DbtDocsS3LocalOperator(
        task_id="generate_dbt_docs",
        aws_conn_id="aws_default", # Airflow Conneciton to access bucket
        bucket_name=aws_bucket, # AWS Bucket where you want the files
        profile_config=ProfileConfig(
            profile_name="your_dbt_project_name",
            target_name="cosmos_target",
            profile_mapping=redshift_profile,
        ),
    )

    DbtFreshnessS3LocalOperator(
        task_id="generate_dbt_freshness",
        aws_conn_id="aws_default", # Airflow Connection to access bucket
        bucket_name=aws_bucket, # AWS Bucket where you want the files
        profile_config=ProfileConfig(
            profile_name="your_dbt_project_name",
            target_name="cosmos_target",
            profile_mapping=redshift_profile
        ),
    )

Creating the dbt Integration in DataHub

Now that our files exist in S3, we can create a DataHub ingestion recipe to retrieve them. Before doing so, add your AWS creds to your DataHub secrets so they are callable in the recipe:

source:
    type: dbt
    config:
        manifest_path: 's3://your-s3-bucket-name/manifest.json'
        catalog_path: 's3://your-s3-bucket-name/catalog.json'
        sources_path: 's3://your-s3-bucket-name/sources.json'
        test_results_path: 's3://your-s3-bucket-name/run_results.json'
        target_platform: redshift
        aws_connection:
            aws_access_key_id: '${AWS_ACCESS_KEY}'
            aws_secret_access_key: '${AWS_SECRET_ACCESS_KEY}'
            aws_region: '${AWS_DEFAULT_REGION}'

Add a schedule to your DataHub dbt ingestion that runs more frequently than the Airflow DAG generating the files, and voila, you have successfully automated the metadata ingestion from dbt core (running in Airflow via Cosmos) into DataHub.

The End Result

As you add metadata descriptions and data quality checks in dbt, you will see them populate as the metadata gets refreshed.

Metadata

An example of metadata descriptions written into our dbt project:

Model Metadata Descriptions

Lineage

An example of upstream and downstream node relations from our dbt project:

Model Lineage

Need Help?

If you are looking to implement a Data Catalog or are in search of other Data Consultancy services for your business, please reach out to us on Discord or submit a service request here.