Thursday, September 4, 2008

Sending Large Amazon SQS Messages with Typica in Java

Here's a real-world case I ran into building JamLegend (http://jamlegend.com)...

Part of JamLegend is in the web-tier and part is in multiple socket-layer servers built on Apache Mina's awesome non-blocking technology. When you have only one of either, communication is easy, but when you have multiple of each, then you really need a system to communicate effectively no matter how many of each, or which particular of each, happen to be running at any given moment in time. Fortunately, Amazon SQS comes to the rescue and, combined with Typica (http://code.google.com/p/typica/) makes it brilliantly simply to distribute jobs amongst any size group of workers.

But SQS has it's own limitation. From the documentation,
Amazon SQS messages can contain up to 8 KB of text data, including XML, JSON and unformatted text.
But that isn't the whole story. More specifically, SQS messages can contain up to 8KB of UTF-16 encoded text data, which makes a big difference in the max character length of your message. So, now we have a constraint to work with, but it's not hard to imagine a case where sending larger messages is quite useful.

Ergo, a code tutorial on chunking SQS messages and some brief discussion of other considerations.

I use Typica, as a mentioned above, which is an awesome SQS utility for Java. Assume for the following that your internal protocol for SQS messages is:
header;content
Given that, chunking a large SQS message into multiple smaller messages is pretty straightforward. First, that header is going to need to accompany all of the messages. Then, we'll repeatedly form messages of the maximum size until we've sent all the data. In the following code, message would be the raw text of the SQS message you wish to send.

Some helpers:

public static final String UTF_16 = "UTF-16";

public static final int SQS_MAX_MESSAGE_SIZE = 8192;


Now onto the logic:

// Connect to the Queue
MessageQueue queue = SQSUtils.connectToQueue(SQS_QUEUE_NAME,AWS_ACCESS_KEY_ID,AWS_SHARED_SECRET);

// if message is small enough to be sent as one message, do it
if (message.getBytes(UTF_16).length <= SQS_MAX_MESSAGE_SIZE) {
queue.sendMessage(message);
} else {

// if it's too big for one message, chunk it and send as multiple

// split the message according to the protocol, but limit it
// in case the content portion contains a semi-colon
String[] parts = message.split(";",2);

// break off the header, this is needed for each chunk
byte[] header = (parts[0] + ";").getBytes(UTF_16);

// get the content as a byte[]
byte[] content = parts[1].getBytes(UTF_16);

// figure out how much content can be in each chunk
int chunkSize = SQS_MAX_MESSAGE_SIZE - header.length;

// create a byte[] for our max message size
// we're going to repeatedly fill this and send the message while
// content remains.
byte[] bytes = new byte[SQS_MAX_MESSAGE_SIZE];

// copy the header into the byte[], we'll only do this once
System.arraycopy(header,0,bytes,0,header.length);

// while there is content left, send a message
for (int i = 0; i < content.length; i += chunkSize) {

// copy the smaller of the remaining bytes or the max chunkSize chunk
// of content into the message array, then send the message. Form the
// message String from the appropriate portion of the array
if (content.length - i < chunkSize) {

System.arraycopy(content,i,bytes,header.length,content.length - i);
message = new String(bytes,0,header.length + content.length - i,UTF_16);

} else {

System.arraycopy(content,i,bytes,header.length,chunkSize);
message = new String(bytes,UTF_16);

}
// send the chunk
queue.sendMessage(message);
}

Since the max size is a measure of UTF-16 bytes, I found it easiest to just deal with everything as a byte[], but you could do it other ways if you felt like it. The bigger thing to note is that this is really only half the solution, since now you face the task of handling chunked SQS messages on the other side. Possible solutions include:

  • Ensuring each message is executable alone, and thus the desired effect is the sum of the executions of each individual message. This has the benefit of not caring which of the workers receive the message, and was the solution I chose to implement.

  • Ensure each message is received by a particular worker, which will then reassemble the messages and execute the whole. For this to work, you really should implement an additional part to the header indicating how many chunks compose a whole and have each chunk identify itself as chunk x of y. That way, the worker will know when it has received the entirety of the message and may execute its task.

13 comments:

scranthdaddy said...

Great post. I initially saw the message size limitation as a deal breaker but I am glad to see how you have circumvented that. Thanks!

Anonymous said...

I overcame the size limit in a different way. If the message body is less than 8K, I store it in its entirety. If it is larger than 8K, I store the body in S3, and store the key in the message queue body using a typographical convention such as __S3_KEY_ID:blah

Then I check the message body upon retrieval. If it begins with __S3_KEY_ID then I know to look for the data in S3. Otherwise, it is entire body and I process it accordingly.

hope this helps!

Anonymous said...

scranthdaddy's suggestion seems much more robust to me. It would be ideal if someone provided an extension to Typica that did this automatically.

Anonymous said...

PLease tell me How to Connect Queue in .Net? What is MessageQueue?? i have not seen yet.

Imron Rosadi said...

This info is very helpful for me with my project time and thanks you very much for using the valuable info in this blog. resep kue

mebel jati said...

This is a great article, Thanks for giving me this information. Keep posting
Mebel Jepara Murah :
Mebel Jepara Online :
Kursi Tamu :
Mebel Jepara Minimalis :
Mebel Jepara Online :
Mebel Jepara :
Mebel Jati :
Mebel Jati Jepara :
Kursi Tamu Jati :
Furniture Jati :
Karya Priboemi Jepara :
Kursi Tamu Jati :
Furniture jepara Online :
Mebel jepara
Mebel Jati
Mebel Jepara :
Mebel Jepara Murah
Mebel Minimalis
Meja Makan Minimalis :
Kursi Tamu Mewah
gebyok jepara
Meja Makan
Tempat Tidur Jati

jual furniture said...

kamar tidur anak
kamar tidur
kamar tidur minimalis
kursi makan
kitchen set
sofa tamu
gazebo
table tv

dipan klasik
kursi pantai
kursi louis
kaligrafi
dipan rococo
kursi rococo
kursi sudut
kursi tamu minimalis

sofa klasik
set kursi makan
meja rias
meja kantor
meja belajar
lemari pakaian
lemari buku
box bayi

aaa kitty20101122 said...

pandora charms sale
chrome hearts online store
burberry scarf
adidas nmd
michael kors outlet
Kanye West shoes
yeezy boost
true religion jeans
michael kors handbags
asics running shoes

aaa kitty20101122 said...

nmd
cartier bracelet
air yeezy
air jordan 13
louboutin shoes uk
adidas gazelle
longchamp
air jordan shoes
michael kors handbags
curry 3

aaa kitty20101122 said...

longchamp longchamps
michael kors handbags outlet
cheap jordans
lebron shoes
yeezy boost 350 v2
converse outlet store
fitflops sale clearance
golden goose
chrome hearts online
lebron 14 shoes

Pervez Joarder said...

Document truly beloved viewing should never content and articles. Document discovered this being an important using inspiring page; Document yet discover it particularly important using smooth. Already follow the check cashing I would like you to ultimately treasure quite a lot to exertions you will benefit from in writing this forum.

洛噷 said...

2018625 leilei3915
michael kors outlet online
asics shoes
coach outlet
christian louboutin outlet
moncler jackets
christian louboutin sale
michael kors outlet clearance
ray ban sunglasses outlet
canada goose jackets
polo ralph lauren outlet

洛噷 said...

2018625 leilei3915
michael kors outlet online
asics shoes
coach outlet
christian louboutin outlet
moncler jackets
christian louboutin sale
michael kors outlet clearance
ray ban sunglasses outlet
canada goose jackets
polo ralph lauren outlet