Message Store
Core reference

MongoDB Message Store

The MongoDB Message Store is a robust, horizontally scalable implementation of the MessageStore interface. Built on the asynchronous motor driver, it leverages MongoDB's document-oriented architecture to store chat histories and complex observability data. It is ideal for high-volume production environments that require horizontal scaling, flexible metadata storage, and high-performance indexing for complex analytical queries.

1. Behavior and Context

Within the framework, this store acts as a high-throughput data repository.

  • Document Nesting: It naturally maps the framework's Pydantic models to BSON documents, allowing complex structures like tool_traces and llm_calls to be stored as nested arrays within a single record.
  • Indexing Strategy: It automatically provisions compound and sparse indexes to ensure that operations like session listing and chronological context retrieval remain performant even as the database grows to millions of records.
  • Asynchronous I/O: Fully non-blocking communication ensures that database operations do not stall the agent's reasoning loop or sandbox executions.
  • Manual Referential Integrity: Since MongoDB lacks native foreign-key cascades, this store explicitly manages the cleanup of associated messages and traces during conversation deletion.

2. Purpose

  • Production Scalability: Designed to handle massive datasets across distributed MongoDB clusters.
  • Flexible Metadata: Optimized for storing and querying the evolving metadata produced by the MessageEnhancer and ScriptGenerator.
  • Advanced Observability: Provides the indexing foundation required for administrative dashboards to perform range queries on token usage, latency, and intent patterns.
  • Efficient Context Windows: Supports high-speed retrieval of chronological message slices for LLM prompt construction.

3. High-Level API (Configuration)

The MongoDB Message Store is initialized through the MessageStore.initialize factory by specifying the mongodb storage type.

Example: Initializing the MongoDB Store

python
from jazzmine.core.message_store import MessageStore

config = {
    "storage": "mongodb",
    "uri": "mongodb://admin:password@localhost:27017",
    "db": "agent_data_prod"
}

# The factory initializes the client and triggers collection setup
store = await MessageStore.initialize(config)

4. Detailed Functionality

_init() [Internal]

Functionality: Bootstraps the database connection and ensures all required collections and indexes are provisioned.

How it works:

  • Connects to the MongoDB instance using the provided uri.
  • Creates three collections: conversations, messages, and turn_traces.
  • Provisions unique indexes on id fields.
  • Creates compound indexes for performance (e.g., user_id + last_updated_at for conversation lists).
  • Provisions sparse indexes for the is_flagged attribute.

get_immediate_context(conversation_id, n)

Functionality: Retrieves a chronological slice of the most recent non-flagged messages for an agent's reasoning turn.

Parameters:

  • conversation_id (str): The unique identifier for the session.
  • n (int): The maximum number of messages to retrieve.

How it works:

  1. Queries the messages collection for the specific conversation_id.
  2. Filters out any messages where is_flagged is True.
  3. Sorts the results by timestamp in descending order and applies the limit n.
  4. Reverses the resulting list in memory to return the messages in ascending (chronological) order.

delete_conversation(conversation_id)

Functionality: Permanently removes a chat session and all associated data.

How it works: Since MongoDB does not support relational cascades, this method performs a manual cleanup:

  1. Finds all id values for messages belonging to the conversation.
  2. Deletes all turn_traces that are linked to those message IDs.
  3. Deletes all messages in the conversation.
  4. Deletes the top-level conversation record.

store_messages(messages)

Functionality: Performs high-speed bulk insertion of multiple message records.

How it works: Uses the bulk_write API with ReplaceOne operations and upsert=True. This allows the store to handle many updates (e.g., during conversation migration or summarization tagging) in a single round-trip to the database.


Observability Queries

Query TargetIndexing Applied
Audit TrailCompound index on (conversation_id, turn_number) for sequential tracing.
Cost MonitoringIndex on total_tokens for aggregate usage analysis.
Latency AnalysisIndex on total_latency_ms to identify performance bottlenecks.
Intent TrendsIndex on detected_intent for high-level usage analytics.

5. Error Handling

  • Connection Timeouts: If the provided uri is unreachable, the motor driver will raise a ServerSelectionTimeoutError.
  • Serialization Errors: Data is validated against Pydantic models before insertion; invalid data will raise a ValidationError before reaching the database.
  • Corrupted Documents: If get_message_by_id encounters a document that does not match the schema, it will raise a validation error, indicating the record should be audited or removed.

6. Remarks

  • Resource Management: Always call await store.close() during application shutdown to gracefully terminate the MongoDB connection pool.
  • Sparse Indexing: The use of sparse=True on the is_flagged index ensures that the index size remains minimal for large datasets where most messages are clean.
  • Atomic Operations: While individual document updates are atomic, the cascading logic in delete_conversation is a multi-step process. In distributed environments, consider this when designing cleanup tasks.