Saturday, November 23, 2013

A Simple Moving Average Implementation in Java

On several occasions I've wanted to compute simple metrics in my Java applications, for example the number of hits per hour, or errors throughout a time period. While computing simple metrics is not terribly difficult, it's just extra work and I'd rather spend that time on the problem domain. I was surprised to not find any widely accepted solutions for metrics in Java. I did find Metrics but it seemed a bit too complicated and not well documented -- All I really wanted was to compute a moving average. I thought about the problem some more and decided it's not a difficult problem. Here's my solution;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

public class MovingAverage implements Runnable {

 private final static Logger log = Logger.getLogger(MovingAverage.class);
 
 private volatile Long[] wheel;
 private volatile int tick;
 private int ticks;
 private GetCount getCount;
 
 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
 // computes count per time period
 public MovingAverage(TimeUnit periodUnit, int period, TimeUnit resolutionUnit, int resolution, GetCount getCount) {
  this.getCount = getCount;
  long converted = resolutionUnit.convert(period, periodUnit);
  // how many ticks in wheel
  
  if (converted % resolution != 0) {
   throw new IllegalArgumentException(
     "Resolution time unit must be divisable by the period time unit. Specifically " + converted + " is not divisable by " + resolution + ", remainder is " + converted % resolution);
  }
  
  ticks = (int) (converted / resolution);
  wheel = new Long[ticks];
  
  log.debug("Creating a wheel with a resolution of " + ticks + " ticks, window of " + period + " " + periodUnit + ", that is updated every " + resolution + " " + resolutionUnit + "");
  
  // schedule call tick with fixed frequency
  scheduledExecutorService.scheduleAtFixedRate(this, resolution, resolution, resolutionUnit); 
 }
 
 public void tick() {
  if (tick == wheel.length - 1) {
   // flip
   tick = 0;
  } else if (wheel[tick] != null) {
   // increment if we are not at zero state
   tick++;
  }
  
  wheel[tick] = getCount.getCount();
  log.debug("tick is " + tick + ", count is " + wheel[tick] + ", average is " + this.getAverage());
 }
 
 public boolean isFull() {
  return wheel[wheel.length -1] != null;
 }
 
 public Long getAverage() {
  if (!this.isFull()) {
   // wheel is not full yet. we could do some math but for now we return 0 until wheel is full
   return null;
  }
  
  // tick can move while we compute average so save tick in local var
  int now = tick;
  
  if (now == wheel.length - 1) {
   // at end of wheel
   return wheel[now] - wheel[0];
  }
  
  return wheel[now] - wheel[now + 1];
 }
 
 interface GetCount {
  long getCount();
 }

 @Override
 public void run() {
  tick();
 }
}

This works by creating an array of window / update frequency size, then a thread sets the count to the next index in the array on the update frequency. The count for the interval is simply array[i] - array[i+1], which is the most recent count minus the oldest count. For a 10 minute interval, the oldest count (i+1) is exactly 10 minutes old.

To add a moving average to our code first we'll need a counter, using AtomicLong.

final AtomicLong counter = new AtomicLong();

This counter should be incremented based on the events you're interested in computing (e.g. POST requests for a REST service).

We need to provide the implementation with access to the counter and that is accomplished through the GetCount interface. Here I'll create a moving average with a 5 minute window that updates every second.
MovingAverage movingAverage = new MovingAverage(TimeUnit.MINUTES, 5, TimeUnit.SECONDS, 1, new GetCount() { 
 @Override
 public long getCount() {
  return counter.get();
 }
});

And to get the current average we simply call the getAverage method:

System.out.println(movingAverage.getAverage());

A key implementation detail is how the array size is determined: by dividing the window by the update frequency. So a large window with a frequent update frequency can consume a significant amount of memory. In this example the array size is reasonable 300. However, if we created a 24 hour moving average with a 1 second interval the size would be 86400! A more reasonable update frequency for a 24 hour period might be every 5 minutes (array size of 288).

Another consideration of choosing the window and update frequency is the window must be divisible by the frequency. For example a 2 minute window with a 6 second update frequency is ok, but a 7 second update frequency is not, since it's not divisible by 120. An IllegalArgumentException is thrown if the window modulus update frequency is not zero.

This implementation requires one thread per moving average, which is not very efficient. A better solution would be to share a thread across many averages. Update: I've updated the code to share a thread here.

Lastly, there's an initial state problem: we don't have data yet for the entire window. For example if you have a 5 minute window and only 15 seconds of data. This implementation returns null until we have 5 minutes of data. Another approach is to estimate the average. Suppose we have a count of 10 in 30 seconds, then we can estimate the average as 40 in 2 minutes. However there is risk of significant error by extrapolating incomplete data. For instance, if we had a burst of 20 hits in 2 seconds, we'd be estimating 1200 per 2 minutes, which in all likelihood is way off.



No comments:

Post a Comment