Security System: Base Moderator
1. Behavior and Context
In high-concurrency conversational AI environments, many separate users might send messages at the exact same millisecond. Processing these inputs sequentially is slow, and processing them concurrently in separate threads can quickly lead to GPU Out-of-Memory (OOM) crashes.
The BaseModerator solves this by acting as a smart, async-to-sync bridge.
Key behaviors:
- Dynamic Micro-Batching: It intercepts incoming asynchronous requests (classify_async) and holds them in a lightweight queue for a tiny fraction of a second (e.g., 50ms). It groups these individual requests into a single optimized batch to feed the GPU, drastically increasing throughput.
- Thread Delegation: If the dynamic batching worker is not explicitly started, the BaseModerator gracefully falls back to wrapping synchronous classify calls in asyncio.to_thread, preventing the main event loop from blocking.
- Hardware Resilience: It features an internal, battle-tested model execution loop (_run_model_batch) that actively monitors PyTorch memory allocations. If an unexpected GPU memory spike causes an OOM exception, it clears the CUDA cache, pauses using exponential backoff, and safely retries the batch without dropping the user's request.
2. Purpose
- API Standardization: Enforces a unified contract (classify, classify_batch) across all security evaluation models in the Jazzmine ecosystem.
- Resource Protection: Prevents agent concurrency spikes from overwhelming local compute hardware.
- Throughput Maximization: Turns isolated, inefficient single-text inferences into highly optimized, vectorized GPU operations.
- Lifecycle Management: Provides safe startup and teardown routines (draining queues, cancelling tasks) for background model workers.
3. High-Level API & Examples
Since BaseModerator is an abstract class, it must be inherited. Below is an example of how a custom text classifier would inherit from it, followed by an example of managing the async lifecycle.
Example 1: Implementing a Custom Moderator
from typing import Tuple, List
from jazzmine.security.base_moderator import BaseModerator
class MyCustomModerator(BaseModerator):
def __init__(self, logger=None):
super().__init__(logger=logger)
# Load your custom models here
def classify(self, text: str) -> Tuple[str, float]:
# Sync logic for a single text
return ("LABEL_SAFE", 0.99)
def classify_batch(self, texts: List[str], batch_size: int = 32) -> List[Tuple[str, float]]:
# Sync logic for batched texts
return [("LABEL_SAFE", 0.99) for _ in texts]Example 2: Managing the Batch Worker Lifecycle
When using a derived class (like JazzmineInputModerator) in an async web server or agent loop, you manage the BaseModerator's queue system like this:
import asyncio
from jazzmine.security.input_moderator import JazzmineInputModerator
async def agent_lifecycle():
mod = JazzmineInputModerator()
# 1. Start the background queue worker
# Groups requests arriving within 50ms, up to a maximum of 16 at a time.
await mod.start_batch_worker(max_batch_size=16, max_wait_ms=50, queue_maxsize=1000)
try:
# 2. Agent receives highly concurrent requests
# These look independent, but the BaseModerator merges them under the hood
task1 = mod.classify_async("User message 1")
task2 = mod.classify_async("User message 2")
results = await asyncio.gather(task1, task2)
print(results)
finally:
# 3. Graceful shutdown. `drain=True` ensures pending queue items are processed.
await mod.stop_batch_worker(drain=True)
asyncio.run(agent_lifecycle())5. Detailed Class Functionality
BaseModerator [Abstract Main Class]
classify(text: str) -> Tuple[str, float] [Abstract]
Must be implemented by the subclass. Represents the synchronous logic to evaluate a single string.
classify_batch(texts: List[str], batch_size: int = 32) -> List[Tuple[str, float]] [Abstract]
Must be implemented by the subclass. Represents the synchronous logic to evaluate an array of strings optimally.
async classify_async(text: str, timeout: Optional[float] = None) -> Tuple[str, float]
- Parameters:
- text (str): Text to evaluate.
- timeout (float): Maximum seconds to wait for a result before aborting.
- How it works:
- If the batch worker is running, it creates an asyncio.Future, places the text payload onto the internal _batch_queue, and awaits the future.
- If the batch worker is not running, it dynamically falls back to running the synchronous classify method via asyncio.to_thread().
async start_batch_worker(max_batch_size: int = 32, max_wait_ms: int = 50, queue_maxsize: int = 1000) -> None
- Parameters:
- max_batch_size (int): The maximum number of payloads to pop off the queue before executing the ML model.
- max_wait_ms (int): The maximum time to wait for the batch to fill up. If time expires, it executes whatever is currently in the batch.
- queue_maxsize (int): Maximum number of unhandled requests that can wait in memory. Applies backpressure if full.
- How it works: Initializes an asyncio.Queue and spawns a background asyncio.Task (_batch_worker_loop) that constantly consumes from the queue and triggers classify_batch.
async stop_batch_worker(drain: bool = True) -> None
- Parameters:
- drain (bool): If True, the worker finishes processing all items currently in the queue before shutting down. If False, it immediately cancels pending futures, causing them to raise a ModeratorError.
_run_model_batch(...) [Internal Utility]
- Purpose: A highly robust PyTorch inference loop provided for subclasses to use inside their classify_batch implementation.
- How it works:
- Wraps raw text in a custom _TextDataset and a PyTorch DataLoader (ensuring deterministic ordering).
- Measures CUDA memory before execution.
- Executes model(**enc).
- Parses out.logits into id2label strings and softmax probability scores.
- Uses an internal retry loop. If a torch.cuda.OutOfMemoryError is caught, it triggers torch.cuda.empty_cache() and sleeps using exponential backoff before retrying the batch.
6. Error Handling
- Queue Backpressure (ModeratorError): If the internal queue reaches queue_maxsize, classify_async immediately raises a ModeratorError("Batch queue full") to protect memory. The application should handle this by slowing down traffic (backpressure).
- Timeouts (ModeratorError): If the provided timeout threshold is crossed while a request is waiting in the queue or processing, the future is cancelled and an error is raised.
- OOM Exhaustion: The _run_model_batch method will attempt to recover from GPU memory spikes. If it fails consecutively beyond max_retries (default 3), it bubbles up a ModeratorError("OOM during inference after retries").
- Worker Shutdown Drops: If stop_batch_worker(drain=False) is called, any requests stuck in the queue will have their futures rejected with a ModeratorError("Batch worker stopped before processing request").
7. Remarks
Correlated Logging
The BaseModerator makes heavy use of structured logging via the injected logger. It automatically records context variables like moderator_class. Internal methods merge this context dynamically, meaning logs generated during classify_async or _run_model_batch natively include queue depths, payload sizes, and exact memory deltas without manual instrumentation.
Async/Sync Abstraction
Because the framework supports both blocking scripts and asynchronous chat loops, the BaseModerator's design safely supports both paradigms. Subclasses don't need to rewrite async PyTorch inference; they only write standard synchronous PyTorch code, and BaseModerator handles the complex asynchronous orchestration transparently.