Skip to content

Commit 42bce0b

Browse files
authored
feat: separate out backend block execution (#135)
* feat: new backend-driven block execution runtime * fix: rebase broken code * format * fix tests * revert to ssh buffer reader from main
1 parent e025824 commit 42bce0b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+12187
-31
lines changed

backend/Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ nix = { version = "0.29.0", features = ["signal"] }
3838
lazy_static = "1.5.0"
3939
shellexpand = "3.1.0"
4040
open = "5"
41+
reqwest = { version = "0.12", features = ["json"] }
42+
url = "2.4"
4143
klickhouse = { version = "0.14.0", features = ["tls"] }
4244
sha2 = "0.10"
4345
bigdecimal = "0.4"
@@ -66,6 +68,7 @@ minijinja = { version = "2.12.0", features = [
6668
"urlencode",
6769
"loop_controls",
6870
"key_interning",
71+
"builtins"
6972
] }
7073
walkdir = "2.5.0"
7174
thiserror = "2.0.11"
@@ -81,10 +84,10 @@ russh-config = "0.54.0" # For SSH config parsing with glob support
8184
dirs = "5.0" # For finding home directory
8285
glob = "0.3" # For general glob pattern matching
8386
base64 = "0.22"
87+
chrono = { version = "0.4", features = ["serde", "clock"] }
8488
async-trait = "0.1.89"
8589
notify-debouncer-full = { version = "0.6.0", features = ["serde"] }
8690
toml = "0.9.2"
87-
8891
ts-rs = { version = "11.0.1", features = ["serde-json-impl", "uuid-impl"] }
8992
json-digest = "0.0.16"
9093
trash = "5.2.2"
@@ -97,10 +100,17 @@ features = [
97100
"runtime-tokio",
98101
"tls-native-tls",
99102
"time",
103+
"chrono",
100104
"postgres",
101105
"mysql",
106+
"sqlite",
102107
"uuid",
108+
"chrono",
103109
"postgres",
110+
"mysql",
111+
"sqlite",
112+
"uuid",
113+
"bigdecimal",
104114
"bigdecimal",
105115
]
106116

backend/src/commands/blocks.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use tauri::{ipc::Channel, AppHandle, Manager, State};
2+
use uuid::Uuid;
3+
4+
use crate::commands::events::ChannelEventBus;
5+
use crate::runtime::blocks::handler::BlockOutput;
6+
use crate::runtime::blocks::registry::BlockRegistry;
7+
use crate::runtime::blocks::Block;
8+
use crate::runtime::workflow::context_builder::ContextBuilder;
9+
use crate::state::AtuinState;
10+
11+
/// Convert editor document block to runtime Block enum
12+
fn document_to_block(block_data: &serde_json::Value) -> Result<Block, String> {
13+
Block::from_document(block_data)
14+
}
15+
16+
#[tauri::command]
17+
pub async fn execute_block(
18+
state: State<'_, AtuinState>,
19+
app_handle: AppHandle,
20+
block_id: String,
21+
runbook_id: String,
22+
editor_document: Vec<serde_json::Value>,
23+
output_channel: Channel<BlockOutput>,
24+
) -> Result<String, String> {
25+
// Build execution context
26+
let mut context = ContextBuilder::build_context(&block_id, &editor_document, &runbook_id)
27+
.await
28+
.map_err(|e| e.to_string())?;
29+
30+
// Add SSH pool to context
31+
context.ssh_pool = Some(state.ssh_pool());
32+
33+
// Add output storage to context
34+
context.output_storage = Some(state.runbook_output_variables.clone());
35+
36+
// Add PTY store to context
37+
context.pty_store = Some(state.pty_store());
38+
39+
// Add event bus to context
40+
let gc_sender = state.gc_event_sender();
41+
let event_bus = std::sync::Arc::new(ChannelEventBus::new(gc_sender));
42+
context.event_bus = Some(event_bus);
43+
44+
// Find the block in the document
45+
let block_data = editor_document
46+
.iter()
47+
.find(|b| b.get("id").and_then(|v| v.as_str()) == Some(&block_id))
48+
.ok_or("Block not found")?;
49+
50+
// Convert document block to runtime block
51+
let block = document_to_block(block_data)?;
52+
53+
// Get event sender from state
54+
let event_sender = state.event_sender();
55+
56+
// Create registry and execute
57+
let registry = BlockRegistry::new();
58+
59+
match registry
60+
.execute_block(&block, context, event_sender, Some(output_channel))
61+
.await
62+
{
63+
Ok(handle) => {
64+
let execution_id = handle.id;
65+
// Store the execution handle for cancellation
66+
if let Some(state) = app_handle.try_state::<AtuinState>() {
67+
state
68+
.block_executions
69+
.write()
70+
.await
71+
.insert(execution_id, handle.clone());
72+
}
73+
Ok(execution_id.to_string())
74+
}
75+
Err(e) => Err(format!("Execution failed: {}", e)),
76+
}
77+
}
78+
79+
#[tauri::command]
80+
pub async fn cancel_block_execution(
81+
app_handle: AppHandle,
82+
execution_id: String,
83+
) -> Result<(), String> {
84+
let execution_uuid = Uuid::parse_str(&execution_id).map_err(|e| e.to_string())?;
85+
86+
if let Some(state) = app_handle.try_state::<AtuinState>() {
87+
let mut executions = state.block_executions.write().await;
88+
if let Some(handle) = executions.remove(&execution_uuid) {
89+
// Cancel the execution
90+
handle.cancellation_token.cancel();
91+
Ok(())
92+
} else {
93+
Err("Execution not found".to_string())
94+
}
95+
} else {
96+
Err("State not available".to_string())
97+
}
98+
}

backend/src/commands/events.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use async_trait::async_trait;
2+
use tauri::{ipc::Channel, AppHandle, Manager};
3+
use tokio::sync::mpsc;
4+
5+
use crate::runtime::events::{EventBus, GCEvent};
6+
use crate::state::AtuinState;
7+
8+
/// Channel-based event bus implementation that forwards events to Tauri channels
9+
pub struct ChannelEventBus {
10+
sender: mpsc::UnboundedSender<GCEvent>,
11+
}
12+
13+
impl ChannelEventBus {
14+
pub fn new(sender: mpsc::UnboundedSender<GCEvent>) -> Self {
15+
Self { sender }
16+
}
17+
}
18+
19+
#[async_trait]
20+
impl EventBus for ChannelEventBus {
21+
async fn emit(&self, event: GCEvent) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
22+
self.sender
23+
.send(event)
24+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
25+
Ok(())
26+
}
27+
}
28+
29+
/// Subscribe to the Grand Central event stream
30+
#[tauri::command]
31+
pub async fn subscribe_to_events(
32+
app_handle: AppHandle,
33+
event_channel: Channel<GCEvent>,
34+
) -> Result<(), String> {
35+
let state = app_handle
36+
.try_state::<AtuinState>()
37+
.ok_or("State not available")?;
38+
39+
// Get the event receiver from state
40+
let mut receiver = state
41+
.event_receiver
42+
.lock()
43+
.await
44+
.take()
45+
.ok_or("Event receiver already taken or not available")?;
46+
47+
// Spawn task to forward events to the channel
48+
tokio::spawn(async move {
49+
while let Some(event) = receiver.recv().await {
50+
if let Err(e) = event_channel.send(event) {
51+
eprintln!("Failed to send event to frontend: {}", e);
52+
break;
53+
}
54+
}
55+
});
56+
57+
Ok(())
58+
}

