Skip to content

test min impl #15656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions contrib/python/ydb/py3/ydb/_utilities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import importlib.util
import sys
import threading
import codecs
from concurrent import futures
Expand Down Expand Up @@ -191,8 +192,11 @@ def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout:
waiter = future()

def get_first_response(waiter):
first_response = next(status_stream)
waiter.set_result(first_response)
try:
first_response = next(status_stream)
waiter.set_result(first_response)
except:
print("Caught an unknown exception", file=sys.stderr)

thread = threading.Thread(
target=get_first_response,
Expand Down
29 changes: 29 additions & 0 deletions ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,4 +1411,33 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
}
}

Y_UNIT_TEST_SUITE(PlanStep) {
Y_UNIT_TEST(CreateTable) {
ui64 tableId = 1;

TTestBasicRuntime runtime;
TTester::Setup(runtime);
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();

using namespace NTxUT;
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);

TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
runtime.DispatchEvents(options);

TActorId sender = runtime.AllocateEdgeActor();

auto schema = TTestSchema::YdbSchema(NArrow::NTest::TTestColumn("k0", TTypeInfo(NTypeIds::Timestamp)));
auto pk = NArrow::NTest::TTestColumn::CropSchema(schema, 4);

ui64 planStep = 1000;
ui64 txId = 100;
ui64 generation = 0;

auto txBody = TTestSchema::CreateTableTxBody(tableId++, schema, pk, {}, ++generation);
SetupSchema(runtime, sender, txBody, NOlap::TSnapshot(planStep++, txId++));
}
}

}
7 changes: 7 additions & 0 deletions ydb/tests/olap/helpers/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
PY3_LIBRARY()

PY_SRCS (
ydb_client.py
)

END()
23 changes: 23 additions & 0 deletions ydb/tests/olap/helpers/ydb_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import ydb

# from contrib.python.ydb.py3.ydb.query.pool.py import (
# RetrySettings
# )

class YdbClient:
def __init__(self, endpoint: str, database: str):
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
self.database = database
self.endpoint = endpoint
self.session_pool = ydb.QuerySessionPool(self.driver)

def stop(self):
self.session_pool.stop()
self.driver.stop()

def wait_connection(self, timeout=5):
self.driver.wait(timeout, fail_fast=True)

def query(self, statement):
with ydb.QuerySessionPool(self.driver) as pool:
return pool.execute_with_retries(statement, retry_settings=ydb.RetrySettings(max_retries=0, max_session_acquire_timeout=5))
273 changes: 273 additions & 0 deletions ydb/tests/olap/restarts/test_nodes_restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
import datetime
import os
import random

import logging
import shutil
import sys
import time
from ydb.tests.library.common.types import Erasure
import yatest.common
import random

from ydb.tests.library.common.types import Erasure
from ydb.tests.library.harness.util import LogLevels
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.harness.kikimr_runner import KiKiMR
from ydb.tests.olap.common.thread_helper import TestThread
from ydb.tests.olap.helpers.ydb_client import YdbClient

from enum import Enum
from pathlib import Path
from multiprocessing import Value

# logging.basicConfig(
# stream=sys.stderr,
# format='%(asctime)s - %(levelname)s - %(message)s',
# datefmt='%Y-%m-%d %H:%M:%S',
# level=logging.INFO
# )

LOG_DIR = '/home/emgariko/project/logs/LostPlanStep/test_output_'

def copy_test_results(run_id):
source_dir = '/home/emgariko/project/ydb_fork/ydb/ydb/tests/olap/restarts/test-results'
dest_dir = f'/home/emgariko/project/logs/LostPlanStep/test_results_/test_results_{run_id}'

try:
Path(dest_dir).parent.mkdir(parents=True, exist_ok=True)

if os.path.exists(source_dir):
shutil.copytree(source_dir, dest_dir, dirs_exist_ok=True)
return True
except Exception as e:
print(f"Failed to copy test results: {e}")
return False

