Friday, July 5, 2013

Kestrel with spymemcached Java Client

Recently I started to experiment with Kestrel. Kestrel is a fast, lightweight, persistent queue, in use at Twitter. It doesn't support transactions but it has a reliable read feature to allow the client to acknowledge it has completed processing an item off the queue. It took me a bit to get the client working in Java. I couldn't find any substantitive examples and my lack of memcached knowledge didn't help. Kestrel supports a subset of Memcache protocol, so any memcached client should work; I used the Java spymemcached client.

The server configuration and start up is well documented so I won't cover that. Here's my client code adding items to the queue

  queue = Memcached.newKestrelClient(ImmutableList.of("127.0.0.1:22133"));
  
  long start = System.currentTimeMillis();
  
  for (int i = 0; i < 10000; i++) {
   
   if (i % 1000 == 0) {
    log.debug("adding tracking event queue "); 
   }
   
   OperationFuture reply = queue.set("tracking-queue", 0, this.getJson(i));
   
   try {
//    log.debug("Waiting for reply");
    if (!reply.get()) {
     log.warn("set failed");
    }
   } catch (InterruptedException | ExecutionException e1) {
    log.warn("Get failed", e1);
   }
  }
  
  log.debug("Finished in " + (System.currentTimeMillis() - start));
I'm storing JSON in the queue so my getJson(int i) method just produces some arbitrary JSON. The queue.set method is asynchronous and returns OperationFuture. By calling get on the Future you are waiting for Kestrel to reply. This is very importan since otherwise it my not add your item and you won't know. This happened to me at first when I was ignoring the Future and only a fraction of my items where added to the queue. Once I made it a synchronous with the get, I didn't see any failures. On my notebook, running Kestrel on the same machine, I was able to add 10000 items to the queue in about 4 seconds.

I'm using the twitter-commons Memcached helper class but it's not doing much so using spymemcached directly is likely a better option.

The dependencies for this code are as follows:
   <dependency>
         <groupId>spy</groupId>
         <artifactId>spymemcached</artifactId>
         <version>2.8.1</version>
     </dependency> 
     <dependency>
       <groupId>com.twitter.common</groupId>
       <artifactId>memcached</artifactId>
       <version>0.0.7</version>
     </dependency>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.16</version>
  </dependency>
These are not in Maven Central so you'll need to add repositories:
 <repositories>
  <repository>
   <id>spy</id>
   <name>Spy Repository</name>
   <layout>default</layout>
   <url>http://files.couchbase.com/maven2/</url>
   <snapshots>
    <enabled>false</enabled>
   </snapshots>
  </repository>
  <repository>
   <id>twitter-twttr</id>
   <url>http://maven.twttr.com/</url>
  </repository>
 </repositories>
Here's my code for consuming the queue:

  queue = Memcached.newKestrelClient(ImmutableList.of("127.0.0.1:22133"));
  
  Object o = null;
  int count = 0;
  long start = System.currentTimeMillis();
  
  while ((o = queue.get("tracking-queue/open/t=1000")) != null) {
     
   try {
    TrackingEvent.toObject((String)o);
    // indicate the item was consumed (reliable read)
    try {
     queue.get("tracking-queue/close");
     count++;
    } catch (RuntimeException e) {
     log.warn("Item processed but we failed to mark it as consumed. will get processed again", e);
    }
   } catch (RuntimeException e) {
    log.warn("Unable to process item [" + (String)o + "]", e);
    // since we don't close this item will not be removed from queue
   }
  }   
 
  log.debug("Consumed " + count + " items from queue in " + (System.currentTimeMillis() - start));
I'm using the reliable read feature to indicate to Kestrel that we've processed the item. This helps ensure the item will be processed by another client in the event an exception occurs after we've removed the item from the queue and failed. The queue.get("tracking-queue/close"); tells Kestrel it's been processed. If Kestrel doesn't received this call it will return the item to the queue. Of course if the queue.get("tracking-queue/close"); call fails then we've processed the item twice. It took about 8 seconds to consume 10000 objects from the queue.