Skip to content

Commit e0ed855

Browse files
authored
Merge 429190c into 33b7f67
2 parents 33b7f67 + 429190c commit e0ed855

28 files changed

+391
-59
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
156156
{
157157
ToString(NYql::EDatabaseType::MongoDB),
158158
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx)
159+
},
160+
{
161+
ToString(NYql::EDatabaseType::DataStreams),
162+
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "TOKEN"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
159163
}
160164
},
161165
allExternalDataSourcesAreAvailable,

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
1414
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
1515
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
16-
16+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
17+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
1718

1819
namespace NKikimr {
1920
namespace NMiniKQL {
@@ -87,6 +88,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
8788
}
8889

8990
NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
91+
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
92+
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
9093
}
9194

9295
return factory;

ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/library/actors/http/http_proxy.h>
44
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>
5+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
56

67
#include <ydb/core/base/counters.h>
78
#include <ydb/core/base/feature_flags.h>
@@ -13,7 +14,6 @@
1314
#include <ydb/core/fq/libs/db_id_async_resolver_impl/http_proxy.h>
1415
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
1516
#include <ydb/library/actors/http/http_proxy.h>
16-
1717
#include <yql/essentials/public/issue/yql_issue_utils.h>
1818

1919
#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
@@ -71,6 +71,16 @@ namespace NKikimr::NKqp {
7171
return NYql::IHTTPGateway::Make(&httpGatewayConfig, httpGatewayGroup);
7272
}
7373

74+
NYql::IPqGateway::TPtr MakePqGateway(const std::shared_ptr<NYdb::TDriver>& driver, const NYql::TPqGatewayConfig& pqGatewayConfig) {
75+
NYql::TPqGatewayServices pqServices(
76+
*driver,
77+
nullptr,
78+
nullptr,
79+
std::make_shared<NYql::TPqGatewayConfig>(pqGatewayConfig),
80+
nullptr);
81+
return CreatePqNativeGateway(pqServices);
82+
}
83+
7484
NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
7585
NYql::THttpGatewayConfig config;
7686
config.SetMaxInFlightCount(2000);
@@ -119,6 +129,14 @@ namespace NKikimr::NKqp {
119129
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
120130
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);
121131

132+
ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
133+
NYdb::TDriverConfig cfg;
134+
cfg.SetLog(std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
135+
Driver = std::make_shared<NYdb::TDriver>(cfg);
136+
137+
PqGatewayConfig = queryServiceConfig.GetPq();
138+
PqGateway = MakePqGateway(Driver, PqGatewayConfig);
139+
122140
// Initialize Token Accessor
123141
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
124142
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
@@ -175,7 +193,11 @@ namespace NKikimr::NKqp {
175193
SolomonGateway,
176194
nullptr,
177195
S3ReadActorFactoryConfig,
178-
DqTaskTransformFactory};
196+
DqTaskTransformFactory,
197+
PqGatewayConfig,
198+
PqGateway,
199+
ActorSystemPtr,
200+
Driver};
179201

