Skip to content

Commit e52d5b5

Browse files
committed
Execute infrastructure
1 parent 3356713 commit e52d5b5

File tree

1 file changed

+96
-1
lines changed

1 file changed

+96
-1
lines changed

postgres-tokio/src/lib.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub use postgres_shared::{params, types};
2424
use error::{ConnectError, Error, DbError};
2525
use params::{ConnectParams, IntoConnectParams};
2626
use stream::PostgresStream;
27-
use types::{Oid, Type};
27+
use types::{Oid, Type, ToSql, SessionInfo, IsNull};
2828

2929
pub mod error;
3030
mod stream;
@@ -402,6 +402,8 @@ impl Connection {
402402
|f, t| Column { name: f.0, type_: t })
403403
.map(|(r, s)| (p, r, s))
404404
})
405+
.and_then(|(p, r, s)| s.ready((p, r)))
406+
.map(|((p, r), s)| (p, r, s))
405407
.boxed()
406408
}
407409

@@ -438,6 +440,99 @@ impl Connection {
438440
unimplemented!()
439441
}
440442

443+
fn raw_execute(self,
444+
stmt: &str,
445+
portal: &str,
446+
param_types: &[Type],
447+
params: &[&ToSql])
448+
-> BoxFuture<Connection, Error> {
449+
assert!(param_types.len() == params.len(),
450+
"expected {} parameters but got {}",
451+
param_types.len(),
452+
params.len());
453+
454+
let mut bind = vec![];
455+
let mut execute = vec![];
456+
let mut sync = vec![];
457+
let r = frontend::bind(stmt,
458+
portal,
459+
Some(1),
460+
params.iter().zip(param_types),
461+
|(param, ty), buf| {
462+
let info = SessionInfo::new(&self.0.parameters);
463+
match param.to_sql_checked(ty, buf, &info) {
464+
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
465+
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
466+
Err(e) => Err(e),
467+
}
468+
},
469+
Some(1),
470+
&mut bind);
471+
let r = match r {
472+
Ok(()) => Ok(self),
473+
Err(frontend::BindError::Conversion(e)) => Err(Error::Conversion(e, self)),
474+
Err(frontend::BindError::Serialization(e)) => Err(Error::Io(e)),
475+
};
476+
477+
r.and_then(|s| {
478+
frontend::execute(portal, 0, &mut execute)
479+
.map(|()| s)
480+
.map_err(Error::Io)
481+
})
482+
.map(|s| {
483+
frontend::sync(&mut sync);
484+
s
485+
})
486+
.into_future()
487+
.and_then(|s| s.0.send(bind).map_err(Error::Io))
488+
.and_then(|s| s.send(execute).map_err(Error::Io))
489+
.and_then(|s| s.send(sync).map_err(Error::Io))
490+
.and_then(|s| s.flush().map_err(Error::Io))
491+
.and_then(|s| s.read().map_err(Error::Io))
492+
.and_then(|(m, s)| {
493+
match m {
494+
backend::Message::BindComplete => Either::A(Ok(Connection(s)).into_future()),
495+
backend::Message::ErrorResponse(body) => {
496+
Either::B(Connection(s).ready_err(body))
497+
}
498+
_ => Either::A(Err(bad_message()).into_future()),
499+
}
500+
})
501+
.boxed()
502+
}
503+
504+
fn finish_execute(self) -> BoxFuture<(u64, Connection), Error> {
505+
self.0.read()
506+
.map_err(Error::Io)
507+
.and_then(|(m, s)| {
508+
match m {
509+
backend::Message::DataRow(_) => Either::B(Connection(s).finish_execute()),
510+
backend::Message::CommandComplete(body) => {
511+
Either::A(body.tag()
512+
.map(|tag| {
513+
let num = tag.split_whitespace()
514+
.last()
515+
.unwrap()
516+
.parse()
517+
.unwrap_or(0);
518+
(num, Connection(s))
519+
})
520+
.map_err(Error::Io)
521+
.into_future())
522+
}
523+
backend::Message::EmptyQueryResponse => {
524+
Either::A(Ok((0, Connection(s))).into_future())
525+
}
526+
backend::Message::ErrorResponse(body) => {
527+
Either::B(Connection(s).ready_err(body))
528+
}
529+
_ => Either::A(Err(bad_message()).into_future()),
530+
}
531+
})
532+
.and_then(|(n, s)| s.ready(n))
533+
.boxed()
534+
}
535+
441536
pub fn cancel_data(&self) -> CancelData {
442537
self.0.cancel_data
443538
}

0 commit comments

Comments
 (0)