Saturday, November 23, 2013

A Simple Moving Average Implementation in Java

On several occasions I've wanted to compute simple metrics in my Java applications, for example the number of hits per hour, or errors throughout a time period. While computing simple metrics is not terribly difficult, it's just extra work and I'd rather spend that time on the problem domain. I was surprised to not find any widely accepted solutions for metrics in Java. I did find Metrics but it seemed a bit too complicated and not well documented -- All I really wanted was to compute a moving average. I thought about the problem some more and decided it's not a difficult problem. Here's my solution;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

public class MovingAverage implements Runnable {

 private final static Logger log = Logger.getLogger(MovingAverage.class);
 
 private volatile Long[] wheel;
 private volatile int tick;
 private int ticks;
 private GetCount getCount;
 
 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
 // computes count per time period
 public MovingAverage(TimeUnit periodUnit, int period, TimeUnit resolutionUnit, int resolution, GetCount getCount) {
  this.getCount = getCount;
  long converted = resolutionUnit.convert(period, periodUnit);
  // how many ticks in wheel
  
  if (converted % resolution != 0) {
   throw new IllegalArgumentException(
     "Resolution time unit must be divisable by the period time unit. Specifically " + converted + " is not divisable by " + resolution + ", remainder is " + converted % resolution);
  }
  
  ticks = (int) (converted / resolution);
  wheel = new Long[ticks];
  
  log.debug("Creating a wheel with a resolution of " + ticks + " ticks, window of " + period + " " + periodUnit + ", that is updated every " + resolution + " " + resolutionUnit + "");
  
  // schedule call tick with fixed frequency
  scheduledExecutorService.scheduleAtFixedRate(this, resolution, resolution, resolutionUnit); 
 }
 
 public void tick() {
  if (tick == wheel.length - 1) {
   // flip
   tick = 0;
  } else if (wheel[tick] != null) {
   // increment if we are not at zero state
   tick++;
  }
  
  wheel[tick] = getCount.getCount();
  log.debug("tick is " + tick + ", count is " + wheel[tick] + ", average is " + this.getAverage());
 }
 
 public boolean isFull() {
  return wheel[wheel.length -1] != null;
 }
 
 public Long getAverage() {
  if (!this.isFull()) {
   // wheel is not full yet. we could do some math but for now we return 0 until wheel is full
   return null;
  }
  
  // tick can move while we compute average so save tick in local var
  int now = tick;
  
  if (now == wheel.length - 1) {
   // at end of wheel
   return wheel[now] - wheel[0];
  }
  
  return wheel[now] - wheel[now + 1];
 }
 
 interface GetCount {
  long getCount();
 }

 @Override
 public void run() {
  tick();
 }
}

This works by creating an array of window / update frequency size, then a thread sets the count to the next index in the array on the update frequency. The count for the interval is simply array[i] - array[i+1], which is the most recent count minus the oldest count. For a 10 minute interval, the oldest count (i+1) is exactly 10 minutes old.

To add a moving average to our code first we'll need a counter, using AtomicLong.

final AtomicLong counter = new AtomicLong();

This counter should be incremented based on the events you're interested in computing (e.g. POST requests for a REST service).

We need to provide the implementation with access to the counter and that is accomplished through the GetCount interface. Here I'll create a moving average with a 5 minute window that updates every second.
MovingAverage movingAverage = new MovingAverage(TimeUnit.MINUTES, 5, TimeUnit.SECONDS, 1, new GetCount() { 
 @Override
 public long getCount() {
  return counter.get();
 }
});

And to get the current average we simply call the getAverage method:

System.out.println(movingAverage.getAverage());

A key implementation detail is how the array size is determined: by dividing the window by the update frequency. So a large window with a frequent update frequency can consume a significant amount of memory. In this example the array size is reasonable 300. However, if we created a 24 hour moving average with a 1 second interval the size would be 86400! A more reasonable update frequency for a 24 hour period might be every 5 minutes (array size of 288).

