Skip to content

input: add pause/resume callbacks #10616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions plugins/in_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ static int in_http_collect(struct flb_input_instance *ins,
return -1;
}

if (ctx->is_paused) {
flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd);
flb_downstream_conn_release(connection);
return -1;
}

flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i",
connection->fd);

Expand Down Expand Up @@ -79,6 +86,7 @@ static int in_http_init(struct flb_input_instance *ins,
}

ctx->collector_id = -1;
ctx->is_paused = FLB_FALSE;

/* Populate context with config map defaults and incoming properties */
ret = flb_input_config_map_set(ins, (void *) ctx);
Expand Down Expand Up @@ -199,6 +207,27 @@ static int in_http_exit(void *data, struct flb_config *config)
return 0;
}

static void in_http_pause(void *data, struct flb_config *config)
{
struct flb_http *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_pause(ctx->collector_id, ctx->ins);
http_conn_release_all(ctx);
ctx->is_paused = FLB_TRUE;
}
}

static void in_http_resume(void *data, struct flb_config *config)
{
struct flb_http *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_resume(ctx->collector_id, ctx->ins);
ctx->is_paused = FLB_FALSE;
}
}

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
Expand Down Expand Up @@ -249,8 +278,8 @@ struct flb_input_plugin in_http_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_http_collect,
.cb_flush_buf = NULL,
.cb_pause = NULL,
.cb_resume = NULL,
.cb_pause = in_http_pause,
.cb_resume = in_http_resume,
.cb_exit = in_http_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
Expand Down
1 change: 1 addition & 0 deletions plugins/in_http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct flb_http {
struct mk_server *server;

int collector_id;
int is_paused; /* Plugin is paused */
};


Expand Down
33 changes: 31 additions & 2 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ static int in_opentelemetry_collect(struct flb_input_instance *ins,
return -1;
}

if (ctx->is_paused) {
flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd);
flb_downstream_conn_release(connection);
return -1;
}

flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);

conn = opentelemetry_conn_add(connection, ctx);
Expand All @@ -76,6 +83,7 @@ static int in_opentelemetry_init(struct flb_input_instance *ins,
return -1;
}
ctx->collector_id = -1;
ctx->is_paused = FLB_FALSE;

/* Populate context with config map defaults and incoming properties */
ret = flb_input_config_map_set(ins, (void *) ctx);
Expand Down Expand Up @@ -195,6 +203,27 @@ static int in_opentelemetry_exit(void *data, struct flb_config *config)
return 0;
}

static void in_opentelemetry_pause(void *data, struct flb_config *config)
{
struct flb_opentelemetry *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_pause(ctx->collector_id, ctx->ins);
opentelemetry_conn_release_all(ctx);
ctx->is_paused = FLB_TRUE;
}
}

static void in_opentelemetry_resume(void *data, struct flb_config *config)
{
struct flb_opentelemetry *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_resume(ctx->collector_id, ctx->ins);
ctx->is_paused = FLB_FALSE;
}
}

/* Configuration properties map */
static struct flb_config_map config_map[] = {
{
Expand Down Expand Up @@ -272,8 +301,8 @@ struct flb_input_plugin in_opentelemetry_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_opentelemetry_collect,
.cb_flush_buf = NULL,
.cb_pause = NULL,
.cb_resume = NULL,
.cb_pause = in_opentelemetry_pause,
.cb_resume = in_opentelemetry_resume,
.cb_exit = in_opentelemetry_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
Expand Down
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct flb_opentelemetry {
struct mk_list connections; /* linked list of connections */

struct mk_server *server;
int is_paused; /* Plugin is paused */
};


Expand Down
31 changes: 31 additions & 0 deletions plugins/in_tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ static int in_tcp_collect(struct flb_input_instance *in,
return -1;
}

if (ctx->is_paused) {
flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd);
flb_downstream_conn_release(connection);
return -1;
}

flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);

conn = tcp_conn_add(connection, ctx);
Expand Down Expand Up @@ -80,6 +87,7 @@ static int in_tcp_init(struct flb_input_instance *in,
ctx->collector_id = -1;
ctx->ins = in;
mk_list_init(&ctx->connections);
ctx->is_paused = FLB_FALSE;

/* Set the context */
flb_input_set_context(in, ctx);
Expand Down Expand Up @@ -145,6 +153,27 @@ static int in_tcp_exit(void *data, struct flb_config *config)
return 0;
}

static void in_tcp_pause(void *data, struct flb_config *config)
{
struct flb_in_tcp_config *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_pause(ctx->collector_id, ctx->ins);
tcp_conn_release_all(ctx);
ctx->is_paused = FLB_TRUE;
}
}

static void in_tcp_resume(void *data, struct flb_config *config)
{
struct flb_in_tcp_config *ctx = data;

if (config->is_running == FLB_TRUE) {
flb_input_collector_resume(ctx->collector_id, ctx->ins);
ctx->is_paused = FLB_FALSE;
}
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "format", (char *)NULL,
Expand Down Expand Up @@ -183,6 +212,8 @@ struct flb_input_plugin in_tcp_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_tcp_collect,
.cb_flush_buf = NULL,
.cb_pause = in_tcp_pause,
.cb_resume = in_tcp_resume,
.cb_exit = in_tcp_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
Expand Down
1 change: 1 addition & 0 deletions plugins/in_tcp/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct flb_in_tcp_config {
struct mk_list connections; /* List of active connections */
struct flb_input_instance *ins; /* Input plugin instace */
struct flb_log_event_encoder *log_encoder;
int is_paused; /* Plugin is paused */
};

#endif
12 changes: 12 additions & 0 deletions plugins/in_tcp/tcp_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,15 @@ int tcp_conn_del(struct tcp_conn *conn)

return 0;
}

void tcp_conn_release_all(struct flb_in_tcp_config *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct tcp_conn *conn;

mk_list_foreach_safe(head, tmp, &ctx->connections) {
conn = mk_list_entry(head, struct tcp_conn, _head);
tcp_conn_del(conn);
}
}
1 change: 1 addition & 0 deletions plugins/in_tcp/tcp_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ struct tcp_conn {

struct tcp_conn *tcp_conn_add(struct flb_connection *connection, struct flb_in_tcp_config *ctx);
int tcp_conn_del(struct tcp_conn *conn);
void tcp_conn_release_all(struct flb_in_tcp_config *ctx);

#endif
Loading