Documentation · zos-agent-mq

zos-agent-mq — agent message queue.

The brokerless agent mailbox, derived from the source of zos_agent_mq 0.1.0. Working code, June 2026.

zos‑agent‑mq is a brokerless, file-backed message queue for multi-agent AI systems on one machine. Agents (a coordinator spawning workers, peers asking each other questions) usually don't need Kafka — they need a mailbox that survives crashes, can't tear messages, and can be inspected with cat. This is that mailbox: append-only JSONL files, no broker process, no daemon. Python 3.11+ · stdlib only · BUSL-1.1

Status: working code · 53 tests · CI green · BUSL-1.1 · repository private until public launch.

Key concepts

Queue & send — zos_agent_mq.Queue

Queue(root=None) # root: explicit arg → $ZOS_MQ_ROOT → $ZOS_HOME/agent-mq → ~/.zos/agent-mq

Attributes: root: Path, body_threshold: int (from $ZOS_MQ_BODY_THRESHOLD, default DEFAULT_BODY_THRESHOLD = 3584), audit_path: Path (property, <root>/audit.jsonl).

send(topic, body, to=None, ttl_s=None, session=None, *, sender=None, reply_to=None) -> str # the msg_id
ParamMeaning
topicmust be in TOPICS, else TopicError
bodystr; bodies over body_threshold UTF-8 bytes go to a side-file, the JSONL line carries @file:<msg_id>.txt
torecipient agent id; None, "all", or "broadcast" mean every consumer
ttl_sseconds until expiry; None = never; must be >= 0
sessionqueue partition; default $ZOS_MQ_SESSION or "default"
senderdefaults to $ZOS_MQ_AGENT_ID or "anonymous"
reply_tocorrelate with an earlier msg_id (stored as in_reply_to)

Raises QueueDisabledError when ZOS_MQ_DISABLED=1; ValueError on invalid session/agent names (must match [A-Za-z0-9][A-Za-z0-9._-]*). Message record schema (one JSON object per line):

{"msg_id": "...", "ts": "2026-06-10T00:00:00Z", "from": "...", "to": null,
 "topic": "ask", "body": "...", "in_reply_to": null, "ttl_s": 600}

Poll & ack — poll / tail / expire / status

poll(agent_id, topics=None, session=None) -> list[dict]

Returns messages appended since agent_id's cursor that are addressed to it (to is None/"all"/"broadcast"/agent_id), match the optional topic filter (str or iterable, validated against TOPICS), and are not TTL-expired. Side-file bodies are dereferenced transparently (delivered messages get _body_source: "side-file", or _body_error if the side-file is unreadable).

Ack semantics: the cursor is the ack. It is advanced over every scanned complete line — including messages filtered out by addressing, topic, or expiry — and is flushed to disk before the messages are returned (crash-safe ordering: under-delivery beats double-count). One consumer per agent_id is assumed. A partial (no trailing newline) last line is not consumed. With ZOS_MQ_DISABLED=1, returns [] and does not move the cursor.

tail(n=10, session=None, include_expired=False) -> list[dict] expire(session=None) -> int status(session=None) -> dict
from zos_agent_mq import Queue

q = Queue()
msg_id = q.send("ask", "What is the build status?", to="builder", ttl_s=600)
for m in q.poll("builder"):          # new messages since builder's cursor
    print(m["from"], m["topic"], m["body"])
q.send("answer", "Build green.", to=m["from"], reply_to=m["msg_id"])

Audit log

<root>/audit.jsonl, one JSON line per send / poll / expire operation. Metadata only — message bodies are never written to the audit log. Send entries carry msg_id, topic, to, from, body_bytes, externalized, ttl_s; poll entries carry agent_id, topics, matched, cursor_from, cursor_to.

Layout on disk:

<root>/
  audit.jsonl                        # metadata-only audit of send/poll/expire
  sessions/<session>/
    messages.jsonl                   # the queue (one JSON object per line)
    bodies/<msg_id>.txt              # externalized long bodies
    cursors/<agent_id>.cursor        # per-consumer byte offsets
    .lock                            # advisory lock file

Concurrency guarantees (honest version)

What the library does guarantee, and how:

What it does not guarantee:

CLI — zos-mq

zos-mq [--root DIR] send   --topic T [--body B | --body - | <stdin>]
                           [--to ID] [--ttl S] [--session S]
                           [--sender ID] [--reply-to MSG_ID]
zos-mq [--root DIR] poll   --agent ID [--topic T]... [--session S]
zos-mq [--root DIR] tail   [-n N] [--session S] [--include-expired]
zos-mq [--root DIR] expire [--session S]
zos-mq [--root DIR] status [--session S]

send prints the msg_id; poll/tail print one JSON object per line; status prints pretty JSON. Exit codes: 0 ok, 1 runtime error (including kill-switch on send), 2 usage.

Exceptions

Environment variables

VariableEffect
ZOS_MQ_ROOTqueue root directory (highest-priority default)
ZOS_HOMEroot falls back to $ZOS_HOME/agent-mq
ZOS_MQ_SESSIONdefault session name
ZOS_MQ_AGENT_IDdefault sender id for send()
ZOS_MQ_BODY_THRESHOLDside-file threshold in bytes (default 3584)
ZOS_MQ_DISABLED=1kill-switch: send() raises, poll() returns empty without advancing cursors, expire() no-ops

This page mirrors docs/API.md in the zos-agent-mq repository, derived from the source at 0.1.0. Companion: platform overview · zos-loops. Questions? Request early access.