Photo from unsplash: https://images.unsplash.com/photo-1441974231531-c6227db76b6e?w=1200&q=80

Building a Confluence RAG Knowledge Base: Auto-Sync, Freshness Checks & AI-Powered Q&A

Written on May 12, 2026 by ibsanju.

Last updated May 12, 2026.

See changes
18 min read
––– views

Building a Confluence RAG Knowledge Base: Auto-Sync, Freshness Checks & AI-Powered Q&A

Ever asked your team a question about some internal process only to get "yeah it's somewhere in Confluence" as the answer? You know it's documented, but finding the right page, reading through three pages of context, and piecing together the full picture from scattered spaces is a full-time job on its own.

Today we're solving that problem completely. We'll build a RAG (Retrieval-Augmented Generation) system that:

  • ✅ Auto-syncs all your Confluence spaces every 24 hours
  • ✅ Detects changed pages using MD5 checksums (only re-embeds what actually changed)
  • ✅ Stores vector embeddings in PostgreSQL with pgvector
  • ✅ Answers questions using OpenAI + LangChain with source citations
  • ✅ Ships as a complete Docker stack you can run in minutes

I've used this exact architecture to build a knowledge base for a large engineering team's QA documentation — it went from "30 minutes searching Confluence" to "15-second AI answer with sources".

TL;DR: What We're Building

  • Confluence client to fetch pages via the REST API
  • Document processor to strip HTML, chunk text, and compute checksums
  • Embeddings service using OpenAI text-embedding-3-small
  • PostgreSQL + pgvector for similarity search
  • FastAPI RAG API to accept questions and return answers with sources
  • n8n workflow for scheduled 24-hour sync
  • Docker Compose to tie it all together

Let's build! 🚀

Architecture Overview

Before we write code, here's how all the pieces connect:

┌─────────────────────────────────────────────────────────┐
│                      n8n Scheduler                       │
│              (triggers every 24 hours)                   │
└──────────────────────────┬──────────────────────────────┘
                           │ POST /api/sync

┌─────────────────────────────────────────────────────────┐
│                  FastAPI Sync Endpoint                   │
│  1. Fetch pages from Confluence REST API                 │
│  2. Compute MD5 checksum of each page                    │
│  3. Skip pages where checksum hasn't changed             │
│  4. Chunk updated pages into ~500-token segments         │
│  5. Generate OpenAI embeddings for each chunk            │
│  6. Upsert into PostgreSQL (pgvector)                    │
└──────────────────────────┬──────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│              PostgreSQL + pgvector                       │
│   pages: id, title, checksum, last_synced_at             │
│   chunks: id, page_id, content, embedding vector(1536)   │
└──────────────────────────┬──────────────────────────────┘


┌─────────────────────────────────────────────────────────┐
│                FastAPI Query Endpoint                    │
│  POST /api/query                                         │
│  1. Embed user question with OpenAI                      │
│  2. Cosine similarity search (top-k chunks)              │
│  3. Build prompt with retrieved context                  │
│  4. Call GPT-4o-mini to generate answer                  │
│  5. Return answer + source page references               │
└─────────────────────────────────────────────────────────┘

Clean separation of concerns — sync is decoupled from query, and the vector store is just a PostgreSQL table you already know how to operate.

Tech Stack & Project Structure

confluence-rag/
├── app/
│   ├── __init__.py
│   ├── main.py               # FastAPI app entry point
│   ├── confluence_client.py  # Confluence REST API wrapper
│   ├── document_processor.py # HTML stripping & text chunking
│   ├── embeddings_service.py # OpenAI embedding + pgvector ops
│   └── rag_service.py        # LangChain RAG query logic
├── sql/
│   └── schema.sql            # PostgreSQL + pgvector schema
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── .env.template

requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.30.6
psycopg2-binary==2.9.9
pgvector==0.3.2
openai==1.45.0
langchain==0.3.0
langchain-openai==0.2.0
langchain-community==0.3.0
beautifulsoup4==4.12.3
requests==2.32.3
python-dotenv==1.0.1
pydantic-settings==2.5.2
tiktoken==0.7.0
tenacity==9.0.0

Install everything:

pip install -r requirements.txt

Database Schema

We use PostgreSQL with the pgvector extension for storing and querying vector embeddings. OpenAI's text-embedding-3-small model produces 1536-dimension vectors.

sql/schema.sql

-- Enable pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
 
-- Tracks each synced Confluence page
CREATE TABLE IF NOT EXISTS confluence_pages (
    id              TEXT PRIMARY KEY,          -- Confluence page ID
    space_key       TEXT NOT NULL,
    title           TEXT NOT NULL,
    url             TEXT NOT NULL,
    checksum        TEXT NOT NULL,             -- MD5 of raw content
    last_synced_at  TIMESTAMPTZ DEFAULT NOW(),
    created_at      TIMESTAMPTZ DEFAULT NOW()
);
 
CREATE INDEX IF NOT EXISTS idx_pages_space ON confluence_pages(space_key);
CREATE INDEX IF NOT EXISTS idx_pages_checksum ON confluence_pages(checksum);
 
-- Stores text chunks with their vector embeddings
CREATE TABLE IF NOT EXISTS document_chunks (
    id          BIGSERIAL PRIMARY KEY,
    page_id     TEXT NOT NULL REFERENCES confluence_pages(id) ON DELETE CASCADE,
    chunk_index INTEGER NOT NULL,
    content     TEXT NOT NULL,
    token_count INTEGER NOT NULL,
    embedding   vector(1536),                  -- OpenAI text-embedding-3-small
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (page_id, chunk_index)
);
 
-- IVFFlat index for fast approximate nearest-neighbour search
-- lists = sqrt(number of rows) is a good starting point
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
    ON document_chunks
    USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
 
-- Analytics: track what people ask
CREATE TABLE IF NOT EXISTS query_log (
    id          BIGSERIAL PRIMARY KEY,
    question    TEXT NOT NULL,
    answer      TEXT,
    sources     JSONB,
    latency_ms  INTEGER,
    created_at  TIMESTAMPTZ DEFAULT NOW()
);

Confluence Client

The ConfluenceClient handles all communication with the Confluence REST API, including automatic pagination and a helper to extract the clean HTML body.

app/confluence_client.py

import hashlib
import os
import time
 
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
 
