Workers
Logger reference

Async HTTP Worker

AsyncHttpWorker is an internal queue-backed HTTP delivery worker used by async HTTP sink configurations. It decouples event production from network I/O and supports controlled async flushing during shutdown.

1. Behavior and Context

The worker maintains:

  • an asyncio.Queue for payload buffering
  • a long-running background task (_run)
  • a shared aiohttp.ClientSession
  • a stop event used to drain queue contents before termination

This design supports smooth operation in async web servers where direct network calls on the request path should be minimized.

2. Purpose

  • Backpressure Boundary: Queue isolates producers from transport latency.
  • Lifecycle Safety: Explicit start() and stop() hooks for framework lifespan events.
  • Structured Delivery: Sends payloads as JSON with configured method and headers.

3. High-Level API & Examples

Example 1: Manual Worker Lifecycle

python
import asyncio
from jazzmine.logging.workers import AsyncHttpWorker

async def main():
    worker = AsyncHttpWorker(
        url="https://collector.internal/logs",
        method="POST",
        headers={"Authorization": "Bearer <token>"},
        max_queue=2000,
    )

    await worker.start()
    await worker.submit({"event": "boot", "service": "chat-api"})
    await worker.submit({"event": "ready", "uptime": 1.2})
    await worker.stop(timeout=3.0)

asyncio.run(main())

Example 2: Integrated via BaseLogger

python
from jazzmine.logging import BaseLogger

logger = BaseLogger({
    "name": "api",
    "level": "INFO",
    "json": True,
    "sinks": [
        {"type": "http", "url": "https://collector.internal/logs", "async": True}
    ],
})

# BaseLogger will create and manage AsyncHttpWorker instances.

4. Detailed Class Functionality

__init__(url, method='POST', headers=None, max_queue=1000)

  • Stores destination settings.
  • Creates bounded queue with max_queue.
  • Initializes task/session state and stop event.
  • Requires aiohttp to be installed.

start()

  • No-op if worker task already exists.
  • Creates aiohttp.ClientSession.
  • Clears stop event and starts background task.

submit(payload)

  • Pushes payload into queue.
  • Drops silently if queue handling fails.

_run()

Main loop:

  • continues while not stopped or queue still has items
  • pulls queued payloads
  • calls _send
  • marks tasks done
  • prints traceback for send failures

_send(payload)

Performs aiohttp request with JSON body and headers.

  • On HTTP status >= 400, prints response details.

stop(timeout=2.0)

  • Waits for queue drain (queue.join()).
  • Sets stop flag.
  • Waits for task completion with timeout, cancels if exceeded.
  • Closes session and resets task handle.

5. Error Handling

  • LoggerDependencyError if aiohttp is unavailable.
  • Send/runtime exceptions are caught and logged with traceback print.
  • Timeout during stop cancels task to avoid indefinite shutdown hang.

6. Remarks

  • For production-grade reliability, consider retries and dead-letter handling on _send failures.
  • Tune max_queue based on throughput and memory profile.
  • Always call stop() on service shutdown to flush remaining logs.