Back to blog
Jan 09, 2025
10 min read

Building an ETL Pipeline for Slack AI Huddle Notes with Airflow and Snowflake

Learn how to build an ETL pipeline that extracts AI-generated Slack huddle notes using Airflow, processes them with Snowflake Cortex, and loads them into a data warehouse for future AI agent consumption.

Cover Image

Background

Our team heavily relies on Slack for communication, finding it to be the most effective way to collaborate as a fully distributed workforce. Slack has consistently enhanced their platform with features like Canvases, Lists, and Workflow Builder. These additions have made Slack an increasingly integral part of our daily workflows.

One feature we particularly value is Slack Huddles. While huddles aren’t new, Slack recently introduced an exciting capability: AI-powered huddle notes. When you join a huddle, you can now enable Slack AI to automatically capture notes. Once the huddle ends, the AI generates a comprehensive summary in the huddle thread as a Slack Canvas. We’ve been impressed by both the accuracy of the AI notetaking and how well the Canvas format preserves key points and participant information.

Recognizing the value of these AI-generated notes, we decided to make them accessible to our internal AI agents by building an ETL pipeline. This pipeline extracts the huddle notes from Slack and loads them into Snowflake, where the normalized data can serve as a RAG (Retrieval-Augmented Generation) source for our AI agents. While we won’t cover the AI agent integration in this post, we’ll walk you through the complete ETL pipeline implementation.

Prerequisites and Setup

Before diving into the implementation, ensure you have the following components configured:

Apache Airflow Environment

This tutorial uses Apache Airflow hosted on Astronomer. Install these essential PyPI packages in your environment: apache-airflow-providers-slack, apache-airflow-providers-amazon, and apache-airflow-providers-snowflake.

Required Airflow Connections

ConnectionIDSetup GuideAuthentication/PermissionsNotes
Slackslack_api_defaultSlack Airflow connection guide• Add Slack bot token (xoxb-...) to password field
• Required Scopes:
  - channels:read
  - files:read
  - users:read
AWSaws_defaultAWS Airflow connection guideWrite access to S3 bucketStore your S3 bucket name as an Airflow variable named S3_BUCKET
Snowflakesnowflake_defaultSnowflake Airflow connection guideRead and write access to Snowflake database

These configurations establish the foundation for building a robust ETL pipeline that seamlessly integrates Slack, AWS S3, and Snowflake.

Extracting Huddle Notes from Slack

While Slack hasn’t released an official API endpoint for huddle notes extraction, we can leverage a clever workaround. Slack treats AI-generated huddle notes (stored as Canvases) like any other shared file in a channel or thread. This allows us to programmatically access these notes using the Slack SDK Web Client.

Implementation Details

Using SlackHook from apache-airflow-providers-slack, our extraction task has four key components:

  1. Canvas Identification: Filter Slack files with type quip to find huddle notes
  2. Incremental Processing: Use prev_end_date_success to only process new/modified notes
  3. Full Refresh: Optional canvases_full_refresh parameter for complete reprocessing
  4. Output: Returns Canvas metadata (IDs/names) for downstream tasks

The code implementation:

    import os
    from airflow import DAG
    from airflow.decorators import task

    from airflow.providers.slack.hooks.slack import SlackHook

    with DAG(
        dag_id="slack",
        template_searchpath=f"{os.getenv("AIRFLOW_HOME", "/usr/local/airflow")}/dags/utils/sql/",
        params={"canvases_full_refresh": False},
        # ...
    ):

        @task
        def list_canvases(channel: str, **kwargs):
            # instantiate clients
            slack = SlackHook()
            client = slack.get_conn()

            # short circuit if full refresh is true
            full_load = kwargs["params"]["canvases_full_refresh"]

            # prev_end_date_success as a unix timestamp
            prev_end_date_success = kwargs.get("prev_end_date_success")  # just use now if missing
            if prev_end_date_success is None:
                prev_end_date_success = datetime.now()
            prev_end_date_success_timestamp = int(datetime.timestamp(prev_end_date_success))

            # retrieve canvases from slack channel
            canvases = client.files_list(channel=channel, types=["quip"])

            if full_load:
                return [{"canvas_id": canvas["id"], "canvas_name": canvas["name"]} for canvas in canvases["files"]]
            else:
                target_canvases = []
                for file in canvases["files"]:
                    file_id, file_name = file["id"], file["name"]
                    created = file["created"]  # unix timestamp
                    edited = file.get("edit_timestamp", file["created"])  # fallback to created timestamp if not edited

                    # if edited or created is after the prev_end_date_success, then we need to extract the canvas.
                    if edited > prev_end_date_success_timestamp or created > prev_end_date_success_timestamp:
                        target_canvases.append({"canvas_id": file_id, "canvas_name": file_name})

                return target_canvases

