import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; public class MovingAverage implements Runnable { private final static Logger log = Logger.getLogger(MovingAverage.class); private volatile Long[] wheel; private volatile int tick; private int ticks; private GetCount getCount; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // computes count per time period public MovingAverage(TimeUnit periodUnit, int period, TimeUnit resolutionUnit, int resolution, GetCount getCount) { this.getCount = getCount; long converted = resolutionUnit.convert(period, periodUnit); // how many ticks in wheel if (converted % resolution != 0) { throw new IllegalArgumentException( "Resolution time unit must be divisable by the period time unit. Specifically " + converted + " is not divisable by " + resolution + ", remainder is " + converted % resolution); } ticks = (int) (converted / resolution); wheel = new Long[ticks]; log.debug("Creating a wheel with a resolution of " + ticks + " ticks, window of " + period + " " + periodUnit + ", that is updated every " + resolution + " " + resolutionUnit + ""); // schedule call tick with fixed frequency scheduledExecutorService.scheduleAtFixedRate(this, resolution, resolution, resolutionUnit); } public void tick() { if (tick == wheel.length - 1) { // flip tick = 0; } else if (wheel[tick] != null) { // increment if we are not at zero state tick++; } wheel[tick] = getCount.getCount(); log.debug("tick is " + tick + ", count is " + wheel[tick] + ", average is " + this.getAverage()); } public boolean isFull() { return wheel[wheel.length -1] != null; } public Long getAverage() { if (!this.isFull()) { // wheel is not full yet. we could do some math but for now we return 0 until wheel is full return null; } // tick can move while we compute average so save tick in local var int now = tick; if (now == wheel.length - 1) { // at end of wheel return wheel[now] - wheel[0]; } return wheel[now] - wheel[now + 1]; } interface GetCount { long getCount(); } @Override public void run() { tick(); } }
This works by creating an array of window / update frequency size, then a thread sets the count to the next index in the array on the update frequency. The count for the interval is simply array[i] - array[i+1], which is the most recent count minus the oldest count. For a 10 minute interval, the oldest count (i+1) is exactly 10 minutes old.
To add a moving average to our code first we'll need a counter, using AtomicLong.
final AtomicLong counter = new AtomicLong();
This counter should be incremented based on the events you're interested in computing (e.g. POST requests for a REST service).
We need to provide the implementation with access to the counter and that is accomplished through the GetCount interface. Here I'll create a moving average with a 5 minute window that updates every second.
MovingAverage movingAverage = new MovingAverage(TimeUnit.MINUTES, 5, TimeUnit.SECONDS, 1, new GetCount() { @Override public long getCount() { return counter.get(); } });
And to get the current average we simply call the getAverage method:
System.out.println(movingAverage.getAverage());
A key implementation detail is how the array size is determined: by dividing the window by the update frequency. So a large window with a frequent update frequency can consume a significant amount of memory. In this example the array size is reasonable 300. However, if we created a 24 hour moving average with a 1 second interval the size would be 86400! A more reasonable update frequency for a 24 hour period might be every 5 minutes (array size of 288).
Another consideration of choosing the window and update frequency is the window must be divisible by the frequency. For example a 2 minute window with a 6 second update frequency is ok, but a 7 second update frequency is not, since it's not divisible by 120. An IllegalArgumentException is thrown if the window modulus update frequency is not zero.
This implementation requires one thread per moving average, which is not very efficient. A better solution would be to share a thread across many averages. Update: I've updated the code to share a thread here.
Lastly, there's an initial state problem: we don't have data yet for the entire window. For example if you have a 5 minute window and only 15 seconds of data. This implementation returns null until we have 5 minutes of data. Another approach is to estimate the average. Suppose we have a count of 10 in 30 seconds, then we can estimate the average as 40 in 2 minutes. However there is risk of significant error by extrapolating incomplete data. For instance, if we had a burst of 20 hits in 2 seconds, we'd be estimating 1200 per 2 minutes, which in all likelihood is way off.