Queue & Agent Sinks
Logger reference

Redis Sink

RedisLogger publishes structured log records to Redis Streams (XADD) with both sync and async client paths.

1. Behavior and Context

The sink performs lazy client initialization:

  • sync path uses redis.Redis
  • async path uses redis.asyncio.from_url

Messages are normalized into string-key/string-value maps suitable for Redis Stream entries.

2. Purpose

  • Low-Latency Ingestion: Use Redis as a lightweight centralized log stream.
  • Pipeline Compatibility: Stream records can be consumed by workers and ETL jobs.
  • Dual Runtime Support: Works in sync and async execution models.

3. High-Level API & Examples

Example 1: Sync Logging to Redis Stream

python
from jazzmine.logging.sinks.redis_sink import RedisLogger

sink = RedisLogger(
    sink_config={
        "type": "redis",
        "host": "localhost",
        "port": 6379,
        "db": 0,
        "stream_key": "logs:api",
    },
    logger_name="api",
    level="INFO",
    use_json=True,
)

sink.initialize()
sink.info("cache miss", key="user:14")
sink.shutdown()

Example 2: Async Logging

python
import asyncio

async def write_async(sink):
    await sink.ainfo("response sent", status=200, latency_ms=18)

4. Detailed Class Functionality

initialize()

  • Imports redis and redis.asyncio lazily.
  • Raises LoggerDependencyError if package is missing.
  • Reads host/port/db/password/stream key config.
  • Marks sink initialized.

_format_message(level, msg, **kwargs)

Builds stream field dict with:

  • timestamp
  • level
  • logger
  • message
  • additional kwargs

Non-string kwargs are JSON-serialized to preserve structure in string field values.

log(...)

  • Creates sync client lazily if absent.
  • Writes event with xadd(stream_key, payload).

alog(...)

  • Creates async client lazily via from_url.
  • Writes event with async xadd.

shutdown()

  • Closes sync client immediately.
  • Schedules async client close via asyncio.create_task.

5. Error Handling

  • LoggerDependencyError if Redis package is unavailable.
  • Connection/write failures are swallowed to preserve application flow.

6. Remarks

  • Consider stream trimming strategy (XTRIM) externally to control memory growth.
  • If payload fidelity is important, align serialization rules with downstream consumer expectations.