Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions pgxn/neon/communicator.c
Original file line number Diff line number Diff line change
Expand Up @@ -2398,11 +2398,16 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
return db_size;
}

int
communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *request_lsns,
void *buffer)
/*
* Download the given SLRU segment from the pageserver.
*
* See comments on read_slru_segment_result for the result.
*/
read_slru_segment_result
communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns)
{
int n_blocks;
read_slru_segment_result result = { NULL, 0, NULL };
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
NeonResponse *resp = NULL;
NeonGetSlruSegmentRequest request;
Expand Down Expand Up @@ -2456,8 +2461,9 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
}
}
n_blocks = slru_resp->n_blocks;
memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ);
result.slru_data = slru_resp->data;
result.n_blocks = slru_resp->n_blocks;
result.buf = slru_resp;
break;
}
case T_NeonErrorResponse:
Expand Down Expand Up @@ -2486,10 +2492,10 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
}
pfree(resp);
/* do not pfree(resp) here. The caller is responsible for it */

communicator_reconfigure_timeout_if_needed();
return n_blocks;
return result;
}

void
Expand Down
21 changes: 18 additions & 3 deletions pgxn/neon/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,24 @@ extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_l
BlockNumber nblocks, const bits8 *mask);
extern bool communicator_prefetch_receive(BufferTag tag);

extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);
/* Return type for communicator_read_slru_segment */
typedef struct read_slru_segment_result
{
/* the SLRU segment content is returned here */
void *slru_data;

/* Size of 'slru_data', in BLCKSZ blocks. 0 if the segment was not found */
int n_blocks;

/*
* 'slru_data' points to a larger palloc'd structure that's opaque
* to the caller. Once you're done with the result, pfree this.
*/
void *buf;
} read_slru_segment_result;

extern read_slru_segment_result communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns);

extern void communicator_reconfigure_timeout_if_needed(void);
extern void communicator_prefetch_pump_state(void);
Expand Down
65 changes: 59 additions & 6 deletions pgxn/neon/pagestore_smgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_class.h"
#include "common/file_utils.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/interrupt.h"
#include "port/pg_iovec.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
#include "storage/fd.h"
#include "storage/fsm_internals.h"
#include "storage/md.h"
#include "storage/smgr.h"
Expand Down Expand Up @@ -2127,13 +2129,20 @@ neon_end_unlogged_build(SMgrRelation reln)

#define STRPREFIX(str, prefix) (strncmp(str, prefix, strlen(prefix)) == 0)

static int
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
/*
* Attempt to download the given SLRU segment file from the pageserver.
*
* Returns:
* true if the file was successfully downloaded
* false if the file was not found in pageserver
* ereports if some other error happened
*/
static bool
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno)
{
XLogRecPtr request_lsn,
not_modified_since;
SlruKind kind;
int n_blocks;
neon_request_lsns request_lsns;

/*
Expand Down Expand Up @@ -2164,22 +2173,66 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
*/
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());

/* Only these SLRUs are stored in the pageserver */
if (STRPREFIX(path, "pg_xact"))
kind = SLRU_CLOG;
else if (STRPREFIX(path, "pg_multixact/members"))
kind = SLRU_MULTIXACT_MEMBERS;
else if (STRPREFIX(path, "pg_multixact/offsets"))
kind = SLRU_MULTIXACT_OFFSETS;
else
return -1;
return false;

request_lsns.request_lsn = request_lsn;
request_lsns.not_modified_since = not_modified_since;
request_lsns.effective_request_lsn = request_lsn;

n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);
{
read_slru_segment_result result;
int fd;
struct iovec iov[1];

return n_blocks;
/* Call the pageserver */
result = communicator_read_slru_segment(kind, segno, &request_lsns);
if (result.n_blocks == 0)
{
/* "File not found" from pageserver */
if (result.buf)
pfree(result.buf);
return false;
}

/* Success! Write the contents to the file */
fd = OpenTransientFile(path, O_WRONLY | O_EXCL | O_CREAT | PG_BINARY);
if (fd < 0)
{
ereport(ERROR,
errcode_for_file_access(),
errmsg("could not create SLRU file \"%s\" to write downloaded contents: %m",
path));
}

errno = 0;
iov[0].iov_base = result.slru_data;
iov[0].iov_len = result.n_blocks * BLCKSZ;
pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
if (pg_pwritev_with_retry(fd, iov, 1, 0) != result.n_blocks * BLCKSZ)
{
pgstat_report_wait_end();
/* if write didn't set errno, assume problem is no disk space */
if (errno == 0)
errno = ENOSPC;
ereport(ERROR,
errcode_for_file_access(),
errmsg("could not write downloaded contents to SLRU file \"%s\": %m",
path));
}
pgstat_report_wait_end();
pfree(result.buf);
CloseTransientFile(fd);

return true;
}
}

static void
Expand Down
8 changes: 4 additions & 4 deletions vendor/revisions.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"1e01fcea2a6b38180021aa83e0051d95286d9096"
"4bb475f24ebbe860fd5432235bd2ab09413b9816"
],
"v16": [
"16.9",
"a42351fcd41ea01edede1daed65f651e838988fc"
"e871de05e4bef58a580e28be393a43e12852d5fc"
],
"v15": [
"15.13",
"2aaab3bb4a13557aae05bb2ae0ef0a132d0c4f85"
"6d0324add02106b6843a3d3701f4923e847bcbb0"
],
"v14": [
"14.18",
"2155cb165d05f617eb2c8ad7e43367189b627703"
"a9419a4ec4cd9fb80eaf45dffe4a2b82a0f25ec1"
]
}
Loading