Skip to content

source/influxdb3: change to use new InfluxDB3Source #608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions python/sources/influxdb_3/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
# InfluxDB v3

[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically query InfluxDB and publish the results to a Kafka topic.
[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically
query InfluxDB3 and publish the results to a Kafka topic.

To learn more about how it functions, [check out the underlying
Quix Streams `InfluxDB3Source`](https://quix.io/docs/quix-streams/connectors/sources/influxdb3-source.html).



## Using with a Quix Cloud InfluxDB3 Service

This deployment will work seamlessly with a [Quix Cloud InfluxDB3 service](https://github.com/quixio/quix-samples/tree/main/docker/influxdb_3).

Simply provide the following arguments when setting up this connector:

```shell
INFLUXDB_HOST="http://influxdb3:80"
INFLUXDB_ORG="<ANYTHING>" # required, but ignored
INFLUXDB_TOKEN="<ANYTHING>" # required, but ignored
```


## How to run

Expand All @@ -13,21 +32,39 @@ Then either:

* or click `Customise connector` to inspect or alter the code before deployment.



## Environment Variables

The connector uses the following environment variables:

- **output**: This is the ouput topic that will receive the stream (Default: `influxdb`, Required: `True`)
- **task_interval**: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1mo, 1y (Default: `5m`, Required: `True`)
- **INFLUXDB_HOST**: Host address for the InfluxDB instance. (Default: `eu-central-1-1.aws.cloud2.influxdata.com`, Required: `True`)
- **INFLUXDB_TOKEN**: Authentication token to access InfluxDB. (Default: `<TOKEN>`, Required: `True`)
- **INFLUXDB_ORG**: Organization name in InfluxDB. (Default: `<ORG>`, Required: `False`)
- **INFLUXDB_DATABASE**: Database name in InfluxDB where data is stored. (Default: `<DATABASE>`, Required: `True`)
- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used (Default: `<INSERT MEASUREMENT>`, Required: `False`)
### Required

- `output`: The Kafka topic that will receive the query results.
- `INFLUXDB_HOST`: Host address for the InfluxDB instance.
- `INFLUXDB_TOKEN`: Authentication token to access InfluxDB.
- `INFLUXDB_ORG`: Organization name in InfluxDB.
- `INFLUXDB_DATABASE`: Database name in InfluxDB where data is stored.

### Optional

- `INFLUXDB_QUERY_MEASUREMENTS`: The measurements to query. If left None, all measurements will be processed.
- `INFLUXDB_RECORD_TIMESTAMP_COLUMN`: The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time).
- `INFLUXDB_RECORD_MEASUREMENT_COLUMN`: The column name used for inserting the measurement name, else uses `'_measurement'`.
- `INFLUXDB_QUERY_SQL`: A custom SQL query to use for retrieving data from InfluxDB, else uses default.
- `INFLUXDB_QUERY_START_DATE`: The RFC3339-formatted start time for querying InfluxDB, else uses current runtime.
- `INFLUXDB_QUERY_END_DATE`: The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only.
- `INFLUXDB_QUERY_TIME_DELTA`: Time interval for batching queries, e.g., `'5m'` for 5 minutes.
- `INFLUXDB_QUERY_MAX_RETRIES`: Maximum number of retries for querying or producing (with multiplicative backoff).
- `INFLUXDB_QUERY_DELAY_SECONDS`: Add an optional delay (in seconds) between producing batches
- `CONSUMER_GROUP_NAME`: The name of the consumer group to use when consuming from Kafka.
- `BUFFER_SIZE`: The number of records that sink holds before flush data to InfluxDb.
- `BUFFER_TIMEOUT`: The number of seconds that sink holds before flush data to the InfluxDb.

## Requirements / Prerequisites

You will need to have an InfluxDB 3.0 instance available and an API authentication token.
You will need to have an InfluxDB 3.0 instance available and an API authentication token (
unless otherwise noted).

## Contribute

Expand Down
2 changes: 1 addition & 1 deletion python/sources/influxdb_3/dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
Expand Down
108 changes: 93 additions & 15 deletions python/sources/influxdb_3/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,22 @@
"Name": "output",
"Type": "EnvironmentVariable",
"InputType": "OutputTopic",
"Description": "This is the Kafka topic that will receive the query results",
"DefaultValue": "influxdbv3-data",
"Required": true
},
{
"Name": "task_interval",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1y",
"DefaultValue": "5m",
"Description": "The Kafka topic that will receive the query results",
"DefaultValue": "influxdbv3-data-source",
"Required": true
},
{
"Name": "INFLUXDB_HOST",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Host address for the InfluxDB instance.",
"DefaultValue": "eu-central-1-1.aws.cloud2.influxdata.com",
"DefaultValue": "http://influxdb3:80",
"Required": true
},
{
"Name": "INFLUXDB_TOKEN",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"InputType": "Secret",
"Description": "Authentication token to access InfluxDB.",
"DefaultValue": "<TOKEN>",
"Required": true
Expand All @@ -62,11 +54,97 @@
"Required": true
},
{
"Name": "INFLUXDB_MEASUREMENT_NAME",
"Name": "INFLUXDB_QUERY_MEASUREMENTS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The measurements to query. If left None, all measurements will be processed.",
"Required": false
},
{
"Name": "INFLUXDB_RECORD_KEY_COLUMN",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The InfluxDB record column used for the Kafka message key, else uses the measurement's name.",
"Required": false
},
{
"Name": "INFLUXDB_RECORD_TIMESTAMP_COLUMN",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time).",
"Required": false
},
{
"Name": "INFLUXDB_RECORD_MEASUREMENT_COLUMN",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The column name used for inserting the measurement name, else uses '_measurement'.",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_SQL",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "A custom SQL query to use for retrieving data from InfluxDB, else uses default.",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_START_DATE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The RFC3339-formatted start time for querying InfluxDB, else uses current runtime.",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_END_DATE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only.",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_TIME_DELTA",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Time interval for batching queries, e.g., '5m' for 5 minutes.",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_MAX_RETRIES",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Maximum number of retries for querying or producing (with multiplicative backoff).",
"Required": false
},
{
"Name": "INFLUXDB_QUERY_DELAY_SECONDS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Add an optional delay (in seconds) between producing batches.",
"Required": false
},
{
"Name": "CONSUMER_GROUP_NAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The name of the consumer group to use when consuming from Kafka.",
"DefaultValue": "influxdb-sink",
"Required": true
},
{
"Name": "BUFFER_SIZE",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The number of records that sink holds before flush data to InfluxDb.",
"DefaultValue": "1000",
"Required": false
},
{
"Name": "BUFFER_TIMEOUT",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used",
"DefaultValue": "<INSERT MEASUREMENT>",
"Description": "The number of seconds that sink holds before flush data to the InfluxDb.",
"DefaultValue": "1",
"Required": false
}
],
Expand Down
Loading