Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,086 changes: 0 additions & 1,086 deletions yew/src/agent.rs

This file was deleted.

190 changes: 190 additions & 0 deletions yew/src/agent/link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use super::*;
use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;

/// Defines communication from Worker to Consumers
pub(crate) trait Responder<AGN: Agent> {
/// Implementation for communication channel from Worker to Consumers
fn respond(&self, id: HandlerId, output: AGN::Output);
}

/// Link to agent's scope for creating callbacks.
pub struct AgentLink<AGN: Agent> {
scope: AgentScope<AGN>,
responder: Rc<dyn Responder<AGN>>,
}

impl<AGN: Agent> AgentLink<AGN> {
/// Create link for a scope.
pub(crate) fn connect<T>(scope: &AgentScope<AGN>, responder: T) -> Self
where
T: Responder<AGN> + 'static,
{
AgentLink {
scope: scope.clone(),
responder: Rc::new(responder),
}
}

/// Send response to an agent.
pub fn respond(&self, id: HandlerId, output: AGN::Output) {
self.responder.respond(id, output);
}

/// Create a callback which will send a message to the agent when invoked.
pub fn callback<F, IN>(&self, function: F) -> Callback<IN>
where
F: Fn(IN) -> AGN::Message + 'static,
{
let scope = self.scope.clone();
let closure = move |input| {
let output = function(input);
scope.send(AgentLifecycleEvent::Message(output));
};
closure.into()
}
}

impl<AGN: Agent> fmt::Debug for AgentLink<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentLink<_>")
}
}

impl<AGN: Agent> Clone for AgentLink<AGN> {
fn clone(&self) -> Self {
AgentLink {
scope: self.scope.clone(),
responder: self.responder.clone(),
}
}
}
/// This struct holds a reference to a component and to a global scheduler.
pub(crate) struct AgentScope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
}

impl<AGN: Agent> fmt::Debug for AgentScope<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentScope<_>")
}
}

impl<AGN: Agent> Clone for AgentScope<AGN> {
fn clone(&self) -> Self {
AgentScope {
shared_agent: self.shared_agent.clone(),
}
}
}

impl<AGN: Agent> AgentScope<AGN> {
/// Create agent scope
pub fn new() -> Self {
let shared_agent = Rc::new(RefCell::new(AgentRunnable::new()));
AgentScope { shared_agent }
}
/// Schedule message for sending to agent
pub fn send(&self, update: AgentLifecycleEvent<AGN>) {
let envelope = AgentEnvelope {
shared_agent: self.shared_agent.clone(),
update,
};
let runnable: Box<dyn Runnable> = Box::new(envelope);
scheduler().push(runnable);
}
}

impl<AGN: Agent> Default for AgentScope<AGN> {
fn default() -> Self {
Self::new()
}
}

struct AgentRunnable<AGN> {
agent: Option<AGN>,
// TODO(#939): Use agent field to control create message this flag
destroyed: bool,
}

impl<AGN> AgentRunnable<AGN> {
fn new() -> Self {
AgentRunnable {
agent: None,
destroyed: false,
}
}
}

/// Local Agent messages
#[derive(Debug)]
pub(crate) enum AgentLifecycleEvent<AGN: Agent> {
/// Request to create link
Create(AgentLink<AGN>),
/// Internal Agent message
Message(AGN::Message),
/// Client connected
Connected(HandlerId),
/// Received mesasge from Client
Input(AGN::Input, HandlerId),
/// Client disconnected
Disconnected(HandlerId),
/// Request to destroy agent
Destroy,
}

struct AgentEnvelope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
update: AgentLifecycleEvent<AGN>,
}

impl<AGN> Runnable for AgentEnvelope<AGN>
where
AGN: Agent,
{
fn run(self: Box<Self>) {
let mut this = self.shared_agent.borrow_mut();
if this.destroyed {
return;
}
match self.update {
AgentLifecycleEvent::Create(link) => {
this.agent = Some(AGN::create(link));
}
AgentLifecycleEvent::Message(msg) => {
this.agent
.as_mut()
.expect("agent was not created to process messages")
.update(msg);
}
AgentLifecycleEvent::Connected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentLifecycleEvent::Input(inp, id) => {
this.agent
.as_mut()
.expect("agent was not created to process inputs")
.handle_input(inp, id);
}
AgentLifecycleEvent::Disconnected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentLifecycleEvent::Destroy => {
let mut agent = this
.agent
.take()
.expect("trying to destroy not existent agent");
agent.destroy();
}
}
}
}
133 changes: 133 additions & 0 deletions yew/src/agent/local/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use super::*;
use crate::callback::Callback;
use crate::scheduler::Shared;
use anymap::{self, AnyMap};
use slab::Slab;
use std::cell::RefCell;
use std::rc::Rc;

