1. Behavior and Context
The sink performs lazy client initialization:
- sync path uses redis.Redis
- async path uses redis.asyncio.from_url
Messages are normalized into string-key/string-value maps suitable for Redis Stream entries.
2. Purpose
- Low-Latency Ingestion: Use Redis as a lightweight centralized log stream.
- Pipeline Compatibility: Stream records can be consumed by workers and ETL jobs.
- Dual Runtime Support: Works in sync and async execution models.
3. High-Level API & Examples
Example 1: Sync Logging to Redis Stream
python
from jazzmine.logging.sinks.redis_sink import RedisLogger
sink = RedisLogger(
sink_config={
"type": "redis",
"host": "localhost",
"port": 6379,
"db": 0,
"stream_key": "logs:api",
},
logger_name="api",
level="INFO",
use_json=True,
)
sink.initialize()
sink.info("cache miss", key="user:14")
sink.shutdown()Example 2: Async Logging
python
import asyncio
async def write_async(sink):
await sink.ainfo("response sent", status=200, latency_ms=18)4. Detailed Class Functionality
initialize()
- Imports redis and redis.asyncio lazily.
- Raises LoggerDependencyError if package is missing.
- Reads host/port/db/password/stream key config.
- Marks sink initialized.
_format_message(level, msg, **kwargs)
Builds stream field dict with:
- timestamp
- level
- logger
- message
- additional kwargs
Non-string kwargs are JSON-serialized to preserve structure in string field values.
log(...)
- Creates sync client lazily if absent.
- Writes event with xadd(stream_key, payload).
alog(...)
- Creates async client lazily via from_url.
- Writes event with async xadd.
shutdown()
- Closes sync client immediately.
- Schedules async client close via asyncio.create_task.
5. Error Handling
- LoggerDependencyError if Redis package is unavailable.
- Connection/write failures are swallowed to preserve application flow.
6. Remarks
- Consider stream trimming strategy (XTRIM) externally to control memory growth.
- If payload fidelity is important, align serialization rules with downstream consumer expectations.