Skip to content

Commit 0431837

Browse files
committed
Implemented async Item movement between tiers
1 parent 341a94b commit 0431837

File tree

7 files changed

+224
-20
lines changed

7 files changed

+224
-20
lines changed

cachelib/allocator/CacheAllocator-inl.h

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,12 +1101,42 @@ CacheAllocator<CacheTrait>::insertOrReplace(const ItemHandle& handle) {
11011101
return replaced;
11021102
}
11031103

1104+
/* Next two methods are used to asynchronously move Item between memory tiers.
1105+
*
1106+
* The thread, which moves Item allocate, new Item in the tier we are moving to
1107+
* and calls moveRegularItemOnEviction() method. This method does the following:
1108+
* 1. Create MoveCtx and put it to the movesMap.
1109+
* 2. Updates the access container with the new item from the tier we are
1110+
* moving to. This Item has kNotReady flag set.
1111+
* 3. Copy data from the old Item to the new one.
1112+
* 4. Unset thekNotReady flag and Notify MoveCtx
1113+
*
1114+
* Concurrent threads which are getting handle to the same key:
1115+
* 1. When a handle is created it checks if the kNotReady flag is set
1116+
* 2. If so, Handle implementation creates waitContext and adds it to the
1117+
* MoveCtx by calling addWaitContextForMovingItem() method.
1118+
* 3. Wait until the moving thread will complete its job.
1119+
*/
1120+
template <typename CacheTrait>
1121+
bool CacheAllocator<CacheTrait>::addWaitContextForMovingItem(
1122+
folly::StringPiece key, std::shared_ptr<WaitContext<ItemHandle>> waiter) {
1123+
auto shard = getShardForKey(key);
1124+
auto& movesMap = getMoveMapForShard(shard);
1125+
auto lock = getMoveLockForShard(shard);
1126+
auto it = movesMap.find(key);
1127+
if (it == movesMap.end()) {
1128+
return false;
1129+
}
1130+
auto ctx = it->second.get();
1131+
ctx->addWaiter(std::move(waiter));
1132+
return true;
1133+
}
1134+
11041135
template <typename CacheTrait>
11051136
bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
1106-
Item& oldItem,
1107-
ItemHandle& newItemHdl) {
1108-
// TODO: should we introduce new latency tracker. E.g. evictRegularLatency_ ???
1109-
// util::LatencyTracker tracker{stats_.evictRegularLatency_};
1137+
Item& oldItem, ItemHandle& newItemHdl) {
1138+
// TODO: should we introduce new latency tracker. E.g. evictRegularLatency_
1139+
// ??? util::LatencyTracker tracker{stats_.evictRegularLatency_};
11101140

11111141
if (!oldItem.isAccessible() || oldItem.isExpired()) {
11121142
return false;
@@ -1122,22 +1152,36 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11221152
newItemHdl->markNvmClean();
11231153
}
11241154

1125-
if(config_.moveCb) {
1126-
// Execute the move callback. We cannot make any guarantees about the
1127-
// consistency of the old item beyond this point, because the callback can
1128-
// do more than a simple memcpy() e.g. update external references. If there
1129-
// are any remaining handles to the old item, it is the caller's
1130-
// responsibility to invalidate them. The move can only fail after this
1131-
// statement if the old item has been removed or replaced, in which case it
1132-
// should be fine for it to be left in an inconsistent state.
1133-
config_.moveCb(oldItem, *newItemHdl, nullptr);
1134-
} else {
1135-
std::memcpy(newItemHdl->getWritableMemory(), oldItem.getMemory(), oldItem.getSize());
1155+
folly::StringPiece key(oldItem.getKey());
1156+
auto shard = getShardForKey(key);
1157+
auto& movesMap = getMoveMapForShard(shard);
1158+
MoveCtx* ctx(nullptr);
1159+
{
1160+
auto lock = getMoveLockForShard(shard);
1161+
auto res = movesMap.try_emplace(key, std::make_unique<MoveCtx>());
1162+
if (!res.second) {
1163+
return false;
1164+
}
1165+
ctx = res.first->second.get();
11361166
}
11371167

1138-
// TODO: Possible data race. We copied Item's memory to the newItemHdl
1139-
// but have not updated accessContainer yet. Concurrent threads might get handle
1140-
// to the old Item.
1168+
auto resHdl = ItemHandle{};
1169+
auto guard = folly::makeGuard([key, this, ctx, shard, &resHdl]() {
1170+
auto& movesMap = getMoveMapForShard(shard);
1171+
resHdl->unmarkNotReady();
1172+
auto lock = getMoveLockForShard(shard);
1173+
ctx->setItemHandle(std::move(resHdl));
1174+
movesMap.erase(key);
1175+
});
1176+
1177+
// TODO: Possibly we can use markMoving() instead. But today
1178+
// moveOnSlabRelease logic assume that we mark as moving old Item
1179+
// and than do copy and replace old Item with the new one in access
1180+
// container. Furthermore, Item can be marked as Moving only
1181+
// if it is linked to MM container. In our case we mark the new Item
1182+
// and update access container before the new Item is ready (content is
1183+
// copied).
1184+
newItemHdl->markNotReady();
11411185

11421186
// Inside the access container's lock, this checks if the old item is
11431187
// accessible and its refcount is zero. If the item is not accessible,
@@ -1147,10 +1191,25 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11471191
// this item through an API such as remove(ItemHandle). In this case,
11481192
// it is unsafe to replace the old item with a new one, so we should
11491193
// also abort.
1150-
if (!accessContainer_->replaceIf(oldItem, *newItemHdl, itemEvictionPredicate)) {
1194+
if (!accessContainer_->replaceIf(oldItem, *newItemHdl,
1195+
itemEvictionPredicate)) {
11511196
return false;
11521197
}
11531198

1199+
if (config_.moveCb) {
1200+
// Execute the move callback. We cannot make any guarantees about the
1201+
// consistency of the old item beyond this point, because the callback can
1202+
// do more than a simple memcpy() e.g. update external references. If there
1203+
// are any remaining handles to the old item, it is the caller's
1204+
// responsibility to invalidate them. The move can only fail after this
1205+
// statement if the old item has been removed or replaced, in which case it
1206+
// should be fine for it to be left in an inconsistent state.
1207+
config_.moveCb(oldItem, *newItemHdl, nullptr);
1208+
} else {
1209+
std::memcpy(newItemHdl->getWritableMemory(), oldItem.getMemory(),
1210+
oldItem.getSize());
1211+
}
1212+
11541213
// Inside the MM container's lock, this checks if the old item exists to
11551214
// make sure that no other thread removed it, and only then replaces it.
11561215
if (!replaceInMMContainer(oldItem, *newItemHdl)) {
@@ -1185,6 +1244,7 @@ bool CacheAllocator<CacheTrait>::moveRegularItemOnEviction(
11851244
XDCHECK(newItemHdl->hasChainedItem());
11861245
}
11871246
newItemHdl.unmarkNascent();
1247+
resHdl = newItemHdl.clone(); // guard will assign it to ctx under lock
11881248
return true;
11891249
}
11901250

cachelib/allocator/CacheAllocator.h

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include <folly/ScopeGuard.h>
2222
#include <folly/logging/xlog.h>
2323
#include <folly/synchronization/SanitizeThread.h>
24+
#include <folly/hash/Hash.h>
25+
#include <folly/container/F14Map.h>
2426

2527
#include <functional>
2628
#include <memory>
@@ -1754,6 +1756,84 @@ class CacheAllocator : public CacheBase {
17541756
return 0; // TODO
17551757
}
17561758

1759+
bool addWaitContextForMovingItem(
1760+
folly::StringPiece key, std::shared_ptr<WaitContext<ItemHandle>> waiter);
1761+
1762+
class MoveCtx {
1763+
public:
1764+
MoveCtx() {}
1765+
1766+
~MoveCtx() {
1767+
// prevent any further enqueue to waiters
1768+
// Note: we don't need to hold locks since no one can enqueue
1769+
// after this point.
1770+
wakeUpWaiters();
1771+
}
1772+
1773+
// record the item handle. Upon destruction we will wake up the waiters
1774+
// and pass a clone of the handle to the callBack. By default we pass
1775+
// a null handle
1776+
void setItemHandle(ItemHandle _it) { it = std::move(_it); }
1777+
1778+
// enqueue a waiter into the waiter list
1779+
// @param waiter WaitContext
1780+
void addWaiter(std::shared_ptr<WaitContext<ItemHandle>> waiter) {
1781+
XDCHECK(waiter);
1782+
waiters.push_back(std::move(waiter));
1783+
}
1784+
1785+
private:
1786+
// notify all pending waiters that are waiting for the fetch.
1787+
void wakeUpWaiters() {
1788+
bool refcountOverflowed = false;
1789+
for (auto& w : waiters) {
1790+
// If refcount overflowed earlier, then we will return miss to
1791+
// all subsequent waitors.
1792+
if (refcountOverflowed) {
1793+
w->set(ItemHandle{});
1794+
continue;
1795+
}
1796+
1797+
try {
1798+
w->set(it.clone());
1799+
} catch (const exception::RefcountOverflow&) {
1800+
// We'll return a miss to the user's pending read,
1801+
// so we should enqueue a delete via NvmCache.
1802+
// TODO: cache.remove(it);
1803+
refcountOverflowed = true;
1804+
}
1805+
}
1806+
}
1807+
1808+
ItemHandle it; // will be set when Context is being filled
1809+
std::vector<std::shared_ptr<WaitContext<ItemHandle>>> waiters; // list of
1810+
// waiters
1811+
};
1812+
using MoveMap =
1813+
folly::F14ValueMap<folly::StringPiece,
1814+
std::unique_ptr<MoveCtx>,
1815+
folly::HeterogeneousAccessHash<folly::StringPiece>>;
1816+
1817+
static size_t getShardForKey(folly::StringPiece key) {
1818+
return folly::Hash()(key) % kShards;
1819+
}
1820+
1821+
MoveMap& getMoveMapForShard(size_t shard) {
1822+
return movesMap_[shard].movesMap_;
1823+
}
1824+
1825+
MoveMap& getMoveMap(folly::StringPiece key) {
1826+
return getMoveMapForShard(getShardForKey(key));
1827+
}
1828+
1829+
std::unique_lock<std::mutex> getMoveLockForShard(size_t shard) {
1830+
return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_);
1831+
}
1832+
1833+
std::unique_lock<std::mutex> getMoveLock(folly::StringPiece key) {
1834+
return getMoveLockForShard(getShardForKey(key));
1835+
}
1836+
17571837
// Whether the memory allocator for this cache allocator was created on shared
17581838
// memory. The hash table, chained item hash table etc is also created on
17591839
// shared memory except for temporary shared memory mode when they're created
@@ -1871,6 +1951,18 @@ class CacheAllocator : public CacheBase {
18711951
// indicates if the shutdown of cache is in progress or not
18721952
std::atomic<bool> shutDownInProgress_{false};
18731953

1954+
static constexpr size_t kShards = 8192; // TODO: need to define right value
1955+
1956+
// a map of all pending moves
1957+
struct {
1958+
alignas(folly::hardware_destructive_interference_size) MoveMap movesMap_;
1959+
} movesMap_[kShards];
1960+
1961+
// a map of move locks for each shard
1962+
struct {
1963+
alignas(folly::hardware_destructive_interference_size) std::mutex moveLock_;
1964+
} moveLock_[kShards];
1965+
18741966
// END private members
18751967

18761968
// Make this friend to give access to acquire and release

cachelib/allocator/CacheItem-inl.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,21 @@ bool CacheItem<CacheTrait>::isNvmEvicted() const noexcept {
264264
return ref_.isNvmEvicted();
265265
}
266266

267+
template <typename CacheTrait>
268+
void CacheItem<CacheTrait>::markNotReady() noexcept {
269+
ref_.markNotReady();
270+
}
271+
272+
template <typename CacheTrait>
273+
void CacheItem<CacheTrait>::unmarkNotReady() noexcept {
274+
ref_.unmarkNotReady();
275+
}
276+
277+
template <typename CacheTrait>
278+
bool CacheItem<CacheTrait>::isNotReady() const noexcept {
279+
return ref_.isNotReady();
280+
}
281+
267282
template <typename CacheTrait>
268283
void CacheItem<CacheTrait>::markIsChainedItem() noexcept {
269284
XDCHECK(!hasChainedItem());

cachelib/allocator/CacheItem.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,14 @@ class CACHELIB_PACKED_ATTR CacheItem {
240240
void unmarkNvmEvicted() noexcept;
241241
bool isNvmEvicted() const noexcept;
242242

243+
/**
244+
* Marks that the item is migrating between memory tiers and
245+
* not ready for access now. Accessing thread should wait.
246+
*/
247+
void markNotReady() noexcept;
248+
void unmarkNotReady() noexcept;
249+
bool isNotReady() const noexcept;
250+
243251
/**
244252
* Function to set the timestamp for when to expire an item
245253
* Employs a best-effort approach to update the expiryTime. Item's expiry

cachelib/allocator/Handle.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,14 @@ struct HandleImpl {
464464

465465
// Handle which has the item already
466466
FOLLY_ALWAYS_INLINE HandleImpl(Item* it, CacheT& alloc) noexcept
467-
: alloc_(&alloc), it_(it) {}
467+
: alloc_(&alloc), it_(it) {
468+
if (it_ && it_->isNotReady()) {
469+
waitContext_ = std::make_shared<ItemWaitContext>(alloc);
470+
if (!alloc_->addWaitContextForMovingItem(it->getKey(), waitContext_)) {
471+
waitContext_.reset();
472+
}
473+
}
474+
}
468475

469476
// handle that has a wait context allocated. Used for async handles
470477
// In this case, the it_ will be filled in asynchronously and mulitple

cachelib/allocator/Refcount.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
116116
// unevictable in the past.
117117
kUnevictable_NOOP,
118118

119+
// Item is accecible but content is not ready yet. Used by eviction
120+
// when Item is moved between memory tiers.
121+
kNotReady,
122+
119123
// Unused. This is just to indciate the maximum number of flags
120124
kFlagMax,
121125
};
@@ -329,6 +333,14 @@ class FOLLY_PACK_ATTR RefcountWithFlags {
329333
void unmarkNvmEvicted() noexcept { return unSetFlag<kNvmEvicted>(); }
330334
bool isNvmEvicted() const noexcept { return isFlagSet<kNvmEvicted>(); }
331335

336+
/**
337+
* Marks that the item is migrating between memory tiers and
338+
* not ready for access now. Accessing thread should wait.
339+
*/
340+
void markNotReady() noexcept { return setFlag<kNotReady>(); }
341+
void unmarkNotReady() noexcept { return unSetFlag<kNotReady>(); }
342+
bool isNotReady() const noexcept { return isFlagSet<kNotReady>(); }
343+
332344
// Whether or not an item is completely drained of access
333345
// Refcount is 0 and the item is not linked, accessible, nor moving
334346
bool isDrained() const noexcept { return getRefWithAccessAndAdmin() == 0; }

cachelib/allocator/tests/ItemHandleTest.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ struct TestItem {
3939
using ChainedItem = int;
4040

4141
void reset() {}
42+
43+
folly::StringPiece getKey() const { return folly::StringPiece(); }
44+
45+
bool isNotReady() const { return false; }
4246
};
4347

4448
struct TestNvmCache;
@@ -79,6 +83,12 @@ struct TestAllocator {
7983

8084
void adjustHandleCountForThread_private(int i) { tlRef_.tlStats() += i; }
8185

86+
bool addWaitContextForMovingItem(
87+
folly::StringPiece key,
88+
std::shared_ptr<WaitContext<TestItemHandle>> waiter) {
89+
return false;
90+
}
91+
8292
util::FastStats<int> tlRef_;
8393
};
8494
} // namespace

0 commit comments

Comments
 (0)