Skip to content

Commit 5eeef48

Browse files
committed
feat: 🎸 add watch mode #12
Closes: #12
1 parent 66a850e commit 5eeef48

File tree

7 files changed

+122
-73
lines changed

7 files changed

+122
-73
lines changed

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ Table Sources list endpoint is available at `/index.json`
8585
curl localhost:3000/index.json
8686
```
8787

88+
**Note**: if in `watch` mode, this will rescan database for table sources.
89+
8890
### Table Source TileJSON
8991

9092
Table Source [TileJSON](https://github.com/mapbox/tilejson-spec) endpoint is available at `/{schema_name}.{table_name}.json`.
@@ -146,6 +148,8 @@ Function Sources list endpoint is available at `/rpc/index.json`
146148
curl localhost:3000/rpc/index.json
147149
```
148150

151+
**Note**: if in `watch` mode, this will rescan database for function sources.
152+
149153
### Function Source TileJSON
150154

151155
Function Source [TileJSON](https://github.com/mapbox/tilejson-spec) endpoint is available at `/rpc/{schema_name}.{function_name}.json`
@@ -179,11 +183,12 @@ Usage:
179183
Options:
180184
-h --help Show this screen.
181185
-v --version Show version.
182-
--workers=<n> Number of web server workers.
183-
--pool_size=<n> Maximum connections pool size [default: 20].
186+
--config=<path> Path to config file.
184187
--keep_alive=<n> Connection keep alive timeout [default: 75].
185188
--listen_addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
186-
--config=<path> Path to config file.
189+
--pool_size=<n> Maximum connections pool size [default: 20].
190+
--watch Scan for new sources on sources list requests
191+
--workers=<n> Number of web server workers.
187192
```
188193

189194
## Environment Variables
@@ -194,8 +199,9 @@ You can also configure martin using environment variables
194199
| -------------------- | -------------------------------- | ----------------------------- |
195200
| DATABASE_URL | postgres://postgres@localhost/db | postgres database connection |
196201
| DATABASE_POOL_SIZE | 20 | maximum connections pool size |
197-
| WORKER_PROCESSES | 8 | number of web server workers |
198202
| KEEP_ALIVE | 75 | connection keep alive timeout |
203+
| WATCH_MODE | true | scan for new sources |
204+
| WORKER_PROCESSES | 8 | number of web server workers |
199205

200206
## Configuration File
201207

@@ -208,6 +214,9 @@ martin --config config.yaml
208214
You can find an example of a configuration file [here](https://github.com/urbica/martin/blob/master/tests/config.yaml).
209215

210216
```yaml
217+
# Database connection string
218+
connection_string: 'postgres://postgres@localhost/db'
219+
211220
# Maximum connections pool size [default: 20]
212221
pool_size: 20
213222

@@ -220,6 +229,9 @@ worker_processes: 8
220229
# The socket address to bind [default: 0.0.0.0:3000]
221230
listen_addresses: '0.0.0.0:3000'
222231

232+
# Enable watch mode
233+
watch: true
234+
223235
# associative arrays of table sources
224236
table_sources:
225237
public.table_source:

src/app.rs

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use actix::*;
22
use actix_web::*;
3-
use futures::future::Future;
3+
use futures::future::{result, Future};
44
use std::cell::RefCell;
55
use std::collections::HashMap;
66
use std::rc::Rc;
@@ -20,33 +20,49 @@ pub struct State {
2020
coordinator: Addr<CoordinatorActor>,
2121
table_sources: Rc<RefCell<Option<TableSources>>>,
2222
function_sources: Rc<RefCell<Option<FunctionSources>>>,
23+
watch_mode: bool,
2324
}
2425

2526
fn get_table_sources(
2627
req: &HttpRequest<State>,
2728
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
2829
let state = &req.state();
29-
let coordinator = state.coordinator.clone();
30+
if state.watch_mode {
31+
info!("Scanning database for table sources");
32+
let coordinator = state.coordinator.clone();
33+
let result = req.state().db.send(messages::GetTableSources {});
34+
35+
let response = result
36+
.from_err()
37+
.and_then(move |res| match res {
38+
Ok(table_sources) => {
39+
coordinator.do_send(messages::RefreshTableSources {
40+
table_sources: Some(table_sources.clone()),
41+
});
42+
43+
Ok(HttpResponse::Ok()
44+
.header("Access-Control-Allow-Origin", "*")
45+
.json(table_sources))
46+
}
47+
Err(_) => Ok(HttpResponse::InternalServerError().into()),
48+
})
49+
.responder();
3050

31-
let result = req.state().db.send(messages::GetTableSources {});
51+
return Ok(response);
52+
}
3253

33-
let response = result
34-
.from_err()
35-
.and_then(move |res| match res {
36-
Ok(table_sources) => {
37-
coordinator.do_send(messages::RefreshTableSources {
38-
table_sources: Some(table_sources.clone()),
39-
});
54+
let table_sources = state
55+
.table_sources
56+
.borrow()
57+
.clone()
58+
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
4059

41-
Ok(HttpResponse::Ok()
42-
.header("Access-Control-Allow-Origin", "*")
43-
.json(table_sources))
44-
}
45-
Err(_) => Ok(HttpResponse::InternalServerError().into()),
46-
})
47-
.responder();
60+
let http_response = HttpResponse::Ok()
61+
.header("Access-Control-Allow-Origin", "*")
62+
.json(table_sources);
4863

49-
Ok(response)
64+
let response = result(Ok(http_response)).responder();
65+
return Ok(response);
5066
}
5167

5268
fn get_table_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
@@ -138,26 +154,42 @@ fn get_function_sources(
138154
req: &HttpRequest<State>,
139155
) -> Result<Box<Future<Item = HttpResponse, Error = Error>>> {
140156
let state = &req.state();
141-
let coordinator = state.coordinator.clone();
157+
if state.watch_mode {
158+
info!("Scanning database for function sources");
159+
let coordinator = state.coordinator.clone();
160+
161+
let result = req.state().db.send(messages::GetFunctionSources {});
162+
let response = result
163+
.from_err()
164+
.and_then(move |res| match res {
165+
Ok(function_sources) => {
166+
coordinator.do_send(messages::RefreshFunctionSources {
167+
function_sources: Some(function_sources.clone()),
168+
});
169+
170+
Ok(HttpResponse::Ok()
171+
.header("Access-Control-Allow-Origin", "*")
172+
.json(function_sources))
173+
}
174+
Err(_) => Ok(HttpResponse::InternalServerError().into()),
175+
})
176+
.responder();
142177

143-
let result = req.state().db.send(messages::GetFunctionSources {});
144-
let response = result
145-
.from_err()
146-
.and_then(move |res| match res {
147-
Ok(function_sources) => {
148-
coordinator.do_send(messages::RefreshFunctionSources {
149-
function_sources: Some(function_sources.clone()),
150-
});
178+
return Ok(response);
179+
}
151180

152-
Ok(HttpResponse::Ok()
153-
.header("Access-Control-Allow-Origin", "*")
154-
.json(function_sources))
155-
}
156-
Err(_) => Ok(HttpResponse::InternalServerError().into()),
157-
})
158-
.responder();
181+
let function_sources = state
182+
.function_sources
183+
.borrow()
184+
.clone()
185+
.ok_or_else(|| error::ErrorNotFound("There is no table sources"))?;
159186

160-
Ok(response)
187+
let http_response = HttpResponse::Ok()
188+
.header("Access-Control-Allow-Origin", "*")
189+
.json(function_sources);
190+
191+
let response = result(Ok(http_response)).responder();
192+
return Ok(response);
161193
}
162194

163195
fn get_function_source(req: &HttpRequest<State>) -> Result<HttpResponse> {
@@ -248,6 +280,7 @@ pub fn new(
248280
coordinator: Addr<CoordinatorActor>,
249281
table_sources: Option<TableSources>,
250282
function_sources: Option<FunctionSources>,
283+
watch_mode: bool,
251284
) -> App<State> {
252285
let table_sources_rc = Rc::new(RefCell::new(table_sources));
253286
let function_sources_rc = Rc::new(RefCell::new(function_sources));
@@ -265,6 +298,7 @@ pub fn new(
265298
coordinator,
266299
table_sources: table_sources_rc.clone(),
267300
function_sources: function_sources_rc.clone(),
301+
watch_mode,
268302
};
269303

270304
App::with_state(state)
@@ -328,6 +362,7 @@ mod tests {
328362
State {
329363
db,
330364
coordinator,
365+
watch_mode: true,
331366
table_sources: table_sources_rc.clone(),
332367
function_sources: function_sources_rc.clone(),
333368
}

src/cli.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,23 @@ Usage:
99
Options:
1010
-h --help Show this screen.
1111
-v --version Show version.
12-
--workers=<n> Number of web server workers.
13-
--pool_size=<n> Maximum connections pool size [default: 20].
12+
--config=<path> Path to config file.
1413
--keep_alive=<n> Connection keep alive timeout [default: 75].
1514
--listen_addresses=<n> The socket address to bind [default: 0.0.0.0:3000].
16-
--config=<path> Path to config file.
15+
--pool_size=<n> Maximum connections pool size [default: 20].
16+
--watch Scan for new sources on sources list requests
17+
--workers=<n> Number of web server workers.
1718
";
1819

1920
#[derive(Debug, Deserialize)]
2021
pub struct Args {
22+
pub arg_connection: Option<String>,
23+
pub flag_config: Option<String>,
2124
pub flag_help: bool,
22-
pub flag_version: bool,
23-
pub flag_workers: Option<usize>,
24-
pub flag_pool_size: Option<u32>,
2525
pub flag_keep_alive: Option<usize>,
2626
pub flag_listen_addresses: Option<String>,
27-
pub flag_config: Option<String>,
28-
pub arg_connection: Option<String>,
27+
pub flag_pool_size: Option<u32>,
28+
pub flag_watch: bool,
29+
pub flag_version: bool,
30+
pub flag_workers: Option<usize>,
2931
}

src/config.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use super::table_source::{get_table_sources, TableSources};
1212

1313
#[derive(Clone, Debug, Serialize)]
1414
pub struct Config {
15+
pub watch: bool,
1516
pub pool_size: u32,
1617
pub keep_alive: usize,
1718
pub worker_processes: usize,
@@ -23,6 +24,7 @@ pub struct Config {
2324

2425
#[derive(Deserialize)]
2526
struct ConfigBuilder {
27+
pub watch: Option<bool>,
2628
pub pool_size: Option<u32>,
2729
pub keep_alive: Option<usize>,
2830
pub worker_processes: Option<usize>,
@@ -35,6 +37,7 @@ struct ConfigBuilder {
3537
impl ConfigBuilder {
3638
pub fn finalize(self) -> Config {
3739
Config {
40+
watch: self.watch.unwrap_or(false),
3841
pool_size: self.pool_size.unwrap_or(20),
3942
keep_alive: self.keep_alive.unwrap_or(75),
4043
worker_processes: self.worker_processes.unwrap_or_else(num_cpus::get),
@@ -59,16 +62,6 @@ pub fn read_config(file_name: &str) -> io::Result<Config> {
5962
Ok(config_builder.finalize())
6063
}
6164

62-
// pub fn write_config(file_name: &str, config: Config) -> io::Result<()> {
63-
// let mut file = File::create(file_name)?;
64-
65-
// let config = serde_yaml::to_string(&config)
66-
// .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
67-
68-
// file.write_all(config.as_bytes())?;
69-
// Ok(())
70-
// }
71-
7265
pub fn generate_config(
7366
args: Args,
7467
connection_string: String,
@@ -82,6 +75,7 @@ pub fn generate_config(
8275
let function_sources = get_function_sources(&conn)?;
8376

8477
let config = ConfigBuilder {
78+
watch: Some(args.flag_watch),
8579
keep_alive: args.flag_keep_alive,
8680
listen_addresses: args.flag_listen_addresses,
8781
connection_string: connection_string,
@@ -94,10 +88,3 @@ pub fn generate_config(
9488
let config = config.finalize();
9589
Ok(config)
9690
}
97-
98-
// pub fn to_string(config: Config) -> io::Result<String> {
99-
// let config = serde_yaml::to_string(&config)
100-
// .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description()))?;
101-
102-
// Ok(config)
103-
// }

src/main.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,9 @@ fn setup_from_database(args: Args) -> Result<(Config, PostgresPool), std::io::Er
7575
fn start(args: Args) -> Result<actix::SystemRunner, std::io::Error> {
7676
info!("Starting martin v{}", VERSION);
7777

78-
let (config, pool) = if args.flag_config.is_some() {
79-
let file_name = args.flag_config.unwrap();
78+
let config_file_name = args.flag_config.clone();
79+
let (config, pool) = if config_file_name.is_some() {
80+
let file_name = config_file_name.clone().unwrap();
8081
info!("Using {}", file_name);
8182
setup_from_config(file_name)?
8283
} else {
@@ -91,8 +92,13 @@ fn start(args: Args) -> Result<actix::SystemRunner, std::io::Error> {
9192
std::process::exit(-1);
9293
}
9394

95+
let watch_mode = config.watch || env::var_os("WATCH_MODE").is_some();
96+
if watch_mode {
97+
info!("Watch mode enabled");
98+
}
99+
94100
let listen_addresses = config.listen_addresses.clone();
95-
let server = server::new(config, pool);
101+
let server = server::new(pool, config, watch_mode);
96102
info!("Martin has been started on {}.", listen_addresses);
97103

98104
Ok(server)

src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use super::coordinator_actor::CoordinatorActor;
77
use super::db::PostgresPool;
88
use super::db_executor::DbExecutor;
99

10-
pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
10+
pub fn new(pool: PostgresPool, config: Config, watch_mode: bool) -> SystemRunner {
1111
let server = System::new("server");
1212

1313
let db = SyncArbiter::start(3, move || DbExecutor(pool.clone()));
@@ -23,6 +23,7 @@ pub fn new(config: Config, pool: PostgresPool) -> SystemRunner {
2323
coordinator.clone(),
2424
config.table_sources.clone(),
2525
config.function_sources.clone(),
26+
watch_mode,
2627
)
2728
})
2829
.bind(listen_addresses.clone())

tests/config.yaml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
---
2-
# Maximum connections pool size [default: 20]
3-
pool_size: 20
2+
# Database connection string
3+
connection_string: "postgres://postgres@localhost/test"
44

55
# Connection keep alive timeout [default: 75]
66
keep_alive: 75
77

8+
# The socket address to bind [default: 0.0.0.0:3000]
9+
listen_addresses: "0.0.0.0:3000"
10+
11+
# Maximum connections pool size [default: 20]
12+
pool_size: 20
13+
14+
# Enable watch mode
15+
watch: false
16+
817
# Number of web server workers
918
worker_processes: 8
1019

11-
# The socket address to bind [default: 0.0.0.0:3000]
12-
listen_addresses: '0.0.0.0:3000'
13-
1420
# associative arrays of table sources
1521
table_sources:
1622
public.table_source:

0 commit comments

Comments
 (0)