class ConfluenceClient:
    """Thin wrapper around the Confluence Cloud REST API v2."""
 
    def __init__(self) -> None:
        self.base_url = os.environ["CONFLUENCE_BASE_URL"].rstrip("/")
        self.email = os.environ["CONFLUENCE_EMAIL"]
        self.api_token = os.environ["CONFLUENCE_API_TOKEN"]
        self._session = self._build_session()
 
    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------
 
    def _build_session(self) -> requests.Session:
        session = requests.Session()
        session.auth = (self.email, self.api_token)
        session.headers.update({"Accept": "application/json"})
        retry = Retry(
            total=4,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("https://", adapter)
        session.mount("http://", adapter)
        return session
 
    def _get(self, path: str, **params) -> dict:
        url = f"{self.base_url}/wiki/rest/api{path}"
        resp = self._session.get(url, params=params, timeout=30)
        resp.raise_for_status()
        return resp.json()
 
    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------
 
    def get_space_pages(self, space_key: str) -> list[dict]:
        """Return every page in a Confluence space (handles pagination)."""
        pages: list[dict] = []
        start = 0
        limit = 50
 
        while True:
            data = self._get(
                "/content",
                spaceKey=space_key,
                type="page",
                status="current",
                expand="body.storage,version",
                start=start,
                limit=limit,
            )
            pages.extend(data["results"])
            if len(data["results"]) < limit:
                break
            start += limit
            time.sleep(0.2)  # be kind to the API
 
        return pages
 
    def get_page(self, page_id: str) -> dict:
        """Fetch a single page with full body and metadata."""
        return self._get(
            f"/content/{page_id}",
            expand="body.storage,version,space,ancestors",
        )
 
    def page_url(self, page: dict) -> str:
        """Build the browser URL for a page dict returned by the API."""
        base = self.base_url
        space = page.get("space", {}).get("key", "")
        title_encoded = page["title"].replace(" ", "+")
        return f"{base}/wiki/spaces/{space}/pages/{page['id']}/{title_encoded}"
 
    @staticmethod
    def content_checksum(html_body: str) -> str:
        """MD5 of raw HTML — used to detect whether a page has changed."""
        return hashlib.md5(html_body.encode("utf-8")).hexdigest()

Document Processor

Raw Confluence content is HTML — we strip tags with BeautifulSoup, then split into overlapping chunks so no context is lost at chunk boundaries.

app/document_processor.py

import re
from dataclasses import dataclass
 
import tiktoken
from bs4 import BeautifulSoup
 
 
CHUNK_SIZE_TOKENS = 400   # target tokens per chunk
CHUNK_OVERLAP_TOKENS = 80  # overlap to preserve sentence context
 
 
@dataclass
class DocumentChunk:
    index: int
    content: str
    token_count: int
 
 
class DocumentProcessor:
    def __init__(self) -> None:
        # cl100k_base is the tokenizer for text-embedding-3-* models
        self._encoder = tiktoken.get_encoding("cl100k_base")
 
    # ------------------------------------------------------------------
    # HTML → plain text
    # ------------------------------------------------------------------
 
    def html_to_text(self, html: str) -> str:
        """Strip all HTML tags and normalise whitespace."""
        soup = BeautifulSoup(html, "html.parser")
 
        # Remove script / style noise
        for tag in soup(["script", "style", "head"]):
            tag.decompose()
 
        text = soup.get_text(separator="\n")
        # Collapse blank lines
        text = re.sub(r"\n{3,}", "\n\n", text)
        return text.strip()
 
    # ------------------------------------------------------------------
    # Chunking
    # ------------------------------------------------------------------
 
    def _token_count(self, text: str) -> int:
        return len(self._encoder.encode(text))
 
    def chunk(self, text: str) -> list[DocumentChunk]:
        """Split text into overlapping fixed-token chunks."""
        tokens = self._encoder.encode(text)
        chunks: list[DocumentChunk] = []
        start = 0
        idx = 0
 
        while start < len(tokens):
            end = min(start + CHUNK_SIZE_TOKENS, len(tokens))
            chunk_tokens = tokens[start:end]
            chunk_text = self._encoder.decode(chunk_tokens)
            chunks.append(
                DocumentChunk(
                    index=idx,
                    content=chunk_text.strip(),
                    token_count=len(chunk_tokens),
                )
            )
            if end == len(tokens):
                break
            start += CHUNK_SIZE_TOKENS - CHUNK_OVERLAP_TOKENS
            idx += 1
 
        return chunks

Embeddings Service

The EmbeddingsService wraps all vector database operations — storing chunks, upserting pages, and running similarity searches — behind a clean interface.

app/embeddings_service.py

import os
from typing import Optional
 
import psycopg2
import psycopg2.extras
from openai import OpenAI
from pgvector.psycopg2 import register_vector
 
from .document_processor import DocumentChunk
 
EMBED_MODEL = "text-embedding-3-small"
TOP_K = 6  # chunks to retrieve per query
 
 
class EmbeddingsService:
    def __init__(self) -> None:
        self._client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
        self._conn = self._connect()
 
    # ------------------------------------------------------------------
    # Database helpers
    # ------------------------------------------------------------------
 
    def _connect(self) -> psycopg2.extensions.connection:
        conn = psycopg2.connect(os.environ["DATABASE_URL"])
        conn.autocommit = False
        register_vector(conn)
        return conn
 
    def _cursor(self):
        return self._conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
 
    # ------------------------------------------------------------------
    # Embedding generation
    # ------------------------------------------------------------------
 
    def embed(self, text: str) -> list[float]:
        """Return an embedding vector for a piece of text."""
        response = self._client.embeddings.create(
            model=EMBED_MODEL,
            input=text,
        )
        return response.data[0].embedding
 
    def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Batch embed up to 2048 texts in a single API call."""
        response = self._client.embeddings.create(
            model=EMBED_MODEL,
            input=texts,
        )
        # API returns items sorted by index
        return [item.embedding for item in sorted(response.data, key=lambda x: x.index)]
 
    # ------------------------------------------------------------------
    # Page & chunk persistence
    # ------------------------------------------------------------------
 
    def upsert_page(
        self,
        page_id: str,
        space_key: str,
        title: str,
        url: str,
        checksum: str,
    ) -> None:
        with self._cursor() as cur:
            cur.execute(
                """
                INSERT INTO confluence_pages (id, space_key, title, url, checksum, last_synced_at)
                VALUES (%s, %s, %s, %s, %s, NOW())
                ON CONFLICT (id) DO UPDATE SET
                    title = EXCLUDED.title,
                    url = EXCLUDED.url,
                    checksum = EXCLUDED.checksum,
                    last_synced_at = NOW()
                """,
                (page_id, space_key, title, url, checksum),
            )
        self._conn.commit()
 
    def delete_chunks(self, page_id: str) -> None:
        with self._cursor() as cur:
            cur.execute("DELETE FROM document_chunks WHERE page_id = %s", (page_id,))
        self._conn.commit()
 
    def store_chunks(self, page_id: str, chunks: list[DocumentChunk], embeddings: list[list[float]]) -> None:
        rows = [
            (page_id, chunk.index, chunk.content, chunk.token_count, embedding)
            for chunk, embedding in zip(chunks, embeddings)
        ]
        with self._cursor() as cur:
            psycopg2.extras.execute_values(
                cur,
                """
                INSERT INTO document_chunks (page_id, chunk_index, content, token_count, embedding)
                VALUES %s
                ON CONFLICT (page_id, chunk_index) DO UPDATE SET
                    content = EXCLUDED.content,
                    token_count = EXCLUDED.token_count,
                    embedding = EXCLUDED.embedding
                """,
                rows,
                template="(%s, %s, %s, %s, %s::vector)",
            )
        self._conn.commit()
 
    # ------------------------------------------------------------------
    # Similarity search
    # ------------------------------------------------------------------
 
    def get_page_checksum(self, page_id: str) -> Optional[str]:
        with self._cursor() as cur:
            cur.execute("SELECT checksum FROM confluence_pages WHERE id = %s", (page_id,))
            row = cur.fetchone()
        return row["checksum"] if row else None
 
    def similarity_search(self, query_embedding: list[float]) -> list[dict]:
        """Return the top-k most similar chunks with page metadata."""
        with self._cursor() as cur:
            cur.execute(
                """
                SELECT
                    dc.content,
                    dc.chunk_index,
                    cp.title,
                    cp.url,
                    cp.space_key,
                    1 - (dc.embedding <=> %s::vector) AS similarity
                FROM document_chunks dc
                JOIN confluence_pages cp ON cp.id = dc.page_id
                ORDER BY dc.embedding <=> %s::vector
                LIMIT %s
                """,
                (query_embedding, query_embedding, TOP_K),
            )
            return [dict(row) for row in cur.fetchall()]

RAG Service

The RagService wires together LangChain, the vector store, and GPT-4o-mini to produce answers with source citations.

app/rag_service.py

import os
 
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
 
from .embeddings_service import EmbeddingsService
 
SYSTEM_PROMPT = """You are a knowledgeable assistant for an engineering team.
You answer questions strictly based on the Confluence documentation provided as context.
 
Rules:
- If the context contains the answer, provide a clear, structured response.
- If the context does not contain enough information, say so honestly.
- Always cite the Confluence page title(s) you referenced.
- Keep answers concise and actionable.
- Use bullet points or numbered lists where appropriate."""
 
 
class RagService:
    def __init__(self) -> None:
        self._embeddings = EmbeddingsService()
        self._llm = ChatOpenAI(
            model="gpt-4o-mini",
            temperature=0.1,
            openai_api_key=os.environ["OPENAI_API_KEY"],
        )
 
    def query(self, question: str) -> dict:
        """
        1. Embed the question.
        2. Retrieve the top-k similar chunks.
        3. Build a context prompt.
        4. Ask GPT-4o-mini and return the answer + sources.
        """
        # Step 1 — embed question
        query_embedding = self._embeddings.embed(question)
 
        # Step 2 — retrieve context
        results = self._embeddings.similarity_search(query_embedding)
 
        if not results:
            return {
                "answer": "I couldn't find any relevant information in the knowledge base.",
                "sources": [],
            }
 
        # Step 3 — build context block
        context_parts = []
        seen_pages: dict[str, str] = {}  # title → url
 
        for r in results:
            context_parts.append(
                f"[Source: {r['title']}]\n{r['content']}"
            )
            seen_pages[r["title"]] = r["url"]
 
        context = "\n\n---\n\n".join(context_parts)
 
        # Step 4 — call LLM
        messages = [
            SystemMessage(content=SYSTEM_PROMPT),
            HumanMessage(
                content=f"Context from Confluence:\n\n{context}\n\n---\n\nQuestion: {question}"
            ),
        ]
        response = self._llm.invoke(messages)
 
        sources = [{"title": title, "url": url} for title, url in seen_pages.items()]
 
        return {
            "answer": response.content,
            "sources": sources,
            "chunks_used": len(results),
        }

FastAPI Application

The main app exposes three endpoints: /api/sync to trigger a knowledge base refresh, /api/query to ask questions, and /api/health for liveness checks.

app/main.py

import os
import time
import logging
from contextlib import asynccontextmanager
 
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
 
from .confluence_client import ConfluenceClient
from .document_processor import DocumentProcessor
from .embeddings_service import EmbeddingsService
from .rag_service import RagService
 
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
 
# ---------------------------------------------------------------------------
# Initialise shared services once at startup
# ---------------------------------------------------------------------------
 
_embeddings: EmbeddingsService
_rag: RagService
 
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    global _embeddings, _rag
    _embeddings = EmbeddingsService()
    _rag = RagService()
    yield
 
 
app = FastAPI(title="Confluence RAG API", version="1.0.0", lifespan=lifespan)
 
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
 
 
class SyncRequest(BaseModel):
    space_keys: list[str]
    force: bool = False  # if True, re-embed even unchanged pages
 
 
class QueryRequest(BaseModel):
    question: str
 
 
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
 
 
@app.get("/api/health")
def health():
    return {"status": "ok"}
 
 
@app.post("/api/sync")
def sync(req: SyncRequest):
    """
    Pull pages from one or more Confluence spaces, check freshness,
    and update the vector store for changed pages only.
    """
    confluence = ConfluenceClient()
    processor = DocumentProcessor()
    stats = {"synced": 0, "skipped": 0, "failed": 0}
 
    for space_key in req.space_keys:
        logger.info("Syncing space: %s", space_key)
        try:
            pages = confluence.get_space_pages(space_key)
        except Exception as exc:
            logger.error("Failed to fetch space %s: %s", space_key, exc)
            stats["failed"] += 1
            continue
 
        for page in pages:
            page_id = page["id"]
            html_body = page.get("body", {}).get("storage", {}).get("value", "")
            checksum = confluence.content_checksum(html_body)
 
            # Skip if the page hasn't changed (unless force=True)
            if not req.force:
                existing_checksum = _embeddings.get_page_checksum(page_id)
                if existing_checksum == checksum:
                    stats["skipped"] += 1
                    continue
 
            try:
                plain_text = processor.html_to_text(html_body)
                if not plain_text.strip():
                    continue
 
                chunks = processor.chunk(plain_text)
                if not chunks:
                    continue
 
                embeddings = _embeddings.embed_batch([c.content for c in chunks])
 
                _embeddings.upsert_page(
                    page_id=page_id,
                    space_key=space_key,
                    title=page["title"],
                    url=confluence.page_url(page),
                    checksum=checksum,
                )
                _embeddings.delete_chunks(page_id)
                _embeddings.store_chunks(page_id, chunks, embeddings)
 
                logger.info("Indexed page: %s (%d chunks)", page["title"], len(chunks))
                stats["synced"] += 1
 
            except Exception as exc:
                logger.error("Failed to index page %s: %s", page_id, exc)
                stats["failed"] += 1
 
    return {"status": "complete", **stats}
 
 
@app.post("/api/query")
def query(req: QueryRequest):
    """Answer a question using the Confluence knowledge base."""
    if not req.question.strip():
        raise HTTPException(status_code=400, detail="Question must not be empty")
 
    start = time.perf_counter()
    result = _rag.query(req.question)
    latency_ms = int((time.perf_counter() - start) * 1000)
 
    return {**result, "latency_ms": latency_ms}

Environment Configuration

.env.template

# Confluence
CONFLUENCE_BASE_URL=https://yourorg.atlassian.net
CONFLUENCE_EMAIL=you@yourcompany.com
CONFLUENCE_API_TOKEN=your_confluence_api_token
 
# OpenAI
OPENAI_API_KEY=sk-...
 
# PostgreSQL (matches docker-compose service)
DATABASE_URL=postgresql://raguser:ragpassword@db:5432/ragdb

Getting a Confluence API token: Log into id.atlassian.com, click Create API token, give it a label, and copy it. That's your CONFLUENCE_API_TOKEN.

Docker Compose Stack

One command to run the full stack: PostgreSQL with pgvector, the RAG API, and n8n for scheduling.

docker-compose.yml

version: "3.9"
 
services:
  db:
    image: pgvector/pgvector:pg16
    restart: unless-stopped
    environment:
      POSTGRES_USER: raguser
      POSTGRES_PASSWORD: ragpassword
      POSTGRES_DB: ragdb
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./sql/schema.sql:/docker-entrypoint-initdb.d/01-schema.sql
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U raguser -d ragdb"]
      interval: 10s
      timeout: 5s
      retries: 5
 
  api:
    build: .
    restart: unless-stopped
    env_file: .env
    environment:
      DATABASE_URL: postgresql://raguser:ragpassword@db:5432/ragdb
    ports:
      - "8080:8080"
    depends_on:
      db:
        condition: service_healthy
    command: uvicorn app.main:app --host 0.0.0.0 --port 8080
 
  n8n:
    image: n8nio/n8n:latest
    restart: unless-stopped
    environment:
      N8N_BASIC_AUTH_ACTIVE: "true"
      N8N_BASIC_AUTH_USER: admin
      N8N_BASIC_AUTH_PASSWORD: changeme
      WEBHOOK_URL: http://localhost:5678
    ports:
      - "5678:5678"
    volumes:
      - n8ndata:/home/node/.n8n
    depends_on:
      - api
 
volumes:
  pgdata:
  n8ndata:

Dockerfile

FROM python:3.11-slim
 
WORKDIR /app
 
# System deps for psycopg2
RUN apt-get update && apt-get install -y libpq-dev gcc && rm -rf /var/lib/apt/lists/*
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY . .
 
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

n8n Sync Workflow

Import this JSON into n8n (Workflows → Import from JSON) to get a fully configured daily sync trigger.

{
  "name": "Confluence RAG Sync",
  "nodes": [
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "field": "hours",
              "hoursInterval": 24
            }
          ]
        }
      },
      "name": "Daily Schedule",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [250, 300]
    },
    {
      "parameters": {
        "url": "http://api:8080/api/sync",
        "options": {},
        "sendBody": true,
        "bodyParameters": {
          "parameters": [
            {
              "name": "space_keys",
              "value": "={{ [\"ENG\", \"QA\", \"OPS\"] }}"
            },
            {
              "name": "force",
              "value": false
            }
          ]
        }
      },
      "name": "Sync Knowledge Base",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 3,
      "position": [500, 300]
    },
    {
      "parameters": {
        "conditions": {
          "number": [
            {
              "value1": "={{ $json.failed }}",
              "operation": "larger",
              "value2": 0
            }
          ]
        }
      },
      "name": "Any failures?",
      "type": "n8n-nodes-base.if",
      "typeVersion": 1,
      "position": [750, 300]
    },
    {
      "parameters": {
        "channel": "#ops-alerts",
        "text": "⚠️ Confluence RAG sync finished with {{ $json.failed }} failed page(s). Synced: {{ $json.synced }}, Skipped: {{ $json.skipped }}",
        "authentication": "oAuth2"
      },
      "name": "Slack Alert",
      "type": "n8n-nodes-base.slack",
      "typeVersion": 1,
      "position": [1000, 200]
    }
  ],
  "connections": {
    "Daily Schedule": {
      "main": [[{ "node": "Sync Knowledge Base", "type": "main", "index": 0 }]]
    },
    "Sync Knowledge Base": {
      "main": [[{ "node": "Any failures?", "type": "main", "index": 0 }]]
    },
    "Any failures?": {
      "main": [[{ "node": "Slack Alert", "type": "main", "index": 0 }], []]
    }
  }
}

Replace "ENG", "QA", "OPS" with your actual Confluence space keys, and configure your Slack credentials in n8n.

Quick Start: Running It All

# 1. Clone and configure
cp .env.template .env
# Fill in CONFLUENCE_BASE_URL, CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN, OPENAI_API_KEY
 
# 2. Start the full stack
docker compose up -d
 
# 3. Trigger an initial sync (replace space keys with yours)
curl -s -X POST http://localhost:8080/api/sync \
  -H "Content-Type: application/json" \
  -d '{"space_keys": ["ENG"], "force": true}' | python -m json.tool
 
# Expected output:
# {
#   "status": "complete",
#   "synced": 47,
#   "skipped": 0,
#   "failed": 0
# }
 
# 4. Ask a question!
curl -s -X POST http://localhost:8080/api/query \
  -H "Content-Type: application/json" \
  -d '{"question": "How do I set up a new test environment?"}' | python -m json.tool

Testing & Sample Responses

Health Check

curl http://localhost:8080/api/health
# {"status": "ok"}

Sync Response

{
  "status": "complete",
  "synced": 12,
  "skipped": 35,
  "failed": 0
}

skipped pages are ones whose MD5 checksum matched what was already in the database — we didn't waste API calls or embedding credits on them. ✅

Query Response

{
  "answer": "To set up a new test environment, follow these steps:\n\n1. **Provision the CIF** — use the MCP automation script at `/tools/provision-cif.sh` with the target environment flag.\n2. **Assign PID** — run the PID assignment workflow documented on the 'PID Management' page.\n3. **Verify MCP connectivity** — check the digital MCP setup checklist before running regression tests.\n\nSources: 'Test Environment Setup Guide', 'PID Management', 'MCP Automation Runbook'",
  "sources": [
    {
      "title": "Test Environment Setup Guide",
      "url": "https://yourorg.atlassian.net/wiki/spaces/QA/pages/123456/Test+Environment+Setup+Guide"
    },
    {
      "title": "PID Management",
      "url": "https://yourorg.atlassian.net/wiki/spaces/ENG/pages/789012/PID+Management"
    },
    {
      "title": "MCP Automation Runbook",
      "url": "https://yourorg.atlassian.net/wiki/spaces/OPS/pages/345678/MCP+Automation+Runbook"
    }
  ],
  "chunks_used": 6,
  "latency_ms": 1842
}

Every answer includes clickable source links — your team can verify the AI's response against the original page in seconds.

Freshness: How the Change Detection Works

The sync endpoint computes an MD5 checksum of each page's raw HTML body and compares it to the stored value in confluence_pages.checksum:

# New checksum from API response
checksum = confluence.content_checksum(html_body)
 
# Stored checksum from our DB
existing_checksum = _embeddings.get_page_checksum(page_id)
 
if existing_checksum == checksum:
    # Page hasn't changed — skip embedding entirely
    stats["skipped"] += 1
    continue

This means:

  • Zero unnecessary embedding API calls on unchanged pages
  • Automatic re-indexing whenever a Confluence author edits a page
  • Cost stays low — a 100-page space with 5 changed pages only bills for 5 pages of embeddings

Production Considerations

1. Index Tuning

The ivfflat index works great up to ~1M vectors. For larger knowledge bases, switch to hnsw:

CREATE INDEX idx_chunks_embedding_hnsw
    ON document_chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

2. Rate Limiting

The OpenAI Embeddings API has a 1M tokens/min limit on tier 1. For large initial syncs, batch pages and add a small sleep between batches:

import time
 
BATCH_SIZE = 20
for i in range(0, len(chunks), BATCH_SIZE):
    batch = chunks[i : i + BATCH_SIZE]
    embeddings = _embeddings.embed_batch([c.content for c in batch])
    # ... store embeddings ...
    time.sleep(0.5)  # ~500ms between batches

3. Query Logging

Log every query to the query_log table for analytics:

cur.execute(
    "INSERT INTO query_log (question, answer, sources, latency_ms) VALUES (%s, %s, %s, %s)",
    (question, answer, json.dumps(sources), latency_ms),
)

This tells you what your team asks most — great input for improving documentation coverage!

4. Authentication

For production deployments, add API key auth to FastAPI:

from fastapi.security import APIKeyHeader
from fastapi import Security, Depends
 
api_key_header = APIKeyHeader(name="X-API-Key")
 
async def verify_api_key(key: str = Security(api_key_header)):
    if key != os.environ["API_KEY"]:
        raise HTTPException(status_code=403, detail="Invalid API key")

Key Takeaways

  1. Freshness via checksums — MD5 comparison before embedding is cheap and effective; don't re-index what hasn't changed.
  2. pgvector is production-ready — it's a PostgreSQL extension, not a separate service, so you operate it with the same tools you already know.
  3. Chunk overlap matters — 80-token overlap between 400-token chunks prevents context loss at boundaries without dramatically increasing storage.
  4. Cite your sources — an AI answer without source links is untrustworthy; always return the Confluence URLs alongside the answer.
  5. n8n makes scheduling painless — trigger sync on a 24-hour schedule with Slack alerting on failures, no custom cron infra needed.

Wrapping Up

We now have a complete, production-ready RAG system that turns your Confluence spaces into an AI-queryable knowledge base. The most satisfying part? The freshness detection — the system is smart enough to only do work when pages actually change, keeping costs low even for large spaces.

From "I know it's somewhere in Confluence" to a 15-second AI answer with clickable source links — give it a try with your own team's documentation!

Drop a comment below if you run into anything, or if you extend this with streaming responses or a chat UI. Happy building! 🚀


Full source code is available in the linked GitHub repository. The Docker Compose setup means you're one docker compose up -d away from a running system.

Share this article

Enjoying this post?

Don't miss out 😉. Get an email whenever I post, no spam.

Subscribe Now