Skip to content

Commit 85ac3ef

Browse files
authored
Buffer client CopyData messages (#284)
Buffers CopyData messages and removes buffer clone for the sync message
1 parent 7894bba commit 85ac3ef

File tree

4 files changed

+44
-34
lines changed

4 files changed

+44
-34
lines changed

src/admin.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ where
171171
res.put_i32(5);
172172
res.put_u8(b'I');
173173

174-
write_all_half(stream, res).await
174+
write_all_half(stream, &res).await
175175
}
176176

177177
/// Show PgCat version.
@@ -189,7 +189,7 @@ where
189189
res.put_i32(5);
190190
res.put_u8(b'I');
191191

192-
write_all_half(stream, res).await
192+
write_all_half(stream, &res).await
193193
}
194194

195195
/// Show utilization of connection pools for each shard and replicas.
@@ -250,7 +250,7 @@ where
250250
res.put_i32(5);
251251
res.put_u8(b'I');
252252

253-
write_all_half(stream, res).await
253+
write_all_half(stream, &res).await
254254
}
255255

256256
/// Show shards and replicas.
@@ -317,7 +317,7 @@ where
317317
res.put_i32(5);
318318
res.put_u8(b'I');
319319

320-
write_all_half(stream, res).await
320+
write_all_half(stream, &res).await
321321
}
322322

323323
/// Ignore any SET commands the client sends.
@@ -349,7 +349,7 @@ where
349349
res.put_i32(5);
350350
res.put_u8(b'I');
351351

352-
write_all_half(stream, res).await
352+
write_all_half(stream, &res).await
353353
}
354354

355355
/// Shows current configuration.
@@ -395,7 +395,7 @@ where
395395
res.put_i32(5);
396396
res.put_u8(b'I');
397397

398-
write_all_half(stream, res).await
398+
write_all_half(stream, &res).await
399399
}
400400

401401
/// Show shard and replicas statistics.
@@ -455,7 +455,7 @@ where
455455
res.put_i32(5);
456456
res.put_u8(b'I');
457457

458-
write_all_half(stream, res).await
458+
write_all_half(stream, &res).await
459459
}
460460

461461
/// Show currently connected clients
@@ -505,7 +505,7 @@ where
505505
res.put_i32(5);
506506
res.put_u8(b'I');
507507

508-
write_all_half(stream, res).await
508+
write_all_half(stream, &res).await
509509
}
510510

511511
/// Show currently connected servers
@@ -559,5 +559,5 @@ where
559559
res.put_i32(5);
560560
res.put_u8(b'I');
561561

562-
write_all_half(stream, res).await
562+
write_all_half(stream, &res).await
563563
}

src/client.rs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ where
861861
'Q' => {
862862
debug!("Sending query to server");
863863

864-
self.send_and_receive_loop(code, message, server, &address, &pool)
864+
self.send_and_receive_loop(code, Some(&message), server, &address, &pool)
865865
.await?;
866866

867867
if !server.in_transaction() {
@@ -931,14 +931,8 @@ where
931931
}
932932
}
933933

934-
self.send_and_receive_loop(
935-
code,
936-
self.buffer.clone(),
937-
server,
938-
&address,
939-
&pool,
940-
)
941-
.await?;
934+
self.send_and_receive_loop(code, None, server, &address, &pool)
935+
.await?;
942936

943937
self.buffer.clear();
944938

@@ -955,21 +949,32 @@ where
955949

956950
// CopyData
957951
'd' => {
958-
// Forward the data to the server,
959-
// don't buffer it since it can be rather large.
960-
self.send_server_message(server, message, &address, &pool)
961-
.await?;
952+
self.buffer.put(&message[..]);
953+
954+
// Want to limit buffer size
955+
if self.buffer.len() > 8196 {
956+
// Forward the data to the server,
957+
self.send_server_message(server, &self.buffer, &address, &pool)
958+
.await?;
959+
self.buffer.clear();
960+
}
962961
}
963962

