diff --git a/mysql/CHANGELOG.md b/mysql/CHANGELOG.md index 0e2384dadea5a..0b2b99f3dbdb7 100644 --- a/mysql/CHANGELOG.md +++ b/mysql/CHANGELOG.md @@ -6,7 +6,7 @@ ***Added***: -* Migrate SQL Server to a new schema collector, which provides improved performance in the Agent and allows the backend to handle larger schema collections ([#21729](https://github.com/DataDog/integrations-core/pull/21729)) +* Migrate Mysql to a new schema collector, which provides improved performance in the Agent and allows the backend to handle larger schema collections ([#21729](https://github.com/DataDog/integrations-core/pull/21729)) * Add DBM Agent health events to MySQL, including basic initialization checks, unhandled errors, and missed collections ([#21867](https://github.com/DataDog/integrations-core/pull/21867)) * Upgrade base version for Postgres, MySQL, and SQLServer ([#21906](https://github.com/DataDog/integrations-core/pull/21906)) diff --git a/mysql/assets/configuration/spec.yaml b/mysql/assets/configuration/spec.yaml index ab59bad040e8d..22deb38e8ffe5 100644 --- a/mysql/assets/configuration/spec.yaml +++ b/mysql/assets/configuration/spec.yaml @@ -429,13 +429,7 @@ files: Capped by `collect_schemas.collection_interval` value: type: number - display_default: 60 - - name: max_tables - description: | - Set the maximum number of tables to collect. Defaults to 300. - value: - type: integer - display_default: 300 + example: 60 - name: schemas_collection deprecation: Agent version: 7.69.0 diff --git a/mysql/changelog.d/22103.fixed b/mysql/changelog.d/22103.fixed new file mode 100644 index 0000000000000..a57e013355cc4 --- /dev/null +++ b/mysql/changelog.d/22103.fixed @@ -0,0 +1 @@ +Revert "Migrate MySQL to new schema collector" to avoid breaking schema collection on older versions of Mysql/MariaDB diff --git a/mysql/datadog_checks/__init__.py b/mysql/datadog_checks/__init__.py deleted file mode 100644 index 2349b91f0642e..0000000000000 --- a/mysql/datadog_checks/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# (C) Datadog, Inc. 2025-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -# This fixes some issues with import paths not being correctly detected by IDEs -__path__ = __import__('pkgutil').extend_path(__path__, __name__) \ No newline at end of file diff --git a/mysql/datadog_checks/mysql/config_models/instance.py b/mysql/datadog_checks/mysql/config_models/instance.py index abd21dde47eaa..03afda6337ee7 100644 --- a/mysql/datadog_checks/mysql/config_models/instance.py +++ b/mysql/datadog_checks/mysql/config_models/instance.py @@ -56,7 +56,6 @@ class CollectSchemas(BaseModel): collection_interval: Optional[float] = None enabled: Optional[bool] = None max_execution_time: Optional[float] = None - max_tables: Optional[int] = None class CollectSettings(BaseModel): diff --git a/mysql/datadog_checks/mysql/data/conf.yaml.example b/mysql/datadog_checks/mysql/data/conf.yaml.example index 0a7eccd385557..c4673407f410b 100644 --- a/mysql/datadog_checks/mysql/data/conf.yaml.example +++ b/mysql/datadog_checks/mysql/data/conf.yaml.example @@ -417,12 +417,7 @@ instances: ## Set the maximum time for schema collection (in seconds). Defaults to 60 seconds. ## Capped by `collect_schemas.collection_interval` # - # max_execution_time: - - ## @param max_tables - integer - optional - default: 300 - ## Set the maximum number of tables to collect. Defaults to 300. - # - # max_tables: + # max_execution_time: 60 ## DEPRECATED: Use `collect_schemas` instead. ## Configure collection of schemas (databases). diff --git a/mysql/datadog_checks/mysql/databases_data.py b/mysql/datadog_checks/mysql/databases_data.py new file mode 100644 index 0000000000000..1adc026422154 --- /dev/null +++ b/mysql/datadog_checks/mysql/databases_data.py @@ -0,0 +1,448 @@ +# (C) Datadog, Inc. 2024-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent +import json +import time +from collections import defaultdict +from contextlib import closing + +import pymysql + +from datadog_checks.base.utils.db.utils import default_json_event_encoding +from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.mysql.cursor import CommenterDictCursor +from datadog_checks.mysql.queries import ( + SQL_COLUMNS, + SQL_DATABASES, + SQL_FOREIGN_KEYS, + SQL_PARTITION, + SQL_TABLES, + get_indexes_query, +) + +from .util import get_list_chunks + +DEFAULT_DATABASES_DATA_COLLECTION_INTERVAL = 600 + + +class SubmitData: + def __init__(self, submit_data_function, base_event, logger): + self._submit_to_agent_queue = submit_data_function + self._base_event = base_event + self._log = logger + + self._columns_count = 0 + self._total_columns_sent = 0 + self.db_to_tables = {} # dbname : {"tables" : []} + self.db_info = {} # name to info + self.any_tables_found = False # Flag to track for permission issues + + def set_base_event_data(self, hostname, database_instance, tags, cloud_metadata, dbms_version, flavor): + self._base_event["host"] = hostname + self._base_event["database_instance"] = database_instance + self._base_event["tags"] = tags + self._base_event["cloud_metadata"] = cloud_metadata + self._base_event["dbms_version"] = dbms_version + self._base_event["flavor"] = flavor + + def reset(self): + self._total_columns_sent = 0 + self._columns_count = 0 + self.db_info.clear() + self.db_to_tables.clear() + self.any_tables_found = False + + def store_db_infos(self, db_infos): + for db_info in db_infos: + self.db_info[db_info['name']] = db_info + + def store(self, db_name, tables, columns_count): + self._columns_count += columns_count + known_tables = self.db_to_tables.setdefault(db_name, []) + known_tables.extend(tables) + if tables: + self.any_tables_found = True + + def columns_since_last_submit(self): + return self._columns_count + + def truncate(self, json_event): + max_length = 1000 + if len(json_event) > max_length: + return json_event[:max_length] + " ... (truncated)" + else: + return json_event + + def send_truncated_msg(self, db_name, time_spent): + event = { + **self._base_event, + "metadata": [], + "timestamp": time.time() * 1000, + "collection_errors": [{"error_type": "truncated", "message": ""}], + } + db_info = self.db_info[db_name] + event["metadata"] = [{**(db_info)}] + event["collection_errors"][0]["message"] = ( + "Truncated after fetching {} columns, elapsed time is {}s, database is {}".format( + self._total_columns_sent, time_spent, db_name + ) + ) + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting truncation of schema collection: {}".format(self.truncate(json_event))) + self._submit_to_agent_queue(json_event) + + def submit(self): + if not self.db_to_tables: + return + self._total_columns_sent += self._columns_count + self._columns_count = 0 + event = {**self._base_event, "metadata": [], "timestamp": time.time() * 1000} + for db, tables in self.db_to_tables.items(): + db_info = self.db_info[db] + event["metadata"] = event["metadata"] + [{**(db_info), "tables": tables}] + json_event = json.dumps(event, default=default_json_event_encoding) + self._log.debug("Reporting the following payload for schema collection: {}".format(self.truncate(json_event))) + self._submit_to_agent_queue(json_event) + self.db_to_tables.clear() + + +def agent_check_getter(self): + return self._check + + +class DatabasesData: + TABLES_CHUNK_SIZE = 500 + DEFAULT_MAX_EXECUTION_TIME = 60 + MAX_COLUMNS_PER_EVENT = 100_000 + + def __init__(self, mysql_metadata, check, config): + self._metadata = mysql_metadata + self._check = check + self._log = check.log + self._tags = [] + collection_interval = config.schemas_config.get( + 'collection_interval', DEFAULT_DATABASES_DATA_COLLECTION_INTERVAL + ) + base_event = { + "host": None, + "agent_version": datadog_agent.get_version(), + "dbms": "mysql", + "kind": "mysql_databases", + "collection_interval": collection_interval, + "dbms_version": None, + "tags": [], + "cloud_metadata": self._check._config.cloud_metadata, + } + self._data_submitter = SubmitData(self._check.database_monitoring_metadata, base_event, self._log) + + self._max_execution_time = min( + config.schemas_config.get('max_execution_time', self.DEFAULT_MAX_EXECUTION_TIME), collection_interval + ) + + def shut_down(self): + self._data_submitter.submit() + + def _cursor_run(self, cursor, query, params=None): + """ + Run and log the query. If provided, obfuscated params are logged in place of the regular params. + """ + try: + self._log.debug("Running query [{}] params={}".format(query, params)) + cursor.execute(query, params) + except pymysql.DatabaseError as e: + self._check.count( + "dd.mysql.db.error", + 1, + tags=self._tags + ["error:{}".format(type(e))] + self._check._get_debug_tags(), + hostname=self._check.resolved_hostname, + ) + raise + + @tracked_method(agent_check_getter=agent_check_getter) + def _fetch_database_data(self, cursor, start_time, db_name): + tables = self._get_tables(db_name, cursor) + for tables_chunk in get_list_chunks(tables, self.TABLES_CHUNK_SIZE): + schema_collection_elapsed_time = time.time() - start_time + if schema_collection_elapsed_time > self._max_execution_time: + self._data_submitter.submit() + self._data_submitter.send_truncated_msg(db_name, schema_collection_elapsed_time) + raise StopIteration( + """Schema collection took {}s which is longer than allowed limit of {}s, + stopped while collecting for db - {}""".format( + schema_collection_elapsed_time, self._max_execution_time, db_name + ) + ) + columns_count, tables_info = self._get_tables_data(tables_chunk, db_name, cursor) + self._data_submitter.store(db_name, tables_info, columns_count) + if self._data_submitter.columns_since_last_submit() > self.MAX_COLUMNS_PER_EVENT: + self._data_submitter.submit() + self._data_submitter.submit() + + @tracked_method(agent_check_getter=agent_check_getter) + def collect_databases_data(self, tags): + """ + Collects database information and schemas and submits them to the agent's queue as dictionaries. + + A submitted dictionary: + dict: A dictionary representing the database information. + + - name (str): The name of the database. + - default_character_set_name (str): The default character set name. + - default_collation_name (str): The default collation name. + - tables (list): A list of table dictionaries. + - table (dict): A dictionary representing a table. + - name (str): The name of the table. + - columns (list): A list of column dictionaries. + - column (dict): A dictionary representing a column. + - name (str): The name of the column. + - data_type (str): The data type of the column. + - default (str): The default value of the column. + - nullable (bool): Whether the column is nullable. + - ordinal_position (str): The ordinal position of the column. + - indexes (list): A list of index dictionaries. + - index (dict): A dictionary representing an index. + - name (str): The name of the index. + - cardinality (int): The cardinality of the index. + - index_type (str): The index method used. + - columns (list): A list of column dictionaries + - column (dict): A dictionary representing a column. + - name (str): The name of the column. + - sub_part (int): The number of indexed characters if column is partially indexed. + - collation (str): The collation of the column. + - packed (str): How the index is packed. + - nullable (bool): Whether the column is nullable. + - non_unique (bool): Whether the index can contain duplicates. + - expression (str): If index was built with a functional key part, the expression used. + - foreign_keys (list): A list of foreign key dictionaries. + - foreign_key (dict): A dictionary representing a foreign key. + - constraint_schema (str): The schema of the constraint. + - name (str): The name of the foreign key. + - column_names (str): The column names in the foreign key. + - referenced_table_schema (str): The schema of the referenced table. + - referenced_table_name (str): The name of the referenced table. + - referenced_column_names (str): The column names in the referenced table. + - update_action (str): The update rule for the foreign key. + - delete_action (str): The delete rule for the foreign key. + - partitions (list): A list of partition dictionaries. + - partition (dict): A dictionary representing a partition. + - name (str): The name of the partition. + - subpartitions (list): A list of subpartition dictionaries. + - subpartition (dict): A dictionary representing a subpartition. + - name (str): The name of the subpartition. + - subpartition_ordinal_position (int): The ordinal position of the subpartition. + - subpartition_method (str): The subpartition method. + - subpartition_expression (str): The subpartition expression. + - table_rows (int): The number of rows in the subpartition. + - data_length (int): The data length of the subpartition in bytes. + - partition_ordinal_position (int): The ordinal position of the partition. + - partition_method (str): The partition method. + - partition_expression (str): The partition expression. + - partition_description (str): The description of the partition. + - table_rows (int): The number of rows in the partition. If partition has subpartitions, + this is the sum of all subpartitions table_rows. + - data_length (int): The data length of the partition in bytes. If partition has + subpartitions, this is the sum of all subpartitions data_length. + """ + self._data_submitter.reset() # Ensure we start fresh + self._tags = tags + self._data_submitter.set_base_event_data( + self._check.reported_hostname, + self._check.database_identifier, + self._tags, + self._check._config.cloud_metadata, + self._check.version.version, + self._check.version.flavor, + ) + try: + with closing(self._metadata.get_db_connection().cursor(CommenterDictCursor)) as cursor: + db_infos = self._query_db_information(cursor) + self._data_submitter.store_db_infos(db_infos) + self._fetch_for_databases(db_infos, cursor) + self._data_submitter.submit() + finally: + self._data_submitter.reset() # Ensure we reset in case of errors + self._log.debug("Finished collect_databases_data") + + def _fetch_for_databases(self, db_infos, cursor): + start_time = time.time() + for db_info in db_infos: + try: + self._fetch_database_data(cursor, start_time, db_info['name']) + except StopIteration as e: + self._log.error( + "While executing fetch database data for database {}, the following exception occured {}".format( + db_info['name'], e + ) + ) + return + except Exception as e: + self._log.error( + "While executing fetch database data for database {}, the following exception occured {}".format( + db_info['name'], e + ) + ) + + # Check if we found databases but no tables across all of them. + # This happens when the datadog user has permissions to see databases + # but lacks SELECT privileges on the tables themselves, which prevents + # the agent from collecting table metadata. + if db_infos and not self._data_submitter.any_tables_found: + self._log.warning( + "No tables were found across any of the {} databases. This may indicate insufficient privileges " + "to view table metadata. The datadog user needs SELECT privileges on the tables.".format(len(db_infos)) + ) + + @tracked_method(agent_check_getter=agent_check_getter) + def _query_db_information(self, cursor): + self._cursor_run(cursor, query=SQL_DATABASES) + rows = cursor.fetchall() + return rows + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_tables(self, db_name, cursor): + """returns a list of tables for schema with their names and empty column array + list of table dicts + "name": str + """ + self._cursor_run(cursor, query=SQL_TABLES, params=db_name) + tables_info = cursor.fetchall() + return tables_info + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _get_tables_data(self, table_list, db_name, cursor): + if len(table_list) == 0: + return 0, [] + + table_name_to_table_index = {} + for i, table in enumerate(table_list): + table_name_to_table_index[table["name"]] = i + table_names = ','.join(f'"{str(table["name"])}"' for table in table_list) + total_columns_number = self._populate_with_columns_data( + table_name_to_table_index, table_list, table_names, db_name, cursor + ) + self._populate_with_partitions_data(table_name_to_table_index, table_list, table_names, db_name, cursor) + self._populate_with_foreign_keys_data(table_name_to_table_index, table_list, table_names, db_name, cursor) + self._populate_with_index_data(table_name_to_table_index, table_list, table_names, db_name, cursor) + return total_columns_number, table_list + + @tracked_method(agent_check_getter=agent_check_getter) + def _populate_with_columns_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor): + self._cursor_run( + cursor, + query=SQL_COLUMNS.format(table_names), + params=db_name, + ) + rows = cursor.fetchall() + for row in rows: + if "nullable" in row: + if row["nullable"].lower() == "yes": + row["nullable"] = True + else: + row["nullable"] = False + if "default" in row: + if row["default"] is not None: + row["default"] = str(row["default"]) + table_name = str(row.pop("table_name")) + table_list[table_name_to_table_index[table_name]].setdefault("columns", []) + table_list[table_name_to_table_index[table_name]]["columns"].append(row) + + return len(rows) + + @tracked_method(agent_check_getter=agent_check_getter) + def _populate_with_index_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor): + query = get_indexes_query(self._check.version, self._check.is_mariadb, table_names) + self._cursor_run(cursor, query=query, params=db_name) + rows = cursor.fetchall() + if not rows: + return + table_index_dict = defaultdict(lambda: defaultdict(lambda: {})) + for row in rows: + table_name = str(row["table_name"]) + table_list[table_name_to_table_index[table_name]].setdefault("indexes", []) + index_name = str(row["name"]) + index_data = table_index_dict[table_name][index_name] + + # Update index-level info + index_data["name"] = index_name + + # in-memory table BTREE indexes have no cardinality apparently, so we default to 0 + # https://bugs.mysql.com/bug.php?id=58520 + index_data["cardinality"] = int(row["cardinality"]) if row["cardinality"] is not None else 0 + index_data["index_type"] = str(row["index_type"]) + index_data["non_unique"] = bool(row["non_unique"]) + if row["expression"]: + index_data["expression"] = str(row["expression"]) + + # Add column info, if exists + if row["column_name"]: + index_data.setdefault("columns", []) + column = {"name": row["column_name"], "nullable": bool(row["nullable"].lower() == "yes")} + if row["sub_part"]: + column["sub_part"] = int(row["sub_part"]) + if row["collation"]: + column["collation"] = str(row["collation"]) + if row["packed"]: + column["packed"] = str(row["packed"]) + index_data["columns"].append(column) + + for table_name, index_dict in table_index_dict.items(): + table_list[table_name_to_table_index[table_name]]["indexes"] = list(index_dict.values()) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _populate_with_foreign_keys_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor): + self._cursor_run(cursor, query=SQL_FOREIGN_KEYS.format(table_names), params=db_name) + rows = cursor.fetchall() + for row in rows: + table_name = row["table_name"] + table_list[table_name_to_table_index[table_name]].setdefault("foreign_keys", []) + table_list[table_name_to_table_index[table_name]]["foreign_keys"].append(row) + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _populate_with_partitions_data(self, table_name_to_table_index, table_list, table_names, db_name, cursor): + self._cursor_run(cursor, query=SQL_PARTITION.format(table_names), params=db_name) + rows = cursor.fetchall() + if not rows: + return + table_partitions_dict = defaultdict( + lambda: defaultdict( + lambda: { + "table_rows": 0, + "data_length": 0, + } + ) + ) + + for row in rows: + table_name = str(row["table_name"]) + table_list[table_name_to_table_index[table_name]].setdefault("partitions", []) + partition_name = str(row["name"]) + partition_data = table_partitions_dict[table_name][partition_name] + + # Update partition-level info + partition_data["name"] = partition_name + partition_data["partition_ordinal_position"] = int(row["partition_ordinal_position"]) + partition_data["partition_method"] = str(row["partition_method"]) + partition_data["partition_expression"] = str(row["partition_expression"]).strip().lower() + partition_data["partition_description"] = str(row["partition_description"]) + partition_data["table_rows"] += int(row["table_rows"]) + partition_data["data_length"] += int(row["data_length"]) + + # Add subpartition info, if exists + if row["subpartition_name"]: + partition_data.setdefault("subpartitions", []) + subpartition = { + "name": row["subpartition_name"], + "subpartition_ordinal_position": int(row["subpartition_ordinal_position"]), + "subpartition_method": str(row["subpartition_method"]), + "subpartition_expression": str(row["subpartition_expression"]).strip().lower(), + "table_rows": int(row["table_rows"]), + "data_length": int(row["data_length"]), + } + partition_data["subpartitions"].append(subpartition) + for table_name, partitions_dict in table_partitions_dict.items(): + table_list[table_name_to_table_index[table_name]]["partitions"] = list(partitions_dict.values()) diff --git a/mysql/datadog_checks/mysql/metadata.py b/mysql/datadog_checks/mysql/metadata.py index 712144d5517f4..547458f71d661 100644 --- a/mysql/datadog_checks/mysql/metadata.py +++ b/mysql/datadog_checks/mysql/metadata.py @@ -5,15 +5,15 @@ from contextlib import closing from operator import attrgetter -import pymysql # type: ignore +import pymysql from datadog_checks.mysql.cursor import CommenterDictCursor -from datadog_checks.mysql.schemas import MySqlSchemaCollector +from datadog_checks.mysql.databases_data import DEFAULT_DATABASES_DATA_COLLECTION_INTERVAL, DatabasesData from .util import ManagedAuthConnectionMixin, connect_with_session_variables try: - import datadog_agent # type: ignore + import datadog_agent except ImportError: from datadog_checks.base.stubs import datadog_agent @@ -27,7 +27,7 @@ # default pg_settings collection interval in seconds DEFAULT_SETTINGS_COLLECTION_INTERVAL = 600 -DEFAULT_SCHEMAS_COLLECTION_INTERVAL = 600 + MARIADB_TABLE_NAME = "information_schema.GLOBAL_VARIABLES" MYSQL_TABLE_NAME = "performance_schema.global_variables" @@ -48,23 +48,24 @@ class MySQLMetadata(ManagedAuthConnectionMixin, DBMAsyncJob): """ def __init__(self, check, config, connection_args_provider, uses_managed_auth=False): + self._databases_data_enabled = is_affirmative(config.schemas_config.get("enabled", False)) + self._databases_data_collection_interval = config.schemas_config.get( + "collection_interval", DEFAULT_DATABASES_DATA_COLLECTION_INTERVAL + ) self._settings_enabled = is_affirmative(config.settings_config.get('enabled', True)) - self._schemas_enabled = is_affirmative(config.schemas_config.get('enabled', False)) self._settings_collection_interval = float( config.settings_config.get('collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL) ) - self._schemas_collection_interval = float( - config.schemas_config.get('collection_interval', DEFAULT_SCHEMAS_COLLECTION_INTERVAL) - ) - if self._schemas_enabled and not self._settings_enabled: - self.collection_interval = self._schemas_collection_interval - elif not self._schemas_enabled and self._settings_enabled: + if self._databases_data_enabled and not self._settings_enabled: + self.collection_interval = self._databases_data_collection_interval + elif not self._databases_data_enabled and self._settings_enabled: self.collection_interval = self._settings_collection_interval else: - self.collection_interval = min(self._settings_collection_interval, self._schemas_collection_interval) - self.enabled = self._settings_enabled or self._schemas_enabled + self.collection_interval = min(self._databases_data_collection_interval, self._settings_collection_interval) + + self.enabled = self._databases_data_enabled or self._settings_enabled super(MySQLMetadata, self).__init__( check, @@ -84,9 +85,9 @@ def __init__(self, check, config, connection_args_provider, uses_managed_auth=Fa self._uses_managed_auth = uses_managed_auth self._db_created_at = 0 self._db = None - self._schemas_collector = MySqlSchemaCollector(check) + self._databases_data = DatabasesData(self, check, config) self._last_settings_collection_time = 0 - self._last_schemas_collection_time = 0 + self._last_databases_collection_time = 0 def get_db_connection(self): """ @@ -146,10 +147,19 @@ def run_job(self): These may be unavailable until the error is resolved. The error - {}""".format(e) ) - elapsed_time_schemas = time.time() - self._last_schemas_collection_time - if self._schemas_enabled and elapsed_time_schemas >= self._schemas_collection_interval: - self._last_schemas_collection_time = time.time() - self._schemas_collector.collect_schemas() + elapsed_time_databases = time.time() - self._last_databases_collection_time + if self._databases_data_enabled and elapsed_time_databases >= self._databases_data_collection_interval: + self._last_databases_collection_time = time.time() + try: + self._databases_data.collect_databases_data(self._tags) + except Exception as e: + self._log.error( + """An error occurred while collecting schema data. + These may be unavailable until the error is resolved. The error - {}""".format(e) + ) + + def shut_down(self): + self._databases_data.shut_down() @tracked_method(agent_check_getter=attrgetter('_check')) def report_mysql_metadata(self): diff --git a/mysql/datadog_checks/mysql/mysql.py b/mysql/datadog_checks/mysql/mysql.py index f93859ea49043..53e41ff92787a 100644 --- a/mysql/datadog_checks/mysql/mysql.py +++ b/mysql/datadog_checks/mysql/mysql.py @@ -15,8 +15,7 @@ import pymysql from cachetools import TTLCache -from datadog_checks.base import AgentCheck, is_affirmative -from datadog_checks.base.checks.db import DatabaseCheck +from datadog_checks.base import AgentCheck, DatabaseCheck, is_affirmative from datadog_checks.base.utils.db import QueryExecutor, QueryManager from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.utils import ( @@ -87,7 +86,7 @@ ) from .statement_samples import MySQLStatementSamples from .statements import MySQLStatementMetrics -from .util import connect_with_session_variables +from .util import DatabaseConfigurationError, connect_with_session_variables # noqa: F401 from .version_utils import parse_version try: @@ -224,12 +223,6 @@ def resolved_hostname(self): def cloud_metadata(self): return self._cloud_metadata - @property - def dbms_version(self): - if self.version is None: - return None - return self.version.version + '+' + self.version.build - @property def database_identifier(self): # type: () -> str diff --git a/mysql/datadog_checks/mysql/queries.py b/mysql/datadog_checks/mysql/queries.py index f0df13271abbf..bae209bb14fa1 100644 --- a/mysql/datadog_checks/mysql/queries.py +++ b/mysql/datadog_checks/mysql/queries.py @@ -89,26 +89,25 @@ SELECT plugin_status FROM information_schema.plugins WHERE plugin_name='group_replication'""" +# Alisases add to homogenize fields across different database types like SQLServer, PostgreSQL SQL_DATABASES = """ -SELECT schema_name as `schema_name`, +SELECT schema_name as `name`, default_character_set_name as `default_character_set_name`, default_collation_name as `default_collation_name` FROM information_schema.SCHEMATA WHERE schema_name not in ('sys', 'mysql', 'performance_schema', 'information_schema')""" SQL_TABLES = """\ -SELECT table_name as `table_name`, +SELECT table_name as `name`, engine as `engine`, row_format as `row_format`, - create_time as `create_time`, - table_schema as `schema_name` + create_time as `create_time` FROM information_schema.TABLES - WHERE TABLE_TYPE="BASE TABLE" + WHERE TABLE_SCHEMA = %s AND TABLE_TYPE="BASE TABLE" """ SQL_COLUMNS = """\ SELECT table_name as `table_name`, - table_schema as `schema_name`, column_name as `name`, column_type as `column_type`, column_default as `default`, @@ -117,44 +116,43 @@ column_key as `column_key`, extra as `extra` FROM INFORMATION_SCHEMA.COLUMNS +WHERE table_schema = %s AND table_name IN ({}); """ SQL_INDEXES = """\ SELECT table_name as `table_name`, - table_schema as `schema_name`, index_name as `name`, + collation as `collation`, cardinality as `cardinality`, index_type as `index_type`, + seq_in_index as `seq_in_index`, + column_name as `column_name`, + sub_part as `sub_part`, + packed as `packed`, + nullable as `nullable`, non_unique as `non_unique`, - NULL as `expression`, - json_arrayagg(json_object( - 'name', column_name, - 'collation', collation, - 'nullable', nullable, - 'sub_part', sub_part - )) as `columns` + NULL as `expression` FROM INFORMATION_SCHEMA.STATISTICS -GROUP BY index_name, table_name, schema_name, cardinality, index_type, non_unique, expression +WHERE table_schema = %s AND table_name IN ({}); """ SQL_INDEXES_8_0_13 = """\ SELECT table_name as `table_name`, - table_schema as `schema_name`, index_name as `name`, + collation as `collation`, cardinality as `cardinality`, index_type as `index_type`, + seq_in_index as `seq_in_index`, + column_name as `column_name`, + sub_part as `sub_part`, + packed as `packed`, + nullable as `nullable`, non_unique as `non_unique`, - expression as `expression`, - json_arrayagg(json_object( - 'name', column_name, - 'collation', collation, - 'nullable', nullable, - 'sub_part', sub_part - )) as `columns` + expression as `expression` FROM INFORMATION_SCHEMA.STATISTICS -GROUP BY index_name, table_name, schema_name, cardinality, index_type, non_unique, expression +WHERE table_schema = %s AND table_name IN ({}); """ SQL_FOREIGN_KEYS = """\ @@ -162,7 +160,6 @@ kcu.constraint_schema as constraint_schema, kcu.constraint_name as name, kcu.table_name as table_name, - kcu.table_schema as schema_name, group_concat(kcu.column_name order by kcu.ordinal_position asc) as column_names, kcu.referenced_table_schema as referenced_table_schema, kcu.referenced_table_name as referenced_table_name, @@ -176,12 +173,12 @@ ON kcu.CONSTRAINT_SCHEMA = rc.CONSTRAINT_SCHEMA AND kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME WHERE - kcu.referenced_table_name is not null + kcu.table_schema = %s AND kcu.table_name in ({}) + AND kcu.referenced_table_name is not null GROUP BY kcu.constraint_schema, kcu.constraint_name, kcu.table_name, - kcu.table_schema, kcu.referenced_table_schema, kcu.referenced_table_name, rc.update_rule, @@ -191,28 +188,22 @@ SQL_PARTITION = """\ SELECT table_name as `table_name`, - table_schema as `schema_name`, partition_name as `name`, + subpartition_name as `subpartition_name`, partition_ordinal_position as `partition_ordinal_position`, + subpartition_ordinal_position as `subpartition_ordinal_position`, partition_method as `partition_method`, + subpartition_method as `subpartition_method`, partition_expression as `partition_expression`, + subpartition_expression as `subpartition_expression`, partition_description as `partition_description`, - json_arrayagg(json_object( - 'name', subpartition_name, - 'subpartition_ordinal_position', subpartition_ordinal_position, - 'subpartition_method', subpartition_method, - 'subpartition_expression', subpartition_expression, - 'table_rows', table_rows, - 'data_length', data_length - )) as `subpartitions` + table_rows as `table_rows`, + data_length as `data_length` FROM INFORMATION_SCHEMA.PARTITIONS WHERE - partition_name IS NOT NULL -GROUP BY table_name, table_schema, partition_name, partition_ordinal_position, - partition_method, partition_expression, partition_description + table_schema = %s AND table_name in ({}) AND partition_name IS NOT NULL """ - QUERY_DEADLOCKS = { 'name': 'information_schema.INNODB_METRICS.lock_deadlocks', 'query': """ diff --git a/mysql/datadog_checks/mysql/schemas.py b/mysql/datadog_checks/mysql/schemas.py deleted file mode 100644 index 4460c6f9e828a..0000000000000 --- a/mysql/datadog_checks/mysql/schemas.py +++ /dev/null @@ -1,255 +0,0 @@ -# (C) Datadog, Inc. 2025-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -from __future__ import annotations - -import contextlib -from contextlib import closing -from typing import TYPE_CHECKING, TypedDict - -import orjson as json - -if TYPE_CHECKING: - from datadog_checks.mysql import MySql - -from datadog_checks.base.utils.db.schemas import SchemaCollector, SchemaCollectorConfig -from datadog_checks.mysql.cursor import CommenterDictCursor -from datadog_checks.mysql.queries import ( - SQL_COLUMNS, - SQL_DATABASES, - SQL_FOREIGN_KEYS, - SQL_INDEXES, - SQL_INDEXES_8_0_13, - SQL_PARTITION, - SQL_TABLES, -) - - -class DatabaseInfo(TypedDict): - description: str - name: str - id: str - encoding: str - owner: str - - -# The schema collector sends lists of DatabaseObjects to the agent -# The format is for backwards compatibility with the current backend -class DatabaseObject(TypedDict): - # Splat of database info - description: str - name: str - id: str - encoding: str - owner: str - - -class MySqlSchemaCollectorConfig(SchemaCollectorConfig): - max_execution_time: int - max_tables: int - - -class MySqlSchemaCollector(SchemaCollector): - _check: MySql - _config: MySqlSchemaCollectorConfig - - def __init__(self, check: MySql): - config = MySqlSchemaCollectorConfig() - config.max_execution_time = check._config.schemas_config.get('max_execution_time', 60) - config.max_tables = check._config.schemas_config.get('max_tables', 300) - super().__init__(check, config) - - @property - def kind(self): - return "mysql_databases" - - @property - def base_event(self): - event = super().base_event - event["flavor"] = self._check.version.flavor - return event - - def _get_databases(self): - # MySQL can query all schemas at once so we return a stub - # and then fetch all databases with their tables in the _get_cursor method - return [{'name': 'mysql'}] - - @contextlib.contextmanager - def _get_cursor(self, database_name): - with closing(self._check._mysql_metadata.get_db_connection().cursor(CommenterDictCursor)) as cursor: - query = self._get_tables_query() - max_execution_time = self._config.max_execution_time - if self._check.is_mariadb: - # MariaDB is in seconds - cursor.execute(f"SET SESSION MAX_STATEMENT_TIME={max_execution_time};") - else: - # MySQL is in milliseconds - cursor.execute(f"SET SESSION MAX_EXECUTION_TIME={max_execution_time * 1000};") - cursor.execute(query) - yield cursor - - def _get_tables_query(self): - schemas_query = SQL_DATABASES - tables_query = SQL_TABLES - columns_query = SQL_COLUMNS - indexes_query = ( - SQL_INDEXES_8_0_13 - if self._check.version.flavor == 'MySQL' and self._check.version.version_compatible((8, 0, 13)) - else SQL_INDEXES - ) - constraints_query = SQL_FOREIGN_KEYS - partition_query = SQL_PARTITION - column_columns = """'name', columns.name, - 'column_type', columns.column_type, - 'default', columns.default, - 'nullable', columns.nullable, - 'ordinal_position', columns.ordinal_position, - 'column_key', columns.column_key""" - index_columns = """'name', indexes.name, - 'cardinality', indexes.cardinality, - 'index_type', indexes.index_type, - 'non_unique', indexes.non_unique, - 'expression', indexes.expression, - 'columns', indexes.columns - """ - constraint_columns = """'name', constraints.name, - 'constraint_schema', constraints.constraint_schema, - 'table_name', constraints.table_name, - 'column_names', constraints.column_names, - 'referenced_table_schema', constraints.referenced_table_schema, - 'referenced_table_name', constraints.referenced_table_name, - 'referenced_column_names', constraints.referenced_column_names, - 'update_action', constraints.update_action, - 'delete_action', constraints.delete_action - """ - - partition_columns = """'name', partitions.name, - 'partition_ordinal_position', partitions.partition_ordinal_position, - 'partition_method', partitions.partition_method, - 'partition_expression', partitions.partition_expression, - 'partition_description', partitions.partition_description, - 'subpartitions', partitions.subpartitions - """ - - limit = int(self._config.max_tables or 1_000_000) - - query = f""" - SELECT schema_tables.schema_name, schema_tables.table_name, - schema_tables.engine, schema_tables.row_format, schema_tables.create_time, - json_arrayagg(json_object({column_columns})) columns, - json_arrayagg(json_object({index_columns})) indexes, - json_arrayagg(json_object({constraint_columns})) foreign_keys, - json_arrayagg(json_object({partition_columns})) partitions - FROM ( - SELECT `schemas`.schema_name, `schemas`.default_character_set_name, `schemas`.default_collation_name, - tables.table_name, tables.engine, tables.row_format, tables.create_time - FROM ({schemas_query}) `schemas` - LEFT JOIN ({tables_query}) tables ON `schemas`.schema_name = tables.schema_name - ORDER BY tables.table_name - LIMIT {limit} - ) schema_tables - LEFT JOIN ({columns_query}) columns ON schema_tables.table_name = columns.table_name and - schema_tables.schema_name = columns.schema_name - LEFT JOIN ({indexes_query}) indexes ON schema_tables.table_name = indexes.table_name and - schema_tables.schema_name = indexes.schema_name - LEFT JOIN ({constraints_query}) constraints ON schema_tables.table_name = constraints.table_name and - schema_tables.schema_name = constraints.schema_name - LEFT JOIN ({partition_query}) partitions ON schema_tables.table_name = partitions.table_name - GROUP BY schema_tables.schema_name, schema_tables.table_name, - schema_tables.engine, schema_tables.row_format, schema_tables.create_time - ; - """ - return query - - def _get_next(self, cursor): - return cursor.fetchone() - - def _get_all(self, cursor): - return cursor.fetchall() - - def _map_row(self, database: DatabaseInfo, cursor_row) -> DatabaseObject: - # We intentionally dont call super because MySQL has no logical databases - object = { - "name": cursor_row.get("schema_name"), - "default_character_set_name": cursor_row.get("default_character_set_name"), - "default_collation_name": cursor_row.get("default_collation_name"), - } - # Map the cursor row to the expected schema, and strip out None values - object["tables"] = [ - { - k: v - for k, v in { - "engine": cursor_row.get("engine"), - "row_format": cursor_row.get("row_format"), - "create_time": cursor_row.get("create_time"), - "name": cursor_row.get("table_name"), - # The query can create duplicates of the joined tables - "columns": list( - { - v['name']: { - **{k: v_ for k, v_ in v.items() if k == 'default' or v_ is not None}, - 'nullable': v['nullable'] == 'YES', - } - for v in json.loads(cursor_row.get("columns")) or [] - if v and v.get('name') is not None - }.values() - ), - "indexes": list( - { - v['name']: { - **{ - k: v2 - for k, v2 in { - **v, - 'non_unique': v['non_unique'] == 1, - 'columns': list( - { - c['name']: { - **{k: v_ for k, v_ in c.items() if v_ is not None}, - 'nullable': c['nullable'] == 'YES', - } - for c in v['columns'] or [] - if c and c.get('name') is not None - }.values() - ), - }.items() - if v2 is not None - } - } - for v in json.loads(cursor_row.get("indexes")) or [] - if v and v.get('name') is not None - }.values() - ), - "foreign_keys": list( - { - v['name']: v - for v in (json.loads(cursor_row.get("foreign_keys")) or []) - if v and v.get('name') is not None - }.values() - ), - "partitions": list( - { - v['name']: { - **v, - 'subpartitions': list( - { - v2['name']: v2 - for v2 in v['subpartitions'] or [] - if v2 and v2.get('name') is not None - }.values() - ), - "data_length": sum(v2.get('data_length', 0) for v2 in (v['subpartitions'] or [])), - "table_rows": sum(v2.get('table_rows', 0) for v2 in (v['subpartitions'] or [])), - } - for v in (json.loads(cursor_row.get("partitions")) or []) - if v and v.get('name') is not None - }.values() - ), - }.items() - if v is not None - } - if cursor_row.get("table_name") is not None - else None - ] - return object diff --git a/mysql/tests/conftest.py b/mysql/tests/conftest.py index 8b30f850ab57a..74e725e3372ab 100644 --- a/mysql/tests/conftest.py +++ b/mysql/tests/conftest.py @@ -282,8 +282,7 @@ def version_metadata(): 'version.raw': mock.ANY, 'version.build': mock.ANY, 'flavor': flavor, - # This is weirdly different on CI (forced_hostname) vs local (stubbed.hostname) - 'resolved_hostname': mock.ANY, + 'resolved_hostname': 'forced_hostname', } diff --git a/mysql/tests/test_metadata.py b/mysql/tests/test_metadata.py index b42d679a87d8e..fc7014f2418e0 100644 --- a/mysql/tests/test_metadata.py +++ b/mysql/tests/test_metadata.py @@ -2,6 +2,8 @@ # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import re + import pytest from packaging.version import parse as parse_version @@ -21,7 +23,7 @@ def dbm_instance(instance_complex): return instance_complex -def sort_names_split_by_comma(names): +def sort_names_split_by_coma(names): names_arr = names.split(',') sorted_columns = sorted(names_arr) return ','.join(sorted_columns) @@ -37,20 +39,9 @@ def normalize_values(actual_payload): table['columns'].sort(key=lambda x: x['name']) if 'indexes' in table: table['indexes'].sort(key=lambda x: x['name']) - for index in table['indexes']: - index['columns'].sort(key=lambda x: x['name']) if 'foreign_keys' in table: for f_key in table['foreign_keys']: - f_key["referenced_column_names"] = ( - sort_names_split_by_comma(f_key["referenced_column_names"]) - if "referenced_column_names" in f_key and f_key["referenced_column_names"] is not None - else None - ) - if 'partitions' in table: - table['partitions'].sort(key=lambda x: x['name']) - for partition in table['partitions']: - if 'subpartitions' in partition: - partition['subpartitions'].sort(key=lambda x: x['name']) + f_key["referenced_column_names"] = sort_names_split_by_coma(f_key["referenced_column_names"]) if 'columns' in table: for column in table['columns']: if column['column_type'] == 'int': @@ -139,6 +130,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 1, "column_key": "MUL", + "extra": "", }, { "name": "District", @@ -147,6 +139,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "", + "extra": "", }, { "name": "Review", @@ -155,6 +148,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 3, "column_key": "", + "extra": "", }, ], "foreign_keys": [ @@ -190,7 +184,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "non_unique": True, } ], - "partitions": [], }, { "name": "Restaurants", @@ -205,6 +198,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 1, "column_key": "MUL", + "extra": "", }, { "name": "District", @@ -213,6 +207,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "", + "extra": "", }, { "name": "Cuisine", @@ -221,6 +216,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 3, "column_key": "", + "extra": "", }, ], "indexes": [ @@ -243,8 +239,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "non_unique": False, } ], - "foreign_keys": [], - "partitions": [], }, { "name": "cities", @@ -259,6 +253,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": False, "ordinal_position": 1, "column_key": "PRI", + "extra": "", }, { "name": "name", @@ -267,6 +262,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "", + "extra": "", }, { "name": "population", @@ -275,6 +271,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": False, "ordinal_position": 3, "column_key": "MUL", + "extra": "", }, ], "indexes": [ @@ -339,14 +336,11 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "cardinality": 0, "non_unique": True, "expression": "(`population` + 1)", - "columns": [], } ] if MYSQL_VERSION_PARSED >= parse_version('8.0.13') and not is_maria_db and not is_percona else [] ), - "foreign_keys": [], - "partitions": [], }, { "name": "cities_partitioned", @@ -361,6 +355,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": False, "ordinal_position": 1, "column_key": "PRI", + "extra": "", }, { "name": "name", @@ -369,6 +364,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "", + "extra": "", }, { "name": "population", @@ -377,6 +373,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": False, "ordinal_position": 3, "column_key": "", + "extra": "", }, ], "partitions": [ @@ -388,7 +385,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "partition_description": "100", "table_rows": 0, "data_length": 16384, - "subpartitions": [], }, { "name": "p1", @@ -398,7 +394,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "partition_description": "200", "table_rows": 0, "data_length": 16384, - "subpartitions": [], }, { "name": "p2", @@ -408,7 +403,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "partition_description": "300", "table_rows": 0, "data_length": 16384, - "subpartitions": [], }, { "name": "p3", @@ -418,7 +412,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "partition_description": "MAXVALUE", "table_rows": 0, "data_length": 16384, - "subpartitions": [], }, ], "indexes": [ @@ -436,7 +429,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "non_unique": False, } ], - "foreign_keys": [], }, { "name": "landmarks", @@ -451,6 +443,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 1, "column_key": "", + "extra": "", }, { "name": "city_id", @@ -459,6 +452,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "MUL", + "extra": "", }, ], "foreign_keys": [ @@ -489,7 +483,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "non_unique": True, } ], - "partitions": [], }, ], } @@ -511,6 +504,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 1, "column_key": "", + "extra": "", }, { "name": "name", @@ -519,6 +513,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "UNI", + "extra": "", }, ], "indexes": [ @@ -536,8 +531,6 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "non_unique": False, } ], - "foreign_keys": [], - "partitions": [], }, { "name": "ts", @@ -552,6 +545,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 1, "column_key": "", + "extra": "", }, { "name": "purchased", @@ -560,10 +554,9 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): "nullable": True, "ordinal_position": 2, "column_key": "", + "extra": "", }, ], - "foreign_keys": [], - "indexes": [], "partitions": [ { "name": "p0", @@ -669,6 +662,7 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): 'database_instance:stubbed.hostname', 'dbms_flavor:{}'.format(common.MYSQL_FLAVOR.lower()), 'dd.internal.resource:database_instance:stubbed.hostname', + 'port:13306', 'tag1:value1', 'tag2:value2', ) @@ -687,16 +681,15 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): assert schema_event.get("flavor") in ("MariaDB", "MySQL", "Percona") assert sorted(schema_event["tags"]) == sorted(expected_tags) database_metadata = schema_event['metadata'] + assert len(database_metadata) == 1 + db_name = database_metadata[0]['name'] + if db_name not in databases_to_find: + continue - for db_metadata in database_metadata: - db_name = db_metadata['name'] - if db_name not in databases_to_find: - continue - - if db_name in actual_payloads: - actual_payloads[db_name]['tables'] = actual_payloads[db_name]['tables'] + db_metadata['tables'] - else: - actual_payloads[db_name] = db_metadata + if db_name in actual_payloads: + actual_payloads[db_name]['schemas'] = actual_payloads[db_name]['schemas'] + database_metadata[0]['schemas'] + else: + actual_payloads[db_name] = database_metadata[0] assert len(actual_payloads) == len(expected_data_for_db) @@ -707,6 +700,25 @@ def test_collect_schemas(aggregator, dd_run_check, dbm_instance): assert expected_data_for_db[db_name] == actual_payload +@pytest.mark.integration +def test_schemas_collection_truncated(aggregator, dd_run_check, dbm_instance): + dbm_instance['dbm'] = True + dbm_instance['schemas_collection'] = {"enabled": True, "max_execution_time": 0} + expected_pattern = r"^Truncated after fetching \d+ columns, elapsed time is \d+(\.\d+)?s, database is .*" + check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance]) + dd_run_check(check) + + dbm_metadata = aggregator.get_event_platform_events("dbm-metadata") + found = False + for schema_event in (e for e in dbm_metadata if e['kind'] == 'mysql_databases'): + if "collection_errors" in schema_event: + if schema_event["collection_errors"][0]["error_type"] == "truncated" and re.fullmatch( + expected_pattern, schema_event["collection_errors"][0]["message"] + ): + found = True + assert found + + @pytest.mark.unit def test_schemas_collection_config(dbm_instance): dbm_instance['schemas_collection'] = {"enabled": True, "max_execution_time": 0} diff --git a/mysql/tests/test_schemas.py b/mysql/tests/test_schemas.py deleted file mode 100644 index d76a602e2d7a1..0000000000000 --- a/mysql/tests/test_schemas.py +++ /dev/null @@ -1,94 +0,0 @@ -# (C) Datadog, Inc. 2025-present -# All rights reserved -# Licensed under a 3-clause BSD style license (see LICENSE) - -from typing import Callable, Optional - -import pytest -from packaging.version import parse as parse_version - -from datadog_checks.mysql import MySql -from datadog_checks.mysql.schemas import MySqlSchemaCollector -from datadog_checks.mysql.version_utils import MySQLVersion - -from . import common -from .common import MYSQL_FLAVOR, MYSQL_VERSION_PARSED - -pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')] - - -@pytest.fixture -def dbm_instance(instance_basic): - instance_basic['dbm'] = True - instance_basic['min_collection_interval'] = 0.1 - instance_basic['query_samples'] = {'enabled': False} - instance_basic['query_activity'] = {'enabled': False} - instance_basic['query_metrics'] = {'enabled': False} - instance_basic['collect_resources'] = {'enabled': False, 'run_sync': True} - instance_basic['collect_settings'] = {'enabled': False, 'run_sync': True} - instance_basic['collect_schemas'] = {'enabled': True, 'run_sync': True} - return instance_basic - - -@pytest.fixture(scope="function") -def integration_check() -> Callable[[dict, Optional[dict]], MySql]: - checks = [] - - def _check(instance: dict, init_config: dict = None): - nonlocal checks - c = MySql(common.CHECK_NAME, init_config or {}, [instance]) - c.is_mariadb = MYSQL_FLAVOR.lower() == 'mariadb' - c.version = MySQLVersion(version=str(MYSQL_VERSION_PARSED), flavor=MYSQL_FLAVOR, build='') - checks.append(c) - return c - - yield _check - - for c in checks: - c.cancel() - - -def test_get_cursor(dbm_instance, integration_check): - check = integration_check(dbm_instance) - collector = MySqlSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - schemas = [] - for row in cursor: - schemas.append(row['schema_name']) - - expected_schemas = {'datadog_test_schemas', 'datadog', 'datadog_test_schemas_second', 'testdb'} - if MYSQL_FLAVOR.lower() == 'mariadb' and MYSQL_VERSION_PARSED <= parse_version('10.6.0'): - expected_schemas.add('test') - assert set(schemas) == expected_schemas - - -def test_tables(dbm_instance, integration_check): - check = integration_check(dbm_instance) - collector = MySqlSchemaCollector(check) - - with collector._get_cursor('datadog_test') as cursor: - assert cursor is not None - tables = [] - for row in cursor: - if row['table_name']: - tables.append(row['table_name']) - - assert set(tables) == { - 'cities', - 'RestaurantReviews', - 'cities_partitioned', - 'users', - 'Restaurants', - 'ϑings', - 'landmarks', - 'ts', - } - - -def test_collect_schemas(dbm_instance, integration_check): - check = integration_check(dbm_instance) - collector = MySqlSchemaCollector(check) - - collector.collect_schemas() diff --git a/mysql/tests/test_unit.py b/mysql/tests/test_unit.py index bf808be071e0c..96ef7a515949b 100644 --- a/mysql/tests/test_unit.py +++ b/mysql/tests/test_unit.py @@ -1,19 +1,22 @@ # (C) Datadog, Inc. 2021-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) - +import json import subprocess +import time import mock import psutil -import pymysql # type: ignore +import pymysql import pytest from datadog_checks.mysql import MySql from datadog_checks.mysql.activity import MySQLActivity +from datadog_checks.mysql.databases_data import DatabasesData, SubmitData from datadog_checks.mysql.version_utils import parse_version from . import common +from .utils import deep_compare pytestmark = pytest.mark.unit @@ -332,6 +335,114 @@ def test_service_check(disable_generic_tags, expected_tags, hostname): assert set(check._service_check_tags(hostname)) == expected_tags +class DummyLogger: + def debug(*args): + pass + + def error(*args): + pass + + +def set_up_submitter_unit_test(): + submitted_data = [] + base_event = { + "host": "some", + "agent_version": 0, + "dbms": "sqlserver", + "kind": "sqlserver_databases", + "collection_interval": 1200, + "dbms_version": "some", + "tags": "some", + "cloud_metadata": "some", + } + + def submitData(data): + submitted_data.append(data) + + dataSubmitter = SubmitData(submitData, base_event, DummyLogger()) + return dataSubmitter, submitted_data + + +def test_submit_data(): + dataSubmitter, submitted_data = set_up_submitter_unit_test() + + dataSubmitter.store_db_infos( + [ + {"name": "test_db1", "default_character_set_name": "latin1"}, + {"name": "test_db2", "default_character_set_name": "latin1"}, + ] + ) + + dataSubmitter.store("test_db1", [1, 2], 5) + dataSubmitter.store("test_db2", [1, 2], 5) + assert dataSubmitter.columns_since_last_submit() == 10 + dataSubmitter.store("test_db1", [1, 2], 10) + + dataSubmitter.submit() + + assert dataSubmitter.columns_since_last_submit() == 0 + + expected_data = { + "host": "some", + "agent_version": 0, + "dbms": "sqlserver", + "kind": "sqlserver_databases", + "collection_interval": 1200, + "dbms_version": "some", + "tags": "some", + "cloud_metadata": "some", + "metadata": [ + {"name": "test_db1", "default_character_set_name": "latin1", "tables": [1, 2, 1, 2]}, + {"name": "test_db2", "default_character_set_name": "latin1", "tables": [1, 2]}, + ], + } + + data = json.loads(submitted_data[0]) + data.pop("timestamp") + assert deep_compare(data, expected_data) + + +def test_fetch_throws(): + check = MySql(common.CHECK_NAME, {}, instances=[{'server': 'localhost', 'user': 'datadog'}]) + databases_data = DatabasesData({}, check, check._config) + with ( + mock.patch('time.time', side_effect=[0, 9999999]), + mock.patch( + 'datadog_checks.mysql.databases_data.DatabasesData._get_tables', + return_value=[{"name": "mytable1"}, {"name": "mytable2"}], + ), + mock.patch('datadog_checks.mysql.databases_data.DatabasesData._get_tables', return_value=[1, 2]), + ): + with pytest.raises(StopIteration): + databases_data._fetch_database_data("dummy_cursor", time.time(), "my_db") + + +def test_submit_is_called_if_too_many_columns(): + check = MySql(common.CHECK_NAME, {}, instances=[{'server': 'localhost', 'user': 'datadog'}]) + databases_data = DatabasesData({}, check, check._config) + with ( + mock.patch('time.time', side_effect=[0, 0]), + mock.patch('datadog_checks.mysql.databases_data.DatabasesData._get_tables', return_value=[1, 2]), + mock.patch('datadog_checks.mysql.databases_data.SubmitData.submit') as mocked_submit, + mock.patch( + 'datadog_checks.mysql.databases_data.DatabasesData._get_tables_data', + return_value=(1000_000, {"name": "my_table"}), + ), + ): + databases_data._fetch_database_data("dummy_cursor", time.time(), "my_db") + assert mocked_submit.call_count == 2 + + +def test_exception_handling_by_do_for_dbs(): + check = MySql(common.CHECK_NAME, {}, instances=[{'server': 'localhost', 'user': 'datadog'}]) + databases_data = DatabasesData({}, check, check._config) + with mock.patch( + 'datadog_checks.mysql.databases_data.DatabasesData._fetch_database_data', + side_effect=Exception("Can't connect to DB"), + ): + databases_data._fetch_for_databases([{"name": "my_db"}], "dummy_cursor") + + def test_update_aurora_replication_role(): mysql_check = MySql(common.CHECK_NAME, {}, instances=[{'server': 'localhost', 'user': 'datadog'}])