Queue & Agent Sinks
Logger reference

Kafka Sink

KafkaLogger publishes structured log events to Apache Kafka. It supports both synchronous and asynchronous producer modes.

1. Behavior and Context

The sink can operate in two paths depending on sink_config['async']:

  • Async mode (True, default): uses aiokafka.AIOKafkaProducer
  • Sync mode (False): uses kafka.KafkaProducer

Messages are formatted as JSON objects and sent to a configured topic.

2. Purpose

  • Stream-Native Logging: Integrate directly with event-driven architectures.
  • Scalability: Use Kafka partitioning and consumer groups for downstream processing.
  • Flexibility: Support async service stacks and sync environments.

3. High-Level API & Examples

Example 1: Async Kafka Sink

python
import asyncio
from jazzmine.logging.sinks.kafka_sink import KafkaLogger

async def main():
    sink = KafkaLogger(
        sink_config={
            "type": "kafka",
            "bootstrap_servers": ["localhost:9092"],
            "topic": "app.logs",
            "async": True,
        },
        logger_name="api",
        level="INFO",
        use_json=True,
    )
    sink.initialize()
    await sink.ainfo("user authenticated", user_id="u_19")
    sink.shutdown()

asyncio.run(main())

Example 2: Sync Kafka Sink

python
from jazzmine.logging.sinks.kafka_sink import KafkaLogger

sink = KafkaLogger(
    sink_config={
        "type": "kafka",
        "bootstrap_servers": ["localhost:9092"],
        "topic": "app.logs",
        "async": False,
    },
    logger_name="worker",
    level="INFO",
    use_json=True,
)
sink.initialize()
sink.info("job started", job_id="j-101")
sink.shutdown()

4. Detailed Class Functionality

initialize()

  • Reads bootstrap_servers, topic, and async flag.
  • Resolves producer class:
  • async -> AIOKafkaProducer
  • sync -> KafkaProducer
  • Raises LoggerDependencyError if required package is missing.
  • In sync mode, immediately creates producer with JSON value serializer.

_format_message(level, msg, **kwargs)

Builds payload with timestamp, level, logger name, message, and extra fields.

log(...)

  • Active only in sync mode.
  • Sends message via producer.send(topic, value=payload).

alog(...)

  • Initializes async producer lazily on first call.
  • Starts producer and sends bytes payload with send_and_wait.

shutdown()

  • Async mode: schedules producer.stop().
  • Sync mode: closes producer.

5. Error Handling

  • LoggerDependencyError for missing aiokafka or kafka-python.
  • Send errors are swallowed in both sync and async paths.

6. Remarks

  • Async producer startup occurs on first async send; warm it during service startup if first-log latency matters.
  • For strict durability, configure Kafka acks/retries/idempotence at producer construction (not exposed by default in this implementation).