1. Behavior and Context
In a jazzmine deployment, the PostgresMessageStore acts as the "System of Record."
- Relational Schema: It enforces strict schemas for core metadata while allowing flexibility for nested data (like tool traces) using JSONB.
- Performance Indexing: It utilizes B-tree indexes for fast lookups by ID/Timestamp and GIN (Generalized Inverted Index) for "containment" searches within JSON columns.
- Atomic Persistence: Every operation is executed as a standard SQL command, ensuring ACID compliance.
- Connection Pooling: It manages a pool of asynchronous connections to handle high-concurrency agent environments without overwhelming the database.
2. Purpose
- Production Scaling: Designed to handle millions of messages across thousands of concurrent users.
- Deep Auditing: Enables complex SQL queries to analyze agent behavior (e.g., "Find all turns where token usage exceeded X").
- Referential Integrity: Uses standard SQL Foreign Keys with ON DELETE CASCADE to ensure that deleting a conversation cleanly wipes all associated messages and traces.
- Hybrid Search Support: Provides the structured data necessary for the MessageEnhancer to perform cross-turn coreference resolution.
3. High-Level API (Configuration)
The PostgresMessageStore is initialized via the MessageStore.initialize factory.
Example: Initializing the Postgres Store
from jazzmine.core.message_store import MessageStore
config = {
"storage": "postgres",
"dsn": "postgresql://postgres:password@localhost:5432/jazzmine_db",
"pool_min": 5,
"pool_max": 20
}
# The factory handles the internal call to _init()
store = await MessageStore.initialize(config)4. Detailed Functionality
_init() [Internal]
Functionality: Bootstrap method that establishes the connection pool and executes DDL (Data Definition Language) to create tables and indexes if they do not exist.
How it works:
- Creates conversations table.
- Creates messages table (Foreign Key to conversations).
- Creates turn_traces table (Foreign Key to messages).
- Creates a suite of indexes, including specialized GIN indexes for searching inside tool_traces, llm_calls, and errors.
get_immediate_context(conversation_id, n)
Functionality: Fetches the n most recent non-flagged messages to populate the LLM's context window.
How it works: This method uses a Nested "Flip" Query:
- Inner Query: Filters by conversation_id, excludes is_flagged messages, and sorts by timestamp DESC with a LIMIT n. This grabs the n latest messages but in reverse order.
- Outer Query: Takes those n messages and re-sorts them by timestamp ASC.
- Result: The LLM receives the most recent history in the correct chronological order.
store_message(message) / store_messages(list)
Functionality: Persists Message models using "Upsert" logic.
How it works: Uses INSERT ... ON CONFLICT (id) DO UPDATE. This allows the MessageEnhancer or Summarizer to update existing messages (adding intent, sentiment, or episode_id) without creating duplicate rows. The store_messages variant uses executemany for high-efficiency bulk insertion.
store_turn_trace(trace)
Functionality: Persists the exhaustive TurnTrace model.
How it works:
- Serializes nested Pydantic objects (like ToolTrace or LLMCallRecord) into JSON strings.
- Maps these strings to JSONB columns in the turn_traces table.
- Because of the unique constraint on message_id, each turn has exactly one trace.
delete_conversation(conversation_id)
Functionality: Permanently removes a session and all its data.
How it works: Executes a single DELETE on the conversations table. Since messages and turn_traces are defined with ON DELETE CASCADE, PostgreSQL handles the recursive cleanup automatically at the database engine level.
5. Indexing Strategy Reference
The store defines several critical indexes for performance:
| Index Name | Type | Description |
|---|---|---|
| idx_msg_conv | B‑tree | Optimizes fetching history for a specific chat session. |
| idx_tt_agent_time | B‑tree | Powers dashboards showing recent activity for an agent. |
| idx_tt_tool_gin | GIN | Allows searching for specific tools used inside the JSON data. |
| idx_msg_flagged | Partial | High‑speed lookup for messages requiring moderation. |
6. Error Handling
- ConnectionError: Raised during _init if the dsn (Data Source Name) is invalid or the database is unreachable.
- IntegrityError: While most operations use Upserts, attempting to store a message for a non-existent conversation_id will trigger a foreign‑key violation.
- Serialization Failures: If a Pydantic model contains a type that cannot be serialized to JSON, an error is raised before the database is reached.
7. Remarks
- Performance: The binary protocol used by asyncpg makes this implementation significantly faster than standard psycopg2 or high‑level ORMs.
- Data Size: Since turn_traces can become quite large (especially with multiple code‑generation retries), it is recommended to monitor disk usage on the JSONB columns.
- Compatibility: This class is tested against PostgreSQL 12+. Versions below 12 may not support all the JSONB features used.