backend/src/commands/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
pub(crate) mod block_state;
2+
pub(crate) mod blocks;
23
pub(crate) mod dependency;
4+
pub(crate) mod events;
35
pub(crate) mod exec_log;
46
pub(crate) mod feedback;
57
pub(crate) mod kubernetes;

backend/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,9 @@ fn main() {
503503
commands::mysql::mysql_query,
504504
commands::mysql::mysql_execute,
505505
commands::kubernetes::kubernetes_get_execute,
506+
commands::blocks::execute_block,
507+
commands::blocks::cancel_block_execution,
508+
commands::events::subscribe_to_events,
506509
commands::updates::check_for_updates,
507510
commands::workspaces::copy_welcome_workspace,
508511
commands::workspaces::reset_workspaces,

backend/src/pty.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ use bytes::Bytes;
99
use eyre::{eyre, Result};
1010
use portable_pty::{CommandBuilder, MasterPty, PtySize};
1111
use serde::{Deserialize, Serialize};
12+
use ts_rs::TS;
1213
use uuid::Uuid;
1314

1415
use crate::runtime::pty_store::PtyLike;
1516

16-
#[derive(Clone, Deserialize, Serialize, Debug)]
17+
#[derive(Clone, Deserialize, Serialize, Debug, TS)]
18+
#[ts(export)]
1719
pub struct PtyMetadata {
1820
pub pid: Uuid,
1921
pub runbook: Uuid,

backend/src/runtime/blocks/clickhouse.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
22
use typed_builder::TypedBuilder;
33
use uuid::Uuid;
44

5+
use super::FromDocument;
6+
57
#[derive(Debug, Serialize, Deserialize, Clone, TypedBuilder)]
68
#[serde(rename_all = "camelCase")]
79
pub struct Clickhouse {
@@ -20,3 +22,52 @@ pub struct Clickhouse {
2022
#[builder(default = 0)]
2123
pub auto_refresh: i32,
2224
}
25+
26+
impl FromDocument for Clickhouse {
27+
fn from_document(block_data: &serde_json::Value) -> Result<Self, String> {
28+
let block_id = block_data
29+
.get("id")
30+
.and_then(|v| v.as_str())
31+
.ok_or("Block has no id")?;
32+
33+
let props = block_data
34+
.get("props")
35+
.and_then(|p| p.as_object())
36+
.ok_or("Block has no props")?;
37+
38+
let id = Uuid::parse_str(block_id).map_err(|e| e.to_string())?;
39+
40+
let clickhouse = Clickhouse::builder()
41+
.id(id)
42+
.name(
43+
props
44+
.get("name")
45+
.and_then(|v| v.as_str())
46+
.unwrap_or("ClickHouse Query")
47+
.to_string(),
48+
)
49+
.query(
50+
props
51+
.get("query")
52+
.and_then(|v| v.as_str())
53+
.unwrap_or("")
54+
.to_string(),
55+
)
56+
.uri(
57+
props
58+
.get("uri")
59+
.and_then(|v| v.as_str())
60+
.unwrap_or("")
61+
.to_string(),
62+
)
63+
.auto_refresh(
64+
props
65+
.get("autoRefresh")
66+
.and_then(|v| v.as_i64())
67+
.unwrap_or(0) as i32,
68+
)
69+
.build();
70+
71+
Ok(clickhouse)
72+
}
73+
}

0 commit comments

Comments
 (0)