Flows
Core reference

Flow Synchronization

Flow Synchronization is the automated maintenance utility that ensures the agent's in-code "Skills" (defined in Python) are perfectly mirrored in its "Procedural Memory" (stored in Qdrant). It performs a differential update between the local flow registry and the remote vector database, ensuring that search indices are always accurate and up-to-date with the latest code changes.

1. Behavior and Context

In the jazzmine lifecycle, synchronization typically occurs once during the Agent's startup sequence. It acts as the bridge between static source code and dynamic retrieval.

Key behaviors:

  • Differential Logic: It does not perform a "blind overwrite." It compares the local state with the remote state and only applies the minimum necessary changes (Create, Update, or Delete).
  • Checksum-Based Detection: It utilizes the SHA-256 checksums generated by the Flow objects to detect if a procedure's logic, examples, or conditions have changed, even if the name remains the same.
  • Weak-Reference Sensitivity: It operates on the BaseFlow.registry. Because that registry uses weak references, the sync utility must be executed while the flow objects are still held in memory by the main application logic.
  • Idempotency: Running the sync multiple times with the same code results in zero database operations after the first successful run.

2. Purpose

  • Consistency: Guaranteeing that the FlowSelector is making decisions based on the actual logic currently deployed in the Python environment.
  • Resource Efficiency: Minimizing expensive LLM embedding calls by skipping flows that haven't changed.
  • Automatic Cleanup: Removing "zombie" flows from the vector database that have been deleted from the source code.
  • Deployment Safety: Automating the migration of the semantic index during CI/CD or version updates without manual database intervention.

3. High-Level API (Usage)

Synchronization is performed using the sync_registry function. It should be called after all flows have been instantiated but before the agent starts accepting user messages.

Example: Standard Startup Sync

python
from jazzmine.core.flows import sync_registry, Flow
from jazzmine.memory import ProceduralMemory

# 1. Instantiate your flows (they auto-register in BaseFlow.registry)
my_skills = [
    Flow(name="process_payment", ...),
    Flow(name="check_balance", ...)
]

# 2. Initialize the procedural memory connection
proc_mem = ProceduralMemory(...)

# 3. Synchronize with the database
# This handles CREATE, UPDATE, and DELETE operations automatically.
report = await sync_registry(
    proc_mem=proc_mem,
    agent_id="finance_bot_v1",
    verbose=True
)

# 4. Inspect the results
print(report)
# Output: 1 change(s), 1 unchanged (CREATE process_payment, SKIP check_balance)

4. Detailed Functionality

SyncReport [Dataclass]

The structured result of a synchronization run.

  • created: A list of flow names that were newly indexed into Qdrant.
  • updated: A list of flow names whose checksums changed, triggering a re-embed.
  • deleted: A list of flow names found in Qdrant but missing from the Python registry.
  • skipped: A list of flow names that were already perfectly in sync.

sync_registry(...)

Functionality: Orchestrates the four-stage synchronization process.

How it works:

  1. Snapshot Remote: Calls proc_mem.list_flows(agent_id) to get a complete map of what is currently stored in the vector database, including the stored checksums.
  2. Snapshot Local: Takes a strong-reference snapshot of the BaseFlow.registry to ensure no flows are garbage collected during the loop.
  3. The Diff Loop:
  • CREATE: If a local flow name is missing from the remote snapshot, it calls memorize().
  • UPDATE: If the name exists but the local checksum differs from the remote one, it calls update_memory(). This replaces the point in Qdrant while preserving the original UUID.
  • SKIP: If the name and checksum match, it records the flow as skipped.
  1. The Cleanup Loop: Identifies any flows in the remote snapshot that no longer exist in the local code and calls delete_flow() for each.

5. Error Handling

  • Connection Failures: If the Qdrant server is unreachable during any stage, the function raises a PyRuntimeError. This prevents the agent from starting with a potentially stale or mismatched procedural memory.
  • Partial Sync: If the process is interrupted, the next run will pick up exactly where it left off because of the checksum-based comparison.
  • GC Race Conditions: If sync_registry is called after the flow objects have gone out of scope, they will have vanished from the registry. The utility will interpret this as the flows being "deleted" and will attempt to purge them from the database. Always ensure your flows are assigned to a variable that lives until the sync is complete.

6. Remarks

  • Identity Mapping: The sync utility uses the flow's Name as the primary key for comparison, while the UUID is used for the actual database point. If you rename a flow in code, the system will treat it as a deletion of the old name and a creation of the new name.
  • Log Verbosity: Setting verbose=True provides detailed info-level logs for every database mutation, which is highly recommended for tracking deployment changes.
  • Performance: Stage 1 (Remote Snapshot) uses a "Scroll" operation capped at 10,000 flows, making this utility suitable for even very large skill repositories.