---
title: 'Omniqueue: A Queue Abstraction Layer for Rust'
authors: ['daniel']
date: 2023-06-08T12:00:00
tags: ['Technical']
summary: 'Announcing Omniqueue, a new Rust queue abstraction library'
---

![Cover image](./cover.png)

One of my favorite parts about working for Svix is our commitment to open source software. I love
giving back to the OSS community, and I was lucky enough to get to work on a new Rust library that
we just released to [crates.io](https://crates.io) called [Omniqueue](https://crates.io/crates/omniqueue)!

Modern software needs to be adaptable to whatever you throw at it. This is true for
almost everything, including the OSS version of Svix, where we allow the user to configure which
backend to use for the main task queue used for scheduling message dispatch. But here's the problem:
we just had to write almost the same exact queue wrapping code for another application of ours
that's in development.

Enter Omniqueue: a high-level queuing crate that allows you to forget about what backend is actually
being used, and instead think about sending and receiving your types. It's simple: a few traits
defining crate behavior; implementations for RabbitMQ, Redis streams, SQS, and an in-memory queue
for now; and some utilities for building queue producers and/or consumers.

## How to use Omniqueue

### `Cargo.toml` and feature guards

So you obviously need to include `omniqueue` in your project's `Cargo.toml` if you want to use the
crate. For simplicity, all backends are enabled by default. Just plug in `omniqueue = "0.1"` and you
should be good to go.

But if you don't want to use every backend available, you're free to disable the default features
and to pick and choose. Each backend is behind a feature.

### Constructing the queue

Building the actual producer and/or consumer for a queue backend is the only time you have to worry
about that backend's specifics -- what data is modeled like inside the queue, what types are used to
interface with the queue, and all the knobs and settings for each backend. As such, each backend
will have a unique configuration.

Constructing the queue is as simple as:

```rust
// 1. Making the configuration for the given queue
let cfg = SqsConfig {
    queue_dsn: "http://localhost:9234/queue/queue_name".to_owned(),
    override_endpoint: true,
};

// 2a. Creating a `QueueBuilder` from the configuration and building the producer, consumer pair
let (p, mut c) = SqsQueueBackend::builder(cfg.clone()).build_pair().await?;

// 2b. Or making just one half of the pair
let p = SqsQueueBackend::builder(cfg.clone()).build_producer().await?;
let mut c = SqsQueueBackend::builder(cfg).build_consumer().await?;
```

If you're not sure the backend you'll be using until runtime, `DynProducer`s and `DynConsumer`s are
perfect for you:

```rust
let (p, mut c) = SqsQueueBackend::builder(cfg.clone())
    // Just add one method call to the builder
    .make_dynamic()
    .build_pair()
    .await?;
```

### Sending and receiving

We support most fundamentally byte array payloads. This is our intermediary representation bridging
all the backends' internal data representations. From this, we can allow arbitrary types to be sent
or received through the queue as long as they implement `Serialize` and/or `Deserialize` by using
JSON formatted byte arrays.

This works great for a new project where you just want to start sending and receiving custom types
quick.

```rust
p.send_serde_json(&ExampleType::default()).await?;

let delivery = c.receive().await?;
let payload = delivery.payload_serde_json::<ExampleType>().await?;
delivery.ack().await?;
```

But what if you just want the bytes? Well, you can either get a reference to the cached byte slice
or take the whole payload out using `delivery.borrow_payload()` and `delivery.take_payload()`
respectively. Note that taking the payload means that future attempts to extract the payload in any
capacity will return `Ok(None)`.

Finally, we come up to the concept of `CustomEncoder`s and `CustomDecoder`s. By just passing a
simple function pointer or closure to the builder before constructing your producer and/or consumer,
you can define the conversion to/from the "native payload type" of the queue backend to the Rust
types you actually want to deal with when working with the queue. Using them looks something like
this:

```rust
#[derive(Debug, PartialEq)]
struct ExampleType {
    field: u8,
}
let (p, mut c) = RabbitMqBackend::builder(cfg)
    .with_encoder(|et: &ExampleType| -> Result<Vec<u8>, QueueError> {
        Ok(vec![et.field])
    })
    .with_decoder(|v: &Vec<u8>| -> Result<ExampleType, QueueError> {
        Ok(ExampleType {
            field: *v.first().unwrap_or(&0),
        })
    })
```

## The Future

I'm really excited to finally release it to the public! With this first release it's finally a
standalone project that other people can use. So please open tickets and/or PRs on the [Omniqueue repo](https://github.com/svix/omniqueue-rs)
if you have any suggestions or if it's missing your favorite queue backend.

I have lots of ideas for improving this crate, so expect more changes soon! This queue
implementation is based on the one we have in the [Svix Webhooks Server](https://github.com/svix/svix-webhooks)
and we are in the process of migrating it to use this crate.

If you think it could be useful to your own work, check out [Omniqueue](https://crates.io/crates/omniqueue).

This is it for this update, but make sure to follow us on [Twitter](https://twitter.com/SvixHQ), [Mastodon](https://mastodon.social/@svixhq), [Github](https://github.com/svix) or [RSS](https://www.svix.com/blog/rss/) for the latest updates for the [Svix webhook provider](https://www.svix.com), or join the discussion on [our community Slack](https://www.svix.com/slack/).
