Skip to content

Commit 123c761

Browse files
authored
Merge fc52d2f into 4af7b01
2 parents 4af7b01 + fc52d2f commit 123c761

File tree

7 files changed

+366
-2
lines changed

7 files changed

+366
-2
lines changed

contrib/python/ydb/py3/ydb/_utilities.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
import importlib.util
3+
import sys
34
import threading
45
import codecs
56
from concurrent import futures
@@ -191,8 +192,11 @@ def get_first_message_with_timeout(status_stream: SyncResponseIterator, timeout:
191192
waiter = future()
192193

193194
def get_first_response(waiter):
194-
first_response = next(status_stream)
195-
waiter.set_result(first_response)
195+
try:
196+
first_response = next(status_stream)
197+
waiter.set_result(first_response)
198+
except:
199+
print("Caught an unknown exception", file=sys.stderr)
196200

197201
thread = threading.Thread(
198202
target=get_first_response,

ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,4 +1411,33 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
14111411
}
14121412
}
14131413

1414+
Y_UNIT_TEST_SUITE(PlanStep) {
1415+
Y_UNIT_TEST(CreateTable) {
1416+
ui64 tableId = 1;
1417+
1418+
TTestBasicRuntime runtime;
1419+
TTester::Setup(runtime);
1420+
auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
1421+
1422+
using namespace NTxUT;
1423+
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
1424+
1425+
TDispatchOptions options;
1426+
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
1427+
runtime.DispatchEvents(options);
1428+
1429+
TActorId sender = runtime.AllocateEdgeActor();
1430+
1431+
auto schema = TTestSchema::YdbSchema(NArrow::NTest::TTestColumn("k0", TTypeInfo(NTypeIds::Timestamp)));
1432+
auto pk = NArrow::NTest::TTestColumn::CropSchema(schema, 4);
1433+
1434+
ui64 planStep = 1000;
1435+
ui64 txId = 100;
1436+
ui64 generation = 0;
1437+
1438+
auto txBody = TTestSchema::CreateTableTxBody(tableId++, schema, pk, {}, ++generation);
1439+
SetupSchema(runtime, sender, txBody, NOlap::TSnapshot(planStep++, txId++));
1440+
}
1441+
}
1442+
14141443
}

ydb/tests/olap/helpers/ya.make

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
PY3_LIBRARY()
2+
3+
PY_SRCS (
4+
ydb_client.py
5+
)
6+
7+
END()