180202
// Init DatabaseAsyncResolver only if all requirements are met
181203
if (DatabaseResolverActorId && MdbEndpointGenerator &&

ydb/core/kqp/federated_query/kqp_federated_query_helpers.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
77
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
88
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
9+
10+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
11+
912
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
1013
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1114
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
@@ -15,6 +18,7 @@
1518
#include <yql/essentials/public/issue/yql_issue_message.h>
1619

1720
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
21+
#include <ydb/library/logger/actor.h>
1822

1923
namespace NKikimrConfig {
2024
class TQueryServiceConfig;
@@ -28,6 +32,9 @@ namespace NKikimr::NKqp {
2832

2933
NYql::IHTTPGateway::TPtr MakeHttpGateway(const NYql::THttpGatewayConfig& httpGatewayConfig, NMonitoring::TDynamicCounterPtr countersRoot);
3034

35+
NYql::IPqGateway::TPtr MakePqGateway(const std::shared_ptr<NYdb::TDriver>& driver, const NYql::TPqGatewayConfig& pqGatewayConfig);
36+
37+
3138
struct TKqpFederatedQuerySetup {
3239
NYql::IHTTPGateway::TPtr HttpGateway;
3340
NYql::NConnector::IClient::TPtr ConnectorClient;
@@ -42,6 +49,10 @@ namespace NKikimr::NKqp {
4249
NMiniKQL::TComputationNodeFactory ComputationFactory;
4350
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
4451
NYql::TTaskTransformFactory DqTaskTransformFactory;
52+
NYql::TPqGatewayConfig PqGatewayConfig;
53+
NYql::IPqGateway::TPtr PqGateway;
54+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
55+
std::shared_ptr<NYdb::TDriver> Driver;
4556
};
4657

4758
struct IKqpFederatedQuerySetupFactory {
@@ -81,6 +92,10 @@ namespace NKikimr::NKqp {
8192
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
8293
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
8394
NYql::TTaskTransformFactory DqTaskTransformFactory;
95+
NYql::TPqGatewayConfig PqGatewayConfig;
96+
NYql::IPqGateway::TPtr PqGateway;
97+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
98+
std::shared_ptr<NYdb::TDriver> Driver;
8499
};
85100

86101
struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
@@ -99,7 +114,11 @@ namespace NKikimr::NKqp {
99114
const NYql::ISolomonGateway::TPtr& solomonGateway,
100115
NMiniKQL::TComputationNodeFactory computationFactory,
101116
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
102-
NYql::TTaskTransformFactory dqTaskTransformFactory)
117+
NYql::TTaskTransformFactory dqTaskTransformFactory,
118+
const NYql::TPqGatewayConfig& pqGatewayConfig,
119+
NYql::IPqGateway::TPtr pqGateway,
120+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr actorSystemPtr,
121+
std::shared_ptr<NYdb::TDriver> driver)
103122
: HttpGateway(httpGateway)
104123
, ConnectorClient(connectorClient)
105124
, CredentialsFactory(credentialsFactory)
@@ -113,6 +132,10 @@ namespace NKikimr::NKqp {
113132
, ComputationFactory(computationFactory)
114133
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
115134
, DqTaskTransformFactory(dqTaskTransformFactory)
135+
, PqGatewayConfig(pqGatewayConfig)
136+
, PqGateway(pqGateway)
137+
, ActorSystemPtr(actorSystemPtr)
138+
, Driver(driver)
116139
{
117140
}
118141

@@ -122,7 +145,7 @@ namespace NKikimr::NKqp {
122145
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
123146
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
124147
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
125-
DqTaskTransformFactory};
148+
DqTaskTransformFactory, PqGatewayConfig, PqGateway, ActorSystemPtr, Driver};
126149
}
127150

128151
private:
@@ -139,6 +162,10 @@ namespace NKikimr::NKqp {
139162
NMiniKQL::TComputationNodeFactory ComputationFactory;
140163
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
141164
NYql::TTaskTransformFactory DqTaskTransformFactory;
165+
NYql::TPqGatewayConfig PqGatewayConfig;
166+
NYql::IPqGateway::TPtr PqGateway;
167+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
168+
std::shared_ptr<NYdb::TDriver> Driver;
142169
};
143170

144171
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(

ydb/core/kqp/federated_query/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ PEERDIR(
1313
ydb/library/yql/providers/common/http_gateway
1414
ydb/library/yql/providers/generic/connector/libcpp
1515
ydb/library/yql/providers/s3/actors_factory
16+
ydb/library/yql/providers/pq/gateway/native
1617
ydb/library/yql/providers/solomon/gateway
1718
yql/essentials/core/dq_integration/transform
1819
yql/essentials/public/issue

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include <yql/essentials/providers/common/codec/yql_codec.h>
1919
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2020
#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
21-
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
21+
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
22+
#include <ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.h>
23+
#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
2224
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
2325
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2426
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
@@ -30,11 +32,9 @@
3032
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
3133
#include <ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h>
3234
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
33-
3435
#include <library/cpp/cache/cache.h>
3536
#include <library/cpp/random_provider/random_provider.h>
3637
#include <library/cpp/time_provider/time_provider.h>
37-
3838
namespace NKikimr {
3939
namespace NKqp {
4040

@@ -1912,6 +1912,23 @@ class TKqpHost : public IKqpHost {
19121912
TypesCtx->AddDataSink(NYql::SolomonProviderName, NYql::CreateSolomonDataSink(solomonState));
19131913
}
19141914

1915+
void InitPqProvider() {
1916+
TString sessionId = CreateGuidAsString();
1917+
auto state = MakeIntrusive<TPqState>(sessionId);
1918+
state->SupportRtmrMode = false;
1919+
state->Types = TypesCtx.Get();
1920+
state->DbResolver = FederatedQuerySetup->DatabaseAsyncResolver;
1921+
state->FunctionRegistry = FuncRegistry;
1922+
// state->Disposition = disposition;
1923+
state->Configuration->Init(FederatedQuerySetup->PqGatewayConfig, TypesCtx, state->DbResolver, state->DatabaseIds);
1924+
state->Gateway = FederatedQuerySetup->PqGateway;;
1925+
state->DqIntegration = NYql::CreatePqDqIntegration(state);
1926+
state->Gateway->OpenSession(sessionId, "username");
1927+
1928+
TypesCtx->AddDataSource(NYql::PqProviderName, NYql::CreatePqDataSource(state, state->Gateway));
1929+
TypesCtx->AddDataSink(NYql::PqProviderName, NYql::CreatePqDataSink(state, state->Gateway));
1930+
}
1931+
19151932
void Init(EKikimrQueryType queryType) {
19161933
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
19171934
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);
@@ -1956,6 +1973,9 @@ class TKqpHost : public IKqpHost {
19561973
if (FederatedQuerySetup->SolomonGateway) {
19571974
InitSolomonProvider();
19581975
}
1976+
if (FederatedQuerySetup->PqGateway) {
1977+
InitPqProvider();
1978+
}
19591979
}
19601980

19611981
InitPgProvider();

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,6 +1161,9 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TStringBuf datab
11611161
TKiExploreTxResults txExplore;
11621162
txExplore.ConcurrentResults = concurrentResults;
11631163
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
1164+
if (txExplore.HasErrors) {
1165+
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "ExploreTx failed"));
1166+
}
11641167
return txExplore.HasErrors ? nullptr : node.Ptr();
11651168
}
11661169

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
7272
if (initializeHttpGateway) {
7373
httpGateway = MakeHttpGateway(appConfig->GetQueryServiceConfig().GetHttpGateway(), settings.CountersRoot);
7474
}
75+
auto driver = std::make_shared<NYdb::TDriver>(NYdb::TDriverConfig());
7576

7677
auto federatedQuerySetupFactory = std::make_shared<TKqpFederatedQuerySetupFactoryMock>(
7778
httpGateway,
@@ -86,7 +87,11 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
8687
nullptr,
8788
nullptr,
8889
NYql::NDq::CreateReadActorFactoryConfig(appConfig->GetQueryServiceConfig().GetS3()),
89-
nullptr);
90+
nullptr,
91+
appConfig->GetQueryServiceConfig().GetPq(),
92+
NKqp::MakePqGateway(driver, appConfig->GetQueryServiceConfig().GetPq()),
93+
nullptr,
94+
driver);
9095

9196
settings
9297
.SetFeatureFlags(featureFlags)
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
3+
#include <ydb/core/kqp/ut/federated_query/common/common.h>
4+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
5+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
6+
7+
#include <fmt/format.h>
8+
9+
namespace NKikimr::NKqp {
10+
11+
using namespace NYdb;
12+
using namespace NYdb::NQuery;
13+
using namespace NKikimr::NKqp::NFederatedQueryTest;
14+
using namespace fmt::literals;
15+
16+
Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) {
17+
Y_UNIT_TEST(CreateExternalDataSourceDatastreamsAndReadTopic) {
18+
NKikimrConfig::TAppConfig appCfg;
19+
auto kikimr = NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, NYql::NDq::CreateS3ActorsFactory());
20+
21+
auto db = kikimr->GetTableClient();
22+
auto session = db.CreateSession().GetValueSync().GetSession();
23+
TString sourceName = "sourceName";
24+
TString topicName = "topicName";
25+
TString tableName = "tableName";
26+
27+
auto driverConfig = TDriverConfig();
28+
29+
NYdb::NTopic::TTopicClientSettings opts;
30+
opts.DiscoveryEndpoint(GetEnv("YDB_ENDPOINT"))
31+
.Database(GetEnv("YDB_DATABASE"));
32+
33+
TDriver driver(driverConfig);
34+
NYdb::NTopic::TTopicClient topicClient(driver, opts);
35+
36+
auto topicSettings = NYdb::NTopic::TCreateTopicSettings()
37+
.PartitioningSettings(1, 1);
38+
39+
auto status = topicClient
40+
.CreateTopic(topicName, topicSettings)
41+
.GetValueSync();
42+
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString());
43+
44+
auto query = TStringBuilder() << R"(
45+
CREATE EXTERNAL DATA SOURCE `)" << sourceName << R"(` WITH (
46+
SOURCE_TYPE="DataStreams",
47+
LOCATION=")" << GetEnv("YDB_ENDPOINT") << R"(",
48+
DATABASE_NAME=")" << GetEnv("YDB_DATABASE") << R"(",
49+
AUTH_METHOD="NONE"
50+
);)";
51+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
52+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
53+
auto settings = TExecuteScriptSettings().StatsMode(EStatsMode::Basic);
54+
55+
const TString sql = fmt::format(R"(
56+
SELECT * FROM `{source}`.`{topic}`
57+
WITH (
58+
FORMAT="json_each_row",
59+
SCHEMA=(
60+
key String NOT NULL,
61+
value String NOT NULL
62+
))
63+
LIMIT 1;
64+
)", "source"_a=sourceName, "topic"_a=topicName);
65+
66+
auto queryClient = kikimr->GetQueryClient();
67+
auto scriptExecutionOperation = queryClient.ExecuteScript(sql, settings).ExtractValueSync();
68+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
69+
70+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
71+
72+
auto writeSettings = NYdb::NTopic::TWriteSessionSettings().Path(topicName);
73+
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
74+
topicSession->Write(NYdb::NTopic::TWriteMessage(R"({"key":"key1", "value": "value1"})"));
75+
76+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
77+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
78+
TFetchScriptResultsResult results = queryClient.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
79+
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
80+
81+
TResultSetParser resultSet(results.ExtractResultSet());
82+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);
83+
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
84+
UNIT_ASSERT(resultSet.TryNextRow());
85+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetString(), "key1");
86+
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(1).GetString(), "value1");
87+
}
88+
}
89+
90+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)