thread_local! {
static LOCAL_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
}

/// Create a single instance in the current thread.
#[allow(missing_debug_implementations)]
pub struct Context;

impl Discoverer for Context {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
match pool.entry::<LocalAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
anymap::Entry::Vacant(entry) => {
let scope = AgentScope::<AGN>::new();
let launched = LocalAgent::new(&scope);
let responder = SlabResponder {
slab: launched.slab(),
};
scope_to_init = Some((scope, responder));
entry.insert(launched).create_bridge(callback)
}
}
});
if let Some((scope, responder)) = scope_to_init {
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
}
let upd = AgentLifecycleEvent::Connected(bridge.id);
bridge.scope.send(upd);
Box::new(bridge)
}
}

struct SlabResponder<AGN: Agent> {
slab: Shared<Slab<Option<Callback<AGN::Output>>>>,
}

impl<AGN: Agent> Responder<AGN> for SlabResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
locate_callback_and_respond::<AGN>(&self.slab, id, output);
}
}

impl Dispatchable for Context {}

struct ContextBridge<AGN: Agent> {
scope: AgentScope<AGN>,
id: HandlerId,
}

impl<AGN: Agent> Bridge<AGN> for ContextBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, self.id);
self.scope.send(upd);
}
}

impl<AGN: Agent> Drop for ContextBridge<AGN> {
fn drop(&mut self) {
let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = {
if let Some(launched) = pool.get_mut::<LocalAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};

if terminate_worker {
pool.remove::<LocalAgent<AGN>>();
}

terminate_worker
});

let upd = AgentLifecycleEvent::Disconnected(self.id);
self.scope.send(upd);

if terminate_worker {
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}
}

struct LocalAgent<AGN: Agent> {
scope: AgentScope<AGN>,
slab: SharedOutputSlab<AGN>,
}

impl<AGN: Agent> LocalAgent<AGN> {
pub fn new(scope: &AgentScope<AGN>) -> Self {
let slab = Rc::new(RefCell::new(Slab::new()));
LocalAgent {
scope: scope.clone(),
slab,
}
}

fn slab(&self) -> SharedOutputSlab<AGN> {
self.slab.clone()
}

fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> {
let respondable = callback.is_some();
let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable);
ContextBridge {
scope: self.scope.clone(),
id,
}
}

fn remove_bridge(&mut self, bridge: &ContextBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
54 changes: 54 additions & 0 deletions yew/src/agent/local/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::*;
use crate::callback::Callback;

const SINGLETON_ID: HandlerId = HandlerId(0, true);

/// Create an instance in the current thread.
#[allow(missing_debug_implementations)]
pub struct Job;

impl Discoverer for Job {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Job");
let scope = AgentScope::<AGN>::new();
let responder = CallbackResponder { callback };
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
let upd = AgentLifecycleEvent::Connected(SINGLETON_ID);
scope.send(upd);
let bridge = JobBridge { scope };
Box::new(bridge)
}
}

struct JobBridge<AGN: Agent> {
scope: AgentScope<AGN>,
}

impl<AGN: Agent> Bridge<AGN> for JobBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID);
self.scope.send(upd);
}
}

impl<AGN: Agent> Drop for JobBridge<AGN> {
fn drop(&mut self) {
let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID);
self.scope.send(upd);
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}

struct CallbackResponder<AGN: Agent> {
callback: Callback<AGN::Output>,
}

impl<AGN: Agent> Responder<AGN> for CallbackResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
self.callback.emit(output);
}
}
7 changes: 7 additions & 0 deletions yew/src/agent/local/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod context;
mod job;

use super::*;

pub use context::Context;
pub use job::Job;
Loading