1. Behavior and Context
The Embedder is designed for high-concurrency environments. It functions as a thread-safe service (managed via Arc<Embedder>) that abstracts the complexity of two distinct backends:
- Local Backend: Executes transformer models (like BGE or MiniLM) locally using the ort (ONNX Runtime). It features specific optimizations for CPU inference, such as SIMD-accelerated mean pooling and a thread-local scratch buffer to minimize heap allocations.
- Remote Backend: A universal async adapter that communicates with cloud providers (OpenAI, Anthropic, Gemini, Cohere, etc.). It uses an asynchronous reqwest client to ensure the system remains non-blocking during network round-trips.
The component features a Dedicated Batcher Runtime. This background thread uses a single-threaded Tokio runtime to aggregate requests, applying priority scheduling and zero-CPU idle waiting to maximize throughput while minimizing latency.
2. Purpose
- Hybrid Vector Generation: Producing both dense f32 embeddings and BM25-style term frequency vectors.
- Intelligent Batching: Automatically grouping concurrent requests into optimal batch sizes for the underlying hardware or API.
- Priority Orchestration: Ensuring user-facing queries (High) skip the queue ahead of background ingestion tasks (Normal).
- Lock-Free Observability: Tracking performance metrics (cache hits, errors, latencies) using atomic counters to prevent thread contention.
3. High-Level API (Rust)
Example: Initializing a Local Backend
use crate::embed::{Embedder, LocalConfig};
// 1. Initialize instance with tokenizer and constraints
let embedder = Embedder::new(
"path/to/tokenizer.json".to_string(),
384, // Hidden size (e.g., for MiniLM)
16 // Max batch size
).unwrap();
// 2. Configure Local ONNX (Automatic path resolution for int8/fp32)
embedder.setup_local(LocalConfig {
model_dir: "./models/bge-small".to_string(),
quantized: true, // Use INT8 optimized model
}).expect("Failed to load ONNX model");
// 3. Generate an embedding
let vector = embedder.embed("How do I reset my password?").await.unwrap();4. Detailed Functionality
new(tokenizer_path, hidden_size, max_batch)
Functionality: Initializes the internal state, loads the tokenizer, and spawns the embedder‑batcher OS thread.
- hidden_size: Must be between 1 and 4096.
- max_batch: Must be at least 2 for efficient scheduling.
setup_local(config)
Functionality: Configures the local ONNX session.
- Safety Fix: Specifically disables ORT’s memory‑pattern optimization (with_memory_pattern(false)). This prevents crashes when the session encounters variable batch sizes (e.g., switching from a batch of 1 to a batch of 8).
- Warm‑up: Automatically performs a multi‑item “warm‑up” inference to prime the memory allocator.
setup_remote(config)
Functionality: Configures the asynchronous HTTP client for cloud providers.
- Supported Providers: OpenAI, AzureOpenAI, Gemini, Cohere, HuggingFace, Mistral, Together, VoyageAI, JinaAI, Nomic, OpenRouter, DeepInfra.
embed(text) / embed_high_priority(text)
Functionality: Entry points for single‑string vectorization.
- Caching: Uses a 1,000‑entry LRU cache. Hits are returned in O(1) without reaching the batcher.
- Async Handshake: Uses a oneshot channel to receive the result from the background batching thread.
embed_batch(...) / embed_batch_high_priority(...)
Functionality: Optimized entry points for multiple strings.
- Efficiency: Acquires the cache lock only once per batch to check for hits, rather than N times, significantly reducing overhead for large ingestions.
compute_sparse_vector(text)
Functionality: Synchronous term‑frequency counter for BM25 search.
- Mechanism: Tokenizes the text and generates a sorted pair of (indices, values). This is a CPU‑bound operation and does not require the ONNX session or a network call.
get_metrics()
Functionality: Returns a MetricsSnapshot containing total_requests, total_batches, cache_hits, etc.
- Atomic: Uses Ordering::Relaxed loads, ensuring metric collection never slows down the embedding pipeline.
5. Optimization Details
The Batcher Loop
The background batcher utilizes a “Block‑Wait” strategy:
- Idle: The thread consumes zero CPU while waiting for the first request via .recv().await.
- Collection: Once an item arrives, it performs a non‑blocking try_recv drain to group any items that arrived concurrently (e.g., from a tokio::join!).
- Dispatch: Flushes High priority batches immediately, followed by Normal priority batches in the same cycle.
Mean Pooling & Normalization
For local ONNX models, the Embedder performs:
- Masked Mean Pooling: Averages hidden states while ignoring padding tokens.
- L2 Normalization: Scales the result to unit length, making it compatible with Cosine distance metrics.
- Scratch Buffer: Uses a thread_local scratch vector to reuse memory across inferences, preventing expensive re‑allocations on the hot path.
6. Error Handling
- API Error: Remote backends return a detailed string including the HTTP status and raw body for debugging provider‑side issues.
- Timeout: If a request remains in the batcher queue for more than 60 seconds, it returns a TimeoutError.
- Shutdown: If the Embedder is dropped, the batcher thread exits cleanly, and any pending requests receive an “Embedder is shut down” error.