Another consideration of choosing the window and update frequency is the window must be divisible by the frequency. For example a 2 minute window with a 6 second update frequency is ok, but a 7 second update frequency is not, since it's not divisible by 120. An IllegalArgumentException is thrown if the window modulus update frequency is not zero.

This implementation requires one thread per moving average, which is not very efficient. A better solution would be to share a thread across many averages. Update: I've updated the code to share a thread here.

Lastly, there's an initial state problem: we don't have data yet for the entire window. For example if you have a 5 minute window and only 15 seconds of data. This implementation returns null until we have 5 minutes of data. Another approach is to estimate the average. Suppose we have a count of 10 in 30 seconds, then we can estimate the average as 40 in 2 minutes. However there is risk of significant error by extrapolating incomplete data. For instance, if we had a burst of 20 hits in 2 seconds, we'd be estimating 1200 per 2 minutes, which in all likelihood is way off.



Thursday, November 7, 2013

Synchronous Executor Service

Recently I found myself in a situation where I wanted my code to be asynchronous, but only in certain situations. The solution I came up with is a synchronous Executor

public class SynchronousExecutor implements ExecutorService {

 @Override
 public void execute(Runnable command) {
  throw new UnsupportedOperationException();
 }

 @Override
 public void shutdown() {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<Runnable> shutdownNow() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isShutdown() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isTerminated() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> Future<T> submit(Callable<T> task) {
  T t = null;
  Throwable exception = null;
  
  try {
   t = task.call();
  } catch (Throwable e) {
   exception = e;
  }

  return new SynchronousFuture<T>(t, exception);
 }

 @Override
 public <T> Future<T> submit(Runnable task, T result) {
  throw new UnsupportedOperationException();
 }

 @Override
 public Future<?> submit(Runnable task) {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
  // TODO Auto-generated method stub
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException,
   ExecutionException, TimeoutException {
  throw new UnsupportedOperationException();
 }
}

This implements ExecutorService but I've changed the behavior of submit, by executing the Callable, synchronously. The submit method returns a SynchronousFuture, which contains the return value of the Callable, or a Throwable if the call failed. I return my SynchronousExecutor in getDefaultExecutorService, however In situations where I want asynchronous execution, I override this method and return a standard ExecutorService.

 protected ExecutorService getDefaultExecutorService() { 
  return new SynchronousExecutor();
 }

The SynchronousExecutor is entirely stateless so we really could return the same instance every time instead of creating new objects. SynchronousFuture is pretty basic; many of the Future methods do not apply for synchronous execution, so I indicate that by throwing UnsupportedOperationException.

public class SynchronousFuture<t> implements Future<t> {

 private T t;
 private Throwable e;
 
 public SynchronousFuture(T t, Throwable e) {
  this.t = t;
  this.e = e;
 }
 
 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isCancelled() {
  return false;
 }

 @Override
 public boolean isDone() {
  return true;
 }

 @Override
 public T get() throws InterruptedException, ExecutionException {
  if (e != null) {
   throw new ExecutionException(e);
  }
  
  return t;
 }

 @Override
 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  throw new UnsupportedOperationException();
 }
}

Now we can submit Callables, just as you would to a typical ExecutorService, but it is executed synchronously. The call to the get method of SynchronousFuture never blocks since it's already complete.

Friday, October 25, 2013

Simple Google Oauth2 with Jetty/Servlets

Oauth isn't very complicated but you might get a different impression by looking at Google's Java Oauth client and related libraries. I spent some time working with those libraries, thinking they must be doing something magical, but they're not and they're needlessly complex. I got steered towards Google's client by searching for java google oauth example, which at the time only returned results for the Google Java client. Here I present a simple, and hopefully easy to follow (for Java/Servlets developers) example of authenticating users with Google on a web application. All the necessary information to implement this code here can be found in Google's Oauth2 documentation, here and here.

First we need to obtain credentials for our application. Go to the api console and create a new application. Under "Registered apps", click "Register App", select "web application" and register. Now expand "OAuth 2.0 Client ID" and enter a web origin and redirect page:
WEB ORIGIN
http://localhost:8089
REDIRECT URI
http://localhost:8089/callback

The origin and redirect url must match the address of the web application. I'll use localhost to keep it simple.

Click generate and the client id and secret will update:
CLIENT ID
904170821502.apps.googleusercontent.com
CLIENT SECRET
fFDwuKdd0Eqc5AE6dsTRLyen


You may also want to go to the Console Screen and update the Product Name. This is the name of your app that users see when they are redirected to Google.

The basic Oauth workflow is:
  • Redirect the user to Google, specifying permissions you are requesting and other parameters in the querystring. The user will be asked to sign-in to Google and authorize access to your application.
  • If the user accepts, Google redirects back to your application and provides a code parameter in the querystring which can be exchanged for a access_token via a web service call to Google. The access_token can be used to access Google APIs, on behalf of the user.

    
{
        "access_token": "ya29.AHES6ZQS-BsKiPxdU_iKChTsaGCYZGcuqhm_A5bef8ksNoU",
        "token_type": "Bearer",
        "expires_in": 3600,
        "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6IjA5ZmE5NmFjZWNkOGQyZWRjZmFiMjk0NDRhOTgyN2UwZmFiODlhYTYifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGUuY29tIiwiZW1haWxfdmVyaWZpZWQiOiJ0cnVlIiwiZW1haWwiOiJhbmRyZXcucmFwcEBnbWFpbC5jb20iLCJhdWQiOiI1MDgxNzA4MjE1MDIuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJhdF9oYXNoIjoieUpVTFp3UjVDX2ZmWmozWkNublJvZyIsInN1YiI6IjExODM4NTYyMDEzNDczMjQzMTYzOSIsImF6cCI6IjUwODE3MDgyMTUwMi5hcHBzLmdvb2dsZXVzZXJjb250ZW50LmNvbSIsImlhdCI6MTM4Mjc0MjAzNSwiZXhwIjoxMzgyNzQ1OTM1fQ.Va3kePMh1FlhT1QBdLGgjuaiI3pM9xv9zWGMA9cbbzdr6Tkdy9E-8kHqrFg7cRiQkKt4OKp3M9H60Acw_H15sV6MiOah4vhJcxt0l4-08-A84inI4rsnFn5hp8b-dJKVyxw1Dj1tocgwnYI03czUV3cVqt9wptG34vTEcV3dsU8",
        "refresh_token": "1/Hc1oTSLuw7NMc3qSQMTNqN6MlmgVafc78IZaGhwYS-o"
}
  • Now you can make web service calls to any authorized API. The access_token is only valid for an hour. After that you the user would need to re-authorize your application; however with offline access, you can request a new token, via a refresh_token. In this example I request offline access so a refresh token is provided.
In eclipse I start a Jetty server on 8089 with two endpoints: /signin and /callback. The /signin endpoint will start the Oauth process by redirecting to Google. If authorization is granted, Google will redirect back to /callback with a code parameter.

The code starts a Jetty server on port 8089. Replace clientId and clientSecret with your app's values. Start the application and go to localhost:8089/signin

If everything goes well, you'll see the the user's email, id and whether they are verified, in JSON format.

Aside from Jetty, this code uses just a few libraries to simplify things a bit: httpclient (get/post requests), json-simple (json parsing) and guava (concise maps).

package com.googleoauth;

package com.googleoauth;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import com.google.common.collect.ImmutableMap;


public class GoogleOauthServer {

