Svix Blog
Published on

Omniqueue: A Queue Abstraction Layer for Rust

Authors
  • avatar
    Name
    Daniel Lyne
    Twitter

Cover image

One of my favorite parts about working for Svix is our commitment to open source software. I love giving back to the OSS community, and I was lucky enough to get to work on a new Rust library that we just released to crates.io called Omniqueue!

Modern software needs to be adaptable to whatever you throw at it. This is true for almost everything, including the OSS version of Svix, where we allow the user to configure which backend to use for the main task queue used for scheduling message dispatch. But here's the problem: we just had to write almost the same exact queue wrapping code for another application of ours that's in development.

Enter Omniqueue: a high-level queuing crate that allows you to forget about what backend is actually being used, and instead think about sending and receiving your types. It's simple: a few traits defining crate behavior; implementations for RabbitMQ, Redis streams, SQS, and an in-memory queue for now; and some utilities for building queue producers and/or consumers.

How to use Omniqueue

Cargo.toml and feature guards

So you obviously need to include omniqueue in your project's Cargo.toml if you want to use the crate. For simplicity, all backends are enabled by default. Just plug in omniqueue = "0.1" and you should be good to go.

But if you don't want to use every backend available, you're free to disable the default features and to pick and choose. Each backend is behind a feature.

Constructing the queue

Building the actual producer and/or consumer for a queue backend is the only time you have to worry about that backend's specifics -- what data is modeled like inside the queue, what types are used to interface with the queue, and all the knobs and settings for each backend. As such, each backend will have a unique configuration.

Constructing the queue is as simple as:

// 1. Making the configuration for the given queue
let cfg = SqsConfig {
    queue_dsn: "http://localhost:9234/queue/queue_name".to_owned(),
    override_endpoint: true,
};

// 2a. Creating a `QueueBuilder` from the configuration and building the producer, consumer pair
let (p, mut c) = SqsQueueBackend::builder(cfg.clone()).build_pair().await?;

// 2b. Or making just one half of the pair
let p = SqsQueueBackend::builder(cfg.clone()).build_producer().await?;
let mut c = SqsQueueBackend::builder(cfg).build_consumer().await?;

If you're not sure the backend you'll be using until runtime, DynProducers and DynConsumers are perfect for you:

let (p, mut c) = SqsQueueBackend::builder(cfg.clone())
    // Just add one method call to the builder
    .make_dynamic()
    .build_pair()
    .await?;

Sending and receiving

We support most fundamentally byte array payloads. This is our intermediary representation bridging all the backends' internal data representations. From this, we can allow arbitrary types to be sent or received through the queue as long as they implement Serialize and/or Deserialize by using JSON formatted byte arrays.

This works great for a new project where you just want to start sending and receiving custom types quick.

p.send_serde_json(&ExampleType::default()).await?;

let delivery = c.receive().await?;
let payload = delivery.payload_serde_json::<ExampleType>().await?;
delivery.ack().await?;

But what if you just want the bytes? Well, you can either get a reference to the cached byte slice or take the whole payload out using delivery.borrow_payload() and delivery.take_payload() respectively. Note that taking the payload means that future attempts to extract the payload in any capacity will return Ok(None).

Finally, we come up to the concept of CustomEncoders and CustomDecoders. By just passing a simple function pointer or closure to the builder before constructing your producer and/or consumer, you can define the conversion to/from the "native payload type" of the queue backend to the Rust types you actually want to deal with when working with the queue. Using them looks something like this:

#[derive(Debug, PartialEq)]
struct ExampleType {
    field: u8,
}
let (p, mut c) = RabbitMqBackend::builder(cfg)
    .with_encoder(|et: &ExampleType| -> Result<Vec<u8>, QueueError> {
        Ok(vec![et.field])
    })
    .with_decoder(|v: &Vec<u8>| -> Result<ExampleType, QueueError> {
        Ok(ExampleType {
            field: *v.first().unwrap_or(&0),
        })
    })

The Future

I'm really excited to finally release it to the public! With this first release it's finally a standalone project that other people can use. So please open tickets and/or PRs on the Omniqueue repo if you have any suggestions or if it's missing your favorite queue backend.

I have lots of ideas for improving this crate, so expect more changes soon! This queue implementation is based on the one we have in the Svix Webhooks Server and we are in the process of migrating it to use this crate.

If you think it could be useful to your own work, check out Omniqueue.

This is it for this update, but make sure to follow us on Twitter, Github or RSS for the latest updates for the Svix webhook provider, or join the discussion on our community Slack.