Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions silkworm/db/kv/api/endpoint/paginated_sequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class PaginatedSequence {
using Page = std::vector<T>;
struct PageResult {
Page values;
bool has_more{false};
std::string next_page_token;
};
using Paginator = std::function<Task<PageResult>()>;
using Paginator = std::function<Task<PageResult>(const std::string& page_token)>;

class Iterator {
public:
Expand All @@ -60,8 +60,8 @@ class PaginatedSequence {

Task<std::optional<T>> next() {
if (it_ == current_.values.cend()) {
if (current_.has_more) {
current_ = co_await next_page_provider_();
if (!current_.next_page_token.empty()) {
current_ = co_await next_page_provider_(current_.next_page_token);
it_ = current_.values.cbegin();
}
}
Expand All @@ -88,7 +88,7 @@ class PaginatedSequence {
: next_page_provider_(std::move(next_page_provider)) {}

Task<Iterator> begin() {
auto current = co_await next_page_provider_();
auto current = co_await next_page_provider_("");
co_return Iterator{next_page_provider_, std::move(current)};
}

Expand Down Expand Up @@ -119,9 +119,9 @@ class PaginatedSequencePair {
struct PageResult {
KPage keys;
VPage values;
bool has_more{false};
std::string next_page_token;
};
using Paginator = std::function<Task<PageResult>()>;
using Paginator = std::function<Task<PageResult>(const std::string& page_token)>;

class Iterator {
public:
Expand All @@ -137,8 +137,8 @@ class PaginatedSequencePair {
Task<std::optional<value_type>> next() {
if (key_it_ == current_.keys.cend()) {
SILKWORM_ASSERT(value_it_ == current_.values.cend());
if (current_.has_more) {
current_ = co_await next_page_provider_();
if (!current_.next_page_token.empty()) {
current_ = co_await next_page_provider_(current_.next_page_token);
key_it_ = current_.keys.cbegin();
value_it_ = current_.values.cbegin();
}
Expand Down Expand Up @@ -171,7 +171,7 @@ class PaginatedSequencePair {
: next_page_provider_(std::move(next_page_provider)) {}

Task<Iterator> begin() {
auto current = co_await next_page_provider_();
auto current = co_await next_page_provider_("");
ensure(current.keys.size() == current.values.size(), "PaginatedSequencePair::begin keys/values size mismatch");
co_return Iterator{next_page_provider_, std::move(current)};
}
Expand Down
15 changes: 8 additions & 7 deletions silkworm/db/kv/api/endpoint/paginated_sequence_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ template <typename TSequence>
TSequence make_paginated_sequence(const size_t page_size, const size_t n) {
const size_t num_pages = n / page_size + (n % page_size ? 1 : 0);
const size_t last_page_size = n % page_size ? n % page_size : page_size;
typename TSequence::Paginator paginator = [=]() -> Task<typename TSequence::PageResult> {
typename TSequence::Paginator paginator = [=](const std::string&) -> Task<typename TSequence::PageResult> {
static size_t count{0};
const bool has_more = ++count < num_pages;
const std::string token = has_more ? "next" : "";
typename TSequence::Page p(has_more ? page_size : last_page_size, 0);
co_return typename TSequence::PageResult{std::move(p), has_more};
co_return typename TSequence::PageResult{std::move(p), token};
};
return TSequence{std::move(paginator)};
}
Expand Down Expand Up @@ -85,9 +86,9 @@ class PaginatedSequence2 {
using Page = std::vector<T>;
struct PageResult {
Page values;
bool has_more{false};
std::string next_page_token;
};
using Paginator = std::function<Task<PageResult>()>;
using Paginator = std::function<Task<PageResult>(const std::string& page_token)>;

class Iterator {
public:
Expand All @@ -98,7 +99,7 @@ class PaginatedSequence2 {
bool operator++() {
++it_;
if (it_ == current_.values.cend()) {
if (current_.has_more) {
if (!current_.next_page_token.empty()) {
return true;
}
it_ = typename Page::const_iterator(); // empty i.e. sentinel value
Expand All @@ -107,7 +108,7 @@ class PaginatedSequence2 {
}

Task<void> next_page() {
current_ = co_await next_page_provider_();
current_ = co_await next_page_provider_(current_.next_page_token);
it_ = current_.values.cbegin();
}

Expand All @@ -134,7 +135,7 @@ class PaginatedSequence2 {
: next_page_provider_(std::move(next_page_provider)) {}

Task<Iterator> begin() {
auto current = co_await next_page_provider_();
auto current = co_await next_page_provider_("");
co_return Iterator{next_page_provider_, std::move(current)};
}

Expand Down
55 changes: 39 additions & 16 deletions silkworm/db/kv/api/endpoint/paginated_sequence_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ struct TestPaginatorUint64 {
co_return PageResultUint64{}; // has_more=false as default
}
if (count_ < pages_.size()) {
PageResultUint64 page_result{pages_[count_], /*has_more=*/(count_ != pages_.size() - 1)};
const auto next_token = (count_ != pages_.size() - 1) ? "again" : "";
PageResultUint64 page_result{pages_[count_], next_token};
++count_;
co_return page_result;
}
Expand All @@ -66,7 +67,11 @@ struct TestPaginatorUint64 {
TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_uint64_sequence: empty sequence", "[db][kv][api][paginated_sequence]") {
PageUint64List empty;
TestPaginatorUint64 paginator{empty};
PaginatedUint64 paginated{paginator};
auto lambda = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator();
};

PaginatedUint64 paginated{lambda};
// We're using this lambda instead of built-in paginated_to_vector just to check Iterator::has_next
const auto paginated_it_to_vector = [](auto& ps) -> Task<std::vector<uint64_t>> {
auto it = co_await ps.begin();
Expand All @@ -87,20 +92,24 @@ TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_uint64_sequence: non-empty se
for (const auto& [page_list, expected_sequence] : fixtures) {
SECTION("test vector: " + std::to_string(++i)) {
TestPaginatorUint64 paginator{page_list};
PaginatedUint64 paginated{paginator};
auto lambda = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator();
};

PaginatedUint64 paginated{lambda};
CHECK(spawn_and_wait(paginated_to_vector(paginated)) == expected_sequence);
}
}
}

TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_uint64_sequence: error", "[db][kv][api][paginated_sequence]") {
PaginatorUint64 paginator = []() -> Task<PageResultUint64> {
PaginatorUint64 paginator = [](const std::string&) -> Task<PageResultUint64> {
static int count{0};
switch (++count) {
case 1:
co_return PageResultUint64{PageUint64{1, 2, 3}, /*has_more=*/true};
co_return PageResultUint64{PageUint64{1, 2, 3}, "next"};
case 2:
co_return PageResultUint64{PageUint64{4, 5, 6}, /*has_more=*/true};
co_return PageResultUint64{PageUint64{4, 5, 6}, "next"};
case 3:
throw std::runtime_error{"error during pagination"};
default:
Expand All @@ -112,7 +121,7 @@ TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_uint64_sequence: error", "[db
}

TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_kv_sequence: empty sequence", "[db][kv][api][paginated_sequence]") {
PaginatorKV paginator = []() -> Task<PageResultKV> {
PaginatorKV paginator = [](const std::string&) -> Task<PageResultKV> {
co_return PageResultKV{}; // has_more=false as default
};
PaginatedKV paginated{paginator};
Expand All @@ -132,15 +141,15 @@ static const Bytes kValue1{*from_hex("FF11")}, kValue2{*from_hex("FF22")}, kValu
static const Bytes kValue4{*from_hex("FF44")}, kValue5{*from_hex("FF55")}, kValue6{*from_hex("FF66")};

TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_kv_sequence: non-empty sequence", "[db][kv][api][paginated_sequence]") {
PaginatorKV paginator = []() -> Task<PageResultKV> {
PaginatorKV paginator = [](const std::string&) -> Task<PageResultKV> {
static int count{0};
switch (++count) {
case 1:
co_return PageResultKV{PageK{kKey1, kKey2}, PageV{kValue1, kValue2}, /*has_more=*/true};
co_return PageResultKV{PageK{kKey1, kKey2}, PageV{kValue1, kValue2}, "next"};
case 2:
co_return PageResultKV{PageK{kKey3, kKey4}, PageV{kValue3, kValue4}, /*has_more=*/true};
co_return PageResultKV{PageK{kKey3, kKey4}, PageV{kValue3, kValue4}, "next"};
case 3:
co_return PageResultKV{PageK{kKey5, kKey6}, PageV{kValue5, kValue6}, /*has_more=*/false};
co_return PageResultKV{PageK{kKey5, kKey6}, PageV{kValue5, kValue6}, ""};
default:
throw std::logic_error{"unexpected call to paginator"};
}
Expand All @@ -151,13 +160,13 @@ TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_kv_sequence: non-empty sequen
}

TEST_CASE_METHOD(PaginatedSequenceTest, "paginated_kv_sequence: error", "[db][kv][api][paginated_sequence]") {
PaginatorKV paginator = []() -> Task<PageResultKV> {
PaginatorKV paginator = [](const std::string&) -> Task<PageResultKV> {
static int count{0};
switch (++count) {
case 1:
co_return PageResultKV{PageK{kKey1, kKey2}, PageV{kValue1, kValue2}, /*has_more=*/true};
co_return PageResultKV{PageK{kKey1, kKey2}, PageV{kValue1, kValue2}, "next"};
case 2:
co_return PageResultKV{PageK{kKey3, kKey4}, PageV{kValue3, kValue4}, /*has_more=*/true};
co_return PageResultKV{PageK{kKey3, kKey4}, PageV{kValue3, kValue4}, "next"};
case 3:
throw std::runtime_error{"error during pagination"};
default:
Expand All @@ -181,7 +190,14 @@ TEST_CASE_METHOD(PaginatedSequenceTest, "set_intersection", "[db][kv][api][pagin
SECTION("test vector: " + std::to_string(++i)) {
const auto& [v1, v2] = v1_v2_pair;
TestPaginatorUint64 paginator1{v1}, paginator2{v2};
PaginatedUint64 paginated1{paginator1}, paginated2{paginator2};
auto lambda1 = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator1();
};
auto lambda2 = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator2();
};

PaginatedUint64 paginated1{lambda1}, paginated2{lambda2};
const auto async_intersection = [&](PaginatedUint64& ps1, PaginatedUint64& ps2) -> Task<std::vector<uint64_t>> {
IntersectionIterator it = set_intersection(co_await ps1.begin(), co_await ps2.begin());
CHECK(co_await it.has_next() == !expected_intersection_set.empty());
Expand All @@ -205,7 +221,14 @@ TEST_CASE_METHOD(PaginatedSequenceTest, "set_union", "[db][kv][api][paginated_se
SECTION("test vector: " + std::to_string(++i)) {
const auto& [v1, v2] = v1_v2_pair;
TestPaginatorUint64 paginator1{v1}, paginator2{v2};
PaginatedUint64 paginated1{paginator1}, paginated2{paginator2};
auto lambda1 = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator1();
};
auto lambda2 = [&](const std::string&) mutable -> Task<PageResultUint64> {
co_return co_await paginator2();
};

PaginatedUint64 paginated1{lambda1}, paginated2{lambda2};
const auto async_union = [&](PaginatedUint64& ps1, PaginatedUint64& ps2) -> Task<std::vector<uint64_t>> {
UnionIterator<PaginatedUint64::Iterator> it = set_union(co_await ps1.begin(), co_await ps2.begin());
CHECK(co_await it.has_next() == !expected_union_set.empty());
Expand Down
6 changes: 3 additions & 3 deletions silkworm/db/kv/api/local_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Task<HistoryPointResult> LocalTransaction::history_seek(HistoryPointQuery&& /*qu
// NOLINTNEXTLINE(*-rvalue-reference-param-not-moved)
Task<PaginatedTimestamps> LocalTransaction::index_range(IndexRangeQuery&& /*query*/) {
// TODO(canepat) implement using E3-like aggregator abstraction [tx_id_ must be changed]
auto paginator = []() mutable -> Task<api::PaginatedTimestamps::PageResult> {
auto paginator = [](const std::string&) mutable -> Task<api::PaginatedTimestamps::PageResult> {
co_return api::PaginatedTimestamps::PageResult{};
};
co_return api::PaginatedTimestamps{std::move(paginator)};
Expand All @@ -98,7 +98,7 @@ Task<PaginatedTimestamps> LocalTransaction::index_range(IndexRangeQuery&& /*quer
// NOLINTNEXTLINE(*-rvalue-reference-param-not-moved)
Task<PaginatedKeysValues> LocalTransaction::history_range(HistoryRangeQuery&& /*query*/) {
// TODO(canepat) implement using E3-like aggregator abstraction [tx_id_ must be changed]
auto paginator = []() mutable -> Task<api::PaginatedKeysValues::PageResult> {
auto paginator = [](const std::string&) mutable -> Task<api::PaginatedKeysValues::PageResult> {
co_return api::PaginatedKeysValues::PageResult{};
};
co_return api::PaginatedKeysValues{std::move(paginator)};
Expand All @@ -107,7 +107,7 @@ Task<PaginatedKeysValues> LocalTransaction::history_range(HistoryRangeQuery&& /*
// NOLINTNEXTLINE(*-rvalue-reference-param-not-moved)
Task<PaginatedKeysValues> LocalTransaction::domain_range(DomainRangeQuery&& /*query*/) {
// TODO(canepat) implement using E3-like aggregator abstraction [tx_id_ must be changed]
auto paginator = []() mutable -> Task<api::PaginatedKeysValues::PageResult> {
auto paginator = [](const std::string&) mutable -> Task<api::PaginatedKeysValues::PageResult> {
co_return api::PaginatedKeysValues::PageResult{};
};
co_return api::PaginatedKeysValues{std::move(paginator)};
Expand Down
21 changes: 9 additions & 12 deletions silkworm/db/kv/grpc/client/remote_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,15 @@ Task<api::HistoryPointResult> RemoteTransaction::history_seek(api::HistoryPointQ
}

Task<api::PaginatedTimestamps> RemoteTransaction::index_range(api::IndexRangeQuery&& query) {
auto paginator = [&, query = std::move(query)]() mutable -> Task<api::PaginatedTimestamps::PageResult> {
static std::string page_token{query.page_token};
auto paginator = [&, query = std::move(query)](const std::string& page_token) mutable -> Task<api::PaginatedTimestamps::PageResult> {
query.tx_id = tx_id_;
query.page_token = page_token;
auto request = index_range_request_from_query(query);
try {
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncIndexRange, stub_, std::move(request), grpc_context_);
auto result = index_range_result_from_response(reply);
page_token = std::move(result.next_page_token);
co_return api::PaginatedTimestamps::PageResult{std::move(result.timestamps), !page_token.empty()};

co_return api::PaginatedTimestamps::PageResult{std::move(result.timestamps), std::move(result.next_page_token)};
} catch (rpc::GrpcStatusError& gse) {
SILK_WARN << "KV::IndexRange RPC failed status=" << gse.status();
throw boost::system::system_error{rpc::to_system_code(gse.status().error_code())};
Expand All @@ -168,16 +167,15 @@ Task<api::PaginatedTimestamps> RemoteTransaction::index_range(api::IndexRangeQue
}

Task<api::PaginatedKeysValues> RemoteTransaction::history_range(api::HistoryRangeQuery&& query) {
auto paginator = [&, query = std::move(query)]() mutable -> Task<api::PaginatedKeysValues::PageResult> {
static std::string page_token{query.page_token};
auto paginator = [&, query = std::move(query)](const std::string& page_token) mutable -> Task<api::PaginatedKeysValues::PageResult> {
query.tx_id = tx_id_;
query.page_token = page_token;
auto request = history_range_request_from_query(query);
try {
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncHistoryRange, stub_, std::move(request), grpc_context_);
auto result = history_range_result_from_response(reply);
page_token = std::move(result.next_page_token);
co_return api::PaginatedKeysValues::PageResult{std::move(result.keys), std::move(result.values), !page_token.empty()};

co_return api::PaginatedKeysValues::PageResult{std::move(result.keys), std::move(result.values), std::move(result.next_page_token)};
} catch (rpc::GrpcStatusError& gse) {
SILK_WARN << "KV::HistoryRange RPC failed status=" << gse.status();
throw boost::system::system_error{rpc::to_system_code(gse.status().error_code())};
Expand All @@ -187,16 +185,15 @@ Task<api::PaginatedKeysValues> RemoteTransaction::history_range(api::HistoryRang
}

Task<api::PaginatedKeysValues> RemoteTransaction::domain_range(api::DomainRangeQuery&& query) {
auto paginator = [&, query = std::move(query)]() mutable -> Task<api::PaginatedKeysValues::PageResult> {
static std::string page_token{query.page_token};
auto paginator = [&, query = std::move(query)](const std::string& page_token) mutable -> Task<api::PaginatedKeysValues::PageResult> {
query.tx_id = tx_id_;
query.page_token = page_token;
auto request = domain_range_request_from_query(query);
try {
const auto reply = co_await rpc::unary_rpc(&Stub::AsyncDomainRange, stub_, std::move(request), grpc_context_);
auto result = history_range_result_from_response(reply);
page_token = std::move(result.next_page_token);
co_return api::PaginatedKeysValues::PageResult{std::move(result.keys), std::move(result.values), !page_token.empty()};

co_return api::PaginatedKeysValues::PageResult{std::move(result.keys), std::move(result.values), std::move(result.next_page_token)};
} catch (rpc::GrpcStatusError& gse) {
SILK_WARN << "KV::DomainRange RPC failed status=" << gse.status();
throw boost::system::system_error{rpc::to_system_code(gse.status().error_code())};
Expand Down
2 changes: 1 addition & 1 deletion silkworm/rpc/test_util/dummy_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace silkworm::rpc::test {

template <typename Paginated>
inline Paginated empty_paginated_sequence() {
auto paginator = []() -> Task<typename Paginated::PageResult> {
auto paginator = [](const std::string&) -> Task<typename Paginated::PageResult> {
co_return typename Paginated::PageResult{};
};
return Paginated{paginator};
Expand Down