Reduction

Reduction, which is the process of combining (or reducing) the results of several sub-calculations into a single combined (reduced) result, is very common, and is the second half of the very powerful map-reduce form of parallel programming. In the exercise in the last section you used reduction to form the global sum of the total number of points inside and outside the circle, calculated as the sum of the number of points inside and outside the circle calculated by each process's iterations of the loop. While it may appear easy to write your own reduction code, using MPI_Send and MPI_Recv, it is actually very hard to write efficient reduction code. This is because reduction requires communication between all processes, where perhaps only one process at a time is allowed to update the sum on the "master" process. Reduction can actually be implemented much more efficiently, e.g. perhaps by dividing processes into pairs, and getting each pair to sum their results, and then dividing into pairs of pairs, and summing the pairs of pairs results, etc. (this method is known as binary tree reduction - see here for a more in-depth discussion of reduction algorithms).

So reduction is actually quite complicated to implement if you want it to be efficient and work well. Fortunately, you don't have to implement it, as MPI provides a MPI_Reduce function which has a complete implementation for you! You can use MPI_Reduce by providing the input message, which is sent by each process and contains the set of data to be reduced, the output message, which will be sent to a specified process, and will contain the reduced output, and the operation, which refers to the numerical operation that must be applied to reduce the variables from each process together. The operators include;

A full list can be found here. Note that it is also possible for you to define your own functions that can be used to reduce variables together, via MPI_Op_create, although describing how that works is beyond this introductory course.

To make this clear, the following links provide the code for the fixed loop counting examples from the last page which use MPI_Reduce rather than MPI_Send / MPI_Recv pairs for summing up the number of iterations performed;

Note that MPI_Reduce will send the reduced result back to only the designated process (which should normally be your "master" process). If you want all of the processes to receive the reduced result, then you should call MPI_Allreduce. MPI_Allreduce is equivalent to MPI_Reduce, except that it sends the reduced result to every process in the MPI team.


Exercise
Edit your program to estimate pi so that it uses reduction to form the sum of the number of points inside and outside the circle.

Here's the answers for checking (no peeking before you've finished!)


Next - Map/Reduce


Compare with OpenMP

C

Copy this code into reduced_loops.c.

#include <stdio.h>
#include <mpi.h>

int main(int argc, char **argv)
{
    int i, rank, nprocs, count, start, stop, nloops, total_nloops;

    MPI_Init(&argc, &argv);

    // get the number of processes, and the id of this process
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);

    // we want to perform 1000 iterations in total. Work out the 
    // number of iterations to perform per process...
    count = 1000 / nprocs;

    // we use the rank of this process to work out which
    // iterations to perform.
    start = rank * count;
    stop = start + count;

    // now perform the loop
    nloops = 0;

    for (i=start; i<stop; ++i)
    {
        ++nloops;
    }

    printf("Process %d performed %d iterations of the loop.\n",
           rank, nloops);

    total_nloops = 0;
    MPI_Reduce( &nloops, &total_nloops, 1, MPI_INT, MPI_SUM,
                0, MPI_COMM_WORLD );

    if (rank == 0)
    {
        // ok - are there any more loops to run?
        nloops = 0;

        for (i=total_nloops; i<1000; ++i)
        {
            ++nloops;
        }

        printf("Process 0 performed the remaining %d iterations of the loop\n",
               nloops);
    }

    MPI_Finalize();

    return 0;
}

In this code, MPI_Send and MPI_Recv have been replaced by a call to MPI_Reduce(void *send, void *receive, int size, MPI_INT, MPI_SUM, int process, MPI_COMM_WORLD). This function reduces the data in the message pointed to by "send" using the MPI operation MPI_SUM and places the resulting message into "receive". The messages are of size "size", and you specify the data type as you do for MPI_Recv and MPI_Send, e.g. in this case we use MPI_INT. The resulting message is placed only into "receive" on the process whose rank is given in "process". Note that all processes in the MPI process team must call this function with the same arguments, or else the result is undefined. Note also that other reduction operations are possible, e.g. by replacing MPI_SUM with MPI_PROD.

Compile the code using;

mpicc reduced_loops.c -o reduced_loops

This will give you an executable called reduced_loops.

Return to the previous page.

C++

Copy this code into reduced_loops.cpp.

#include <iostream>
#include <mpi.h>

