Background
Our team heavily relies on Slack for communication, finding it to be the most effective way to collaborate as a fully distributed workforce. Slack has consistently enhanced their platform with features like Canvases, Lists, and Workflow Builder. These additions have made Slack an increasingly integral part of our daily workflows.
One feature we particularly value is Slack Huddles. While huddles aren’t new, Slack recently introduced an exciting capability: AI-powered huddle notes. When you join a huddle, you can now enable Slack AI to automatically capture notes. Once the huddle ends, the AI generates a comprehensive summary in the huddle thread as a Slack Canvas. We’ve been impressed by both the accuracy of the AI notetaking and how well the Canvas format preserves key points and participant information.
Recognizing the value of these AI-generated notes, we decided to make them accessible to our internal AI agents by building an ETL pipeline. This pipeline extracts the huddle notes from Slack and loads them into Snowflake, where the normalized data can serve as a RAG (Retrieval-Augmented Generation) source for our AI agents. While we won’t cover the AI agent integration in this post, we’ll walk you through the complete ETL pipeline implementation.
Prerequisites and Setup
Before diving into the implementation, ensure you have the following components configured:
Apache Airflow Environment
This tutorial uses Apache Airflow hosted on Astronomer. Install these essential PyPI packages in your environment: apache-airflow-providers-slack
, apache-airflow-providers-amazon
, and apache-airflow-providers-snowflake
.
Required Airflow Connections
Connection | ID | Setup Guide | Authentication/Permissions | Notes |
---|---|---|---|---|
Slack | slack_api_default | Slack Airflow connection guide | • Add Slack bot token (xoxb-... ) to password field• Required Scopes: - channels:read - files:read - users:read | |
AWS | aws_default | AWS Airflow connection guide | Write access to S3 bucket | Store your S3 bucket name as an Airflow variable named S3_BUCKET |
Snowflake | snowflake_default | Snowflake Airflow connection guide | Read and write access to Snowflake database |
These configurations establish the foundation for building a robust ETL pipeline that seamlessly integrates Slack, AWS S3, and Snowflake.
Extracting Huddle Notes from Slack
While Slack hasn’t released an official API endpoint for huddle notes extraction, we can leverage a clever workaround. Slack treats AI-generated huddle notes (stored as Canvases) like any other shared file in a channel or thread. This allows us to programmatically access these notes using the Slack SDK Web Client.
Implementation Details
Using SlackHook
from apache-airflow-providers-slack
, our extraction task has four key components:
- Canvas Identification: Filter Slack files with type
quip
to find huddle notes - Incremental Processing: Use
prev_end_date_success
to only process new/modified notes - Full Refresh: Optional
canvases_full_refresh
parameter for complete reprocessing - Output: Returns Canvas metadata (IDs/names) for downstream tasks
The code implementation:
import os
from airflow import DAG
from airflow.decorators import task
from airflow.providers.slack.hooks.slack import SlackHook
with DAG(
dag_id="slack",
template_searchpath=f"{os.getenv("AIRFLOW_HOME", "/usr/local/airflow")}/dags/utils/sql/",
params={"canvases_full_refresh": False},
# ...
):
@task
def list_canvases(channel: str, **kwargs):
# instantiate clients
slack = SlackHook()
client = slack.get_conn()
# short circuit if full refresh is true
full_load = kwargs["params"]["canvases_full_refresh"]
# prev_end_date_success as a unix timestamp
prev_end_date_success = kwargs.get("prev_end_date_success") # just use now if missing
if prev_end_date_success is None:
prev_end_date_success = datetime.now()
prev_end_date_success_timestamp = int(datetime.timestamp(prev_end_date_success))
# retrieve canvases from slack channel
canvases = client.files_list(channel=channel, types=["quip"])
if full_load:
return [{"canvas_id": canvas["id"], "canvas_name": canvas["name"]} for canvas in canvases["files"]]
else:
target_canvases = []
for file in canvases["files"]:
file_id, file_name = file["id"], file["name"]
created = file["created"] # unix timestamp
edited = file.get("edit_timestamp", file["created"]) # fallback to created timestamp if not edited
# if edited or created is after the prev_end_date_success, then we need to extract the canvas.
if edited > prev_end_date_success_timestamp or created > prev_end_date_success_timestamp:
target_canvases.append({"canvas_id": file_id, "canvas_name": file_name})
return target_canvases
After obtaining our list of Canvases, we’ll extract their content using these key components:
-
Canvas Extraction
- Uses Slack SDK Web Client (no official Canvas API available)
- Authenticates via bot token from Airflow connection
-
Text Processing
- BeautifulSoup cleans up content
- Converts user IDs to names (e.g.,
@U1234567
→John Smith
)
-
AI Enhancement (via Snowflake Cortex + Llama 3.1)
- Generates concise summaries
- Creates meaningful titles
-
Storage
- Uploads structured data to S3
- Includes metadata (dates, participants, etc.)
This task transforms raw huddle notes into structured, AI-enhanced data ready for analysis.
from airflow.models.variable import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
# ... DAG and previous tasks
@task(map_index_template="{{ canvas_name }}")
def extract_canvas(canvas_id: str, canvas_name: str):
import json
import re
import requests
from airflow.hooks.base import BaseHook
from airflow.operators.python import get_current_context
from bs4 import BeautifulSoup
# set the canvas_name to the task_instance
context = get_current_context()
context["canvas_name"] = canvas_name
# instantiate clients
slack = SlackHook()
snowflake = SnowflakeHook()
client = slack.get_conn()
s3 = S3Hook()
# get token
token = BaseHook.get_connection(slack.slack_conn_id).password
# get file info
response = client.files_info(file=canvas_id)
file_name = response["file"]["name"]
file_url = response["file"]["url_private"]
file_created = response["file"]["created"]
file_updated = response["file"].get(
"updated", file_created
) # fallback to created timestamp if updated is missing
file_date = datetime.fromtimestamp(file_created).strftime("%Y-%m-%d")
# get file content
r = requests.get(file_url, headers={"Authorization": f"Bearer {token}"})
r.raise_for_status()
content = r.content
# parse content
html_string = content.decode("utf-8") if isinstance(content, bytes) else content
html_string = re.sub(r"^b'|'$", "", html_string)
# Parse HTML
soup = BeautifulSoup(html_string, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text and handle whitespace
text = soup.get_text()
# Clean up excessive whitespace while preserving structure
lines = (line.strip() for line in text.splitlines())
text = "\n".join(line for line in lines if line)
# in the text, find all slack user ids
user_ids = re.findall(r"@U[A-Z0-9]+", text)
user_ids = [user_id[1:] for user_id in user_ids]
user_ids = list(set(user_ids))
# lookup the names of the users
users = []
for user_id in user_ids:
user_info = client.users_info(user=user_id)
real_name = user_info["user"]["real_name"]
users.append({"user_id": f"@{user_id}", "name": real_name})
# replace the user ids with the names in the text
for user in users:
text = text.replace(user["user_id"], user["name"])
summary_prompt = f"""
You are an assistant that is very effective at summarizing the messy notes from a Slack Canvas.
Given the raw text between the <canvas> tags, please summarize the text into a short paragraph.
Be sure to capture general summaries and (potential) action items.
Key Asks:
- Do NOT include any explanations of your summary, or thought process. JUST THE SUMMARY!
- ASSUME that your output is going to be stored in a variable.
- Do NOT say things like "Here is the summary:" or anything like that. Just give the summary.
- You do not need to include the attendees in your summary. That is already captured in the attendees field.
<canvas>
{text}
</canvas>
"""
escaped_summary_prompt = summary_prompt.replace("'", "''")
# summarize the text
summary = snowflake.get_first(f"select snowflake.cortex.complete('llama3.1-70b', '{escaped_summary_prompt}')")[
0
]
# generate a title for the canvas
title_prompt = f"""
You are an assistant that is very effective at summarizing the messy notes from a Slack Canvas.
Given the generated summary between the <summary> tags, please generate a title for the canvas note that is meaningful.
Key Asks:
- Do NOT include any explanations of your title, or thought process. JUST THE TITLE!
- ASSUME that your output is going to be stored in a variable.
- Do NOT say things like "Here is the title:" or anything like that. Just give the title.
- The title should be no more than 10 words. Preferably 5-7 words.
<summary>
{summary}
</summary>
"""
escaped_title_prompt = title_prompt.replace("'", "''")
title = snowflake.get_first(f"select snowflake.cortex.complete('llama3.1-70b', '{escaped_title_prompt}')")[0]
# structure the data
structured = {
"id": canvas_id,
"title": title,
"attendees": [user["name"] for user in users],
"text": summary,
"raw_text": text,
"date_key": file_date,
"created": file_created,
"updated": file_updated,
}
# upload to s3
s3.load_string(
string_data=json.dumps(structured),
key=f"slack/canvases/{canvas_id}.json",
bucket_name=Variable.get("S3_BUCKET"),
replace=True,
)
Loading Data into Snowflake
Our pipeline’s final step copies Canvas content from S3 to Snowflake through a two-table approach:
- Load raw JSON into a staging table (
canvases_json
) - Transform into a normalized table (
canvas
) for querying
Here’s the implementation:
-- ./dags/utils/sql/slack/copy_canvases.sql
use schema slack;
create or replace table canvases_json (canvases_json_data variant);
copy into canvases_json
from 's3://{{ var.value.S3_BUCKET }}/slack/canvases/'
storage_integration = aws_s3_integration
file_format = (type = json);
create or replace table canvas as (
select
canvases_json_data:id::varchar as canvas_id,
replace(canvases_json_data:title, '_', ' ')::varchar as canvas_title,
canvases_json_data:attendees::array as attendees,
canvases_json_data:raw_text::string as raw_text,
canvases_json_data:text::varchar as summary,
canvases_json_data:date_key::date as date_key,
convert_timezone('UTC', canvases_json_data:created::timestamp_tz) as created_at,
convert_timezone('UTC', canvases_json_data:updated::timestamp_tz) as updated_at
from canvases_json
);
-- Note: This requires a configured [AWS S3 integration in Snowflake](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration).
# ... DAG code and previous tasks
copy_into_snowflake = SnowflakeSqlApiOperator(
task_id="copy_canvases",
sql="slack/copy_canvases.sql",
statement_count=0,
snowflake_conn_id="snowflake_default"
)
Orchestrating the Pipeline with Dynamic Task Mapping
To bring all our components together, we’ll leverage Airflow’s dynamic task mapping capabilities. This powerful feature enables efficient parallel processing of our Slack Canvas extractions:
# ... DAG code and previous tasks
extract_canvas.expand_kwargs(list_canvases(channel="<insert-your-slack-channel-id-here>")) >> copy_into_snowflake
Visualizing the Pipeline
The Airflow UI provides two helpful views to monitor our pipeline:
-
Graph View: Displays the overall task dependencies and workflow structure
-
Grid View: Shows the dynamic task mapping in action, with parallel Canvas extractions
Accessing Processed Data
Once the pipeline completes, you can query the transformed data directly in Snowflake. Here’s a sample query to explore your processed Canvas data:
select
canvas_id,
canvas_title,
attendees,
summary,
date_key
from canvas
order by created_at desc;
Example output:
canvas_id | canvas_title | attendees | summary | date_key |
---|---|---|---|---|
F07UNK5PN5N | Improving Meeting Notes with AI and Data Engineering | Chris Hronek, John Wick | Christopher Hronek and John Wick discussed leveraging AI and data engineering tools… | 2024-10-29 |
… | … | … | … | … |
… | … | … | … | … |
… | … | … | … | … |
… | … | … | … | … |
Conclusion
We’ve built an ETL pipeline that transforms Slack’s AI huddle notes into structured, queryable data using Airflow, Snowflake, and AWS S3. The system:
- Automatically processes new huddle notes
- Generates AI-powered summaries and titles
- Makes meeting content searchable
- Enables AI agent integration
While we focused on Slack huddles, this architecture can be adapted for other communication platforms. The pipeline’s incremental processing and dynamic task mapping ensure efficient scaling as data volumes grow.
This solution helps teams transform their casual conversations into valuable, structured data assets.