Skip to content

Commit 33f0744

Browse files
authored
Merge bcd2c9b into cde1dc2
2 parents cde1dc2 + bcd2c9b commit 33f0744

File tree

20 files changed

+331
-28
lines changed

20 files changed

+331
-28
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
156156
{
157157
ToString(NYql::EDatabaseType::MongoDB),
158158
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx)
159+
},
160+
{
161+
ToString(NYql::EDatabaseType::DataStreams),
162+
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "TOKEN"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
159163
}
160164
},
161165
allExternalDataSourcesAreAvailable,

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
1414
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
1515
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
16-
16+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
17+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
1718

1819
namespace NKikimr {
1920
namespace NMiniKQL {
@@ -87,6 +88,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
8788
}
8889

8990
NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
91+
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
92+
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
9093
}
9194

9295
return factory;

ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/library/actors/http/http_proxy.h>
44
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>
5+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
56

67
#include <ydb/core/base/counters.h>
78
#include <ydb/core/base/feature_flags.h>
@@ -13,7 +14,6 @@
1314
#include <ydb/core/fq/libs/db_id_async_resolver_impl/http_proxy.h>
1415
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
1516
#include <ydb/library/actors/http/http_proxy.h>
16-
1717
#include <yql/essentials/public/issue/yql_issue_utils.h>
1818

1919
#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
@@ -71,6 +71,18 @@ namespace NKikimr::NKqp {
7171
return NYql::IHTTPGateway::Make(&httpGatewayConfig, httpGatewayGroup);
7272
}
7373

74+
NYql::IPqGateway::TPtr MakePqGateway(const NYql::TPqGatewayConfig& pqGatewayConfig) {
75+
NYdb::TDriverConfig config;
76+
NYdb::TDriver driver(config);
77+
NYql::TPqGatewayServices pqServices(
78+
driver,
79+
nullptr,
80+
nullptr,
81+
std::make_shared<NYql::TPqGatewayConfig>(pqGatewayConfig),
82+
nullptr);
83+
return CreatePqNativeGateway(pqServices);
84+
}
85+
7486
NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
7587
NYql::THttpGatewayConfig config;
7688
config.SetMaxInFlightCount(2000);
@@ -119,6 +131,14 @@ namespace NKikimr::NKqp {
119131
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
120132
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);
121133

134+
PqGatewayConfig = queryServiceConfig.GetPq();
135+
PqGateway = MakePqGateway(PqGatewayConfig);
136+
137+
ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
138+
NYdb::TDriverConfig cfg;
139+
cfg.SetLog(std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
140+
Driver = std::make_shared<NYdb::TDriver>(cfg);
141+
122142
// Initialize Token Accessor
123143
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
124144
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
@@ -175,7 +195,11 @@ namespace NKikimr::NKqp {
175195
SolomonGateway,
176196
nullptr,
177197
S3ReadActorFactoryConfig,
178-
DqTaskTransformFactory};
198+
DqTaskTransformFactory,
199+
PqGatewayConfig,
200+
PqGateway,
201+
ActorSystemPtr,
202+
Driver};
179203

