Saturday, November 23, 2013

A Simple Moving Average Implementation in Java

On several occasions I've wanted to compute simple metrics in my Java applications, for example the number of hits per hour, or errors throughout a time period. While computing simple metrics is not terribly difficult, it's just extra work and I'd rather spend that time on the problem domain. I was surprised to not find any widely accepted solutions for metrics in Java. I did find Metrics but it seemed a bit too complicated and not well documented -- All I really wanted was to compute a moving average. I thought about the problem some more and decided it's not a difficult problem. Here's my solution;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

public class MovingAverage implements Runnable {

 private final static Logger log = Logger.getLogger(MovingAverage.class);
 
 private volatile Long[] wheel;
 private volatile int tick;
 private int ticks;
 private GetCount getCount;
 
 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
 // computes count per time period
 public MovingAverage(TimeUnit periodUnit, int period, TimeUnit resolutionUnit, int resolution, GetCount getCount) {
  this.getCount = getCount;
  long converted = resolutionUnit.convert(period, periodUnit);
  // how many ticks in wheel
  
  if (converted % resolution != 0) {
   throw new IllegalArgumentException(
     "Resolution time unit must be divisable by the period time unit. Specifically " + converted + " is not divisable by " + resolution + ", remainder is " + converted % resolution);
  }
  
  ticks = (int) (converted / resolution);
  wheel = new Long[ticks];
  
  log.debug("Creating a wheel with a resolution of " + ticks + " ticks, window of " + period + " " + periodUnit + ", that is updated every " + resolution + " " + resolutionUnit + "");
  
  // schedule call tick with fixed frequency
  scheduledExecutorService.scheduleAtFixedRate(this, resolution, resolution, resolutionUnit); 
 }
 
 public void tick() {
  if (tick == wheel.length - 1) {
   // flip
   tick = 0;
  } else if (wheel[tick] != null) {
   // increment if we are not at zero state
   tick++;
  }
  
  wheel[tick] = getCount.getCount();
  log.debug("tick is " + tick + ", count is " + wheel[tick] + ", average is " + this.getAverage());
 }
 
 public boolean isFull() {
  return wheel[wheel.length -1] != null;
 }
 
 public Long getAverage() {
  if (!this.isFull()) {
   // wheel is not full yet. we could do some math but for now we return 0 until wheel is full
   return null;
  }
  
  // tick can move while we compute average so save tick in local var
  int now = tick;
  
  if (now == wheel.length - 1) {
   // at end of wheel
   return wheel[now] - wheel[0];
  }
  
  return wheel[now] - wheel[now + 1];
 }
 
 interface GetCount {
  long getCount();
 }

 @Override
 public void run() {
  tick();
 }
}

This works by creating an array of window / update frequency size, then a thread sets the count to the next index in the array on the update frequency. The count for the interval is simply array[i] - array[i+1], which is the most recent count minus the oldest count. For a 10 minute interval, the oldest count (i+1) is exactly 10 minutes old.

To add a moving average to our code first we'll need a counter, using AtomicLong.

final AtomicLong counter = new AtomicLong();

This counter should be incremented based on the events you're interested in computing (e.g. POST requests for a REST service).

We need to provide the implementation with access to the counter and that is accomplished through the GetCount interface. Here I'll create a moving average with a 5 minute window that updates every second.
MovingAverage movingAverage = new MovingAverage(TimeUnit.MINUTES, 5, TimeUnit.SECONDS, 1, new GetCount() { 
 @Override
 public long getCount() {
  return counter.get();
 }
});

And to get the current average we simply call the getAverage method:

System.out.println(movingAverage.getAverage());

A key implementation detail is how the array size is determined: by dividing the window by the update frequency. So a large window with a frequent update frequency can consume a significant amount of memory. In this example the array size is reasonable 300. However, if we created a 24 hour moving average with a 1 second interval the size would be 86400! A more reasonable update frequency for a 24 hour period might be every 5 minutes (array size of 288).

Another consideration of choosing the window and update frequency is the window must be divisible by the frequency. For example a 2 minute window with a 6 second update frequency is ok, but a 7 second update frequency is not, since it's not divisible by 120. An IllegalArgumentException is thrown if the window modulus update frequency is not zero.

This implementation requires one thread per moving average, which is not very efficient. A better solution would be to share a thread across many averages. Update: I've updated the code to share a thread here.

Lastly, there's an initial state problem: we don't have data yet for the entire window. For example if you have a 5 minute window and only 15 seconds of data. This implementation returns null until we have 5 minutes of data. Another approach is to estimate the average. Suppose we have a count of 10 in 30 seconds, then we can estimate the average as 40 in 2 minutes. However there is risk of significant error by extrapolating incomplete data. For instance, if we had a burst of 20 hits in 2 seconds, we'd be estimating 1200 per 2 minutes, which in all likelihood is way off.



Thursday, November 7, 2013

Synchronous Executor Service

Recently I found myself in a situation where I wanted my code to be asynchronous, but only in certain situations. The solution I came up with is a synchronous Executor

public class SynchronousExecutor implements ExecutorService {

 @Override
 public void execute(Runnable command) {
  throw new UnsupportedOperationException();
 }

 @Override
 public void shutdown() {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<Runnable> shutdownNow() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isShutdown() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isTerminated() {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> Future<T> submit(Callable<T> task) {
  T t = null;
  Throwable exception = null;
  
  try {
   t = task.call();
  } catch (Throwable e) {
   exception = e;
  }

  return new SynchronousFuture<T>(t, exception);
 }

 @Override
 public <T> Future<T> submit(Runnable task, T result) {
  throw new UnsupportedOperationException();
 }

 @Override
 public Future<?> submit(Runnable task) {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
  // TODO Auto-generated method stub
  throw new UnsupportedOperationException();
 }

 @Override
 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException,
   ExecutionException, TimeoutException {
  throw new UnsupportedOperationException();
 }
}

This implements ExecutorService but I've changed the behavior of submit, by executing the Callable, synchronously. The submit method returns a SynchronousFuture, which contains the return value of the Callable, or a Throwable if the call failed. I return my SynchronousExecutor in getDefaultExecutorService, however In situations where I want asynchronous execution, I override this method and return a standard ExecutorService.

 protected ExecutorService getDefaultExecutorService() { 
  return new SynchronousExecutor();
 }

The SynchronousExecutor is entirely stateless so we really could return the same instance every time instead of creating new objects. SynchronousFuture is pretty basic; many of the Future methods do not apply for synchronous execution, so I indicate that by throwing UnsupportedOperationException.

public class SynchronousFuture<t> implements Future<t> {

 private T t;
 private Throwable e;
 
 public SynchronousFuture(T t, Throwable e) {
  this.t = t;
  this.e = e;
 }
 
 @Override
 public boolean cancel(boolean mayInterruptIfRunning) {
  throw new UnsupportedOperationException();
 }

 @Override
 public boolean isCancelled() {
  return false;
 }

 @Override
 public boolean isDone() {
  return true;
 }

 @Override
 public T get() throws InterruptedException, ExecutionException {
  if (e != null) {
   throw new ExecutionException(e);
  }
  
  return t;
 }

 @Override
 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  throw new UnsupportedOperationException();
 }
}

Now we can submit Callables, just as you would to a typical ExecutorService, but it is executed synchronously. The call to the get method of SynchronousFuture never blocks since it's already complete.