 private Server server = new Server(8089);

 private final String clientId = "428385348633.apps.googleusercontent.com";
 private final String clientSecret = "zJpDtrqk7is9OwjDNWi5CzOK";
 
 public static void main(String[] args) throws Exception {
  new GoogleOauthServer().startJetty();
 }
 
 public void startJetty() throws Exception {

        ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
        context.setContextPath("/");
        server.setHandler(context);
 
        // map servlets to endpoints
        context.addServlet(new ServletHolder(new SigninServlet()),"/signin");        
        context.addServlet(new ServletHolder(new CallbackServlet()),"/callback");        
        
        server.start();
        server.join();
 }

 class SigninServlet extends HttpServlet {
  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,IOException {
   
   // redirect to google for authorization
   StringBuilder oauthUrl = new StringBuilder().append("https://accounts.google.com/o/oauth2/auth")
   .append("?client_id=").append(clientId) // the client id from the api console registration
   .append("&response_type=code")
   .append("&scope=openid%20email") // scope is the api permissions we are requesting
   .append("&redirect_uri=http://localhost:8089/callback") // the servlet that google redirects to after authorization
   .append("&state=this_can_be_anything_to_help_correlate_the_response%3Dlike_session_id")
   .append("&access_type=offline") // here we are asking to access to user's data while they are not signed in
   .append("&approval_prompt=force"); // this requires them to verify which account to use, if they are already signed in
   
   resp.sendRedirect(oauthUrl.toString());
  } 
 }
 
