Program Listing for File IndexThreadReduce.h

Return to documentation for file (lib/util/IndexThreadReduce.h)

#pragma once
#include "util/settings.h"
#include "boost/thread.hpp"
#include <stdio.h>
#include <iostream>



namespace lsd_slam
{

class IndexThreadReduce
{

public:
    inline IndexThreadReduce()
    {
        nextIndex = 0;
        maxIndex = 0;
        stepSize = 1;
        callPerIndex = boost::bind(&IndexThreadReduce::callPerIndexDefault, this, _1, _2, _3);

        running = true;
        for(int i=0;i<MAPPING_THREADS;i++)
        {
            isDone[i] = false;
            workerThreads[i] = boost::thread(&IndexThreadReduce::workerLoop, this, i);
        }

        //printf("created ThreadReduce\n");
    }
    inline ~IndexThreadReduce()
    {
        running = false;

        exMutex.lock();
        todo_signal.notify_all();
        exMutex.unlock();

        for(int i=0;i<MAPPING_THREADS;i++)
            workerThreads[i].join();


        //printf("destroyed ThreadReduce\n");

    }

    inline void reduce(boost::function<void(int,int,RunningStats*)> callPerIndex, int first, int end, int stepSize = 0)
    {
        if(!multiThreading)
        {
            callPerIndex(first, end, &runningStats);
            return;
        }



        if(stepSize == 0)
            stepSize = ((end-first)+MAPPING_THREADS-1)/MAPPING_THREADS;


        //printf("reduce called\n");

        boost::unique_lock<boost::mutex> lock(exMutex);

        // save
        this->callPerIndex = callPerIndex;
        nextIndex = first;
        maxIndex = end;
        this->stepSize = stepSize;

        // go worker threads!
        for(int i=0;i<MAPPING_THREADS;i++)
            isDone[i] = false;

        // let them start!
        todo_signal.notify_all();


        //printf("reduce waiting for threads to finish\n");
        // wait for all worker threads to signal they are done.
        while(true)
        {
            // wait for at least one to finish
            done_signal.wait(lock);
            //printf("thread finished!\n");

            // check if actually all are finished.
            bool allDone = true;
            for(int i=0;i<MAPPING_THREADS;i++)
                allDone = allDone && isDone[i];

            // all are finished! exit.
            if(allDone)
                break;
        }

        nextIndex = 0;
        maxIndex = 0;
        this->callPerIndex = boost::bind(&IndexThreadReduce::callPerIndexDefault, this, _1, _2, _3);

        //printf("reduce done (all threads finished)\n");
    }


private:
    boost::thread workerThreads[MAPPING_THREADS];
    bool isDone[MAPPING_THREADS];

    boost::mutex exMutex;
    boost::condition_variable todo_signal;
    boost::condition_variable done_signal;

    int nextIndex;
    int maxIndex;
    int stepSize;

    bool running;

    boost::function<void(int,int,RunningStats*)> callPerIndex;

    void callPerIndexDefault(int i, int j,RunningStats* k)
    {
        printf("ERROR: should never be called....\n");
    }

    void workerLoop(int idx)
    {
        boost::unique_lock<boost::mutex> lock(exMutex);

        while(running)
        {
            // try to get something to do.
            int todo = 0;
            bool gotSomething = false;
            if(nextIndex < maxIndex)
            {
                // got something!
                todo = nextIndex;
                nextIndex+=stepSize;
                gotSomething = true;
            }

            // if got something: do it (unlock in the meantime)
            if(gotSomething)
            {
                lock.unlock();

                assert(callPerIndex != 0);

                RunningStats s;
                callPerIndex(todo, std::min(todo+stepSize, maxIndex), &s);

                lock.lock();
                runningStats.add(&s);
            }

            // otherwise wait on signal, releasing lock in the meantime.
            else
            {
                isDone[idx] = true;
                //printf("worker %d waiting..\n", idx);
                done_signal.notify_all();
                todo_signal.wait(lock);
            }
        }
    }
};
}