zos-agent-mq — agent message queue.
zos‑agent‑mq is a brokerless, file-backed message queue for multi-agent AI systems on one machine. Agents (a coordinator spawning workers, peers asking each other questions) usually don't need Kafka — they need a mailbox that survives crashes, can't tear messages, and can be inspected with cat. This is that mailbox: append-only JSONL files, no broker process, no daemon. Python 3.11+ · stdlib only · BUSL-1.1
Status: working code · 53 tests · CI green · BUSL-1.1 · repository private until public launch.
Key concepts
- Closed topic set —
TOPICS == frozenset({"ask", "answer", "broadcast", "spawn-request", "status"}). Unknown topics are rejected (TopicError) on send and on poll filters. Deliberate and load-bearing: a closed vocabulary keeps routing decidable and stops topic-space sprawl between agents that never coordinated on names. Add a topic by changing the code, not by sending it. - The cursor is the ack — each consumer's read position is flushed to disk before messages are returned, so a crash mid-handoff under-delivers rather than double-delivers.
- Side-files for long bodies — bodies over a threshold (default 3584 bytes) are externalized so every JSONL line stays small.
- Sessions partition queues — messages in one session are invisible to consumers of another (default session:
default, or$ZOS_MQ_SESSION). - Kill-switch —
ZOS_MQ_DISABLED=1freezes the bus without losing anything.
Queue & send — zos_agent_mq.Queue
Attributes: root: Path, body_threshold: int (from $ZOS_MQ_BODY_THRESHOLD, default DEFAULT_BODY_THRESHOLD = 3584), audit_path: Path (property, <root>/audit.jsonl).
| Param | Meaning |
|---|---|
topic | must be in TOPICS, else TopicError |
body | str; bodies over body_threshold UTF-8 bytes go to a side-file, the JSONL line carries @file:<msg_id>.txt |
to | recipient agent id; None, "all", or "broadcast" mean every consumer |
ttl_s | seconds until expiry; None = never; must be >= 0 |
session | queue partition; default $ZOS_MQ_SESSION or "default" |
sender | defaults to $ZOS_MQ_AGENT_ID or "anonymous" |
reply_to | correlate with an earlier msg_id (stored as in_reply_to) |
Raises QueueDisabledError when ZOS_MQ_DISABLED=1; ValueError on invalid session/agent names (must match [A-Za-z0-9][A-Za-z0-9._-]*). Message record schema (one JSON object per line):
{"msg_id": "...", "ts": "2026-06-10T00:00:00Z", "from": "...", "to": null,
"topic": "ask", "body": "...", "in_reply_to": null, "ttl_s": 600}
Poll & ack — poll / tail / expire / status
Returns messages appended since agent_id's cursor that are addressed to it (to is None/"all"/"broadcast"/agent_id), match the optional topic filter (str or iterable, validated against TOPICS), and are not TTL-expired. Side-file bodies are dereferenced transparently (delivered messages get _body_source: "side-file", or _body_error if the side-file is unreadable).
Ack semantics: the cursor is the ack. It is advanced over every scanned complete line — including messages filtered out by addressing, topic, or expiry — and is flushed to disk before the messages are returned (crash-safe ordering: under-delivery beats double-count). One consumer per agent_id is assumed. A partial (no trailing newline) last line is not consumed. With ZOS_MQ_DISABLED=1, returns [] and does not move the cursor.
tail— read-only peek at the last n live messages. Never moves cursors, never audited.expire— compacts TTL-expired messages out ofmessages.jsonl(atomic rewrite-and-replace under the session lock), deletes their side-files, and remaps every consumer cursor to the equivalent offset in the compacted file. Returns the number of messages dropped. Unparseable lines are preserved. No-op (returns 0) under the kill-switch.status— per-session counts: messages, expired, by-topic, queue bytes, cursors. Withoutsession, covers every session under the root.
from zos_agent_mq import Queue
q = Queue()
msg_id = q.send("ask", "What is the build status?", to="builder", ttl_s=600)
for m in q.poll("builder"): # new messages since builder's cursor
print(m["from"], m["topic"], m["body"])
q.send("answer", "Build green.", to=m["from"], reply_to=m["msg_id"])
Audit log
<root>/audit.jsonl, one JSON line per send / poll / expire operation. Metadata only — message bodies are never written to the audit log. Send entries carry msg_id, topic, to, from, body_bytes, externalized, ttl_s; poll entries carry agent_id, topics, matched, cursor_from, cursor_to.
Layout on disk:
<root>/
audit.jsonl # metadata-only audit of send/poll/expire
sessions/<session>/
messages.jsonl # the queue (one JSON object per line)
bodies/<msg_id>.txt # externalized long bodies
cursors/<agent_id>.cursor # per-consumer byte offsets
.lock # advisory lock file
Concurrency guarantees (honest version)
What the library does guarantee, and how:
- Appends are single
write()calls on anO_APPENDdescriptor. On POSIX local filesystems that makes seek-to-end + write atomic with respect to other appenders; long bodies are externalized precisely to keep every JSONL line small (under ~4 KB, the classicPIPE_BUFbound). - Belt and braces: an advisory per-session lock (
flockon POSIX,msvcrt.lockingon Windows) additionally serializes senders, pollers, andexpire()that go through this library. The concurrent-sender test hammers this with 8 processes and asserts zero torn lines. - Cursor updates are atomic (write temp file +
os.replace) and persisted before messages are returned: at-most-once delivery per consumer — under-delivery, never double-delivery. - A torn trailing line is never consumed.
poll()only advances over complete (newline-terminated) lines.
What it does not guarantee:
- NFS and other network filesystems — neither
O_APPENDatomicity norflockare reliable there. Use a local disk. - Windows —
O_APPENDsemantics are weaker; correctness rests on the advisory lock, which only covers processes using this library. - Multiple concurrent consumers sharing one
agent_id— two pollers with the same id race for messages; for fan-out, use distinct agent ids. - Exactly-once delivery — cursors give at-most-once. If your consumer must not lose messages, persist your own processing state, or re-send with
ask/answercorrelation (reply_to) at the protocol level. - Topic-filtered polls consume what they skip — the cursor advances over every scanned line, so a message filtered out by
topics=...(or already expired) is never redelivered to that consumer later. Poll with the full topic set you care about. expire()rewrites the queue file under the advisory lock; a foreign writer appending between the read and theos.replacewould lose that append. Runexpirefrom the environment that owns the queue.
CLI — zos-mq
zos-mq [--root DIR] send --topic T [--body B | --body - | <stdin>]
[--to ID] [--ttl S] [--session S]
[--sender ID] [--reply-to MSG_ID]
zos-mq [--root DIR] poll --agent ID [--topic T]... [--session S]
zos-mq [--root DIR] tail [-n N] [--session S] [--include-expired]
zos-mq [--root DIR] expire [--session S]
zos-mq [--root DIR] status [--session S]
send prints the msg_id; poll/tail print one JSON object per line; status prints pretty JSON. Exit codes: 0 ok, 1 runtime error (including kill-switch on send), 2 usage.
Exceptions
QueueError— base class.TopicError(QueueError, ValueError)— topic outsideTOPICS(send or poll filter).QueueDisabledError(QueueError, RuntimeError)— send attempted whileZOS_MQ_DISABLED=1.
Environment variables
| Variable | Effect |
|---|---|
ZOS_MQ_ROOT | queue root directory (highest-priority default) |
ZOS_HOME | root falls back to $ZOS_HOME/agent-mq |
ZOS_MQ_SESSION | default session name |
ZOS_MQ_AGENT_ID | default sender id for send() |
ZOS_MQ_BODY_THRESHOLD | side-file threshold in bytes (default 3584) |
ZOS_MQ_DISABLED=1 | kill-switch: send() raises, poll() returns empty without advancing cursors, expire() no-ops |