 class CallbackServlet extends HttpServlet {
  @Override
  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,IOException {
   // google redirects with
   //http://localhost:8089/callback?state=this_can_be_anything_to_help_correlate_the_response%3Dlike_session_id&code=4/ygE-kCdJ_pgwb1mKZq3uaTEWLUBd.slJWq1jM9mcUEnp6UAPFm0F2NQjrgwI&authuser=0&prompt=consent&session_state=a3d1eb134189705e9acf2f573325e6f30dd30ee4..d62c
   
   // if the user denied access, we get back an error, ex
   // error=access_denied&state=session%3Dpotatoes
   
   if (req.getParameter("error") != null) {
    resp.getWriter().println(req.getParameter("error"));
    return;
   }
   
   // google returns a code that can be exchanged for a access token
   String code = req.getParameter("code");
   
   // get the access token by post to Google
   String body = post("https://accounts.google.com/o/oauth2/token", ImmutableMap.<String,String>builder()
     .put("code", code)
     .put("client_id", clientId)
     .put("client_secret", clientSecret)
     .put("redirect_uri", "http://localhost:8089/callback")
     .put("grant_type", "authorization_code").build());

   // ex. returns
//   {
//       "access_token": "ya29.AHES6ZQS-BsKiPxdU_iKChTsaGCYZGcuqhm_A5bef8ksNoU",
//       "token_type": "Bearer",
//       "expires_in": 3600,
//       "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6IjA5ZmE5NmFjZWNkOGQyZWRjZmFiMjk0NDRhOTgyN2UwZmFiODlhYTYifQ.eyJpc3MiOiJhY2NvdW50cy5nb29nbGUuY29tIiwiZW1haWxfdmVyaWZpZWQiOiJ0cnVlIiwiZW1haWwiOiJhbmRyZXcucmFwcEBnbWFpbC5jb20iLCJhdWQiOiI1MDgxNzA4MjE1MDIuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb20iLCJhdF9oYXNoIjoieUpVTFp3UjVDX2ZmWmozWkNublJvZyIsInN1YiI6IjExODM4NTYyMDEzNDczMjQzMTYzOSIsImF6cCI6IjUwODE3MDgyMTUwMi5hcHBzLmdvb2dsZXVzZXJjb250ZW50LmNvbSIsImlhdCI6MTM4Mjc0MjAzNSwiZXhwIjoxMzgyNzQ1OTM1fQ.Va3kePMh1FlhT1QBdLGgjuaiI3pM9xv9zWGMA9cbbzdr6Tkdy9E-8kHqrFg7cRiQkKt4OKp3M9H60Acw_H15sV6MiOah4vhJcxt0l4-08-A84inI4rsnFn5hp8b-dJKVyxw1Dj1tocgwnYI03czUV3cVqt9wptG34vTEcV3dsU8",
//       "refresh_token": "1/Hc1oTSLuw7NMc3qSQMTNqN6MlmgVafc78IZaGhwYS-o"
//   }
   
   JSONObject jsonObject = null;
   
   // get the access token from json and request info from Google
   try {
    jsonObject = (JSONObject) new JSONParser().parse(body);
   } catch (ParseException e) {
    throw new RuntimeException("Unable to parse json " + body);
   }
   
   // google tokens expire after an hour, but since we requested offline access we can get a new token without user involvement via the refresh token
   String accessToken = (String) jsonObject.get("access_token");
     
   // you may want to store the access token in session
   req.getSession().setAttribute("access_token", accessToken);
   
   // get some info about the user with the access token
   String json = get(new StringBuilder("https://www.googleapis.com/oauth2/v1/userinfo?access_token=").append(accessToken).toString());
   
   // now we could store the email address in session
   
   // return the json of the user's basic info
   resp.getWriter().println(json);
  } 
 }
 