After obtaining our list of Canvases, we’ll extract their content using these key components:

  1. Canvas Extraction

    • Uses Slack SDK Web Client (no official Canvas API available)
    • Authenticates via bot token from Airflow connection
  2. Text Processing

    • BeautifulSoup cleans up content
    • Converts user IDs to names (e.g., @U1234567John Smith)
  3. AI Enhancement (via Snowflake Cortex + Llama 3.1)

    • Generates concise summaries
    • Creates meaningful titles
  4. Storage

    • Uploads structured data to S3
    • Includes metadata (dates, participants, etc.)

This task transforms raw huddle notes into structured, AI-enhanced data ready for analysis.

    from airflow.models.variable import Variable

    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    from airflow.providers.slack.hooks.slack import SlackHook
    from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

    # ... DAG and previous tasks

        @task(map_index_template="{{ canvas_name }}")
        def extract_canvas(canvas_id: str, canvas_name: str):
            import json
            import re

            import requests
            from airflow.hooks.base import BaseHook
            from airflow.operators.python import get_current_context
            from bs4 import BeautifulSoup

            # set the canvas_name to the task_instance
            context = get_current_context()
            context["canvas_name"] = canvas_name

            # instantiate clients
            slack = SlackHook()
            snowflake = SnowflakeHook()
            client = slack.get_conn()
            s3 = S3Hook()

            # get token
            token = BaseHook.get_connection(slack.slack_conn_id).password

            # get file info
            response = client.files_info(file=canvas_id)
            file_name = response["file"]["name"]
            file_url = response["file"]["url_private"]
            file_created = response["file"]["created"]
            file_updated = response["file"].get(
                "updated", file_created
            )  # fallback to created timestamp if updated is missing
            file_date = datetime.fromtimestamp(file_created).strftime("%Y-%m-%d")

            # get file content
            r = requests.get(file_url, headers={"Authorization": f"Bearer {token}"})
            r.raise_for_status()
            content = r.content

            # parse content
            html_string = content.decode("utf-8") if isinstance(content, bytes) else content
            html_string = re.sub(r"^b'|'$", "", html_string)

            # Parse HTML
            soup = BeautifulSoup(html_string, "html.parser")

            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()

            # Get text and handle whitespace
            text = soup.get_text()

            # Clean up excessive whitespace while preserving structure
            lines = (line.strip() for line in text.splitlines())
            text = "\n".join(line for line in lines if line)

            # in the text, find all slack user ids
            user_ids = re.findall(r"@U[A-Z0-9]+", text)
            user_ids = [user_id[1:] for user_id in user_ids]
            user_ids = list(set(user_ids))

            # lookup the names of the users
            users = []
            for user_id in user_ids:
                user_info = client.users_info(user=user_id)
                real_name = user_info["user"]["real_name"]
                users.append({"user_id": f"@{user_id}", "name": real_name})

            # replace the user ids with the names in the text
            for user in users:
                text = text.replace(user["user_id"], user["name"])

            summary_prompt = f"""
            You are an assistant that is very effective at summarizing the messy notes from a Slack Canvas.
            Given the raw text between the <canvas> tags, please summarize the text into a short paragraph.
            Be sure to capture general summaries and (potential) action items.

            Key Asks:
            - Do NOT include any explanations of your summary, or thought process. JUST THE SUMMARY!
            - ASSUME that your output is going to be stored in a variable.
            - Do NOT say things like "Here is the summary:" or anything like that. Just give the summary.
            - You do not need to include the attendees in your summary. That is already captured in the attendees field.

            <canvas>
            {text}
            </canvas>
            """

            escaped_summary_prompt = summary_prompt.replace("'", "''")

            # summarize the text
            summary = snowflake.get_first(f"select snowflake.cortex.complete('llama3.1-70b', '{escaped_summary_prompt}')")[
                0
            ]

            # generate a title for the canvas
            title_prompt = f"""
            You are an assistant that is very effective at summarizing the messy notes from a Slack Canvas.
            Given the generated summary between the <summary> tags, please generate a title for the canvas note that is meaningful.

            Key Asks:
            - Do NOT include any explanations of your title, or thought process. JUST THE TITLE!
            - ASSUME that your output is going to be stored in a variable.
            - Do NOT say things like "Here is the title:" or anything like that. Just give the title.
            - The title should be no more than 10 words. Preferably 5-7 words.

            <summary>
            {summary}
            </summary>
            """

            escaped_title_prompt = title_prompt.replace("'", "''")
            title = snowflake.get_first(f"select snowflake.cortex.complete('llama3.1-70b', '{escaped_title_prompt}')")[0]

            # structure the data
            structured = {
                "id": canvas_id,
                "title": title,
                "attendees": [user["name"] for user in users],
                "text": summary,
                "raw_text": text,
                "date_key": file_date,
                "created": file_created,
                "updated": file_updated,
            }

            # upload to s3
            s3.load_string(
                string_data=json.dumps(structured),
                key=f"slack/canvases/{canvas_id}.json",
                bucket_name=Variable.get("S3_BUCKET"),
                replace=True,
            )

