[TMatrixDSparse,dichotomy] Search for non-zeros nrows

Hello,

I am using a TMatrixDSparse about 10M cols and 10M lines. (I admit… this is quite large… anyway) It might seems silly but I couldn’t find a way to operate on the matrix without a huge overhead.

Therefore I implemented a binary search in order to get the index of the non-zeros rows.
I commented out the debug lines, you can easily check the following code.

std::vector<int> GetNonZerosRowIndexByDichotomy(TMatrixDSparse*, std::pair<int,int> = std::make_pair(-1,-1), int = 0, std::pair<int,int> = std::make_pair(-1,-1), int = 0);
vector<int> GetNonZerosRowIndexByDichotomy(TMatrixDSparse *matrix, pair<int,int> iRange, int iElementRef, pair<int,int> iElementLimit, int iDebugDepth)
{
    vector<int> vIndexes;
    if(iRange.first == -1 && iRange.second == -1) {
        iRange.first = 0;
        iRange.second = matrix->GetNrows()-1;
    }
    if(iElementLimit.first == -1 && iElementLimit.second == -1) {
        iElementLimit.first = 0;
        iElementLimit.second = matrix->NonZeros();
    }

    // Main variables..
    int N = iRange.second-iRange.first+1;
    int iRow = iRange.first + N/2;
    int iElement = matrix->GetRowIndexArray()[iRow];
    int iNextElement = matrix->GetRowIndexArray()[iRow+1];

    if(N == 1) { // TERMINAISON..

        // ADD NEW ROW
        if(iElement != iNextElement) {
            vIndexes.push_back(iRow);
            //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "ADD NEW ROW LIMIT CASE = " << (iRow) << endl;
        }

        //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "LIMIT CASE --- STOP HERE.. N = " << N << endl;
        return vIndexes;

    } else { // RECURRENCE

        pair<int,int> iLowerRange = make_pair(iRange.first, iRow-1);
        pair<int,int> iUpperRange = make_pair(iRow+1,       iRange.second);
        if(iRow == iRange.second) iUpperRange.first = iRow;

        pair<int,int> iElementLowerLimit = make_pair(matrix->GetRowIndexArray()[iLowerRange.first], matrix->GetRowIndexArray()[iLowerRange.second]);
        pair<int,int> iElementUpperLimit = make_pair(matrix->GetRowIndexArray()[iUpperRange.first], matrix->GetRowIndexArray()[iUpperRange.second]);

        //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "LIST = [";
        //for(int i = 0; i < N; i++) {
        //    if(i==iRow) cout << "] ";
        //    cout << iRange.first+i << " ";
        //    if(i==iRow) cout << "[ ";
        //}

        //cout << "]" << endl;
        //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "REF  = " << iElementRef << endl;
        //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "A    = ";
        //cout << "[" << iElementLowerLimit.first << ";" << iElementLowerLimit.second << "] ";
        //cout << iElement;
        //cout << " [" << iElementUpperLimit.first << ";" << iElementUpperLimit.second << "] " << endl;

        // Recurrence cases..
        // * For lower range.. if iElement is different.
        //   If not.. this means that there is nothing else to look for below this range.. can skip
        if(iElementRef >= iElementUpperLimit.first && iElementRef <= iElementUpperLimit.second) {

            // Upper range to be run anyway..
            //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
            //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
            //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
            vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementRef, iElementUpperLimit, iDebugDepth+1);
            vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());

        } else {

            if((iElementRef >= iElementLowerLimit.first && iElementRef <= iElementLowerLimit.second)) {

                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL LOW BOUNDARY : [" << iLowerRange.first << ";" << iLowerRange.second << "]" << endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;

                vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iLowerRange, iElementRef, iElementLowerLimit, iDebugDepth+1);
                vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());

            } else {

                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL LOW BOUNDARY : [" << iLowerRange.first << ";" << iLowerRange.second << "]" << endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;

                vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iLowerRange, iElementLowerLimit.first, iElementLowerLimit, iDebugDepth+1);
                vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
            }

            if(iElement >= iElementUpperLimit.first && iElement <= iElementUpperLimit.second) {

                // Upper range to be run anyway..
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementRef, iElementUpperLimit, iDebugDepth+1);
                vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());

            } else {

                // ADD NEW ROW
                if(iElement != iNextElement) {
                    vIndexes.push_back(iRow);
                    //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "ADD NEW ROW = " << (iRow) << endl;
                }

                // Upper range to be run anyway..
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
                //cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
                vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementUpperLimit.first, iElementUpperLimit, iDebugDepth+1);
                vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
            }
        }
    }

    return vIndexes;
}

And here is a root code which is just showing the improvement and the validity of the binary search. Perhaps this can be used in general to improve operations on large matrices?
(At least right now I can merge my matrix much faster… reason why I assumed this was not internally done before, but I haven’t looked at all functions of all nested classes)

{
        TMatrixDSparse m = /* LOAD A MATRIX HERE*/;

        // DICHOTOMY..
        auto start1 = std::chrono::high_resolution_clock::now();
        vector<int> v = GetNonZerosRowIndexByDichotomy(m);
        for(int i=0; i < v.size(); i++) cout << i << " <--> " << v[i] << endl;
        cout << endl;
        auto elapsed1 = std::chrono::high_resolution_clock::now() - start1;

        // STD METHOD..
        auto start2 = std::chrono::high_resolution_clock::now();
        int index = 0;
        int prev = m->GetRowIndexArray()[0];
        for(int i=0; i < m->GetNrows(); i++) {

                int current = m->GetRowIndexArray()[i];
                if(prev != current) {
                        cout << i-1 << " <--> " << v[index] << endl;
                        index++;
                        prev = current;
                }
        }
        auto elapsed2 = std::chrono::high_resolution_clock::now() - start2;

        long long microseconds1 = std::chrono::duration_cast<std::chrono::microseconds>(elapsed1).count();
        long long microseconds2 = std::chrono::duration_cast<std::chrono::microseconds>(elapsed2).count();

        cout << "vsize: " << v.size() << " / " << index << endl;
        cout << "t1 = " << microseconds1/1000 << "ms" << endl;
        cout << "t2 = " << microseconds2/1000 << "ms" << endl;
}

Hi,

Thank you for your post. If you think your code is an improvement with respect to the actual existing ROOT code, it would be great if you could submit a Pull request in github,
https://root.cern.ch/creating-pull-request

which should include a test showing your improvements

Best Regards

Lorenzo

Hi, I don’t know where to implement it. That’s why I posted here.
The nested structures of TMatrixSparse classes are quite cryptic to me.

So this might be just useful for users. Unless someone knows where to implement it and would be willing to.

Basically the improvement is not linear, but for a low density matrix I reached a factor x5-x10 improvement by looping that way as I am avoiding the major part of the holes in my matrix.

Hi,
I understand. Let’s keep then for the time being your improvements here and we will investigate in more detail this issue and eventually how we could integrate them in the sparse matrix code.
Thank you for sharing your improved code

Best Regards

Lorenzo

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.