Part 3: Distributed map/reduce

Scoop supplies a futures module that provides complete support for parallel mapping, futures and asynchronous functions. To use this module you must import it, e.g. via

from scoop import futures

scoop.futures.map

Create a new script called mapreduce.py and type into it

from scoop import futures

def product(x, y):
    """Return the product of the arguments"""
    return x+y

def sum(x, y):
    """Return the sum of the arguments"""
    return x+y

if __name__ == "__main__":

    a = range(1,101)
    b = range(101, 201)

    results = futures.map(product, a, b)

    total = reduce(sum, results)

    print("Sum of the products equals %d" % total)

Run this script using the command

python -m scoop mapreduce.py

You need to use -m scoop so that Scoop has time to set up the distributed cluster before running your script. When you run your script you should see something similar to

[2015-11-29 11:03:38,600] launcher  INFO    SCOOP 0.7 1.1 on darwin using Python 2.7.10 (default, Aug 22 2015, 20:33:39) [GCC 4.2.1 Compatible Apple LLVM 7.0.0 (clang-700.0.59.1)], API: 1013
[2015-11-29 11:03:38,600] launcher  INFO    Deploying 4 worker(s) over 1 host(s).
[2015-11-29 11:03:38,601] launcher  INFO    Worker distribution: 
[2015-11-29 11:03:38,601] launcher  INFO       127.0.0.1:   3 + origin
Sum of the products equals 20100
[2015-11-29 11:03:39,375] launcher  (127.0.0.1:50551) INFO    Root process is done.
[2015-11-29 11:03:39,375] launcher  (127.0.0.1:50551) INFO    Finished cleaning spawned subprocesses.

(the exact output will depend on your computer and your version of Scoop)

By default Scoop will run on your local computer, starting one process for every available processor core. In my case, I have four workers.

Scoop provides a very similar interface as multiprocessing, with the same caveats, requirements and restrictions. For example

In the above script we used the Scoop futures.map function. This is the Scoop mapping function, that is identical to the standard Python map, except that the map is performed in parallel across the distributed cluster. Note that Scoop’s map supports passing multiple arguments to the mapping function.

scoop.futures.mapReduce

Performance of distributed parallel scripts is strongly related to the speed of the network and amount of communication between nodes. In the above example, we used scoop.futures.map to map the sum function. All of the results were then transmitted back to the master process to complete the reduction (sum). This is inefficient, as it means that a lot of data needs to be transmitted back to the master. A better approach is to allow all of the workers in the cluster to perform the reduction as a group, thereby minimising communication.

This can be achieved by using the scoop.futures.mapReduce function. This combines the map and reduce into a single function call. The function call looks like this;

result = scoop.futures.mapReduce( mapping_func, reduction_func, args... )

where mapping_func is the function used for mapping, reduction_func is the function used for the reduction, args are the set of arguments that are passed to mapping_func and result is the returned result.

Edit your mapreduce.py script so that it uses the scoop.futures.mapReduce function e.g.

from scoop import futures

def product(x, y):
    """Return the product of the arguments"""
    return x+y

def sum(x, y):
    """Return the sum of the arguments"""
    return x+y

if __name__ == "__main__":

    a = range(1,101)
    b = range(101, 201)

    total = futures.mapReduce(product, sum, a, b)

    print("Sum of the products equals %d" % total)

Run the script using python -m scoop mapreduce.py, and you should see a similar result as before.


Exercise

Edit your script written in answer to exercise 2 of Parallel Map/Reduce, in which you count all of the words used in all Shakespeare plays (e.g. an example answer is here).

Edit the script so that it uses scoop.futures.mapReduce to perform the work. Note that scoop.futures.mapReduce is not asynchronous, so you cannot (yet) add a status message to your script while it is processing.

If you get stuck or want inspiration, a possible answer is given here.


Previous Up Next