Loading Data into Snowflake

Our pipeline’s final step copies Canvas content from S3 to Snowflake through a two-table approach:

  1. Load raw JSON into a staging table (canvases_json)
  2. Transform into a normalized table (canvas) for querying

Here’s the implementation:

-- ./dags/utils/sql/slack/copy_canvases.sql
use schema slack;

create or replace table canvases_json (canvases_json_data variant);

copy into canvases_json
from 's3://{{ var.value.S3_BUCKET }}/slack/canvases/'
storage_integration = aws_s3_integration
file_format = (type = json);

create or replace table canvas as (
    select
        canvases_json_data:id::varchar as canvas_id,
        replace(canvases_json_data:title, '_', ' ')::varchar as canvas_title,
        canvases_json_data:attendees::array as attendees,
        canvases_json_data:raw_text::string as raw_text,
        canvases_json_data:text::varchar as summary,
        canvases_json_data:date_key::date as date_key,
        convert_timezone('UTC', canvases_json_data:created::timestamp_tz) as created_at,
        convert_timezone('UTC', canvases_json_data:updated::timestamp_tz) as updated_at
    from canvases_json
);

-- Note: This requires a configured [AWS S3 integration in Snowflake](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration).
    # ... DAG code and previous tasks

    copy_into_snowflake = SnowflakeSqlApiOperator(
        task_id="copy_canvases",
        sql="slack/copy_canvases.sql",
        statement_count=0,
        snowflake_conn_id="snowflake_default"
    )

Orchestrating the Pipeline with Dynamic Task Mapping

To bring all our components together, we’ll leverage Airflow’s dynamic task mapping capabilities. This powerful feature enables efficient parallel processing of our Slack Canvas extractions:

# ... DAG code and previous tasks
extract_canvas.expand_kwargs(list_canvases(channel="<insert-your-slack-channel-id-here>")) >> copy_into_snowflake

Visualizing the Pipeline

The Airflow UI provides two helpful views to monitor our pipeline:

  1. Graph View: Displays the overall task dependencies and workflow structure DAG Graph View

  2. Grid View: Shows the dynamic task mapping in action, with parallel Canvas extractions Dynamic Task Mapping

Accessing Processed Data

Once the pipeline completes, you can query the transformed data directly in Snowflake. Here’s a sample query to explore your processed Canvas data:

select
    canvas_id,
    canvas_title,
    attendees,
    summary,
    date_key
from canvas
order by created_at desc;

Example output:

canvas_idcanvas_titleattendeessummarydate_key
F07UNK5PN5NImproving Meeting Notes with AI and Data EngineeringChris Hronek, John WickChristopher Hronek and John Wick discussed leveraging AI and data engineering tools…2024-10-29

Conclusion

We’ve built an ETL pipeline that transforms Slack’s AI huddle notes into structured, queryable data using Airflow, Snowflake, and AWS S3. The system:

  • Automatically processes new huddle notes
  • Generates AI-powered summaries and titles
  • Makes meeting content searchable
  • Enables AI agent integration

While we focused on Slack huddles, this architecture can be adapted for other communication platforms. The pipeline’s incremental processing and dynamic task mapping ensure efficient scaling as data volumes grow.

This solution helps teams transform their casual conversations into valuable, structured data assets.