This document describes the current stable version of Kombu (5.0). For development docs, go here.

Source code for kombu.asynchronous.aws.sqs.message

"""Amazon SQS message implementation."""

import base64

from kombu.message import Message
from kombu.utils.encoding import str_to_bytes


[docs]class BaseAsyncMessage(Message): """Base class for messages received on async client."""
[docs]class AsyncRawMessage(BaseAsyncMessage): """Raw Message."""
[docs]class AsyncMessage(BaseAsyncMessage): """Serialized message."""
[docs] def encode(self, value): """Encode/decode the value using Base64 encoding.""" return base64.b64encode(str_to_bytes(value)).decode()
def __getitem__(self, item): """Support Boto3-style access on a message.""" if item == 'ReceiptHandle': return self.receipt_handle elif item == 'Body': return self.get_body() elif item == 'queue': return self.queue else: raise KeyError(item)