Skip to content

Commit

Permalink
Adding --index-sizes parameter to allow to read indexes from streams
Browse files Browse the repository at this point in the history
  • Loading branch information
leoisl committed Oct 14, 2022
1 parent 4575ff8 commit 51f21dd
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 9 deletions.
6 changes: 6 additions & 0 deletions cobs/query/classic_index/mmap_search_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ ClassicIndexMMapSearchFile::ClassicIndexMMapSearchFile(const fs::path& path)
data_ = handle_.data + stream_pos_.curr_pos;
}

ClassicIndexMMapSearchFile::ClassicIndexMMapSearchFile(std::ifstream &ifs, int64_t index_file_size)
: ClassicIndexSearchFile(ifs, index_file_size) {
handle_ = initialize_stream(ifs, stream_pos_.size());
data_ = handle_.data;
}

ClassicIndexMMapSearchFile::~ClassicIndexMMapSearchFile() {
destroy_mmap(handle_);
}
Expand Down
1 change: 1 addition & 0 deletions cobs/query/classic_index/mmap_search_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ClassicIndexMMapSearchFile : public ClassicIndexSearchFile

public:
explicit ClassicIndexMMapSearchFile(const fs::path& path);
explicit ClassicIndexMMapSearchFile(std::ifstream &ifs, int64_t index_file_size);
~ClassicIndexMMapSearchFile();
};

Expand Down
6 changes: 6 additions & 0 deletions cobs/query/classic_index/search_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ ClassicIndexSearchFile::ClassicIndexSearchFile(const fs::path& path) {
stream_pos_ = get_stream_pos(ifs);
}

ClassicIndexSearchFile::ClassicIndexSearchFile(std::ifstream &ifs, int64_t index_file_size) {
header_ = deserialize_header<ClassicIndexHeader>(ifs);
stream_pos_ = StreamPos { (uint64_t) header_.header_size_, (uint64_t) index_file_size};
}


uint64_t ClassicIndexSearchFile::counts_size() const {
return 8 * header_.row_size();
}
Expand Down
1 change: 1 addition & 0 deletions cobs/query/classic_index/search_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ClassicIndexSearchFile : public IndexSearchFile
{
protected:
explicit ClassicIndexSearchFile(const fs::path& path);
explicit ClassicIndexSearchFile(std::ifstream &ifs, int64_t index_file_size);

uint32_t term_size() const final { return header_.term_size_; }
uint8_t canonicalize() const final { return header_.canonicalize_; }
Expand Down
15 changes: 15 additions & 0 deletions cobs/query/search.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ static inline std::vector<std::shared_ptr<cobs::IndexSearchFile> > get_cobs_inde
return indices;
}

static inline std::vector<std::shared_ptr<cobs::IndexSearchFile> > get_cobs_indexes_given_streams (
const std::vector<std::ifstream*> &streams, const std::vector<int64_t> &index_sizes) {

std::vector<std::shared_ptr<cobs::IndexSearchFile> > indices;
for (size_t i=0; i<streams.size(); i++)
{
std::ifstream *stream = streams[i];
int64_t index_file_size = index_sizes[i];
indices.push_back(
std::make_shared<cobs::ClassicIndexMMapSearchFile>(*stream, index_file_size));
}

return indices;
}