def get_next_log_id(log_dir=LOG_DIR):
Path(log_dir).mkdir(exist_ok=True)

existing_logs = [f for f in os.listdir(log_dir) if f.startswith("out_")]

if not existing_logs:
return 1

ids = [int(f.split('_')[1].split('.')[0]) for f in existing_logs]
return max(ids) + 1

def setup_logger(name=__name__, log_dir=LOG_DIR):
logger = logging.getLogger(name)
# handler = logging.StreamHandler(sys.stderr)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
# logger.addHandler(handler)

# logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

next_id = get_next_log_id(log_dir)
log_file = f"{log_dir}/out_{next_id}.log"

file_handler = logging.FileHandler(log_file, encoding='utf-8')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

logger.handlers.clear()
logger.addHandler(file_handler)

if copy_test_results(next_id):
logger.info(f"Test results copied to test_results_{next_id}")
else:
logger.warning("Failed to copy test results")

return logger

logger = setup_logger()
counter = Value('i', 0)

class TestRestartNodes(object):


@classmethod
def setup_class(cls):
# nodes_count = 8 if cls.erasure == Erasure.BLOCK_4_2 else 9
nodes_count = 8
configurator = KikimrConfigGenerator(None,
nodes=nodes_count,
use_in_memory_pdisks=False,
additional_log_configs={'CMS': LogLevels.DEBUG},
extra_feature_flags=[
'enable_column_store'
]
)
cls.cluster = KiKiMR(configurator=configurator)
cls.cluster.start()
node = cls.cluster.nodes[1]
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{configurator.domain_name}")
cls.ydb_client.wait_connection()
time.sleep(10)
# TODO: increase sleep value


@classmethod
def teardown_class(cls):
cls.ydb_client.stop()
cls.cluster.stop()

def create_table(self, thread_id: int):
logger.info(f"Init: starting creating the table for thread#{thread_id}")
deadline : datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)
while datetime.datetime.now() < deadline:
logger.info("Init: sending create table query")
try:
with counter.get_lock():
table_id = counter.value
logger.info(f"Init: sending create table#{table_id} query")
self.ydb_client.query(f"""
CREATE TABLE `TableStore/my_table_{table_id}` (
timestamp Timestamp NOT NULL,
Key Uint64,
Value String,
PRIMARY KEY (timestamp)
) WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
);
""")
counter.value += 1
except Exception as x:
logger.error(f"Init: Caught an exception during query executing: {x}")
except:
logger.error(f"Init: Caught an unknown exception during creating the table")
logger.info("In progress: executed create query")

logger.info(f"Init: finished creating tables for thread#{thread_id}")

def alter_table(self, thread_id: int):
logger.info(f"In progress: starting altering the table for thread#{thread_id}")
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)
while datetime.datetime.now() < deadline:
try:
with counter.get_lock():
logger.info("In progress: sending alter query")
ttl = random.randint(0, 1000)
table_id = random.randint(0, counter.value)
self.ydb_client.query(f"""
ALTER TABLE `TableStore/my_table_{table_id}` SET(TTL = Interval("PT{ttl}H") ON timestamp);
""")
except Exception as x:
logger.error(f"In progress: Caught an exception during query executing: {x}")
except:
logger.error(f"In progress: Caught an unknown exception during node killing executing")
logger.info("In progress: executed alter query")

logger.info(f"In progress: finished altering for thread#{thread_id}")

