Problem Statement

To write a function that takes any iterable (like list, string, tuple, generator), and an integer n that returns the last n elements from the given iterable as list, if given integer n is <= 0, return empty list, meanwhile, constrain the memoey usage, the function should be able to accept super long iterable (such as loop over a 20GB log file) and return the last n elements without over run the memory

For example

> >> tail([1, 2, 3, 4, 5], 3)
[3, 4, 5]
>>> tail('hello', 2)
['l', 'o']
>>> tail('hello', 0)
[]
>>> tail('hello', -1)
[]
>>> nums = (n**2 for n in range(1, 10000000))
>>> tail(nums, 3)
[99999940000009, 99999960000004, 99999980000001]

Solution

Well, the first half of this problem can peel down to list comprehension and slicing problem, we know that the purpose of list comprehension is to create a list from another list, or create an iterable from another iterable, such as:pythonnumbers = [1, 2, 3, 4, 5] square = [num**2 for num in numbers]

It's a very elegent way to create another iterable with readability count, we can simply solve the first half of problem by this way:

import unittest

class TailTest(unittest.TestCase):
    """Test for first half of problem"""

    def test_zero(self):
        self.assertEqual(tail([1, 2], 0), [])
    
    def test_one(self):
        self.assertEqual(tail([1, 2], 1), [2])
    
    def test_two(self):
        self.assertEqual(tail([1, 2], 2), [1, 2])
    
    def test_string(self):
        self.assertEqual(tail('hello', 2), ['l', 'o'])
    
    def test_tuple(self):
        self.assertEqual(tail((1, 2, 3), 3), [1, 2, 3])
    
    def test_larger_than_length(self):
        numbers = [1, 2, 3, 4]
        self.assertEqual(tail(numbers, 5), [1, 2, 3, 4])
        self.assertEqual(tail([], 10), [])
    
    def test_negative(self):
        numbers = [1, 2, 3, 4]
        self.assertEqual(tail(numbers, -1), [])
        self.assertEqual(tail(numbers, -10), [])

Let's try the first approach, our objective is to write a function that can pass all the test case.

def tail(iterable, index):
    if index > 0:
        return [i for i in iterable][-index:]
    else:
        return []
        
unittest.main(argv=['first-arg-is-ignored'], verbosity=2, exit=False)
test_larger_than_length (__main__.TailTest) ...ok
test_negative (__main__.TailTest) ...ok
test_one (__main__.TailTest) ...ok
test_string (__main__.TailTest) ...ok
test_tuple (__main__.TailTest) ...ok
test_two (__main__.TailTest) ...ok
test_zero (__main__.TailTest) ...ok

----------------------------------------------------------------------
Ran 7 tests in 0.006s

OK
<unittest.main.TestProgram at 0x7f84125d9f50>

Now the tricky part is how to handle to generator object, we know that generator object in Python sometime we call it "lazy" itertor, as when we define the generator Python will "record" the "recipe" of what we want to do, but actually it doing nothing, only when we call next() method, it will generate the next value from our "recipe", the upsize of this method is good memory utilization, as it will not generate anything before hand, and generator have another property as "one time usage", let's see what it mean:

nums = (n**2 for n in [1, 2, 3, 4])
print(nums)
print(type(nums))
<generator object <genexpr> at 0x7f84122803d0>
<class 'generator'>
next(nums)
1
print(next(nums))
print(next(nums))
print(next(nums))
print(next(nums))
4
9
16
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-7-973643c90d98> in <module>
      2 print(next(nums))
      3 print(next(nums))
----> 4 print(next(nums))

StopIteration: 

Each time we call next() to the generator the generator will excute the "recepie" for the next value, untill it catch the StopIteration excaption, now the generator become empty, if we need to reuse it again, need to redefine it:

nums = (n**2 for n in [1, 2, 3, 4])

But please take note that every generator is an iterator, that mean you can loop over this iterator, or convert it into list:

numbers = [_ for _ in range(100)]
squares = (n**2 for n in numbers)
list(squares)
[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361,
 400,
 441,
 484,
 529,
 576,
 625,
 676,
 729,
 784,
 841,
 900,
 961,
 1024,
 1089,
 1156,
 1225,
 1296,
 1369,
 1444,
 1521,
 1600,
 1681,
 1764,
 1849,
 1936,
 2025,
 2116,
 2209,
 2304,
 2401,
 2500,
 2601,
 2704,
 2809,
 2916,
 3025,
 3136,
 3249,
 3364,
 3481,
 3600,
 3721,
 3844,
 3969,
 4096,
 4225,
 4356,
 4489,
 4624,
 4761,
 4900,
 5041,
 5184,
 5329,
 5476,
 5625,
 5776,
 5929,
 6084,
 6241,
 6400,
 6561,
 6724,
 6889,
 7056,
 7225,
 7396,
 7569,
 7744,
 7921,
 8100,
 8281,
 8464,
 8649,
 8836,
 9025,
 9216,
 9409,
 9604,
 9801]

So it means our original solution did solve the generator problem:

def tail(iterable, index):
    if index > 0:
        return [i for i in iterable][-index:]
    else:
        return []

the return [i for i in iterable] will convert the generator into a list, we can write a test case to prove it.

class TailTestGenerator(unittest.TestCase):
    """Test generator"""

    def test_iterator(self):
        nums = (n**2 for n in [1, 2, 3, 4])
        self.assertEqual(tail(nums, -1), [])
        self.assertEqual(tail(nums, 0), [])
        # consume the generator
        self.assertEqual(tail(nums, 2), [9, 16])
        # generator is one time use, now should be empty
        self.assertEqual(list(nums), [])
        # empty generator
        self.assertEqual(tail(nums, 1), [])

unittest.main(argv=['first-arg-is-ignored'], verbosity=2, exit=False)
test_larger_than_length (__main__.TailTest) ...ok
test_negative (__main__.TailTest) ...ok
test_one (__main__.TailTest) ...ok
test_string (__main__.TailTest) ...ok
test_tuple (__main__.TailTest) ...ok
test_two (__main__.TailTest) ...ok
test_zero (__main__.TailTest) ...ok
test_iterator (__main__.TailTestGenerator) ...ok

----------------------------------------------------------------------
Ran 8 tests in 0.007s

OK
<unittest.main.TestProgram at 0x7f841257ae50>

Hoo-Ray, seems we finished our work today, but wait, remember the last requirement of the Problem Statement ?

the function should be able to accept super long iterable (such as loop over a 20GB log file) and return the last n elements without over run the memory.

If we try to feed the function with a 20GB generator, what will happened?

Recall our function:pythondef tail(iterable, index): if index > 0: return [i for i in iterable][-index:] else: return []

if the index is > 0, it will return [i for i in iterable][-index:], this is an actual list comprehension, it will convert the iterable into full list, means if we pass a 20GB generator, the function will generate a 20GB list, which is over run the memory, how can we furthre improve the function, to reduce the memory usage?

Here introduce deque() Double Ended Queue

deque() -- Double-Ended Queue

A double-ended queue, or deque, supports adding and removing elements from either end of the queue. The more commonly used stacks and queues are degenerate forms of deques, where the inputs and outputs are restricted to a single end.

it's properties:

  • it accept any iterable
  • Populating both end
  • Consuming
  • Rotating
  • Constraining the Queue Size

Some exmaple:


from collections import deque

d = deque('abcdefg')
print('Deque:', d)
print('Length:', len(d))
print('Left end:', d[0])
print('Right end:', d[-1])

d.remove('c')
print('remove(c):', d)
Deque: deque(['a', 'b', 'c', 'd', 'e', 'f', 'g'])
Length: 7
Left end: a
Right end: g
remove(c): deque(['a', 'b', 'd', 'e', 'f', 'g'])

Since deques are a type of sequence container, they support some of the same operations as list, such as examining the contents with __getitem__(), determining length, and removing elements from the middle of the queue by matching identity.

Populating

A deque can be populated from either end, termed “left” and “right” in the Python implementation.

Example:


d = deque()

# populating value
d.extend('abcdefg')
print('extend       :', d)

# add to the right
d.append('h')
print('append       :', d)


# add to the left
d1 = deque()
d1.extendleft(range(6))
print('extend left        :', d1)
d1.appendleft(6)
print('append left        :', d1)
extend       : deque(['a', 'b', 'c', 'd', 'e', 'f', 'g'])
append       : deque(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])
extend left        : deque([5, 4, 3, 2, 1, 0])
append left        : deque([6, 5, 4, 3, 2, 1, 0])

