1. Behavior and Context
The sink can operate in two paths depending on sink_config['async']:
- Async mode (True, default): uses aiokafka.AIOKafkaProducer
- Sync mode (False): uses kafka.KafkaProducer
Messages are formatted as JSON objects and sent to a configured topic.
2. Purpose
- Stream-Native Logging: Integrate directly with event-driven architectures.
- Scalability: Use Kafka partitioning and consumer groups for downstream processing.
- Flexibility: Support async service stacks and sync environments.
3. High-Level API & Examples
Example 1: Async Kafka Sink
python
import asyncio
from jazzmine.logging.sinks.kafka_sink import KafkaLogger
async def main():
sink = KafkaLogger(
sink_config={
"type": "kafka",
"bootstrap_servers": ["localhost:9092"],
"topic": "app.logs",
"async": True,
},
logger_name="api",
level="INFO",
use_json=True,
)
sink.initialize()
await sink.ainfo("user authenticated", user_id="u_19")
sink.shutdown()
asyncio.run(main())Example 2: Sync Kafka Sink
python
from jazzmine.logging.sinks.kafka_sink import KafkaLogger
sink = KafkaLogger(
sink_config={
"type": "kafka",
"bootstrap_servers": ["localhost:9092"],
"topic": "app.logs",
"async": False,
},
logger_name="worker",
level="INFO",
use_json=True,
)
sink.initialize()
sink.info("job started", job_id="j-101")
sink.shutdown()4. Detailed Class Functionality
initialize()
- Reads bootstrap_servers, topic, and async flag.
- Resolves producer class:
- async -> AIOKafkaProducer
- sync -> KafkaProducer
- Raises LoggerDependencyError if required package is missing.
- In sync mode, immediately creates producer with JSON value serializer.
_format_message(level, msg, **kwargs)
Builds payload with timestamp, level, logger name, message, and extra fields.
log(...)
- Active only in sync mode.
- Sends message via producer.send(topic, value=payload).
alog(...)
- Initializes async producer lazily on first call.
- Starts producer and sends bytes payload with send_and_wait.
shutdown()
- Async mode: schedules producer.stop().
- Sync mode: closes producer.
5. Error Handling
- LoggerDependencyError for missing aiokafka or kafka-python.
- Send errors are swallowed in both sync and async paths.
6. Remarks
- Async producer startup occurs on first async send; warm it during service startup if first-log latency matters.
- For strict durability, configure Kafka acks/retries/idempotence at producer construction (not exposed by default in this implementation).