180204
// Init DatabaseAsyncResolver only if all requirements are met
181205
if (DatabaseResolverActorId && MdbEndpointGenerator &&

ydb/core/kqp/federated_query/kqp_federated_query_helpers.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
77
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
88
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
9+
10+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
11+
912
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
1013
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1114
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
@@ -15,6 +18,7 @@
1518
#include <yql/essentials/public/issue/yql_issue_message.h>
1619

1720
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
21+
#include <ydb/library/logger/actor.h>
1822

1923
namespace NKikimrConfig {
2024
class TQueryServiceConfig;
@@ -28,6 +32,9 @@ namespace NKikimr::NKqp {
2832

2933
NYql::IHTTPGateway::TPtr MakeHttpGateway(const NYql::THttpGatewayConfig& httpGatewayConfig, NMonitoring::TDynamicCounterPtr countersRoot);
3034

35+
NYql::IPqGateway::TPtr MakePqGateway(const NYql::TPqGatewayConfig& pqGatewayConfig);
36+
37+
3138
struct TKqpFederatedQuerySetup {
3239
NYql::IHTTPGateway::TPtr HttpGateway;
3340
NYql::NConnector::IClient::TPtr ConnectorClient;
@@ -42,6 +49,10 @@ namespace NKikimr::NKqp {
4249
NMiniKQL::TComputationNodeFactory ComputationFactory;
4350
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
4451
NYql::TTaskTransformFactory DqTaskTransformFactory;
52+
NYql::TPqGatewayConfig PqGatewayConfig;
53+
NYql::IPqGateway::TPtr PqGateway;
54+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
55+
std::shared_ptr<NYdb::TDriver> Driver;
4556
};
4657

4758
struct IKqpFederatedQuerySetupFactory {
@@ -81,6 +92,10 @@ namespace NKikimr::NKqp {
8192
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
8293
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
8394
NYql::TTaskTransformFactory DqTaskTransformFactory;
95+
NYql::TPqGatewayConfig PqGatewayConfig;
96+
NYql::IPqGateway::TPtr PqGateway;
97+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
98+
std::shared_ptr<NYdb::TDriver> Driver;
8499
};
85100

86101
struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
@@ -99,7 +114,11 @@ namespace NKikimr::NKqp {
99114
const NYql::ISolomonGateway::TPtr& solomonGateway,
100115
NMiniKQL::TComputationNodeFactory computationFactory,
101116
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
102-
NYql::TTaskTransformFactory dqTaskTransformFactory)
117+
NYql::TTaskTransformFactory dqTaskTransformFactory,
118+
const NYql::TPqGatewayConfig& pqGatewayConfig,
119+
NYql::IPqGateway::TPtr pqGateway,
120+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr actorSystemPtr,
121+
std::shared_ptr<NYdb::TDriver> driver)
103122
: HttpGateway(httpGateway)
104123
, ConnectorClient(connectorClient)
105124
, CredentialsFactory(credentialsFactory)
@@ -113,6 +132,10 @@ namespace NKikimr::NKqp {
113132
, ComputationFactory(computationFactory)
114133
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
115134
, DqTaskTransformFactory(dqTaskTransformFactory)
135+
, PqGatewayConfig(pqGatewayConfig)
136+
, PqGateway(pqGateway)
137+
, ActorSystemPtr(actorSystemPtr)
138+
, Driver(driver)
116139
{
117140
}
118141

@@ -122,7 +145,7 @@ namespace NKikimr::NKqp {
122145
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
123146
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
124147
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
125-
DqTaskTransformFactory};
148+
DqTaskTransformFactory, PqGatewayConfig, PqGateway, ActorSystemPtr, Driver};
126149
}
127150

128151
private:
@@ -139,6 +162,10 @@ namespace NKikimr::NKqp {
139162
NMiniKQL::TComputationNodeFactory ComputationFactory;
140163
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
141164
NYql::TTaskTransformFactory DqTaskTransformFactory;
165+
NYql::TPqGatewayConfig PqGatewayConfig;
166+
NYql::IPqGateway::TPtr PqGateway;
167+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
168+
std::shared_ptr<NYdb::TDriver> Driver;
142169
};
143170

144171
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(

ydb/core/kqp/federated_query/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ PEERDIR(
1313
ydb/library/yql/providers/common/http_gateway
1414
ydb/library/yql/providers/generic/connector/libcpp
1515
ydb/library/yql/providers/s3/actors_factory
16+
ydb/library/yql/providers/pq/gateway/native
1617
ydb/library/yql/providers/solomon/gateway
1718
yql/essentials/core/dq_integration/transform
1819
yql/essentials/public/issue

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include <yql/essentials/providers/common/codec/yql_codec.h>
1919
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2020
#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
21-
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
21+
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
22+
#include <ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.h>
23+
#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
2224
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
2325
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2426
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
@@ -30,11 +32,9 @@
3032
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
3133
#include <ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h>
3234
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
33-
3435
#include <library/cpp/cache/cache.h>
3536
#include <library/cpp/random_provider/random_provider.h>
3637
#include <library/cpp/time_provider/time_provider.h>
37-
3838
namespace NKikimr {
3939
namespace NKqp {
4040

@@ -1912,6 +1912,23 @@ class TKqpHost : public IKqpHost {
19121912
TypesCtx->AddDataSink(NYql::SolomonProviderName, NYql::CreateSolomonDataSink(solomonState));
19131913
}
19141914

1915+
void InitPqProvider() {
1916+
TString sessionId = CreateGuidAsString();
1917+
auto state = MakeIntrusive<TPqState>(sessionId);
1918+
state->SupportRtmrMode = false;
1919+
state->Types = TypesCtx.Get();
1920+
state->DbResolver = FederatedQuerySetup->DatabaseAsyncResolver;
1921+
state->FunctionRegistry = FuncRegistry;
1922+
// state->Disposition = disposition;
1923+
state->Configuration->Init(FederatedQuerySetup->PqGatewayConfig, TypesCtx, state->DbResolver, state->DatabaseIds);
1924+
state->Gateway = FederatedQuerySetup->PqGateway;;
1925+
state->DqIntegration = NYql::CreatePqDqIntegration(state);
1926+
state->Gateway->OpenSession(sessionId, "username");
1927+
1928+
TypesCtx->AddDataSource(NYql::PqProviderName, NYql::CreatePqDataSource(state, state->Gateway));
1929+
TypesCtx->AddDataSink(NYql::PqProviderName, NYql::CreatePqDataSink(state, state->Gateway));
1930+
}
1931+
19151932
void Init(EKikimrQueryType queryType) {
19161933
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
19171934
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);
@@ -1956,6 +1973,9 @@ class TKqpHost : public IKqpHost {
19561973
if (FederatedQuerySetup->SolomonGateway) {
19571974
InitSolomonProvider();
19581975
}
1976+
if (FederatedQuerySetup->PqGateway) {
1977+
InitPqProvider();
1978+
}
19591979
}
19601980

19611981
InitPgProvider();

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
8686
nullptr,
8787
nullptr,
8888
NYql::NDq::CreateReadActorFactoryConfig(appConfig->GetQueryServiceConfig().GetS3()),
89-
nullptr);
89+
nullptr,
90+
appConfig->GetQueryServiceConfig().GetPq(),
91+
NKqp::MakePqGateway(appConfig->GetQueryServiceConfig().GetPq()),
92+
nullptr,
93+
std::make_shared<NYdb::TDriver>(NYdb::TDriverConfig())
94+
);
9095

9196
settings
9297
.SetFeatureFlags(featureFlags)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
3+
#include <ydb/core/kqp/ut/federated_query/common/common.h>
4+
#include <yql/essentials/utils/log/log.h>
5+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h>
6+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/operation/operation.h>
7+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
8+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/operation/operation.h>
9+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
10+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
11+
12+
#include <fmt/format.h>
13+
14+
namespace NKikimr::NKqp {
15+
16+
using namespace NYdb;
17+
using namespace NYdb::NQuery;
18+
using namespace NKikimr::NKqp::NFederatedQueryTest;
19+
//using namespace NTestUtils;
20+
using namespace fmt::literals;
21+
22+
Y_UNIT_TEST_SUITE(KqpFederatedQueryDatastreams) {
23+
Y_UNIT_TEST(CreateExternalDataSourceDatastreams) {
24+
NKikimrConfig::TAppConfig appCfg;
25+
auto kikimr = NFederatedQueryTest::MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, NYql::NDq::CreateS3ActorsFactory());
26+
27+
auto db = kikimr->GetTableClient();
28+
// TKikimrRunner kikimr(appCfg);
29+
// kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableExternalDataSources(true);
30+
// auto db = kikimr.GetTableClient();
31+
auto session = db.CreateSession().GetValueSync().GetSession();
32+
TString sourceName = "sourceName";
33+
TString topicName = "topicName";
34+
TString tableName = "tableName";
35+
36+
37+
auto driverConfig = TDriverConfig()
38+
.SetEndpoint(GetEnv("YDB_ENDPOINT"))
39+
.SetDatabase(GetEnv("YDB_DATABASE"))
40+
.SetAuthToken(GetEnv("YDB_TOKEN"));
41+
42+
TDriver driver(driverConfig);
43+
NYdb::NTopic::TTopicClient topicClient(driver);
44+
auto topicSettings = NYdb::NTopic::TCreateTopicSettings()
45+
.PartitioningSettings(1, 1);
46+
47+
auto status = topicClient
48+
.CreateTopic(topicName, topicSettings)
49+
.GetValueSync();
50+
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString());
51+
{
52+
auto result = topicClient.DescribeTopic(topicName).GetValueSync();
53+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
54+
auto& description = result.GetTopicDescription();
55+
56+
Cerr << "GetPartitions " << description.GetPartitions().size() << Endl;
57+
}
58+
59+
auto query = TStringBuilder() << R"(
60+
CREATE EXTERNAL DATA SOURCE `)" << sourceName << R"(` WITH (
61+
SOURCE_TYPE="DataStreams",
62+
LOCATION=")" << GetEnv("YDB_ENDPOINT") << R"(",
63+
DATABASE_NAME=")" << GetEnv("YDB_DATABASE") << R"(",
64+
AUTH_METHOD="NONE"
65+
);)";
66+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
67+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
68+
69+
// TODO:
70+
// auto query2 = TStringBuilder() << R"(
71+
// CREATE EXTERNAL TABLE `)" << tableName << R"(` (
72+
// key Int64 NOT NULL,
73+
// value String NOT NULL ) WITH (
74+
// DATA_SOURCE=")" << sourceName << R"(",
75+
// LOCATION="folder",
76+
// AUTH_METHOD="NONE");)";
77+
// result = session.ExecuteSchemeQuery(query2).GetValueSync();
78+
// UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
79+
80+
81+
Cerr << "YDB_ENDPOINT " << GetEnv("YDB_ENDPOINT") << Endl;
82+
Cerr << "YDB_DATABASE " << GetEnv("YDB_DATABASE") << Endl;
83+
Cerr << "YDB_TOKEN " << GetEnv("YDB_TOKEN") << Endl;
84+
85+
auto settings = TExecuteScriptSettings().StatsMode(EStatsMode::Basic);
86+
87+
const TString sql = fmt::format(R"(
88+
SELECT * FROM `{source}`.`{topic}`
89+
WITH (
90+
FORMAT="json_each_row",
91+
SCHEMA=(
92+
key String NOT NULL,
93+
value String NOT NULL
94+
))
95+
LIMIT 1;
96+
)", "source"_a=sourceName, "topic"_a=topicName);
97+
98+
auto queryClient = kikimr->GetQueryClient();
99+
auto scriptExecutionOperation = queryClient.ExecuteScript(sql, settings).ExtractValueSync();
100+
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
101+
102+
auto writeSettings = NYdb::NTopic::TWriteSessionSettings().Path(topicName);
103+
auto topicSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
104+
topicSession->Write(NYdb::NTopic::TWriteMessage("message_4.1"));
105+
106+
107+
108+
NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
109+
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
110+
TFetchScriptResultsResult results = queryClient.FetchScriptResults(scriptExecutionOperation.Id(), 0).ExtractValueSync();
111+
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
112+
113+
114+
}
115+
}
116+
117+
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)