int main(int argc, char **argv)
{
    MPI::Init(argc, argv);

    // get the number of processes, and the id of this process
    int rank = MPI::COMM_WORLD.Get_rank();
    int nprocs = MPI::COMM_WORLD.Get_size();

    // we want to perform 1000 iterations in total. Work out the 
    // number of iterations to perform per process...
    int count = 1000 / nprocs;

    // we use the rank of this process to work out which
    // iterations to perform.
    int start = rank * count;
    int stop = start + count;

    // now perform the loop
    int nloops = 0;

    for (int i=start; i<stop; ++i)
    {
        ++nloops;
    }

    std::cout << "Process " << rank << " performed " << nloops 
              << " iterations of the loop.\n";

    int total_nloops = 0;
    MPI::COMM_WORLD.Reduce( &nloops, &total_nloops, 1, MPI_INT,
                               MPI_SUM, 0);

    if (rank == 0)
    {
        // ok - are there any more loops to run?
        nloops = 0;

        for (int i=total_nloops; i<1000; ++i)
        {
            ++nloops;
        }

        std::cout << "Process 0 performed the remaining " << nloops
                  << " iterations of the loop\n";
    }

    MPI::Finalize();

    return 0;
}

In this code, MPI::COMM_WORLD.Send and MPI::COMM_WORLD.Recv have been replaced by a call to MPI::COMM_WORLD.Reduce(void *send, void *receive, int size, MPI_INT, MPI_SUM, int process). This function reduces the data in the message pointed to by "send" using the MPI operation MPI_SUM and places the resulting message into "receive". The messages are of size "size", and you specify the data type as you do for MPI::COMM_WORLD.Recv and MPI::COMM_WORLD.Send, e.g. in this case we use MPI_INT. The resulting message is placed only into "receive" on the process whose rank is given in "process". Note that all processes in the MPI process team must call this function with the same arguments, or else the result is undefined. Note also that other reduction operations are possible, e.g. by replacing MPI_SUM with MPI_PROD.

Compile the code using;

mpicxx reduced_loops.cpp -o reduced_loops

This will give you an executable called reduced_loops.

Return to the previous page.

Fortran

Copy this code into reduced_loops.f.

      program main
      implicit none
      include 'mpif.h'

      integer i, rank, nprocs, count, start, stop, nloops
      integer total_nloops,err

      call MPI_Init(err)

c     get the number of processes, and the id of this process
      call MPI_Comm_rank(MPI_COMM_WORLD, rank, err)
      call MPI_Comm_size(MPI_COMM_WORLD, nprocs, err)

c     we want to perform 1000 iterations in total. Work out the 
c     number of iterations to perform per process...
      count = 1000 / nprocs

c     we use the rank of this process to work out which
c     iterations to perform.
      start = rank * count
      stop = start + count - 1

c     now perform the loop
      nloops = 0

      do i=start,stop
        nloops = nloops + 1
      enddo

      print *,"Process ",rank," performed ",nloops,
     .        " iterations of the loop."

      total_nloops = 0
      call MPI_Reduce( nloops, total_nloops, 1, MPI_INTEGER,
     .                 MPI_SUM, 0, MPI_COMM_WORLD, err )

      if (rank .eq. 0) then
c        ok - are there any more loops to run?
         nloops = 0

         if (total_nloops .lt. 1000) then
           do i=total_nloops,1000-1
              nloops = nloops + 1
           enddo
         endif

         print *,"Process 0 performed the remaining ",nloops,
     .           " iterations of the loop"
      endif

      call MPI_Finalize()

      end

In this code, MPI_Send and MPI_Recv have been replaced by a call to MPI_Reduce(void send, void receive, integer size, MPI_INTEGER, MPI_SUM, integer process, MPI_COMM_WORLD, integer err). This function reduces the data in the message in "send" using the MPI operation MPI_SUM and places the resulting message into "receive". The messages are of size "size", and you specify the data type as you do for MPI_Recv and MPI_Send, e.g. in this case we use MPI_INTEGER. The resulting message is placed only into "receive" on the process whose rank is given in "process". Note that all processes in the MPI process team must call this function with the same arguments, or else the result is undefined. Note also that other reduction operations are possible, e.g. by replacing MPI_SUM with MPI_PROD.

Compile the code using;

mpif77 reduced_loops.f -o reduced_loops

This will give you an executable called reduced_loops.

Return to the previous page.

Answer to exercise (C)

#include <math.h>
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>

double rand_one()
{
    return rand() / (RAND_MAX + 1.0);
}

