Skip to content

Commit e03e80e

Browse files
committed
Cancel queue
1 parent e1e8150 commit e03e80e

File tree

5 files changed

+186
-33
lines changed

5 files changed

+186
-33
lines changed

enclave-runner/src/usercalls/abi.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures::future::Future;
1919

2020
type Register = u64;
2121

22-
trait RegisterArgument {
22+
pub(super) trait RegisterArgument {
2323
fn from_register(_: Register) -> Self;
2424
fn into_register(self) -> Register;
2525
}
@@ -29,7 +29,7 @@ type EnclaveAbort = super::EnclaveAbort<bool>;
2929
pub(crate) type UsercallResult<T> = ::std::result::Result<T, EnclaveAbort>;
3030
pub(crate) type DispatchResult = UsercallResult<(Register, Register)>;
3131

32-
trait ReturnValue {
32+
pub(super) trait ReturnValue {
3333
fn into_registers(self) -> DispatchResult;
3434
}
3535

@@ -38,7 +38,7 @@ macro_rules! define_usercalls {
3838
($(fn $f:ident($($n:ident: $t:ty),*) $(-> $r:tt)*; )*) => {
3939
#[repr(C)]
4040
#[allow(non_camel_case_types)]
41-
enum UsercallList {
41+
pub(super) enum UsercallList {
4242
__enclave_usercalls_invalid,
4343
$($f,)*
4444
}

enclave-runner/src/usercalls/interface.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,13 @@ impl<'future, 'ioinput: 'future, 'tcs: 'ioinput> Usercalls<'future> for Handler<
252252
self,
253253
usercall_queue: *mut FifoDescriptor<Usercall>,
254254
return_queue: *mut FifoDescriptor<Return>,
255+
cancel_queue: *mut FifoDescriptor<Cancel>,
255256
) -> std::pin::Pin<Box<dyn Future<Output = (Self, UsercallResult<Result>)> + 'future>> {
256257
async move {
257258
unsafe {
258259
let ret = match (usercall_queue.as_mut(), return_queue.as_mut()) {
259260
(Some(usercall_queue), Some(return_queue)) => {
260-
self.0.async_queues(usercall_queue, return_queue).await.map(Ok)
261+
self.0.async_queues(usercall_queue, return_queue, cancel_queue.as_mut()).await.map(Ok)
261262
},
262263
_ => {
263264
Ok(Err(IoErrorKind::InvalidInput.into()))
@@ -321,13 +322,13 @@ fn result_from_io_error(err: IoError) -> Result {
321322
ret as _
322323
}
323324

324-
trait ToSgxResult {
325+
pub(super) trait ToSgxResult {
325326
type Return;
326327

327328
fn to_sgx_result(self) -> Self::Return;
328329
}
329330

330-
trait SgxReturn {
331+
pub(super) trait SgxReturn {
331332
fn on_error() -> Self;
332333
}
333334

enclave-runner/src/usercalls/mod.rs

Lines changed: 129 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use ipc_queue::{self, QueueEvent};
2424
use sgxs::loader::Tcs as SgxsTcs;
2525
use std::alloc::{GlobalAlloc, Layout, System};
2626
use std::cell::RefCell;
27-
use std::collections::VecDeque;
27+
use std::collections::{HashMap, VecDeque};
2828
use std::io::{self, ErrorKind as IoErrorKind, Read, Result as IoResult};
2929
use std::pin::Pin;
3030
use std::result::Result as StdResult;
@@ -46,16 +46,17 @@ lazy_static! {
4646
pub(crate) mod abi;
4747
mod interface;
4848

49-
use self::abi::dispatch;
49+
use self::abi::{dispatch, UsercallList};
5050
use self::interface::{Handler, OutputBuffer};
5151

52-
const EV_ABORT: u64 = 0b0000_0000_0000_1000;
52+
const EV_ABORT: u64 = 0b0000_0000_0001_0000;
5353

5454
// Experiments show that tha actual size of these queues is less important than
5555
// the ratio between them. It appears that a much larger return queue performs
5656
// much better when multiple enclave threads send usercalls.
5757
const USERCALL_QUEUE_SIZE: usize = 16;
5858
const RETURN_QUEUE_SIZE: usize = 1024;
59+
const CANCEL_QUEUE_SIZE: usize = USERCALL_QUEUE_SIZE * 2;
5960

6061
enum UsercallSendData {
6162
Sync(ThreadResult<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
@@ -64,7 +65,7 @@ enum UsercallSendData {
6465

6566
enum UsercallHandleData {
6667
Sync(tcs::Usercall<ErasedTcs>, RunningTcs, RefCell<[u8; 1024]>),
67-
Async(Usercall),
68+
Async(Usercall, Option<async_mpsc::UnboundedSender<UsercallEvent>>),
6869
}
6970

7071
type EnclaveResult = StdResult<(u64, u64), EnclaveAbort<Option<EnclavePanic>>>;
@@ -513,11 +514,11 @@ struct PendingEvents {
513514
}
514515

515516
impl PendingEvents {
516-
const MAX_EVENT: usize = 8;
517+
const MAX_EVENT: usize = 16;
517518

518519
fn new() -> Self {
519520
// sanity check to ensure we update MAX_EVENT if new events are added in the future
520-
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK;
521+
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_CANCELQ_NOT_FULL | EV_UNPARK;
521522
assert!(EV_ALL < Self::MAX_EVENT as u64);
522523
assert!(Self::MAX_EVENT <= 1usize + u8::max_value() as usize);
523524

@@ -587,6 +588,7 @@ impl EnclaveKind {
587588
struct FifoDescriptors {
588589
usercall_queue: FifoDescriptor<Usercall>,
589590
return_queue: FifoDescriptor<Return>,
591+
cancel_queue: FifoDescriptor<Cancel>,
590592
}
591593

592594
pub(crate) struct EnclaveState {
@@ -630,6 +632,31 @@ impl Work {
630632
}
631633
}
632634

635+
enum UsercallEvent {
636+
Started(u64, tokio::sync::oneshot::Sender<()>),
637+
Finished(u64),
638+
Cancelled(u64, Instant),
639+
}
640+
641+
fn ignore_cancel_impl(usercall_nr: u64) -> bool {
642+
usercall_nr != UsercallList::read as u64 &&
643+
usercall_nr != UsercallList::read_alloc as u64 &&
644+
usercall_nr != UsercallList::write as u64 &&
645+
usercall_nr != UsercallList::accept_stream as u64 &&
646+
usercall_nr != UsercallList::connect_stream as u64 &&
647+
usercall_nr != UsercallList::wait as u64
648+
}
649+
650+
trait IgnoreCancel {
651+
fn ignore_cancel(&self) -> bool;
652+
}
653+
impl IgnoreCancel for Usercall {
654+
fn ignore_cancel(&self) -> bool { ignore_cancel_impl(self.args.0) }
655+
}
656+
impl IgnoreCancel for Cancel {
657+
fn ignore_cancel(&self) -> bool { ignore_cancel_impl(self.usercall_nr) }
658+
}
659+
633660
impl EnclaveState {
634661
fn event_queue_add_tcs(
635662
event_queues: &mut FnvHashMap<TcsAddress, futures::channel::mpsc::UnboundedSender<u8>>,
@@ -702,15 +729,41 @@ impl EnclaveState {
702729
tx_return_channel: tokio::sync::mpsc::UnboundedSender<(EnclaveResult, EnclaveEntry)>,
703730
mut handle_data: UsercallHandleData,
704731
) {
732+
let notifier_rx = match handle_data {
733+
UsercallHandleData::Async(ref usercall, Some(ref usercall_event_tx)) => {
734+
let (notifier_tx, notifier_rx) = tokio::sync::oneshot::channel();
735+
usercall_event_tx.send(UsercallEvent::Started(usercall.id, notifier_tx)).ok()
736+
.expect("failed to send usercall event");
737+
Some(notifier_rx)
738+
},
739+
_ => None,
740+
};
705741
let (parameters, mode, tcs) = match handle_data {
706742
UsercallHandleData::Sync(ref usercall, ref mut tcs, _) => (usercall.parameters(), tcs.mode, Some(tcs)),
707-
UsercallHandleData::Async(ref usercall) => (usercall.args, EnclaveEntry::ExecutableNonMain, None),
743+
UsercallHandleData::Async(ref usercall, _) => (usercall.args, EnclaveEntry::ExecutableNonMain, None),
708744
};
709745
let mut input = IOHandlerInput { enclave: enclave.clone(), tcs, work_sender: &work_sender };
710746
let handler = Handler(&mut input);
711-
let (_handler, result) = {
747+
let result = {
748+
use self::interface::ToSgxResult;
749+
use self::abi::ReturnValue;
750+
712751
let (p1, p2, p3, p4, p5) = parameters;
713-
dispatch(handler, p1, p2, p3, p4, p5).await
752+
match notifier_rx {
753+
None => dispatch(handler, p1, p2, p3, p4, p5).await.1,
754+
Some(notifier_rx) => {
755+
let a = dispatch(handler, p1, p2, p3, p4, p5).boxed_local();
756+
let b = notifier_rx;
757+
match futures::future::select(a, b).await {
758+
Either::Left((ret, _)) => ret.1,
759+
Either::Right((Ok(()), _)) => {
760+
let result: IoResult<usize> = Err(IoErrorKind::Interrupted.into());
761+
ReturnValue::into_registers(Ok(result.to_sgx_result()))
762+
},
763+
Either::Right((Err(_), _)) => panic!("notifier channel closed unexpectedly"),
764+
}
765+
},
766+
}
714767
};
715768
let ret = match result {
716769
Ok(ret) => {
@@ -721,7 +774,11 @@ impl EnclaveState {
721774
entry: CoEntry::Resume(usercall, ret),
722775
}).expect("Work sender couldn't send data to receiver");
723776
}
724-
UsercallHandleData::Async(usercall) => {
777+
UsercallHandleData::Async(usercall, usercall_event_tx) => {
778+
if let Some(usercall_event_tx) = usercall_event_tx {
779+
usercall_event_tx.send(UsercallEvent::Finished(usercall.id)).ok()
780+
.expect("failed to send usercall event");
781+
}
725782
let return_queue_tx = enclave.return_queue_tx.lock().await.clone().expect("return_queue_tx not initialized");
726783
let ret = Return {
727784
id: usercall.id,
@@ -740,7 +797,7 @@ impl EnclaveState {
740797
trap_attached_debugger(usercall.tcs_address() as _).await;
741798
EnclavePanic::from(debug_buf.into_inner())
742799
}
743-
UsercallHandleData::Async(_) => {
800+
UsercallHandleData::Async(_, _) => {
744801
// FIXME: find a better panic message
745802
EnclavePanic::DebugStr("async exit with a panic".to_owned())
746803
}
@@ -815,14 +872,16 @@ impl EnclaveState {
815872
};
816873
let enclave_clone = enclave.clone();
817874
let io_future = async move {
818-
let (usercall_queue_synchronizer, return_queue_synchronizer, sync_usercall_tx) = QueueSynchronizer::new(enclave_clone.clone());
875+
let (uqs, rqs, cqs, sync_usercall_tx) = QueueSynchronizer::new(enclave_clone.clone());
819876

820-
let (usercall_queue_tx, usercall_queue_rx) = ipc_queue::bounded_async(USERCALL_QUEUE_SIZE, usercall_queue_synchronizer);
821-
let (return_queue_tx, return_queue_rx) = ipc_queue::bounded_async(RETURN_QUEUE_SIZE, return_queue_synchronizer);
877+
let (usercall_queue_tx, usercall_queue_rx) = ipc_queue::bounded_async(USERCALL_QUEUE_SIZE, uqs);
878+
let (return_queue_tx, return_queue_rx) = ipc_queue::bounded_async(RETURN_QUEUE_SIZE, rqs);
879+
let (cancel_queue_tx, cancel_queue_rx) = ipc_queue::bounded_async(CANCEL_QUEUE_SIZE, cqs);
822880

823881
let fifo_descriptors = FifoDescriptors {
824882
usercall_queue: usercall_queue_tx.into_descriptor(),
825883
return_queue: return_queue_rx.into_descriptor(),
884+
cancel_queue: cancel_queue_tx.into_descriptor(),
826885
};
827886

828887
*enclave_clone.fifo_descriptors.lock().await = Some(fifo_descriptors);
@@ -834,14 +893,51 @@ impl EnclaveState {
834893
}
835894
});
836895

896+
let (usercall_event_tx, mut usercall_event_rx) = async_mpsc::unbounded_channel();
897+
let usercall_event_tx_clone = usercall_event_tx.clone();
898+
tokio::task::spawn_local(async move {
899+
while let Ok(c) = cancel_queue_rx.recv().await {
900+
if !c.ignore_cancel() {
901+
let _ = usercall_event_tx_clone.send(UsercallEvent::Cancelled(c.id, Instant::now()));
902+
}
903+
}
904+
});
905+
906+
tokio::task::spawn_local(async move {
907+
let mut notifiers = HashMap::new();
908+
let mut cancels: HashMap<u64, Instant> = HashMap::new();
909+
// This should be greater than the amount of time it takes for the enclave runner
910+
// to start executing a usercall after the enclave sends it on the usercall_queue.
911+
const CANCEL_EXPIRY: Duration = Duration::from_millis(100);
912+
loop {
913+
match usercall_event_rx.recv().await.expect("usercall_event channel closed unexpectedly") {
914+
UsercallEvent::Started(id, notifier) => match cancels.remove(&id) {
915+
Some(t) if t.elapsed() < CANCEL_EXPIRY => { let _ = notifier.send(()); },
916+
_ => { notifiers.insert(id, notifier); },
917+
},
918+
UsercallEvent::Finished(id) => { notifiers.remove(&id); },
919+
UsercallEvent::Cancelled(id, t) => if t.elapsed() < CANCEL_EXPIRY {
920+
match notifiers.remove(&id) {
921+
Some(notifier) => { let _ = notifier.send(()); },
922+
None => { cancels.insert(id, t); },
923+
}
924+
},
925+
}
926+
// cleanup expired cancels
927+
let now = Instant::now();
928+
cancels.retain(|_id, &mut t| now - t < CANCEL_EXPIRY);
929+
}
930+
});
931+
837932
let mut recv_queue = io_queue_receive.into_future();
838933
while let (Some(work), stream) = recv_queue.await {
839934
recv_queue = stream.into_future();
840935
let enclave_clone = enclave_clone.clone();
841936
let tx_return_channel = tx_return_channel.clone();
842937
match work {
843938
UsercallSendData::Async(usercall) => {
844-
let uchd = UsercallHandleData::Async(usercall);
939+
let usercall_event_tx = if usercall.ignore_cancel() { None } else { Some(usercall_event_tx.clone()) };
940+
let uchd = UsercallHandleData::Async(usercall, usercall_event_tx);
845941
let fut = Self::handle_usercall(enclave_clone, work_sender.clone(), tx_return_channel, uchd);
846942
tokio::task::spawn_local(fut);
847943
}
@@ -1416,7 +1512,7 @@ impl<'tcs> IOHandlerInput<'tcs> {
14161512
}
14171513

14181514
fn check_event_set(set: u64) -> IoResult<u8> {
1419-
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_UNPARK;
1515+
const EV_ALL: u64 = EV_USERCALLQ_NOT_FULL | EV_RETURNQ_NOT_EMPTY | EV_CANCELQ_NOT_FULL | EV_UNPARK;
14201516
if (set & !EV_ALL) != 0 {
14211517
return Err(IoErrorKind::InvalidInput.into());
14221518
}
@@ -1561,12 +1657,16 @@ impl<'tcs> IOHandlerInput<'tcs> {
15611657
&mut self,
15621658
usercall_queue: &mut FifoDescriptor<Usercall>,
15631659
return_queue: &mut FifoDescriptor<Return>,
1660+
cancel_queue: Option<&mut FifoDescriptor<Cancel>>,
15641661
) -> StdResult<(), EnclaveAbort<bool>> {
15651662
let fifo_descriptors = self.enclave.fifo_descriptors.lock().await.take();
15661663
match fifo_descriptors {
15671664
Some(fifo_descriptors) => {
15681665
*usercall_queue = fifo_descriptors.usercall_queue;
15691666
*return_queue = fifo_descriptors.return_queue;
1667+
if let Some(cancel_queue) = cancel_queue {
1668+
*cancel_queue = fifo_descriptors.cancel_queue;
1669+
}
15701670
Ok(())
15711671
}
15721672
None => Err(self.exit(true)),
@@ -1578,6 +1678,7 @@ impl<'tcs> IOHandlerInput<'tcs> {
15781678
enum Queue {
15791679
Usercall,
15801680
Return,
1681+
Cancel,
15811682
}
15821683

15831684
struct QueueSynchronizer {
@@ -1590,7 +1691,7 @@ struct QueueSynchronizer {
15901691
}
15911692

15921693
impl QueueSynchronizer {
1593-
fn new(enclave: Arc<EnclaveState>) -> (Self, Self, async_pubsub::Sender<()>) {
1694+
fn new(enclave: Arc<EnclaveState>) -> (Self, Self, Self, async_pubsub::Sender<()>) {
15941695
// This broadcast channel is used to notify enclave-runner of any
15951696
// synchronous usercalls made by the enclave for the purpose of
15961697
// synchronizing access to usercall and return queues.
@@ -1605,11 +1706,17 @@ impl QueueSynchronizer {
16051706
};
16061707
let return_queue_synchronizer = QueueSynchronizer {
16071708
queue: Queue::Return,
1709+
enclave: enclave.clone(),
1710+
subscription: Mutex::new(tx.subscribe()),
1711+
subscription_maker: tx.clone(),
1712+
};
1713+
let cancel_queue_synchronizer = QueueSynchronizer {
1714+
queue: Queue::Cancel,
16081715
enclave,
16091716
subscription: Mutex::new(tx.subscribe()),
16101717
subscription_maker: tx.clone(),
16111718
};
1612-
(usercall_queue_synchronizer, return_queue_synchronizer, tx)
1719+
(usercall_queue_synchronizer, return_queue_synchronizer, cancel_queue_synchronizer, tx)
16131720
}
16141721
}
16151722

@@ -1628,6 +1735,7 @@ impl ipc_queue::AsyncSynchronizer for QueueSynchronizer {
16281735
fn wait(&self, event: QueueEvent) -> Pin<Box<dyn Future<Output = StdResult<(), ipc_queue::SynchronizationError>> + '_>> {
16291736
match (self.queue, event) {
16301737
(Queue::Usercall, QueueEvent::NotFull) => panic!("enclave runner should not send on the usercall queue"),
1738+
(Queue::Cancel, QueueEvent::NotFull) => panic!("enclave runner should not send on the cancel queue"),
16311739
(Queue::Return, QueueEvent::NotEmpty) => panic!("enclave runner should not receive on the return queue"),
16321740
_ => {}
16331741
}
@@ -1646,12 +1754,14 @@ impl ipc_queue::AsyncSynchronizer for QueueSynchronizer {
16461754
fn notify(&self, event: QueueEvent) {
16471755
let ev = match (self.queue, event) {
16481756
(Queue::Usercall, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the usercall queue"),
1757+
(Queue::Cancel, QueueEvent::NotEmpty) => panic!("enclave runner should not send on the cancel queue"),
16491758
(Queue::Return, QueueEvent::NotFull) => panic!("enclave runner should not receive on the return queue"),
16501759
(Queue::Usercall, QueueEvent::NotFull) => EV_USERCALLQ_NOT_FULL,
1760+
(Queue::Cancel, QueueEvent::NotFull) => EV_CANCELQ_NOT_FULL,
16511761
(Queue::Return, QueueEvent::NotEmpty) => EV_RETURNQ_NOT_EMPTY,
16521762
};
16531763
// When the enclave needs to wait on a queue, it executes the wait() usercall synchronously,
1654-
// specifying EV_USERCALLQ_NOT_FULL, EV_RETURNQ_NOT_EMPTY, or both in the event_mask.
1764+
// specifying EV_USERCALLQ_NOT_FULL, EV_RETURNQ_NOT_EMPTY or EV_CANCELQ_NOT_FULL in the event_mask.
16551765
// Userspace will wake any or all threads waiting on the appropriate event when it is triggered.
16561766
for queue in self.enclave.event_queues.values() {
16571767
let _ = queue.unbounded_send(ev as _);

0 commit comments

Comments
 (0)