Hello,
I am using a TMatrixDSparse about 10M cols and 10M lines. (I admit… this is quite large… anyway) It might seems silly but I couldn’t find a way to operate on the matrix without a huge overhead.
Therefore I implemented a binary search in order to get the index of the non-zeros rows.
I commented out the debug lines, you can easily check the following code.
std::vector<int> GetNonZerosRowIndexByDichotomy(TMatrixDSparse*, std::pair<int,int> = std::make_pair(-1,-1), int = 0, std::pair<int,int> = std::make_pair(-1,-1), int = 0);
vector<int> GetNonZerosRowIndexByDichotomy(TMatrixDSparse *matrix, pair<int,int> iRange, int iElementRef, pair<int,int> iElementLimit, int iDebugDepth)
{
vector<int> vIndexes;
if(iRange.first == -1 && iRange.second == -1) {
iRange.first = 0;
iRange.second = matrix->GetNrows()-1;
}
if(iElementLimit.first == -1 && iElementLimit.second == -1) {
iElementLimit.first = 0;
iElementLimit.second = matrix->NonZeros();
}
// Main variables..
int N = iRange.second-iRange.first+1;
int iRow = iRange.first + N/2;
int iElement = matrix->GetRowIndexArray()[iRow];
int iNextElement = matrix->GetRowIndexArray()[iRow+1];
if(N == 1) { // TERMINAISON..
// ADD NEW ROW
if(iElement != iNextElement) {
vIndexes.push_back(iRow);
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "ADD NEW ROW LIMIT CASE = " << (iRow) << endl;
}
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "LIMIT CASE --- STOP HERE.. N = " << N << endl;
return vIndexes;
} else { // RECURRENCE
pair<int,int> iLowerRange = make_pair(iRange.first, iRow-1);
pair<int,int> iUpperRange = make_pair(iRow+1, iRange.second);
if(iRow == iRange.second) iUpperRange.first = iRow;
pair<int,int> iElementLowerLimit = make_pair(matrix->GetRowIndexArray()[iLowerRange.first], matrix->GetRowIndexArray()[iLowerRange.second]);
pair<int,int> iElementUpperLimit = make_pair(matrix->GetRowIndexArray()[iUpperRange.first], matrix->GetRowIndexArray()[iUpperRange.second]);
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "LIST = [";
//for(int i = 0; i < N; i++) {
// if(i==iRow) cout << "] ";
// cout << iRange.first+i << " ";
// if(i==iRow) cout << "[ ";
//}
//cout << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "REF = " << iElementRef << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "A = ";
//cout << "[" << iElementLowerLimit.first << ";" << iElementLowerLimit.second << "] ";
//cout << iElement;
//cout << " [" << iElementUpperLimit.first << ";" << iElementUpperLimit.second << "] " << endl;
// Recurrence cases..
// * For lower range.. if iElement is different.
// If not.. this means that there is nothing else to look for below this range.. can skip
if(iElementRef >= iElementUpperLimit.first && iElementRef <= iElementUpperLimit.second) {
// Upper range to be run anyway..
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementRef, iElementUpperLimit, iDebugDepth+1);
vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
} else {
if((iElementRef >= iElementLowerLimit.first && iElementRef <= iElementLowerLimit.second)) {
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL LOW BOUNDARY : [" << iLowerRange.first << ";" << iLowerRange.second << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iLowerRange, iElementRef, iElementLowerLimit, iDebugDepth+1);
vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
} else {
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL LOW BOUNDARY : [" << iLowerRange.first << ";" << iLowerRange.second << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iLowerRange, iElementLowerLimit.first, iElementLowerLimit, iDebugDepth+1);
vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
}
if(iElement >= iElementUpperLimit.first && iElement <= iElementUpperLimit.second) {
// Upper range to be run anyway..
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementRef, iElementUpperLimit, iDebugDepth+1);
vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
} else {
// ADD NEW ROW
if(iElement != iNextElement) {
vIndexes.push_back(iRow);
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "ADD NEW ROW = " << (iRow) << endl;
}
// Upper range to be run anyway..
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "CALL UPPER BOUNDARY : [" << iUpperRange.first << ";" << iUpperRange.second << "]" << endl;
//cout << UIUC::HandboxMsg::Spacer(iDebugDepth,'\t') << "----------------------------------------------- "<< endl;
vector<int> buf = GetNonZerosRowIndexByDichotomy(matrix, iUpperRange, iElementUpperLimit.first, iElementUpperLimit, iDebugDepth+1);
vIndexes.insert(vIndexes.end(), buf.begin(), buf.end());
}
}
}
return vIndexes;
}
And here is a root code which is just showing the improvement and the validity of the binary search. Perhaps this can be used in general to improve operations on large matrices?
(At least right now I can merge my matrix much faster… reason why I assumed this was not internally done before, but I haven’t looked at all functions of all nested classes)
{
TMatrixDSparse m = /* LOAD A MATRIX HERE*/;
// DICHOTOMY..
auto start1 = std::chrono::high_resolution_clock::now();
vector<int> v = GetNonZerosRowIndexByDichotomy(m);
for(int i=0; i < v.size(); i++) cout << i << " <--> " << v[i] << endl;
cout << endl;
auto elapsed1 = std::chrono::high_resolution_clock::now() - start1;
// STD METHOD..
auto start2 = std::chrono::high_resolution_clock::now();
int index = 0;
int prev = m->GetRowIndexArray()[0];
for(int i=0; i < m->GetNrows(); i++) {
int current = m->GetRowIndexArray()[i];
if(prev != current) {
cout << i-1 << " <--> " << v[index] << endl;
index++;
prev = current;
}
}
auto elapsed2 = std::chrono::high_resolution_clock::now() - start2;
long long microseconds1 = std::chrono::duration_cast<std::chrono::microseconds>(elapsed1).count();
long long microseconds2 = std::chrono::duration_cast<std::chrono::microseconds>(elapsed2).count();
cout << "vsize: " << v.size() << " / " << index << endl;
cout << "t1 = " << microseconds1/1000 << "ms" << endl;
cout << "t2 = " << microseconds2/1000 << "ms" << endl;
}