1. Behavior and Context
Within the framework, this store acts as a high-throughput data repository.
- Document Nesting: It naturally maps the framework's Pydantic models to BSON documents, allowing complex structures like tool_traces and llm_calls to be stored as nested arrays within a single record.
- Indexing Strategy: It automatically provisions compound and sparse indexes to ensure that operations like session listing and chronological context retrieval remain performant even as the database grows to millions of records.
- Asynchronous I/O: Fully non-blocking communication ensures that database operations do not stall the agent's reasoning loop or sandbox executions.
- Manual Referential Integrity: Since MongoDB lacks native foreign-key cascades, this store explicitly manages the cleanup of associated messages and traces during conversation deletion.
2. Purpose
- Production Scalability: Designed to handle massive datasets across distributed MongoDB clusters.
- Flexible Metadata: Optimized for storing and querying the evolving metadata produced by the MessageEnhancer and ScriptGenerator.
- Advanced Observability: Provides the indexing foundation required for administrative dashboards to perform range queries on token usage, latency, and intent patterns.
- Efficient Context Windows: Supports high-speed retrieval of chronological message slices for LLM prompt construction.
3. High-Level API (Configuration)
The MongoDB Message Store is initialized through the MessageStore.initialize factory by specifying the mongodb storage type.
Example: Initializing the MongoDB Store
from jazzmine.core.message_store import MessageStore
config = {
"storage": "mongodb",
"uri": "mongodb://admin:password@localhost:27017",
"db": "agent_data_prod"
}
# The factory initializes the client and triggers collection setup
store = await MessageStore.initialize(config)4. Detailed Functionality
_init() [Internal]
Functionality: Bootstraps the database connection and ensures all required collections and indexes are provisioned.
How it works:
- Connects to the MongoDB instance using the provided uri.
- Creates three collections: conversations, messages, and turn_traces.
- Provisions unique indexes on id fields.
- Creates compound indexes for performance (e.g., user_id + last_updated_at for conversation lists).
- Provisions sparse indexes for the is_flagged attribute.
get_immediate_context(conversation_id, n)
Functionality: Retrieves a chronological slice of the most recent non-flagged messages for an agent's reasoning turn.
Parameters:
- conversation_id (str): The unique identifier for the session.
- n (int): The maximum number of messages to retrieve.
How it works:
- Queries the messages collection for the specific conversation_id.
- Filters out any messages where is_flagged is True.
- Sorts the results by timestamp in descending order and applies the limit n.
- Reverses the resulting list in memory to return the messages in ascending (chronological) order.
delete_conversation(conversation_id)
Functionality: Permanently removes a chat session and all associated data.
How it works: Since MongoDB does not support relational cascades, this method performs a manual cleanup:
- Finds all id values for messages belonging to the conversation.
- Deletes all turn_traces that are linked to those message IDs.
- Deletes all messages in the conversation.
- Deletes the top-level conversation record.
store_messages(messages)
Functionality: Performs high-speed bulk insertion of multiple message records.
How it works: Uses the bulk_write API with ReplaceOne operations and upsert=True. This allows the store to handle many updates (e.g., during conversation migration or summarization tagging) in a single round-trip to the database.
Observability Queries
| Query Target | Indexing Applied |
|---|---|
| Audit Trail | Compound index on (conversation_id, turn_number) for sequential tracing. |
| Cost Monitoring | Index on total_tokens for aggregate usage analysis. |
| Latency Analysis | Index on total_latency_ms to identify performance bottlenecks. |
| Intent Trends | Index on detected_intent for high-level usage analytics. |
5. Error Handling
- Connection Timeouts: If the provided uri is unreachable, the motor driver will raise a ServerSelectionTimeoutError.
- Serialization Errors: Data is validated against Pydantic models before insertion; invalid data will raise a ValidationError before reaching the database.
- Corrupted Documents: If get_message_by_id encounters a document that does not match the schema, it will raise a validation error, indicating the record should be audited or removed.
6. Remarks
- Resource Management: Always call await store.close() during application shutdown to gracefully terminate the MongoDB connection pool.
- Sparse Indexing: The use of sparse=True on the is_flagged index ensures that the index size remains minimal for large datasets where most messages are clean.
- Atomic Operations: While individual document updates are atomic, the cascading logic in delete_conversation is a multi-step process. In distributed environments, consider this when designing cleanup tasks.