def kill_nodes(self):
logger.info(f"In progress: starting killing nodes")
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=90)
while datetime.datetime.now() < deadline:
nodes = list(self.cluster.nodes.items())
random.shuffle(nodes)
nodes = nodes[0: len(nodes) // 2]
for key, node in nodes:
try:
logger.info(f"In progress: killing {key}-node")
node.kill()
time.sleep(random.randint(1, 10))
logger.info(f"In progress: killed {key}-node, starting it")
node.start()
time.sleep(random.randint(1, 10))
logger.info(f"In progress: started {key}-node")
except Exception as x:
logger.error(f"In progress: Caught an exception during node killing executing: {x}")
except:
logger.error(f"In progress: Caught an unknown exception during node killing executing")
break
logger.info(f"In progress: finished killing nodes")


def test(self):
# Ошибка должна быть как в paste.
# Нужно думать про это как про то что у пользователя не работает alter.
# Сценарий пользователя: летят alter-ы, рестартуют ноды, затем ничего не работает.
# 1) создать одну таблицу.
# 2) альтеры в одну таблицу и одновременно случайно убиваем произвольные ноды в течение 2х минут . из нескольких потоков.
# 3) поднимаю все ноды.
# 4) начинаю гонять 1 альтер.
# 5) в течение 2х минут альтер должен начать успешно проходить. крутить в цикле.

logger.info("Init: creating a columnstore")
self.ydb_client.query("""
--!syntax_v1
CREATE TABLESTORE `TableStore` (
timestamp Timestamp NOT NULL,
Key Uint64,
Value String,
PRIMARY KEY (timestamp)
)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
);
""")
logger.info("Init: sucessfully created the columnstore, creating a table")
threads: list[TestThread] = []
for i in range(5):
threads.append(TestThread(target=self.create_table, args=[i]))
for i in range(5, 10):
threads.append(TestThread(target=self.alter_table, args=[i]))
threads.append(TestThread(target=self.kill_nodes))

logger.info("In progress: starting 11 threads")
for thread in threads:
thread.start()
logger.info("In progress: started 11 threads, now joining last")
# for thread in threads:
# thread.join()
threads[-1].join()
logger.info("In progress: joined killing thread, sleeping for 35s")
time.sleep(35)
logger.info("In progress: woke up, starting nodes again")
# TODO: investigate, maybe here should be no repeating start() calls
# for node in self.cluster.nodes.values():
# node.start()
logger.info("In progress: stared nodes again")

logger.info("In progress: starting altering table")
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)

# проверить что ни одного успешного прогона не было.
# Поставить на ночь крутится.
# Дальше надо воспроизвести на store.
# В тесте поискать как создавать таблицу в ColumnStore.

all_failed = True

while datetime.datetime.now() < deadline:
hasException = False
ttl = random.randint(1, 1000)
logger.info("In progress: [last section] sending alter query")
try:
with counter.get_lock():
logger.info("In progress: sending alter query")
ttl = random.randint(0, 1000)
table_id = random.randint(0, counter.value)
self.ydb_client.query(f"""
ALTER TABLE `TableStore/my_table_{table_id}` SET(TTL = Interval("PT{ttl}H") ON timestamp);
""")
except Exception as x:
logger.error(f"In progress: [last section] Caught an exception during query executing: {x}")
hasException = True
except:
logger.error(f"In progress: [last section] Caught an unknown exception during node killing executing")
hasException = True
finally:
if not hasException:
all_failed = False
logger.info("In progress: [last section] executed alter query")

if all_failed:
logger.error("Finished: All alter queries in last section failed")

logger.info("Finished: finished altering table")
27 changes: 27 additions & 0 deletions ydb/tests/olap/restarts/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
PY3TEST()
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")

DEPENDS(
ydb/apps/ydb
ydb/apps/ydbd
)

TEST_SRCS(
test_nodes_restart.py
)

SIZE(MEDIUM)
TIMEOUT(235)

PEERDIR(
ydb/tests/library
ydb/tests/library/test_meta
ydb/public/sdk/python
ydb/public/sdk/python/enable_v3_new_behavior
contrib/python/boto3
library/recipes/common
ydb/tests/olap/common
ydb/tests/olap/helpers
)

END()
1 change: 1 addition & 0 deletions ydb/tests/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ RECURSE(
s3_import
scenario
ttl_tiering
restarts
)
Loading