Tag Archives: starveless

Starveless Priority Queue in Python

The python priority queue describes messages in absolute terms.  The most important element will always be popped.  The least important element may never be popped.

Sometimes when a message based data path is serialized to something (like a serial port based piece of hardware), all messages have some importance.  Unlike the priority queue, we would rather not use such absolute terms.  We would rather say that most of the time, the important messages are sent.  However, we guarantee that some of the time, the less important messages are sent.

The language is more gray and the queue needs to be a bit more…  “cooperative”.  (We could say “round robin”.)

Let’s implement our own.  But first, a quick back story.  I have some knowledge of Motorola’s (at the time) Time Processor Unit (TPU) on some of their embedded processors.  The TPU had an event driven model where certain sections of microcode would execute based on priority but lower priority microcode would still operated periodically.  Let’s base our design on this philosophy.  (See section 3 for more information on the TPU scheduler.)

Python has a pretty good queue framework that you derive your queue from and implement several methods.  If you ever look at the priority queue code in python, it is pretty simplistic as it relies on the heapq.  There is much work done in the Queue class including thread synchronization.

Here is our proposed round robin priority queue.

#! /usr/bin/python

import Queue

class RRPriorityQueue(Queue.Queue):
  '''Variant of Queue that retrieves open entries in priority order (lowest first)
  but allows highest priorities to be skipped.

  '''

  def _init(self, priorityOrder):
    if priorityOrder is None:
      raise TypeError

    self._queue = []

    self._priorityOrder = priorityOrder
    self._orderIdx = len(self._priorityOrder) + 1

  def _qsize(self, len=len):
    return len(self._queue)

  def _put(self, item):
    self._queue.append(item)

  def _get(self):
    if (len(self._queue) == 0):
      raise IndexError

    retVal = None

    # change the order index for the priority order list
    self._orderIdx = self._orderIdx + 1
    if self._orderIdx >= len(self._priorityOrder):
      self._orderIdx = 0

    # the priority we are interested in and the highest value
    # in case we don't find one at our priority
    filterPriority = self._priorityOrder[self._orderIdx]
    highest = self._queue[0]

    # look for one of our priority but saving highest priority (lowest number)
    for item in self._queue:
      priority = item.priority
      if priority == filterPriority:
        # we found our item
        retVal = item
        break
      if highest.priority > item.priority:
        highest = item

    # if we didn't find our item, the return the highest
    if retVal is None:
      retVal = highest

    self._queue.remove(retVal)
    return retVal

The _qsize and _put methods are basic.  The _init method requires a priority order.  This specifies the priority guaranteed to pop from the queue for the corresponding _get call.  The _orderIdx is used to know which step of the priority order the algorithm is on.  It increments or resets on every _get call.

The _get call is where the magic happens.  The first step is to increment or reset the _orderIdx value.  Then we are going to get our filterPriority from the _priorityOrder list based on the _orderIdx.  Then we iterate through the queue.  If we find an entity that matches our priority, return it.

However, what happens if we don’t find a message with our priority?  That is ok, we simply return the highest priority item first found in the queue.  The highest variable is a reference to the first item in the queue with the highest priority and is returned if we don’t find a match.

The only thing we require from the entities in the queue is that they implement a priority member (or method) that returns the priority number.  This deviates from the priority queue in that the queue wasn’t concerned about the priority number, just that an entity in the queue could compare to others.  In our case, we need to know the priority to filter the entities the way we want to.

Like the priority queue, the lower the priority number, the more important it is.

We can create our round robin queue like this:

self.q = RRPriorityQueue([1,2,1,3,1,2,1])

In this case, priority 1 messages are guaranteed to pop 4 out of 7 times.  Priority 2 message are guaranteed 2 out of 7.  Priority 3 – 1 out of 7.

You can make whatever pattern with whatever priorities you want.  Just understand that if a priority of an entity is not in the priority order, it can be starved.  It will be popped if it is the most important message when a message of the current filter order isn’t found.