Consuming

Similarly, the elements of the deque() can be consumed from both ends or either end, depending on the algorithm being applied.

Example:


print('From the right:')
d = deque('abcdefg')
while True:
    try:
        print(d.pop(), end='')
    except IndexError:
        break
print()

print('\nFrom the left:')
d = deque(range(6))
while True:
    try:
        print(d.popleft(), end='')
    except IndexError:
        break
print()
From the right:
gfedcba

From the left:
012345

Since deques are thread-safe, the contents can even be consumed from both ends at the same time from separate threads.

import threading
import time

candle = deque(range(5))


def burn(direction, nextSource):
    while True:
        try:
            next = nextSource()
        except IndexError:
            break
        else:
            print('{:>8}: {}'.format(direction, next))
            time.sleep(0.1)
    print('{:>8} done'.format(direction))
    return


left = threading.Thread(target=burn,
                        args=('Left', candle.popleft))
right = threading.Thread(target=burn,
                         args=('Right', candle.pop))

left.start()
right.start()

left.join()
right.join()
Left: 0
   Right: 4
   Right: 3    Left: 1

   Right: 2
    Left done
   Right done

Rotating

Another useful aspect of the deque is the ability to rotate it in either direction, so as to skip over some items.

Example:


d = deque(range(10))
print('Normal        :', d)

d = deque(range(10))
d.rotate(2)
print('Right rotation:', d)

d = deque(range(10))
d.rotate(-2)
print('Left rotation :', d)
Normal        : deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
Right rotation: deque([8, 9, 0, 1, 2, 3, 4, 5, 6, 7])
Left rotation : deque([2, 3, 4, 5, 6, 7, 8, 9, 0, 1])

Constraining the Queue Size

A deque() instance can be configured with a maximum length so that it never grows beyond that size. When the queue reaches the specified length, existing items are discarded as new items are added. This behavior is useful for finding the last n items in a stream of undetermined length.

Example:


import random

# Set the random seed so we see the same output each time
# the script is run.
random.seed(1)

d1 = deque(maxlen=3)
d2 = deque(maxlen=3)

for i in range(5):
    n = random.randint(0, 100)
    print('n =', n)
    d1.append(n)
    d2.appendleft(n)
    print('D1:', d1)
    print('D2:', d2)
n = 17
D1: deque([17], maxlen=3)
D2: deque([17], maxlen=3)
n = 72
D1: deque([17, 72], maxlen=3)
D2: deque([72, 17], maxlen=3)
n = 97
D1: deque([17, 72, 97], maxlen=3)
D2: deque([97, 72, 17], maxlen=3)
n = 8
D1: deque([72, 97, 8], maxlen=3)
D2: deque([8, 97, 72], maxlen=3)
n = 32
D1: deque([97, 8, 32], maxlen=3)
D2: deque([32, 8, 97], maxlen=3)

Now we know about deque(), it do have two properties we can use:

  • it accept any iterable

  • it constrain the queue size

We can use deque() right away:

del tail

from collections import deque

def tail(iterable, index):
    if index > 0:
        return list(deque(iterable, maxlen=index))
    else:
        return []
    

Let's try out the test case

unittest.main(argv=['first-arg-is-ignored'], verbosity=2, exit=False)
test_larger_than_length (__main__.TailTest) ...ok
test_negative (__main__.TailTest) ...ok
test_one (__main__.TailTest) ...ok
test_string (__main__.TailTest) ...ok
test_tuple (__main__.TailTest) ...ok
test_two (__main__.TailTest) ...ok
test_zero (__main__.TailTest) ...ok
test_iterator (__main__.TailTestGenerator) ...ok

----------------------------------------------------------------------
Ran 8 tests in 0.005s

OK
<unittest.main.TestProgram at 0x7f8411ff94d0>

So we're maxing a deque with our iterable and setting the maximum length (we could have used a positional argument but we chose to use a named one instead) and then converting that deque to a list so that our tests (which expect a list) pass.

By utilizing the maxlen in deque() we can constrain the return list size to the index we want, if the generator generate more than the maxlen, deque() object will discard the values that more than the maxlen