Message Store
Core reference

PostgresMessageStore

The PostgresMessageStore is the production-grade, relational implementation of the MessageStore interface. Built on the high-performance asyncpg library, it leverages PostgreSQL's specialized JSONB data type to store structured chat history and complex observability traces. This store is designed for environments requiring strong data consistency, relational integrity, and advanced analytical querying capabilities.

1. Behavior and Context

In a jazzmine deployment, the PostgresMessageStore acts as the "System of Record."

  • Relational Schema: It enforces strict schemas for core metadata while allowing flexibility for nested data (like tool traces) using JSONB.
  • Performance Indexing: It utilizes B-tree indexes for fast lookups by ID/Timestamp and GIN (Generalized Inverted Index) for "containment" searches within JSON columns.
  • Atomic Persistence: Every operation is executed as a standard SQL command, ensuring ACID compliance.
  • Connection Pooling: It manages a pool of asynchronous connections to handle high-concurrency agent environments without overwhelming the database.

2. Purpose

  • Production Scaling: Designed to handle millions of messages across thousands of concurrent users.
  • Deep Auditing: Enables complex SQL queries to analyze agent behavior (e.g., "Find all turns where token usage exceeded X").
  • Referential Integrity: Uses standard SQL Foreign Keys with ON DELETE CASCADE to ensure that deleting a conversation cleanly wipes all associated messages and traces.
  • Hybrid Search Support: Provides the structured data necessary for the MessageEnhancer to perform cross-turn coreference resolution.

3. High-Level API (Configuration)

The PostgresMessageStore is initialized via the MessageStore.initialize factory.

Example: Initializing the Postgres Store

python
from jazzmine.core.message_store import MessageStore

config = {
    "storage": "postgres",
    "dsn": "postgresql://postgres:password@localhost:5432/jazzmine_db",
    "pool_min": 5,
    "pool_max": 20
}

# The factory handles the internal call to _init()
store = await MessageStore.initialize(config)

4. Detailed Functionality

_init() [Internal]

Functionality: Bootstrap method that establishes the connection pool and executes DDL (Data Definition Language) to create tables and indexes if they do not exist.

How it works:

  • Creates conversations table.
  • Creates messages table (Foreign Key to conversations).
  • Creates turn_traces table (Foreign Key to messages).
  • Creates a suite of indexes, including specialized GIN indexes for searching inside tool_traces, llm_calls, and errors.

get_immediate_context(conversation_id, n)

Functionality: Fetches the n most recent non-flagged messages to populate the LLM's context window.

How it works: This method uses a Nested "Flip" Query:

  1. Inner Query: Filters by conversation_id, excludes is_flagged messages, and sorts by timestamp DESC with a LIMIT n. This grabs the n latest messages but in reverse order.
  2. Outer Query: Takes those n messages and re-sorts them by timestamp ASC.
  3. Result: The LLM receives the most recent history in the correct chronological order.

store_message(message) / store_messages(list)

Functionality: Persists Message models using "Upsert" logic.

How it works: Uses INSERT ... ON CONFLICT (id) DO UPDATE. This allows the MessageEnhancer or Summarizer to update existing messages (adding intent, sentiment, or episode_id) without creating duplicate rows. The store_messages variant uses executemany for high-efficiency bulk insertion.


store_turn_trace(trace)

Functionality: Persists the exhaustive TurnTrace model.

How it works:

  • Serializes nested Pydantic objects (like ToolTrace or LLMCallRecord) into JSON strings.
  • Maps these strings to JSONB columns in the turn_traces table.
  • Because of the unique constraint on message_id, each turn has exactly one trace.

delete_conversation(conversation_id)

Functionality: Permanently removes a session and all its data.

How it works: Executes a single DELETE on the conversations table. Since messages and turn_traces are defined with ON DELETE CASCADE, PostgreSQL handles the recursive cleanup automatically at the database engine level.


5. Indexing Strategy Reference

The store defines several critical indexes for performance:

Index NameTypeDescription
idx_msg_convB‑treeOptimizes fetching history for a specific chat session.
idx_tt_agent_timeB‑treePowers dashboards showing recent activity for an agent.
idx_tt_tool_ginGINAllows searching for specific tools used inside the JSON data.
idx_msg_flaggedPartialHigh‑speed lookup for messages requiring moderation.

6. Error Handling

  • ConnectionError: Raised during _init if the dsn (Data Source Name) is invalid or the database is unreachable.
  • IntegrityError: While most operations use Upserts, attempting to store a message for a non-existent conversation_id will trigger a foreign‑key violation.
  • Serialization Failures: If a Pydantic model contains a type that cannot be serialized to JSON, an error is raised before the database is reached.

7. Remarks

  • Performance: The binary protocol used by asyncpg makes this implementation significantly faster than standard psycopg2 or high‑level ORMs.
  • Data Size: Since turn_traces can become quite large (especially with multiple code‑generation retries), it is recommended to monitor disk usage on the JSONB columns.
  • Compatibility: This class is tested against PostgreSQL 12+. Versions below 12 may not support all the JSONB features used.