ydb/tests/olap/helpers/ydb_client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import ydb
2+
3+
# from contrib.python.ydb.py3.ydb.query.pool.py import (
4+
# RetrySettings
5+
# )
6+
7+
class YdbClient:
8+
def __init__(self, endpoint: str, database: str):
9+
self.driver = ydb.Driver(endpoint=endpoint, database=database, oauth=None)
10+
self.database = database
11+
self.endpoint = endpoint
12+
self.session_pool = ydb.QuerySessionPool(self.driver)
13+
14+
def stop(self):
15+
self.session_pool.stop()
16+
self.driver.stop()
17+
18+
def wait_connection(self, timeout=5):
19+
self.driver.wait(timeout, fail_fast=True)
20+
21+
def query(self, statement):
22+
with ydb.QuerySessionPool(self.driver) as pool:
23+
return pool.execute_with_retries(statement, retry_settings=ydb.RetrySettings(max_retries=0, max_session_acquire_timeout=5))
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
import datetime
2+
import os
3+
import random
4+
5+
import logging
6+
import shutil
7+
import sys
8+
import time
9+
from ydb.tests.library.common.types import Erasure
10+
import yatest.common
11+
import random
12+
13+
from ydb.tests.library.common.types import Erasure
14+
from ydb.tests.library.harness.util import LogLevels
15+
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
16+
from ydb.tests.library.harness.kikimr_runner import KiKiMR
17+
from ydb.tests.olap.common.thread_helper import TestThread
18+
from ydb.tests.olap.helpers.ydb_client import YdbClient
19+
20+
from enum import Enum
21+
from pathlib import Path
22+
from multiprocessing import Value
23+
24+
# logging.basicConfig(
25+
# stream=sys.stderr,
26+
# format='%(asctime)s - %(levelname)s - %(message)s',
27+
# datefmt='%Y-%m-%d %H:%M:%S',
28+
# level=logging.INFO
29+
# )
30+
31+
LOG_DIR = '/home/emgariko/project/logs/LostPlanStep/test_output_'
32+
33+
def copy_test_results(run_id):
34+
source_dir = '/home/emgariko/project/ydb_fork/ydb/ydb/tests/olap/restarts/test-results'
35+
dest_dir = f'/home/emgariko/project/logs/LostPlanStep/test_results_/test_results_{run_id}'
36+
37+
try:
38+
Path(dest_dir).parent.mkdir(parents=True, exist_ok=True)
39+
40+
if os.path.exists(source_dir):
41+
shutil.copytree(source_dir, dest_dir, dirs_exist_ok=True)
42+
return True
43+
except Exception as e:
44+
print(f"Failed to copy test results: {e}")
45+
return False
46+
47+
def get_next_log_id(log_dir=LOG_DIR):
48+
Path(log_dir).mkdir(exist_ok=True)
49+
50+
existing_logs = [f for f in os.listdir(log_dir) if f.startswith("out_")]
51+
52+
if not existing_logs:
53+
return 1
54+
55+
ids = [int(f.split('_')[1].split('.')[0]) for f in existing_logs]
56+
return max(ids) + 1
57+
58+
def setup_logger(name=__name__, log_dir=LOG_DIR):
59+
logger = logging.getLogger(name)
60+
# handler = logging.StreamHandler(sys.stderr)
61+
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
62+
# handler.setFormatter(formatter)
63+
# logger.addHandler(handler)
64+
65+
# logger = logging.getLogger(name)
66+
logger.setLevel(logging.INFO)
67+
68+
next_id = get_next_log_id(log_dir)
69+
log_file = f"{log_dir}/out_{next_id}.log"
70+
71+
file_handler = logging.FileHandler(log_file, encoding='utf-8')
72+
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
73+
file_handler.setFormatter(formatter)
74+
75+
logger.handlers.clear()
76+
logger.addHandler(file_handler)
77+
78+
if copy_test_results(next_id):
79+
logger.info(f"Test results copied to test_results_{next_id}")
80+
else:
81+
logger.warning("Failed to copy test results")
82+
83+
return logger
84+
85+
logger = setup_logger()
86+
counter = Value('i', 0)
87+
88+
class TestRestartNodes(object):
89+
90+
91+
@classmethod
92+
def setup_class(cls):
93+
# nodes_count = 8 if cls.erasure == Erasure.BLOCK_4_2 else 9
94+
nodes_count = 8
95+
configurator = KikimrConfigGenerator(None,
96+
nodes=nodes_count,
97+
use_in_memory_pdisks=False,
98+
additional_log_configs={'CMS': LogLevels.DEBUG},
99+
extra_feature_flags=[
100+
'enable_column_store'
101+
]
102+
)
103+
cls.cluster = KiKiMR(configurator=configurator)
104+
cls.cluster.start()
105+
node = cls.cluster.nodes[1]
106+
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{configurator.domain_name}")
107+
cls.ydb_client.wait_connection()
108+
time.sleep(10)
109+
# TODO: increase sleep value
110+
111+
112+
@classmethod
113+
def teardown_class(cls):
114+
cls.ydb_client.stop()
115+
cls.cluster.stop()
116+
117+
def create_table(self, thread_id: int):
118+
logger.info(f"Init: starting creating the table for thread#{thread_id}")
119+
deadline : datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)
120+
while datetime.datetime.now() < deadline:
121+
logger.info("Init: sending create table query")
122+
try:
123+
with counter.get_lock():
124+
table_id = counter.value
125+
logger.info(f"Init: sending create table#{table_id} query")
126+
self.ydb_client.query(f"""
127+
CREATE TABLE `TableStore/my_table_{table_id}` (
128+
timestamp Timestamp NOT NULL,
129+
Key Uint64,
130+
Value String,
131+
PRIMARY KEY (timestamp)
132+
) WITH (
133+
STORE = COLUMN,
134+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
135+
);
136+
""")
137+
counter.value += 1
138+
except Exception as x:
139+
logger.error(f"Init: Caught an exception during query executing: {x}")
140+
except:
141+
logger.error(f"Init: Caught an unknown exception during creating the table")
142+
logger.info("In progress: executed create query")
143+
144+
logger.info(f"Init: finished creating tables for thread#{thread_id}")
145+
146+
def alter_table(self, thread_id: int):
147+
logger.info(f"In progress: starting altering the table for thread#{thread_id}")
148+
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)
149+
while datetime.datetime.now() < deadline:
150+
try:
151+
with counter.get_lock():
152+
logger.info("In progress: sending alter query")
153+
ttl = random.randint(0, 1000)
154+
table_id = random.randint(0, counter.value)
155+
self.ydb_client.query(f"""
156+
ALTER TABLE `TableStore/my_table_{table_id}` SET(TTL = Interval("PT{ttl}H") ON timestamp);
157+
""")
158+
except Exception as x:
159+
logger.error(f"In progress: Caught an exception during query executing: {x}")
160+
except:
161+
logger.error(f"In progress: Caught an unknown exception during node killing executing")
162+
logger.info("In progress: executed alter query")
163+
164+
logger.info(f"In progress: finished altering for thread#{thread_id}")
165+
166+
def kill_nodes(self):
167+
logger.info(f"In progress: starting killing nodes")
168+
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=90)
169+
while datetime.datetime.now() < deadline:
170+
nodes = list(self.cluster.nodes.items())
171+
random.shuffle(nodes)
172+
nodes = nodes[0: len(nodes) // 2]
173+
for key, node in nodes:
174+
try:
175+
logger.info(f"In progress: killing {key}-node")
176+
node.kill()
177+
time.sleep(random.randint(1, 10))
178+
logger.info(f"In progress: killed {key}-node, starting it")
179+
node.start()
180+
time.sleep(random.randint(1, 10))
181+
logger.info(f"In progress: started {key}-node")
182+
except Exception as x:
183+
logger.error(f"In progress: Caught an exception during node killing executing: {x}")
184+
except:
185+
logger.error(f"In progress: Caught an unknown exception during node killing executing")
186+
break
187+
logger.info(f"In progress: finished killing nodes")
188+
189+
190+
def test(self):
191+
# Ошибка должна быть как в paste.
192+
# Нужно думать про это как про то что у пользователя не работает alter.
193+
# Сценарий пользователя: летят alter-ы, рестартуют ноды, затем ничего не работает.
194+
# 1) создать одну таблицу.
195+
# 2) альтеры в одну таблицу и одновременно случайно убиваем произвольные ноды в течение 2х минут . из нескольких потоков.
196+
# 3) поднимаю все ноды.
197+
# 4) начинаю гонять 1 альтер.
198+
# 5) в течение 2х минут альтер должен начать успешно проходить. крутить в цикле.
199+
200+
logger.info("Init: creating a columnstore")
201+
self.ydb_client.query("""
202+
--!syntax_v1
203+
CREATE TABLESTORE `TableStore` (
204+
timestamp Timestamp NOT NULL,
205+
Key Uint64,
206+
Value String,
207+
PRIMARY KEY (timestamp)
208+
)
209+
WITH (
210+
STORE = COLUMN,
211+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2
212+
);
213+
""")
214+
logger.info("Init: sucessfully created the columnstore, creating a table")
215+
threads: list[TestThread] = []
216+
for i in range(5):
217+
threads.append(TestThread(target=self.create_table, args=[i]))
218+
for i in range(5, 10):
219+
threads.append(TestThread(target=self.alter_table, args=[i]))
220+
threads.append(TestThread(target=self.kill_nodes))
221+
222+
logger.info("In progress: starting 11 threads")
223+
for thread in threads:
224+
thread.start()
225+
logger.info("In progress: started 11 threads, now joining last")
226+
# for thread in threads:
227+
# thread.join()
228+
threads[-1].join()
229+
logger.info("In progress: joined killing thread, sleeping for 35s")
230+
time.sleep(35)
231+
logger.info("In progress: woke up, starting nodes again")
232+
# TODO: investigate, maybe here should be no repeating start() calls
233+
# for node in self.cluster.nodes.values():
234+
# node.start()
235+
logger.info("In progress: stared nodes again")
236+
237+
logger.info("In progress: starting altering table")
238+
deadline: datetime = datetime.datetime.now() + datetime.timedelta(seconds=30)
239+
240+
# проверить что ни одного успешного прогона не было.
241+
# Поставить на ночь крутится.
242+
# Дальше надо воспроизвести на store.
243+
# В тесте поискать как создавать таблицу в ColumnStore.
244+
245+
all_failed = True
246+
247+
while datetime.datetime.now() < deadline:
248+
hasException = False
249+
ttl = random.randint(1, 1000)
250+
logger.info("In progress: [last section] sending alter query")
251+
try:
252+
with counter.get_lock():
253+
logger.info("In progress: sending alter query")
254+
ttl = random.randint(0, 1000)
255+
table_id = random.randint(0, counter.value)
256+
self.ydb_client.query(f"""
257+
ALTER TABLE `TableStore/my_table_{table_id}` SET(TTL = Interval("PT{ttl}H") ON timestamp);
258+
""")
259+
except Exception as x:
260+
logger.error(f"In progress: [last section] Caught an exception during query executing: {x}")
261+
hasException = True
262+
except:
263+
logger.error(f"In progress: [last section] Caught an unknown exception during node killing executing")
264+
hasException = True
265+
finally:
266+
if not hasException:
267+
all_failed = False
268+
logger.info("In progress: [last section] executed alter query")
269+
270+
if all_failed:
271+
logger.error("Finished: All alter queries in last section failed")
272+
273+
logger.info("Finished: finished altering table")

ydb/tests/olap/restarts/ya.make

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
PY3TEST()
2+
ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
3+
4+
DEPENDS(
5+
ydb/apps/ydb
6+
ydb/apps/ydbd
7+
)
8+
9+
TEST_SRCS(
10+
test_nodes_restart.py
11+
)
12+
13+
SIZE(MEDIUM)
14+
TIMEOUT(235)
15+
16+
PEERDIR(
17+
ydb/tests/library
18+
ydb/tests/library/test_meta
19+
ydb/public/sdk/python
20+
ydb/public/sdk/python/enable_v3_new_behavior
21+
contrib/python/boto3
22+
library/recipes/common
23+
ydb/tests/olap/common
24+
ydb/tests/olap/helpers
25+
)
26+
27+
END()

ydb/tests/olap/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ RECURSE(
3535
s3_import
3636
scenario
3737
ttl_tiering
38+
restarts
3839
)

0 commit comments

Comments
 (0)