Skip to main content

Events (listener & std/events)

Xi has a built-in typed publish/subscribe event system. A producer publishes any DTO under a topic; a listener subscribed to that topic receives the typed value — never Json. Producers and listeners never reference each other, and the same code works whether events stay in the process or cross the network — only the bound transport changes.

import "std/json.xi"
import "std/events.xi"

Declaring an event

An event is a DTO — an ordinary value type that may be published. (The compiler also derives a JSON codec for it, used only by external transports.)

event OrderPaid { id: String, item: String, total: Number }

Publishing — any DTO under a topic

A producer depends on the injected PublisherService and calls publish(topic, dto):

class Shop {
deps { events: PublisherService }
producer checkout(item: String, total: Number) {
events.publish("order.paid", OrderPaid { id: "o1", item: item, total: total })
}
}

No Json is built. The DTO is the payload.

Reacting — typed listeners

A listener names its topic with on "…" and receives the typed DTO:

class Receipts {
deps { mailer: Mailer } // listeners get their own deps wired
listener onPaid(e: OrderPaid) on "order.paid" {
mailer.send("buyer@x.dev", "Paid " + e.total + " for " + e.item)
}
}

A topic may have any number of listeners; all fire (in declaration order). Each delivery resolves a fresh owning instance through DI.

Delivery is queued; run the pump

The default transport puts published events on an in-memory queue. Deliver them by running the pump — the bound ConsumerService:

async entry main(args: String[]) -> Integer {
let shop = App.resolve(Store)
shop.checkout("book", 29.0)
Events.run() // drain the queue -> listeners (no serialization)
return 0
}
module App {} // MemoryBus / MemoryConsumer are the defaults

Events.run() resolves the ConsumerService and runs it; the default MemoryConsumer drains the in-memory queue and dispatches each event to its typed listeners. Nothing is serialized — the typed value is passed straight through.

Async delivery — Events.runAsync()

Events.run() drains on the calling thread. Events.runAsync() instead spawns a background worker (a thread) that blocks on the queue and dispatches events as they arrive, decoupling delivery from publishing. It returns a Thread handle; Events.stop() closes the queue so the worker exits, then wait() joins it.

async entry main(args: String[]) -> Integer {
let pump = Events.runAsync() // deliver on a background thread
let shop = App.resolve(Store)
shop.checkout("book", 29.0) // publish from this thread

Events.stop() // close the queue -> the pump returns
pump.wait() // join the worker
return 0
}

Listeners then run on the worker thread, so treat their work as you would any threaded code. See examples/async_events_demo.xi.

Application vs. external: only the transport differs

The producer and the listeners above never change. The only difference between an in-process event and one that crosses the network is which PublisherService / ConsumerService is bound:

Application (default)External (your impl)
PublisherServiceMemoryBus — enqueue, no serializeserialize + send on the wire
ConsumerServiceMemoryConsumer — drain queuereceive + deserialize + dispatch
Payload in transitthe typed value (a pointer)bytes (your format)

An event travels internally as a type-erased envelope (topic + type name + an opaque pointer to the typed value). Producers and listeners only ever see the topic and the typed DTO — the envelope is what the transport carries.

Writing an external transport

The system gives transports five helpers over the envelope:

HelperPurpose
Events.topic(e) / Events.type(e)the event's topic / DTO type name
Events.encode(e) -> Jsonserialize the payload (by type)
Events.decode(topic, type, json) -> Eventrebuild a typed envelope
Events.dispatch(e)deliver an envelope to its typed listeners
// Outbound: serialize and ship. JSON appears ONLY here.
class WireBus implements PublisherService {
deps { conn: net.Conn }
producer publish(e: Event) {
net.sendText(conn, Events.topic(e) + "\t" + Events.type(e)
+ "\t" + json.stringify(Events.encode(e)) + "\n")
}
}

// Inbound: receive, rebuild the typed event, dispatch.
class WireConsumer implements ConsumerService {
deps { conn: net.Conn }
consumer run() {
let line = net.recvText(conn, 65536)
// … split into topic / type / body …
Events.dispatch(Events.decode(topic, type, json.parse(body)))
}
}

module App {
bind PublisherService -> WireBus as singleton
bind ConsumerService -> WireConsumer as singleton
}

The derived codec supports String, Number, Integer, Bool, Json, nested event fields, and arrays of those (String[], Order[], …) — each encoded element by element. (Arrays of primitive numbers/bools in a payload await the language's general primitive-array-in-struct support; String[] and arrays of event types work today.)

Notes & limits

  • Events.run() delivers synchronously on the calling thread; Events.runAsync() delivers on a background worker thread (the queue is thread-safe). Either way, run the pump to drain the queue. Richer policies (batching, retries, dead-letters) layer on as a custom ConsumerService.
  • The codec encodes String[], arrays of event types, and primitive Integer[]/Number[]/Bool[] payload fields.

Advanced / not built-in

These are deliberate boundaries of the current model, with the practical work-around noted; none block normal use:

  • The firing topic inside a listener. A listener (e: T) on "orders.paid" is bound to one topic, which it already knows statically, so it receives just the typed DTO. One listener spanning several topics (wildcards) that needs to know which fired isn't built — use a listener per topic, or carry the topic as a field on the DTO.
  • Wire schema / version negotiation. External transports serialize by the event's type name, so producer and consumer must agree on the shape. To evolve an event across versions, version the type itself (a version field, or a new event type) — there's no automatic cross-version negotiation.
  • Multiple external buses at once. A program binds one transport (PublisherService/ConsumerService); routing different topics to different external buses isn't expressible directly — implement a fan-out transport that dispatches per topic.

See examples/typed_event_demo.xi.