int main(int argc, char **argv)
{
    int n_inside, n_outside;
    int total_n_inside, total_n_outside;
    int rank, nprocs, niters;
    int i;

    double x, y, r, pi;

    total_n_inside = 0;
    total_n_outside = 0;
    n_inside = 0;
    n_outside = 0;

    MPI_Init(&argc, &argv);

    // get the number of processors in the cluster
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);

    // make sure that each process has a different random
    // number seed
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    srand( rank * 473879 );

    // calculate the number of iterations - we want 1000000 in total
    niters = 1000000 / nprocs;

    // perform this processes batch of iterations
    for (i=0; i<niters; ++i)
    {
        x = (2*rand_one()) - 1;
        y = (2*rand_one()) - 1;

        r = sqrt( x*x + y*y );

        if (r < 1.0)
        {
            ++n_inside;
        }
        else
        {
            ++n_outside;
        }
    }

    printf("Process %d: n_inside = %d, n_outside = %d\n",
           rank, n_inside, n_outside);

    // reduce the results back to the master process
    MPI_Reduce(&n_inside, &total_n_inside, 1, MPI_INT,
               MPI_SUM, 0, MPI_COMM_WORLD);

    MPI_Reduce(&n_outside, &total_n_outside, 1, MPI_INT,
               MPI_SUM, 0, MPI_COMM_WORLD);

    if (rank == 0)
    {
        pi = (4.0 * total_n_inside) / (total_n_inside + total_n_outside);
        printf("The estimated value of pi is %f\n", pi);
    }

    MPI_Finalize();

    return 0;
}

Answer to exercise (C++)

#include <cmath>
#include <cstdlib>
#include <iostream>
#include <mpi.h>

double rand_one()
{
    return std::rand() / (RAND_MAX + 1.0);
}

int main(int argc, char **argv)
{
    int n_inside = 0;
    int n_outside = 0;

    MPI::Init(argc, argv);

    // get the number of processors in the cluster
    int nprocs = MPI::COMM_WORLD.Get_size();

    // make sure that each process has a different random
    // number seed
    int rank = MPI::COMM_WORLD.Get_rank();

    std::srand( rank * 473879 );

    // calculate the number of iterations - we want 1000000 in total
    int niters = 1000000 / nprocs;

    // perform this processes batch of iterations
    for (int i=0; i<niters; ++i)
    {
        double x = (2*rand_one()) - 1;
        double y = (2*rand_one()) - 1;

        double r = std::sqrt( x*x + y*y );

        if (r < 1.0)
        {
            ++n_inside;
        }
        else
        {
            ++n_outside;
        }
    }

    std::cout << "Process " << rank << " n_inside = " << n_inside
              << " n_outside = " << n_outside << "\n";

    int total_n_inside = 0;
    int total_n_outside = 0;

    MPI::COMM_WORLD.Reduce(&n_inside, &total_n_inside, 1, MPI_INT,
                           MPI::SUM, 0);

    MPI::COMM_WORLD.Reduce(&n_outside, &total_n_outside, 1, MPI_INT,
                           MPI::SUM, 0);

    if (rank == 0)
    {
        double pi = (4.0 * total_n_inside) / (total_n_inside + total_n_outside);
        std::cout << "The estimated value of pi is " << pi << "\n";
    }

    MPI::Finalize();

    return 0;
}

Answer to exercise (fortran)


      program main
      implicit none

      include 'mpif.h'

      integer n_inside, n_outside
      integer total_n_inside, total_n_outside
      integer rank, nprocs, niters
      integer i, err

      double precision  x, y, r, pi

      n_inside = 0
      n_outside = 0

      call MPI_Init(err)

c     get the number of processors in the cluster
      call MPI_Comm_size(MPI_COMM_WORLD, nprocs, err)

c     make sure that each process has a different random
c     number seed
      call MPI_Comm_rank(MPI_COMM_WORLD, rank, err)

      call srand( rank * 473879 )

c     calculate the number of iterations - we want 1000000 in total
      niters = 1000000 / nprocs

c     perform this processes batch of iterations
      do i=1,niters
        x = (2.0*rand()) - 1.0;
        y = (2.0*rand()) - 1.0;

        r = sqrt( x*x + y*y );

        if (r .lt. 1.0) then
            n_inside = n_inside + 1
        
        else
            n_outside = n_outside + 1
        
        endif
      enddo

      print *,"Process ",rank," n_inside = ",n_inside,
     .        " n_outside = ",n_outside

      call MPI_Reduce(n_inside, total_n_inside, 1, MPI_INTEGER,
     .                MPI_SUM, 0, MPI_COMM_WORLD, err)

      call MPI_Reduce(n_outside, total_n_outside, 1, MPI_INTEGER,
     .                MPI_SUM, 0, MPI_COMM_WORLD, err)

      if (rank .eq. 0) then
        pi = (4.0 * total_n_inside) / (total_n_inside + total_n_outside)
        print *,"The estimated value of pi is ", pi
      endif

      call MPI_Finalize(err)

      end