964963
// CopyDone or CopyFail
965964
// Copy is done, successfully or not.
966965
'c' | 'f' => {
967-
self.send_server_message(server, message, &address, &pool)
966+
// We may already have some copy data in the buffer, add this message to buffer
967+
self.buffer.put(&message[..]);
968+
969+
self.send_server_message(server, &self.buffer, &address, &pool)
968970
.await?;
969971

972+
// Clear the buffer
973+
self.buffer.clear();
974+
970975
let response = self.receive_server_message(server, &address, &pool).await?;
971976

972-
match write_all_half(&mut self.write, response).await {
977+
match write_all_half(&mut self.write, &response).await {
973978
Ok(_) => (),
974979
Err(err) => {
975980
server.mark_bad();
@@ -1016,13 +1021,18 @@ where
10161021
async fn send_and_receive_loop(
10171022
&mut self,
10181023
code: char,
1019-
message: BytesMut,
1024+
message: Option<&BytesMut>,
10201025
server: &mut Server,
10211026
address: &Address,
10221027
pool: &ConnectionPool,
10231028
) -> Result<(), Error> {
10241029
debug!("Sending {} to server", code);
10251030

1031+
let message = match message {
1032+
Some(message) => message,
1033+
None => &self.buffer,
1034+
};
1035+
10261036
self.send_server_message(server, message, address, pool)
10271037
.await?;
10281038

@@ -1032,7 +1042,7 @@ where
10321042
loop {
10331043
let response = self.receive_server_message(server, address, pool).await?;
10341044

1035-
match write_all_half(&mut self.write, response).await {
1045+
match write_all_half(&mut self.write, &response).await {
10361046
Ok(_) => (),
10371047
Err(err) => {
10381048
server.mark_bad();
@@ -1058,7 +1068,7 @@ where
10581068
async fn send_server_message(
10591069
&self,
10601070
server: &mut Server,
1061-
message: BytesMut,
1071+
message: &BytesMut,
10621072
address: &Address,
10631073
pool: &ConnectionPool,
10641074
) -> Result<(), Error> {

src/messages.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ where
258258
res.put_i32(len);
259259
res.put_slice(&set_complete[..]);
260260

261-
write_all_half(stream, res).await?;
261+
write_all_half(stream, &res).await?;
262262
ready_for_query(stream).await
263263
}
264264

@@ -308,7 +308,7 @@ where
308308
res.put_i32(error.len() as i32 + 4);
309309
res.put(error);
310310

311-
write_all_half(stream, res).await
311+
write_all_half(stream, &res).await
312312
}
313313

314314
pub async fn wrong_password<S>(stream: &mut S, user: &str) -> Result<(), Error>
@@ -370,7 +370,7 @@ where
370370
// CommandComplete
371371
res.put(command_complete("SELECT 1"));
372372

373-
write_all_half(stream, res).await?;
373+
write_all_half(stream, &res).await?;
374374
ready_for_query(stream).await
375375
}
376376

@@ -459,11 +459,11 @@ where
459459
}
460460

461461
/// Write all the data in the buffer to the TcpStream, write owned half (see mpsc).
462-
pub async fn write_all_half<S>(stream: &mut S, buf: BytesMut) -> Result<(), Error>
462+
pub async fn write_all_half<S>(stream: &mut S, buf: &BytesMut) -> Result<(), Error>
463463
where
464464
S: tokio::io::AsyncWrite + std::marker::Unpin,
465465
{
466-
match stream.write_all(&buf).await {
466+
match stream.write_all(buf).await {
467467
Ok(_) => Ok(()),
468468
Err(_) => return Err(Error::SocketError(format!("Error writing to socket"))),
469469
}

src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ impl Server {
381381
}
382382

383383
/// Send messages to the server from the client.
384-
pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> {
384+
pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> {
385385
self.stats.data_sent(messages.len(), self.server_id);
386386

387387
match write_all_half(&mut self.write, messages).await {
@@ -593,7 +593,7 @@ impl Server {
593593
pub async fn query(&mut self, query: &str) -> Result<(), Error> {
594594
let query = simple_query(query);
595595

596-
self.send(query).await?;
596+
self.send(&query).await?;
597597

598598
loop {
599599
let _ = self.recv().await?;

0 commit comments

Comments
 (0)