Skip to content

Commit 17eb99b

Browse files
authored
feat: allow manual table flush through HTTP API (#1184)
1 parent cd8be77 commit 17eb99b

File tree

17 files changed

+239
-33
lines changed

17 files changed

+239
-33
lines changed

src/datanode/src/server/grpc.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,16 @@ impl Instance {
8484
}
8585

8686
pub(crate) async fn handle_flush_table(&self, expr: FlushTableExpr) -> Result<Output> {
87+
let table_name = if expr.table_name.trim().is_empty() {
88+
None
89+
} else {
90+
Some(expr.table_name)
91+
};
92+
8793
let req = FlushTableRequest {
8894
catalog_name: expr.catalog_name,
8995
schema_name: expr.schema_name,
90-
table_name: expr.table_name,
96+
table_name,
9197
region_number: expr.region_id,
9298
};
9399
self.sql_handler()
@@ -148,7 +154,6 @@ mod tests {
148154
}
149155

150156
#[test]
151-
152157
fn test_create_column_schema() {
153158
let column_def = ColumnDef {
154159
name: "a".to_string(),

src/datanode/src/sql/flush_table.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,28 +13,66 @@
1313
// limitations under the License.
1414

1515
use common_query::Output;
16-
use snafu::ResultExt;
16+
use snafu::{OptionExt, ResultExt};
1717
use table::engine::TableReference;
1818
use table::requests::FlushTableRequest;
1919

20-
use crate::error::{self, Result};
20+
use crate::error::{self, CatalogSnafu, DatabaseNotFoundSnafu, Result};
2121
use crate::sql::SqlHandler;
2222

2323
impl SqlHandler {
2424
pub(crate) async fn flush_table(&self, req: FlushTableRequest) -> Result<Output> {
25+
if let Some(table) = &req.table_name {
26+
self.flush_table_inner(
27+
&req.catalog_name,
28+
&req.schema_name,
29+
table,
30+
req.region_number,
31+
)
32+
.await?;
33+
} else {
34+
let schema = self
35+
.catalog_manager
36+
.schema(&req.catalog_name, &req.schema_name)
37+
.context(CatalogSnafu)?
38+
.context(DatabaseNotFoundSnafu {
39+
catalog: &req.catalog_name,
40+
schema: &req.schema_name,
41+
})?;
42+
43+
let all_table_names = schema.table_names().context(CatalogSnafu)?;
44+
futures::future::join_all(all_table_names.iter().map(|table| {
45+
self.flush_table_inner(
46+
&req.catalog_name,
47+
&req.schema_name,
48+
table,
49+
req.region_number,
50+
)
51+
}))
52+
.await
53+
.into_iter()
54+
.collect::<Result<Vec<_>>>()?;
55+
}
56+
Ok(Output::AffectedRows(0))
57+
}
58+
59+
async fn flush_table_inner(
60+
&self,
61+
catalog: &str,
62+
schema: &str,
63+
table: &str,
64+
region: Option<u32>,
65+
) -> Result<()> {
2566
let table_ref = TableReference {
26-
catalog: &req.catalog_name,
27-
schema: &req.schema_name,
28-
table: &req.table_name,
67+
catalog,
68+
schema,
69+
table,
2970
};
3071

3172
let full_table_name = table_ref.to_string();
32-
3373
let table = self.get_table(&table_ref)?;
34-
35-
table.flush(req).await.context(error::FlushTableSnafu {
74+
table.flush(region).await.context(error::FlushTableSnafu {
3675
table_name: full_table_name,
37-
})?;
38-
Ok(Output::AffectedRows(0))
76+
})
3977
}
4078
}

src/frontend/src/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ impl Services {
152152

153153
let mut http_server = HttpServer::new(
154154
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
155+
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
155156
http_options.clone(),
156157
);
157158
if let Some(user_provider) = user_provider.clone() {

src/mito/src/engine/tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -793,10 +793,10 @@ async fn test_flush_table_all_regions() {
793793
assert!(!has_parquet_file(&region_dir));
794794

795795
// Trigger flush all region
796-
table.flush(FlushTableRequest::default()).await.unwrap();
796+
table.flush(None).await.unwrap();
797797

798798
// Trigger again, wait for the previous task finished
799-
table.flush(FlushTableRequest::default()).await.unwrap();
799+
table.flush(None).await.unwrap();
800800

801801
assert!(has_parquet_file(&region_dir));
802802
}
@@ -832,10 +832,10 @@ async fn test_flush_table_with_region_id() {
832832
};
833833

834834
// Trigger flush all region
835-
table.flush(req.clone()).await.unwrap();
835+
table.flush(req.region_number).await.unwrap();
836836

837837
// Trigger again, wait for the previous task finished
838-
table.flush(req).await.unwrap();
838+
table.flush(req.region_number).await.unwrap();
839839

840840
assert!(has_parquet_file(&region_dir));
841841
}

src/mito/src/table.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use table::metadata::{
4444
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
4545
};
4646
use table::requests::{
47-
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
47+
AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest,
4848
};
4949
use table::table::scan::SimpleTableScan;
5050
use table::table::{AlterContext, RegionStat, Table};
@@ -323,9 +323,9 @@ impl<R: Region> Table for MitoTable<R> {
323323
Ok(rows_deleted)
324324
}
325325

326-
async fn flush(&self, request: FlushTableRequest) -> TableResult<()> {
327-
if let Some(region_id) = request.region_number {
328-
if let Some(region) = self.regions.get(&region_id) {
326+
async fn flush(&self, region_number: Option<RegionNumber>) -> TableResult<()> {
327+
if let Some(region_number) = region_number {
328+
if let Some(region) = self.regions.get(&region_number) {
329329
region
330330
.flush()
331331
.await

src/servers/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,12 @@ pub enum Error {
263263
#[snafu(backtrace)]
264264
source: common_mem_prof::error::Error,
265265
},
266+
266267
#[snafu(display("Invalid prepare statement: {}", err_msg))]
267268
InvalidPrepareStatement { err_msg: String },
269+
270+
#[snafu(display("Invalid flush argument: {}", err_msg))]
271+
InvalidFlushArgument { err_msg: String },
268272
}
269273

270274
pub type Result<T> = std::result::Result<T, Error>;
@@ -327,6 +331,7 @@ impl ErrorExt for Error {
327331
DatabaseNotFound { .. } => StatusCode::DatabaseNotFound,
328332
#[cfg(feature = "mem-prof")]
329333
DumpProfileData { source, .. } => source.status_code(),
334+
InvalidFlushArgument { .. } => StatusCode::InvalidArguments,
330335
}
331336
}
332337

src/servers/src/http.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod opentsdb;
1919
pub mod prometheus;
2020
pub mod script;
2121

22+
mod admin;
2223
#[cfg(feature = "mem-prof")]
2324
pub mod mem_prof;
2425

@@ -56,6 +57,8 @@ use self::authorize::HttpAuth;
5657
use self::influxdb::{influxdb_health, influxdb_ping, influxdb_write};
5758
use crate::auth::UserProviderRef;
5859
use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu};
60+
use crate::http::admin::flush;
61+
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
5962
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
6063
use crate::query_handler::{
6164
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef,
@@ -96,6 +99,7 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"]
9699

97100
pub struct HttpServer {
98101
sql_handler: ServerSqlQueryHandlerRef,
102+
grpc_handler: ServerGrpcQueryHandlerRef,
99103
options: HttpOptions,
100104
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
101105
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
@@ -349,9 +353,14 @@ pub struct ApiState {
349353
}
350354

351355
impl HttpServer {
352-
pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self {
356+
pub fn new(
357+
sql_handler: ServerSqlQueryHandlerRef,
358+
grpc_handler: ServerGrpcQueryHandlerRef,
359+
options: HttpOptions,
360+
) -> Self {
353361
Self {
354362
sql_handler,
363+
grpc_handler,
355364
options,
356365
opentsdb_handler: None,
357366
influxdb_handler: None,
@@ -426,6 +435,10 @@ impl HttpServer {
426435
.layer(Extension(api));
427436

428437
let mut router = Router::new().nest(&format!("/{HTTP_API_VERSION}"), sql_router);
438+
router = router.nest(
439+
&format!("/{HTTP_API_VERSION}/admin"),
440+
self.route_admin(self.grpc_handler.clone()),
441+
);
429442

430443
if let Some(opentsdb_handler) = self.opentsdb_handler.clone() {
431444
router = router.nest(
@@ -517,6 +530,12 @@ impl HttpServer {
517530
.route("/api/put", routing::post(opentsdb::put))
518531
.with_state(opentsdb_handler)
519532
}
533+
534+
fn route_admin<S>(&self, grpc_handler: ServerGrpcQueryHandlerRef) -> Router<S> {
535+
Router::new()
536+
.route("/flush", routing::post(flush))
537+
.with_state(grpc_handler)
538+
}
520539
}
521540

522541
pub const HTTP_SERVER: &str = "HTTP_SERVER";
@@ -578,6 +597,7 @@ mod test {
578597
use std::future::pending;
579598
use std::sync::Arc;
580599

600+
use api::v1::greptime_request::Request;
581601
use axum::handler::Handler;
582602
use axum::http::StatusCode;
583603
use axum::routing::get;
@@ -592,12 +612,26 @@ mod test {
592612

593613
use super::*;
594614
use crate::error::Error;
615+
use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor};
595616
use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler};
596617

597618
struct DummyInstance {
598619
_tx: mpsc::Sender<(String, Vec<u8>)>,
599620
}
600621

622+
#[async_trait]
623+
impl GrpcQueryHandler for DummyInstance {
624+
type Error = Error;
625+
626+
async fn do_query(
627+
&self,
628+
_query: Request,
629+
_ctx: QueryContextRef,
630+
) -> std::result::Result<Output, Self::Error> {
631+
unimplemented!()
632+
}
633+
}
634+
601635
#[async_trait]
602636
impl SqlQueryHandler for DummyInstance {
603637
type Error = Error;
@@ -637,8 +671,10 @@ mod test {
637671

638672
fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
639673
let instance = Arc::new(DummyInstance { _tx: tx });
640-
let instance = ServerSqlQueryHandlerAdaptor::arc(instance);
641-
let server = HttpServer::new(instance, HttpOptions::default());
674+
let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
675+
let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance);
676+
677+
let server = HttpServer::new(sql_instance, grpc_instance, HttpOptions::default());
642678
server.make_app().route(
643679
"/test/timeout",
644680
get(forever.layer(

src/servers/src/http/admin.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
17+
use api::v1::ddl_request::Expr;
18+
use api::v1::greptime_request::Request;
19+
use api::v1::{DdlRequest, FlushTableExpr};
20+
use axum::extract::{Query, RawBody, State};
21+
use axum::http::StatusCode as HttpStatusCode;
22+
use axum::Json;
23+
use session::context::QueryContext;
24+
use snafu::OptionExt;
25+
26+
use crate::error;
27+
use crate::error::Result;
28+
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
29+
30+
#[axum_macros::debug_handler]
31+
pub async fn flush(
32+
State(grpc_handler): State<ServerGrpcQueryHandlerRef>,
33+
Query(params): Query<HashMap<String, String>>,
34+
RawBody(_): RawBody,
35+
) -> Result<(HttpStatusCode, Json<String>)> {
36+
let catalog_name = params
37+
.get("catalog_name")
38+
.cloned()
39+
.unwrap_or("greptime".to_string());
40+
let schema_name =
41+
params
42+
.get("schema_name")
43+
.cloned()
44+
.context(error::InvalidFlushArgumentSnafu {
45+
err_msg: "schema_name is not present",
46+
})?;
47+
48+
// if table name is not present, flush all tables inside schema
49+
let table_name = params.get("table_name").cloned().unwrap_or_default();
50+
51+
let region_id: Option<u32> = params
52+
.get("region")
53+
.map(|v| v.parse())
54+
.transpose()
55+
.ok()
56+
.flatten();
57+
58+
let request = Request::Ddl(DdlRequest {
59+
expr: Some(Expr::FlushTable(FlushTableExpr {
60+
catalog_name: catalog_name.clone(),
61+
schema_name: schema_name.clone(),
62+
table_name: table_name.clone(),
63+
region_id,
64+
})),
65+
});
66+
67+
grpc_handler.do_query(request, QueryContext::arc()).await?;
68+
Ok((HttpStatusCode::OK, Json::from("hello, world".to_string())))
69+
}

src/servers/src/http/handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub async fn sql(
4343
Form(form_params): Form<SqlQuery>,
4444
) -> Json<JsonResponse> {
4545
let sql_handler = &state.sql_handler;
46+
4647
let start = Instant::now();
4748
let sql = query_params.sql.or(form_params.sql);
4849
let db = query_params.db.or(form_params.db);

src/servers/tests/http/http_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ use axum_test_helper::TestClient;
1717
use servers::http::{HttpOptions, HttpServer};
1818
use table::test_util::MemTable;
1919

20-
use crate::create_testing_sql_query_handler;
20+
use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler};
2121

2222
fn make_test_app() -> Router {
2323
let server = HttpServer::new(
2424
create_testing_sql_query_handler(MemTable::default_numbers_table()),
25+
create_testing_grpc_query_handler(MemTable::default_numbers_table()),
2526
HttpOptions::default(),
2627
);
2728
server.make_app()

0 commit comments

Comments
 (0)