Code Vigorous


navigation
home
github
mozillians
email
about
Dustin J. Mitchell

Kombu and SQS

23 Dec 2014

Today I’ve been working on a simple, generic message-queueing interface for RelengAPI. In production, this will use Amazon SQS, but for development and debugging purposes I’d like to be able to use other transports. While Boto provides a nice low-level interface to SQS, Kombu has the multi-transport flexibility I’d like.

However, there is absolutely no documentation for Kombu’s SQS transport. SQS has a dead-simple model, with little more than “put” and “get” operations on named queues. Kombu, on the other hand, is built around the AMQP exchange / queue model, which supports message routing, “fanout” (with multiple consumers of each message), and lots of other fancy features.

Schema

Kombu’s solution to this mismatch is to emulate AMQP exchanges and delivery to queues internally, and only use SQS for the queues themselves. In practical terms, that means that the entire AMQP schema must be declared in all applications accessing the queues. Unlike an AMQP connection, declaring only the exchange in the producer is not sufficient, and messages will be silently discarded. Furthermore, the name of the exchange is irrelevant – the only symbol used outside of the process is the queue name.

The “simple” interface takes care of all of this nicely:

with kombu.Connection(broker_url, transport_options=dict(region='us-east-1')) as conn:
    simple_queue = conn.SimpleQueue('dustin_test_queue')
    message = 'helloword, sent at %s' % datetime.datetime.today()
    simple_queue.put(message)

Message Format

Kombu also applies some processing to the message contents. Specifically, the message body is dumped as JSON and base64-encoded, then included in a JSON object containing other metadata, and the result is then base64-encoded again. For example,

eyJib2R5IjogImV5Sm9aV3hzYnlJNklDSjNiM0pzWkNKOSIsICJoZWFkZXJzIjoge30sICJjb250
ZW50LXR5cGUiOiAiYXBwbGljYXRpb24vanNvbiIsICJwcm9wZXJ0aWVzIjogeyJib2R5X2VuY29k
aW5nIjogImJhc2U2NCIsICJkZWxpdmVyeV9pbmZvIjogeyJwcmlvcml0eSI6IDAsICJyb3V0aW5n
X2tleSI6ICJkdXN0aW5fdGVzdF9xdWV1ZSIsICJleGNoYW5nZSI6ICJkdXN0aW5fdGVzdF9xdWV1
ZSJ9LCAiZGVsaXZlcnlfbW9kZSI6IDIsICJkZWxpdmVyeV90YWciOiAiMWVmMzM4ODYtZGVjOS00
MGQyLWIyYjUtMjBkZWU0MGY5ZTFjIn0sICJjb250ZW50LWVuY29kaW5nIjogInV0Zi04In0

decodes to (re-indented):

{
    "body": "eyJoZWxsbyI6ICJ3b3JsZCJ9",
    "content-encoding": "utf-8",
    "content-type": "application/json",
    "headers": {},
    "properties": {
        "body_encoding": "base64",
        "delivery_info": {
            "exchange": "dustin_test_queue",
            "priority": 0,
            "routing_key": "dustin_test_queue"
        },
        "delivery_mode": 2,
        "delivery_tag": "1ef33886-dec9-40d2-b2b5-20dee40f9e1c"
    }
}

and the body attribute decodes to {"hello": "world"}.

While it’s possible to specify encodings other than base64, it’s not possible to disable the metadata wrapping, which makes this unsuitable for sending arbitrary content.