 // makes a GET request to url and returns body as a string
 public String get(String url) throws ClientProtocolException, IOException {
  return execute(new HttpGet(url));
 }
 
 // makes a POST request to url with form parameters and returns body as a string
 public String post(String url, Map<String,String> formParameters) throws ClientProtocolException, IOException { 
  HttpPost request = new HttpPost(url);
   
  List <NameValuePair> nvps = new ArrayList <NameValuePair>();
  
  for (String key : formParameters.keySet()) {
   nvps.add(new BasicNameValuePair(key, formParameters.get(key))); 
  }

  request.setEntity(new UrlEncodedFormEntity(nvps));
  
  return execute(request);
 }
 
 // makes request and checks response code for 200
 private String execute(HttpRequestBase request) throws ClientProtocolException, IOException {
  HttpClient httpClient = new DefaultHttpClient();
  HttpResponse response = httpClient.execute(request);
     
  HttpEntity entity = response.getEntity();
     String body = EntityUtils.toString(entity);

  if (response.getStatusLine().getStatusCode() != 200) {
   throw new RuntimeException("Expected 200 but got " + response.getStatusLine().getStatusCode() + ", with body " + body);
  }

     return body;
 }
}
Here the are maven dependencies
  <dependency>
   <groupId>org.eclipse.jetty</groupId>
   <artifactId>jetty-server</artifactId>
   <version>8.1.8.v20121106</version>
  </dependency>
  <dependency>
   <groupId>org.eclipse.jetty</groupId>
   <artifactId>jetty-servlet</artifactId>
   <version>8.1.8.v20121106</version>
  </dependency>
  <dependency>
   <groupId>org.eclipse.jetty</groupId>
   <artifactId>jetty-util</artifactId>
   <version>8.1.8.v20121106</version>
  </dependency>
  <dependency>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpclient</artifactId>
   <version>4.2.1</version>
  </dependency>
  <dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>14.0.1</version>
  </dependency>
  <dependency>
   <groupId>com.googlecode.json-simple</groupId>
   <artifactId>json-simple</artifactId>
   <version>1.1.1</version>
  </dependency>      
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.16</version>
  </dependency>

All the code here can be found at github

A useful tool for learning/developing with Oauth is the the Google Oauth Playground

Friday, July 5, 2013

Kestrel with spymemcached Java Client

Recently I started to experiment with Kestrel. Kestrel is a fast, lightweight, persistent queue, in use at Twitter. It doesn't support transactions but it has a reliable read feature to allow the client to acknowledge it has completed processing an item off the queue. It took me a bit to get the client working in Java. I couldn't find any substantitive examples and my lack of memcached knowledge didn't help. Kestrel supports a subset of Memcache protocol, so any memcached client should work; I used the Java spymemcached client.

The server configuration and start up is well documented so I won't cover that. Here's my client code adding items to the queue

  queue = Memcached.newKestrelClient(ImmutableList.of("127.0.0.1:22133"));
  
  long start = System.currentTimeMillis();
  
  for (int i = 0; i < 10000; i++) {
   
   if (i % 1000 == 0) {
    log.debug("adding tracking event queue "); 
   }
   
   OperationFuture reply = queue.set("tracking-queue", 0, this.getJson(i));
   
   try {
//    log.debug("Waiting for reply");
    if (!reply.get()) {
     log.warn("set failed");
    }
   } catch (InterruptedException | ExecutionException e1) {
    log.warn("Get failed", e1);
   }
  }
  
