Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 7 additions & 62 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,17 @@
use reactor::{Reactor};
use reactor::Reactor;
use token::Token;

#[allow(unused_variable)]
pub trait Handler<T: Token, T2, M: Send> {
fn readable(&mut self, reactor: &mut Reactor<T, T2, M>, token: T) {
pub trait Handler<T, M: Send> {
fn readable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
}

fn writable(&mut self, reactor: &mut Reactor<T, T2, M>, token: T) {
fn writable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
}

fn notify(&mut self, reactor: &mut Reactor<T, T2, M>, msg: M) {
fn notify(&mut self, reactor: &mut Reactor<T, M>, msg: M) {
}

fn timeout(&mut self, reactor: &mut Reactor<T, T2, M>, timeout: T2) {
}
}

pub trait Token : Copy {
fn from_u64(val: u64) -> Self;

fn to_u64(self) -> u64;
}

impl Token for int {
fn from_u64(val: u64) -> int {
val as int
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for uint {
fn from_u64(val: u64) -> uint {
val as uint
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for i64 {
fn from_u64(val: u64) -> i64 {
val as i64
}

fn to_u64(self) -> u64 {
self as u64
}
}

impl Token for u64 {
fn from_u64(val: u64) -> u64 {
val
}

fn to_u64(self) -> u64 {
self
}
}

impl Token for () {
fn from_u64(_: u64) -> () {
()
}

fn to_u64(self) -> u64 {
0
fn timeout(&mut self, reactor: &mut Reactor<T, M>, timeout: T) {
}
}
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ pub use timer::{
Timer,
Timeout,
};
pub use token::{
Token,
TOKEN_0,
TOKEN_1,
};

pub mod buf;
mod error;
Expand All @@ -65,3 +70,4 @@ mod reactor;
mod slab;
mod socket;
mod timer;
mod token;
4 changes: 2 additions & 2 deletions src/os/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ impl Selector {
}

/// Register event interests for the given IO handle with the OS
pub fn register(&mut self, io: &IoDesc, token: u64) -> MioResult<()> {
pub fn register(&mut self, io: &IoDesc, token: uint) -> MioResult<()> {
let interests = EPOLLIN | EPOLLOUT | EPOLLERR;

let info = EpollEvent {
events: interests | EPOLLET,
data: token
data: token as u64
};

epoll_ctl(self.epfd, EpollCtlAdd, io.fd, &info)
Expand Down
10 changes: 4 additions & 6 deletions src/os/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Selector {
Ok(())
}

pub fn register(&mut self, io: &IoDesc, token: u64) -> MioResult<()> {
pub fn register(&mut self, io: &IoDesc, token: uint) -> MioResult<()> {
let flag = EV_ADD | EV_CLEAR;

try!(self.ev_push(io, EVFILT_READ, flag, FilterFlag::empty(), token));
Expand All @@ -45,16 +45,15 @@ impl Selector {
filter: EventFilter,
flags: EventFlag,
fflags: FilterFlag,
token: u64) -> MioResult<()> {
token: uint) -> MioResult<()> {

// If the change buffer is full, flush it
try!(self.maybe_flush_changes());

let idx = self.changes.len;
let ev = &mut self.changes.events[idx];

// TODO: Don't cast to uint
ev_set(ev, io.fd as uint, filter, flags, fflags, token as uint);
ev_set(ev, io.fd as uint, filter, flags, fflags, token);

self.changes.len += 1;

Expand Down Expand Up @@ -121,8 +120,7 @@ impl Events {
}
}

// TODO: Don't cast
IoEvent::new(kind, token as u64)
IoEvent::new(kind, token)
}

#[inline]
Expand Down
13 changes: 7 additions & 6 deletions src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use error::MioResult;
use io::IoHandle;
use os;
use token::Token;

pub struct Poll {
selector: os::Selector,
Expand All @@ -15,11 +16,11 @@ impl Poll {
})
}

pub fn register<H: IoHandle>(&mut self, io: &H, token: u64) -> MioResult<()> {
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
debug!("registering IO with poller");

// Register interests for this socket
try!(self.selector.register(io.desc(), token));
try!(self.selector.register(io.desc(), token.as_uint()));

Ok(())
}
Expand Down Expand Up @@ -47,7 +48,7 @@ bitflags!(
#[deriving(Show)]
pub struct IoEvent {
kind: IoEventKind,
token: u64
token: Token
}

/// IoEvent represents the raw event that the OS-specific selector
Expand All @@ -58,14 +59,14 @@ pub struct IoEvent {
/// Selector when they have events to report.
impl IoEvent {
/// Create a new IoEvent.
pub fn new(kind: IoEventKind, token: u64) -> IoEvent {
pub fn new(kind: IoEventKind, token: uint) -> IoEvent {
IoEvent {
kind: kind,
token: token
token: Token(token)
}
}

pub fn token(&self) -> u64 {
pub fn token(&self) -> Token {
self.token
}

Expand Down
59 changes: 29 additions & 30 deletions src/reactor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::default::Default;
use std::u64;
use std::uint;
use error::{MioResult, MioError};
use handler::{Handler, Token};
use handler::Handler;
use io::{IoAcceptor, IoHandle};
use notify::Notify;
use os;
use poll::{Poll, IoEvent};
use socket::{Socket, SockAddr};
use timer::{Timer, Timeout, TimerResult};
use token::Token;

/// A lightweight IO reactor.
///
Expand Down Expand Up @@ -41,24 +42,24 @@ impl Default for ReactorConfig {
}
}

pub struct Reactor<T, T2, M: Send> {
pub struct Reactor<T, M: Send> {
run: bool,
poll: Poll,
timer: Timer<T2>,
timer: Timer<T>,
notify: Notify<M>,
config: ReactorConfig,
}

// Token used to represent notifications
static NOTIFY: u64 = u64::MAX;
static NOTIFY: Token = Token(uint::MAX);

impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
impl<T, M: Send> Reactor<T, M> {
/// Initializes a new reactor. The reactor will not be running yet.
pub fn new() -> MioResult<Reactor<T, T2, M>> {
pub fn new() -> MioResult<Reactor<T, M>> {
Reactor::configured(Default::default())
}

pub fn configured(config: ReactorConfig) -> MioResult<Reactor<T, T2, M>> {
pub fn configured(config: ReactorConfig) -> MioResult<Reactor<T, M>> {
// Create the IO poller
let mut poll = try!(Poll::new());

Expand Down Expand Up @@ -94,7 +95,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {

/// After the requested time interval, the handler's `timeout` function
/// will be called with the supplied token.
pub fn timeout_ms(&mut self, token: T2, delay: u64) -> TimerResult<Timeout> {
pub fn timeout_ms(&mut self, token: T, delay: u64) -> TimerResult<Timeout> {
self.timer.timeout_ms(token, delay)
}

Expand All @@ -116,8 +117,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

/// Registers an IO handle with the reactor.
pub fn register<H: IoHandle>(&mut self, io: &H, token: T) -> MioResult<()> {
self.poll.register(io, token.to_u64())
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
self.poll.register(io, token)
}

/// Connects the socket to the specified address. When the operation
Expand All @@ -127,9 +128,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the reactor would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: &S,
addr: &SockAddr, token: T) -> MioResult<()> {

pub fn connect<S: Socket>(&mut self, io: &S, addr: &SockAddr, token: Token) -> MioResult<()> {
debug!("socket connect; addr={}", addr);

// Attempt establishing the context. This may not complete immediately.
Expand All @@ -146,8 +145,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
Ok(())
}

pub fn listen<S, A: IoHandle + IoAcceptor<S>>(&mut self, io: &A, backlog: uint,
token: T) -> MioResult<()> {
pub fn listen<S, A: IoHandle + IoAcceptor<S>>(
&mut self, io: &A, backlog: uint, token: Token) -> MioResult<()> {

debug!("socket listen");

Expand All @@ -162,7 +161,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {

/// Keep spinning the reactor indefinitely, and notify the handler whenever
/// any of the registered handles are ready.
pub fn run<H: Handler<T, T2, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
self.run = true;

while self.run {
Expand All @@ -179,7 +178,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
/// Spin the reactor once, with a timeout of one second, and notify the
/// handler if any of the registered handles become ready during that
/// time.
pub fn run_once<H: Handler<T, T2, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run_once<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
// Execute a single tick
match self.tick(&mut handler) {
Err(e) => return Err(ReactorError::new(handler, e)),
Expand All @@ -190,7 +189,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

// Executes a single run of the reactor loop
fn tick<H: Handler<T, T2, M>>(&mut self, handler: &mut H) -> MioResult<()> {
fn tick<H: Handler<T, M>>(&mut self, handler: &mut H) -> MioResult<()> {
let mut messages;
let mut pending;

Expand Down Expand Up @@ -237,7 +236,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}

// Process IO events that have been previously polled
fn io_process<H: Handler<T, T2, M>>(&mut self, handler: &mut H, cnt: uint) {
fn io_process<H: Handler<T, M>>(&mut self, handler: &mut H, cnt: uint) {
let mut i = 0u;

// Iterate over the notifications. Each event provides the token
Expand All @@ -258,8 +257,8 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn io_event<H: Handler<T, T2, M>>(&mut self, handler: &mut H, evt: IoEvent) {
let tok = Token::from_u64(evt.token());
fn io_event<H: Handler<T, M>>(&mut self, handler: &mut H, evt: IoEvent) {
let tok = evt.token();

if evt.is_readable() {
handler.readable(self, tok);
Expand All @@ -274,7 +273,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn notify<H: Handler<T, T2, M>>(&mut self, handler: &mut H, mut cnt: uint) {
fn notify<H: Handler<T, M>>(&mut self, handler: &mut H, mut cnt: uint) {
while cnt > 0 {
let msg = self.notify.poll()
.expect("[BUG] at this point there should always be a message");
Expand All @@ -284,7 +283,7 @@ impl<T: Token, T2, M: Send> Reactor<T, T2, M> {
}
}

fn timer_process<H: Handler<T, T2, M>>(&mut self, handler: &mut H) {
fn timer_process<H: Handler<T, M>>(&mut self, handler: &mut H) {
let now = self.timer.now();

loop {
Expand Down Expand Up @@ -334,9 +333,9 @@ mod tests {
use std::sync::atomics::{AtomicInt, SeqCst};
use super::Reactor;
use io::{IoWriter, IoReader};
use {io, buf, Buf, Handler};
use {io, buf, Buf, Handler, Token};

type TestReactor = Reactor<u64, uint, ()>;
type TestReactor = Reactor<uint, ()>;

struct Funtimes {
rcount: Arc<AtomicInt>,
Expand All @@ -352,10 +351,10 @@ mod tests {
}
}

impl Handler<u64, uint, ()> for Funtimes {
fn readable(&mut self, _reactor: &mut TestReactor, token: u64) {
impl Handler<uint, ()> for Funtimes {
fn readable(&mut self, _reactor: &mut TestReactor, token: Token) {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, 10u64);
assert_eq!(token, Token(10));
}
}

Expand All @@ -370,7 +369,7 @@ mod tests {
let handler = Funtimes::new(rcount.clone(), wcount.clone());

writer.write(&mut buf::wrap("hello".as_bytes())).unwrap();
reactor.register(&reader, 10u64).unwrap();
reactor.register(&reader, Token(10)).unwrap();

let _ = reactor.run_once(handler);
let mut b = buf::ByteBuf::new(16);
Expand Down
Loading