Skip to content

Commit c687e23

Browse files
committed
Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats.
1 parent 8a0da10 commit c687e23

File tree

1 file changed

+111
-5
lines changed

1 file changed

+111
-5
lines changed

src/server.rs

Lines changed: 111 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub struct Server {
6161
connected_at: chrono::naive::NaiveDateTime,
6262

6363
/// Reports various metrics, e.g. data sent & received.
64-
stats: Reporter,
64+
stats: Option<Reporter>,
6565

6666
/// Application name using the server at the moment.
6767
application_name: String,
@@ -79,7 +79,7 @@ impl Server {
7979
user: &User,
8080
database: &str,
8181
client_server_map: ClientServerMap,
82-
stats: Reporter,
82+
stats: Option<Reporter>,
8383
) -> Result<Server, Error> {
8484
let mut stream =
8585
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
@@ -384,7 +384,9 @@ impl Server {
384384

385385
/// Send messages to the server from the client.
386386
pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> {
387-
self.stats.data_sent(messages.len(), self.server_id);
387+
if let Some(stats) = &self.stats {
388+
stats.data_sent(messages.len(), self.server_id);
389+
}
388390

389391
match write_all_half(&mut self.write, messages).await {
390392
Ok(_) => {
@@ -533,7 +535,9 @@ impl Server {
533535
let bytes = self.buffer.clone();
534536

535537
// Keep track of how much data we got from the server for stats.
536-
self.stats.data_received(bytes.len(), self.server_id);
538+
if let Some(stats) = &self.stats {
539+
stats.data_received(bytes.len(), self.server_id);
540+
}
537541

538542
// Clear the buffer for next query.
539543
self.buffer.clear();
@@ -674,14 +678,116 @@ impl Server {
674678
pub fn mark_dirty(&mut self) {
675679
self.needs_cleanup = true;
676680
}
681+
682+
// This is so we can execute out of band queries to the server.
683+
// The connection will be opened, the query executed and closed.
684+
pub async fn exec_simple_query(
685+
address: &Address,
686+
user: &User,
687+
query: &str,
688+
) -> Result<Vec<String>, Error> {
689+
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
690+
691+
debug!("Connecting to server to obtain auth hashes.");
692+
let mut server = Server::startup(
693+
0,
694+
address,
695+
user,
696+
&address.database,
697+
client_server_map,
698+
None,
699+
Arc::new(RwLock::new(None)),
700+
)
701+
.await?;
702+
debug!("Connected!, sending query.");
703+
server.send(&simple_query(query)).await?;
704+
let mut message = server.recv().await?;
705+
706+
Ok(parse_query_message(&mut message).await?)
707+
}
708+
}
709+
710+
async fn parse_query_message(message: &mut BytesMut) -> Result<Vec<String>, Error> {
711+
let mut pair = Vec::<String>::new();
712+
match message::backend::Message::parse(message) {
713+
Ok(Some(message::backend::Message::RowDescription(_description))) => {}
714+
Ok(Some(message::backend::Message::ErrorResponse(err))) => {
715+
return Err(Error::ProtocolSyncError(format!(
716+
"Protocol error parsing response. Err: {:?}",
717+
err.fields()
718+
.iterator()
719+
.fold(String::default(), |acc, element| acc
720+
+ element.unwrap().value())
721+
)))
722+
}
723+
Ok(_) => {
724+
return Err(Error::ProtocolSyncError(
725+
"Protocol error, expected Row Description.".to_string(),
726+
))
727+
}
728+
Err(err) => {
729+
return Err(Error::ProtocolSyncError(format!(
730+
"Protocol error parsing response. Err: {:?}",
731+
err
732+
)))
733+
}
734+
}
735+
736+
while !message.is_empty() {
737+
match message::backend::Message::parse(message) {
738+
Ok(postgres_message) => {
739+
match postgres_message {
740+
Some(message::backend::Message::DataRow(data)) => {
741+
let buf = data.buffer();
742+
trace!("Data: {:?}", buf);
743+
744+
for item in data.ranges().iterator() {
745+
match item.as_ref() {
746+
Ok(range) => match range {
747+
Some(range) => {
748+
pair.push(String::from_utf8_lossy(&buf[range.clone()]).to_string());
749+
}
750+
None => return Err(Error::ProtocolSyncError(String::from(
751+
"Data expected while receiving query auth data, found nothing.",
752+
))),
753+
},
754+
Err(err) => {
755+
return Err(Error::ProtocolSyncError(format!(
756+
"Data error, err: {:?}",
757+
err
758+
)))
759+
}
760+
}
761+
}
762+
}
763+
Some(message::backend::Message::CommandComplete(_)) => {}
764+
Some(message::backend::Message::ReadyForQuery(_)) => {}
765+
_ => {
766+
return Err(Error::ProtocolSyncError(
767+
"Unexpected message while receiving auth query data.".to_string(),
768+
))
769+
}
770+
}
771+
}
772+
Err(err) => {
773+
return Err(Error::ProtocolSyncError(format!(
774+
"Parse error, err: {:?}",
775+
err
776+
)))
777+
}
778+
};
779+
}
780+
Ok(pair)
677781
}
678782

679783
impl Drop for Server {
680784
/// Try to do a clean shut down. Best effort because
681785
/// the socket is in non-blocking mode, so it may not be ready
682786
/// for a write.
683787
fn drop(&mut self) {
684-
self.stats.server_disconnecting(self.server_id);
788+
if let Some(stats) = &self.stats {
789+
stats.server_disconnecting(self.server_id);
790+
}
685791

686792
let mut bytes = BytesMut::with_capacity(4);
687793
bytes.put_u8(b'X');

0 commit comments

Comments
 (0)