  log.debug("Finished in " + (System.currentTimeMillis() - start));
I'm storing JSON in the queue so my getJson(int i) method just produces some arbitrary JSON. The queue.set method is asynchronous and returns OperationFuture. By calling get on the Future you are waiting for Kestrel to reply. This is very importan since otherwise it my not add your item and you won't know. This happened to me at first when I was ignoring the Future and only a fraction of my items where added to the queue. Once I made it a synchronous with the get, I didn't see any failures. On my notebook, running Kestrel on the same machine, I was able to add 10000 items to the queue in about 4 seconds.

I'm using the twitter-commons Memcached helper class but it's not doing much so using spymemcached directly is likely a better option.

The dependencies for this code are as follows:
   <dependency>
         <groupId>spy</groupId>
         <artifactId>spymemcached</artifactId>
         <version>2.8.1</version>
     </dependency> 
     <dependency>
       <groupId>com.twitter.common</groupId>
       <artifactId>memcached</artifactId>
       <version>0.0.7</version>
     </dependency>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.16</version>
  </dependency>
These are not in Maven Central so you'll need to add repositories:
 <repositories>
  <repository>
   <id>spy</id>
   <name>Spy Repository</name>
   <layout>default</layout>
   <url>http://files.couchbase.com/maven2/</url>
   <snapshots>
    <enabled>false</enabled>
   </snapshots>
  </repository>
  <repository>
   <id>twitter-twttr</id>
   <url>http://maven.twttr.com/</url>
  </repository>
 </repositories>
Here's my code for consuming the queue:

  queue = Memcached.newKestrelClient(ImmutableList.of("127.0.0.1:22133"));
  
  Object o = null;
  int count = 0;
  long start = System.currentTimeMillis();
  
  while ((o = queue.get("tracking-queue/open/t=1000")) != null) {
     
   try {
    TrackingEvent.toObject((String)o);
    // indicate the item was consumed (reliable read)
    try {
     queue.get("tracking-queue/close");
     count++;
    } catch (RuntimeException e) {
     log.warn("Item processed but we failed to mark it as consumed. will get processed again", e);
    }
   } catch (RuntimeException e) {
    log.warn("Unable to process item [" + (String)o + "]", e);
    // since we don't close this item will not be removed from queue
   }
  }   
 