static inline void process_query(
cobs::Search &s, double threshold, unsigned num_results,
const std::string &query_line, const std::string &query_file,
Expand Down
9 changes: 9 additions & 0 deletions cobs/util/file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <cobs/util/fs.hpp>

#include <tlx/die.hpp>
#include <tlx/logger.hpp>

namespace cobs {

Expand Down Expand Up @@ -43,6 +44,14 @@ Header deserialize_header(std::ifstream& ifs, const fs::path& p) {
return h;
}

template <class Header>
Header deserialize_header(std::ifstream& ifs) {
LOG1 << "Deserializing header from stream";
Header h;
h.deserialize(ifs);
return h;
}

template <class Header>
Header deserialize_header(const fs::path& p) {
std::ifstream ifs;
Expand Down
43 changes: 41 additions & 2 deletions cobs/util/query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ int open_file(const fs::path& path, int flags) {
}

void close_file(int fd) {
if (close(fd)) {
print_errno("could not close index file");
if (fd>=0) {
if (close(fd)) {
print_errno("could not close index file");
}
}
}

Expand Down Expand Up @@ -88,6 +90,43 @@ MMapHandle initialize_mmap(const fs::path& path)
}
}

MMapHandle initialize_stream(std::ifstream& is, int64_t index_file_size)
{
off_t size = index_file_size;
LOG1 << "Reading complete index from stream";
void* ptr = nullptr;
if (posix_memalign(&ptr, 2 * 1024 * 1024, size)) {
print_errno("posix_memalign()");
}
char* data_ptr = reinterpret_cast<char*>(ptr);
#if defined(MADV_HUGEPAGE)
if (madvise(data_ptr, size, MADV_HUGEPAGE)) {
print_errno("madvise failed for MADV_HUGEPAGE");
}
LOG1 << "Advising to use huge pages";
#endif
uint64_t remain = size;
const uint64_t one_gb = 1024*1024*1024;
uint64_t pos = 0;
while (remain != 0) {
is.read(data_ptr + pos, std::min(one_gb, remain));
int64_t rb = is.gcount();
if (rb < 0) {
print_errno("read failed");
break;
}
remain -= rb;
pos += rb;
LOG1 << "Read " << tlx::format_iec_units(pos)
<< "B / " << tlx::format_iec_units(size) << "B - "
<< pos * 100 / size << "%";
}
LOG1 << "Index loaded into RAM.";
return MMapHandle {
-1 /* not a valid fd, won't be closed */, reinterpret_cast<uint8_t*>(data_ptr), uint64_t(size)
};
}

void destroy_mmap(MMapHandle& handle)
{
if (!gopt_load_complete_index) {
Expand Down
1 change: 1 addition & 0 deletions cobs/util/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct MMapHandle {
};

MMapHandle initialize_mmap(const fs::path& path);
MMapHandle initialize_stream(std::ifstream& is, int64_t index_file_size);
void destroy_mmap(MMapHandle& handle);

//! Canonicalize a k-mer. Given an input k-mer of length size, checks if should
Expand Down
11 changes: 6 additions & 5 deletions cobs/util/serialization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,21 @@ void stream_put(std::ostream& os, const T& t, const Args& ... args) {

//! read a POD from an istream
template <typename T>
void stream_get_pod(std::istream& is, T& t) {
std::streamsize stream_get_pod(std::istream& is, T& t) {
static_assert(std::is_pod<T>::value, "T must be POD");
is.read(reinterpret_cast<char*>(&t), sizeof(T));
return is.gcount();
}

//! read a list of PODs from an istream
static inline
void stream_get(std::istream& /* is */) { }
std::streamsize stream_get(std::istream& /* is */) { return 0; }

//! read a list of PODs from an istream
template <typename T, typename... Args>
void stream_get(std::istream& is, T& t, Args& ... args) {
stream_get_pod(is, t);
stream_get(is, args...);
std::streamsize stream_get(std::istream& is, T& t, Args& ... args) {
std::streamsize nb_of_bytes_from_pod = stream_get_pod(is, t);
return nb_of_bytes_from_pod + stream_get(is, args...);
}

} // namespace cobs
Expand Down
39 changes: 37 additions & 2 deletions src/cobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,17 +501,52 @@ int query(int argc, char** argv) {
'T', "threads", cobs::gopt_threads,
"number of threads to use, default: max cores");

std::vector<std::string> index_sizes_str;
cp.add_stringlist(
"index-sizes", index_sizes_str, "WARNING: HIDDEN OPTION. USE ONLY IF YOU KNOW WHAT YOU ARE DOING. "
"Precomputed file sizes of the index. Useful if --load-complete is given and "
"indexes are streamed into COBS. This is a hidden option to be used with mof. "
"This also implies COBS classic index, skipping double header reading due to "
"streaming.");

if (!cp.sort().process(argc, argv))
return -1;

std::vector<cobs::fs::path> index_paths;
for (const std::string &file : index_files) {
index_paths.push_back(cobs::fs::path(file));
index_paths.push_back(cobs::fs::path(file));
}
std::vector<int64_t> index_sizes;
for (const std::string &index_size_str : index_sizes_str) {
index_sizes.push_back(std::stoll(index_size_str));
}
const bool index_sizes_was_given = index_sizes.size() > 0;
if (index_sizes_was_given) {
die_verbose_unequal(index_paths.size(), index_sizes.size(), "If --index-sizes is used, COBS needs a size for each index.");
}

std::vector<std::ifstream*> streams;
std::vector<std::shared_ptr<cobs::IndexSearchFile> > indices;
if (index_sizes_was_given) {
for (const cobs::fs::path &index_path : index_paths) {
auto* stream = new std::ifstream(index_path);
streams.push_back(stream);
}
indices = cobs::get_cobs_indexes_given_streams(streams, index_sizes);
}
std::vector<std::shared_ptr<cobs::IndexSearchFile> > indices = cobs::get_cobs_indexes_given_files(index_paths);
else {
indices = cobs::get_cobs_indexes_given_files(index_paths);
}

cobs::ClassicSearch s(indices);
cobs::process_query(s, threshold, num_results, query, query_file);

if (index_sizes_was_given) {
for (auto* stream : streams) {
delete stream;
}
}

return 0;
}

Expand Down

0 comments on commit 51f21dd

Please sign in to comment.