  log.debug("Consumed " + count + " items from queue in " + (System.currentTimeMillis() - start));
I'm using the reliable read feature to indicate to Kestrel that we've processed the item. This helps ensure the item will be processed by another client in the event an exception occurs after we've removed the item from the queue and failed. The queue.get("tracking-queue/close"); tells Kestrel it's been processed. If Kestrel doesn't received this call it will return the item to the queue. Of course if the queue.get("tracking-queue/close"); call fails then we've processed the item twice. It took about 8 seconds to consume 10000 objects from the queue.

Wednesday, May 8, 2013

Prevent Multiple Windows with Sublime Text

By default Sublime Text opens new files in new windows. I find this extremely annoying. You can change this behavior by selecting

Preferences -> Settings - Default

Find the text:

"open_files_in_new_window": true,

And change to:

"open_files_in_new_window": false,

I tried a number of searches, including "merge windows" (google chrome has this where you can consolidate several windows into one), and "sublime prevent multiple windows". Oddly there were no results. I would have though this would be in a FAQ. As it turns out it's an easy fix but not so obvious since they don't have UI for preferences.



Saturday, April 6, 2013

JDBI Mockito Headaches

I've started using JDBI for my background jobs since it alleviates much of the pain of JDBC. So far it has been a good experience except that when I started writing unit tests I found that the author has placed the final modifier on all the bind methods, which makes it impossible to override for mocking. I looked at PowerMock (which performs tricks to mock final methods) but it produced inconsistent results; essentially it worked for the first invocation but failed on subsequent invocations. Instead of giving up on JDBI, I created an UpdateDelegate class with simply provides non-final bind methods and delegates to the JDBI class.


public class UpdateDelegate {
private Update delegate;
public UpdateDelegate(Update update) {
this.delegate = update;
}

public UpdateDelegate bindNull(String alias, int type) {
delegate.bindNull(alias, type);
return this;
}
public UpdateDelegate bind(String alias, Long value) {
delegate.bind(alias, value);
return this;
}
public UpdateDelegate bind(String alias, String value) {
delegate.bind(alias, value);
return this;
}

public UpdateDelegate bind(String alias, Float value) {
delegate.bind(alias, value);
return this;
}

public UpdateDelegate bind(String alias, Integer value) {
delegate.bind(alias, value);
return this;
}
public UpdateDelegate bind(String alias, Timestamp value) {
delegate.bind(alias, value);
return this;
}
public int execute() {
return delegate.execute();
}

In my test class I use Mockito to mock UpdateDelegate and apply the following stubbing, so the fluents work

private void applyWhens(UpdateDelegate updateDelegate) {
Mockito.when(updateDelegate.bind(Mockito.anyString(), Mockito.anyLong())).thenReturn(updateDelegate);
Mockito.when(updateDelegate.bind(Mockito.anyString(), Mockito.anyInt())).thenReturn(updateDelegate);
Mockito.when(updateDelegate.bind(Mockito.anyString(), Mockito.anyString())).thenReturn(updateDelegate);
Mockito.when(updateDelegate.bind(Mockito.anyString(), Mockito.anyFloat())).thenReturn(updateDelegate);
Mockito.when(updateDelegate.bind(Mockito.anyString(), Mockito.any(Timestamp.class))).thenReturn(updateDelegate);
Mockito.when(updateDelegate.bindNull(Mockito.anyString(), Mockito.anyInt())).thenReturn(updateDelegate);
}

Now I can verify, e.g.

Mockito.verify(updateDelegate).bind("total", 1000L);


All of this nonsense because of the final methods on bind, sheesh.

Saturday, March 2, 2013

Google Oauth/Jetty 8 Conflict

I've been in the process of applying Google Oauth2 to some of my web apps. I ran into an issue when adding Google Oauth2 to a Jetty 8 application:


java.lang.SecurityException: class "javax.servlet.FilterRegistration"'s signer information does not match signer information of other classes in the same package

The problem is the with the com.google.oauth-client dependency

    <dependency>
      <groupId>com.google.oauth-client</groupId>
      <artifactId>google-oauth-client-servlet</artifactId>
      <version>1.13.1-beta</version>
    </dependency>

This depends on an older version of servlet-api (javax.servlet:servlet-api:jar:2.5:compile), while Jetty 8 depends on Servlet 3 (javax.servlet:jar:3.0.0.v201112011016). 

You can run mvn dependency:tree to view the dependencies

To fix the issue I excluded the 2.5 in my Maven assembly dependencySet

      <excludes>
        <exclude>javax.servlet:servlet-api</exclude>
      </excludes>




Sunday, February 24, 2013

Adding Non-Maven Projects to Your Maven Project - Google Cloud Messaging


You may have noticed that Google has not always embraced Maven. I've started to play around with Google Cloud Messaging and needed a solution to use the non-Maven gcm-server.jar in my Maven project. Because the gcm-server.jar has no dependencies, it can be easily accomplished with the maven install plugin:

mvn install:install-file -Dfile=android-sdks//extras/google/gcm/gcm-server/dist/gcm-server.jar -Dsources=android-sdks//extras/google/gcm/gcm-server/dist/gcm-server-src.jar -DgroupId=com.google.gcm -DartifactId=gcm-server -Dversion=1.0.2 -Dpackaging=jar -DgeneratePom=true -DcreateChecksum=true

The GCM project uses Ant, so alternatively it could be built with Ant from your Maven project.

Now the dependency can be added to your project with

<dependency>
   <groupId>com.google.gcm</groupId>
   <artifactId>gcm-server</artifactId>
   <version>1.0.2</version>
</dependency>

Of course you'll need to re-install whenever a new GCM version is released