Skip to content

Commit bbbffe3

Browse files
authored
Merge b8a01f4 into 11fc6cb
2 parents 11fc6cb + b8a01f4 commit bbbffe3

24 files changed

+425
-50
lines changed

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
156156
{
157157
ToString(NYql::EDatabaseType::MongoDB),
158158
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx)
159+
},
160+
{
161+
ToString(NYql::EDatabaseType::DataStreams),
162+
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "TOKEN"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
159163
}
160164
},
161165
allExternalDataSourcesAreAvailable,

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
1414
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
1515
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
16-
16+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
17+
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
1718

1819
namespace NKikimr {
1920
namespace NMiniKQL {
@@ -87,6 +88,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
8788
}
8889

8990
NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
91+
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
92+
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
9093
}
9194

9295
return factory;

ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <ydb/library/actors/http/http_proxy.h>
44
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>
5+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
56

67
#include <ydb/core/base/counters.h>
78
#include <ydb/core/base/feature_flags.h>
@@ -13,7 +14,6 @@
1314
#include <ydb/core/fq/libs/db_id_async_resolver_impl/http_proxy.h>
1415
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
1516
#include <ydb/library/actors/http/http_proxy.h>
16-
1717
#include <yql/essentials/public/issue/yql_issue_utils.h>
1818

1919
#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
@@ -71,6 +71,18 @@ namespace NKikimr::NKqp {
7171
return NYql::IHTTPGateway::Make(&httpGatewayConfig, httpGatewayGroup);
7272
}
7373

74+
NYql::IPqGateway::TPtr MakePqGateway(const NYql::TPqGatewayConfig& pqGatewayConfig) {
75+
NYdb::TDriverConfig config;
76+
NYdb::TDriver driver(config);
77+
NYql::TPqGatewayServices pqServices(
78+
driver,
79+
nullptr,
80+
nullptr,
81+
std::make_shared<NYql::TPqGatewayConfig>(pqGatewayConfig),
82+
nullptr);
83+
return CreatePqNativeGateway(pqServices);
84+
}
85+
7486
NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
7587
NYql::THttpGatewayConfig config;
7688
config.SetMaxInFlightCount(2000);
@@ -119,6 +131,14 @@ namespace NKikimr::NKqp {
119131
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
120132
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);
121133

134+
PqGatewayConfig = queryServiceConfig.GetPq();
135+
PqGateway = MakePqGateway(PqGatewayConfig);
136+
137+
ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
138+
NYdb::TDriverConfig cfg;
139+
cfg.SetLog(std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
140+
Driver = std::make_shared<NYdb::TDriver>(cfg);
141+
122142
// Initialize Token Accessor
123143
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
124144
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
@@ -175,7 +195,11 @@ namespace NKikimr::NKqp {
175195
SolomonGateway,
176196
nullptr,
177197
S3ReadActorFactoryConfig,
178-
DqTaskTransformFactory};
198+
DqTaskTransformFactory,
199+
PqGatewayConfig,
200+
PqGateway,
201+
ActorSystemPtr,
202+
Driver};
179203

180204
// Init DatabaseAsyncResolver only if all requirements are met
181205
if (DatabaseResolverActorId && MdbEndpointGenerator &&

ydb/core/kqp/federated_query/kqp_federated_query_helpers.h

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
77
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
88
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
9+
10+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
11+
912
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
1013
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
1114
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
@@ -15,6 +18,7 @@
1518
#include <yql/essentials/public/issue/yql_issue_message.h>
1619

1720
#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
21+
#include <ydb/library/logger/actor.h>
1822

1923
namespace NKikimrConfig {
2024
class TQueryServiceConfig;
@@ -28,6 +32,9 @@ namespace NKikimr::NKqp {
2832

2933
NYql::IHTTPGateway::TPtr MakeHttpGateway(const NYql::THttpGatewayConfig& httpGatewayConfig, NMonitoring::TDynamicCounterPtr countersRoot);
3034

35+
NYql::IPqGateway::TPtr MakePqGateway(const NYql::TPqGatewayConfig& pqGatewayConfig);
36+
37+
3138
struct TKqpFederatedQuerySetup {
3239
NYql::IHTTPGateway::TPtr HttpGateway;
3340
NYql::NConnector::IClient::TPtr ConnectorClient;
@@ -42,6 +49,10 @@ namespace NKikimr::NKqp {
4249
NMiniKQL::TComputationNodeFactory ComputationFactory;
4350
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
4451
NYql::TTaskTransformFactory DqTaskTransformFactory;
52+
NYql::TPqGatewayConfig PqGatewayConfig;
53+
NYql::IPqGateway::TPtr PqGateway;
54+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
55+
std::shared_ptr<NYdb::TDriver> Driver;
4556
};
4657

4758
struct IKqpFederatedQuerySetupFactory {
@@ -81,6 +92,10 @@ namespace NKikimr::NKqp {
8192
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
8293
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
8394
NYql::TTaskTransformFactory DqTaskTransformFactory;
95+
NYql::TPqGatewayConfig PqGatewayConfig;
96+
NYql::IPqGateway::TPtr PqGateway;
97+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
98+
std::shared_ptr<NYdb::TDriver> Driver;
8499
};
85100

86101
struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
@@ -99,7 +114,11 @@ namespace NKikimr::NKqp {
99114
const NYql::ISolomonGateway::TPtr& solomonGateway,
100115
NMiniKQL::TComputationNodeFactory computationFactory,
101116
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
102-
NYql::TTaskTransformFactory dqTaskTransformFactory)
117+
NYql::TTaskTransformFactory dqTaskTransformFactory,
118+
const NYql::TPqGatewayConfig& pqGatewayConfig,
119+
NYql::IPqGateway::TPtr pqGateway,
120+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr actorSystemPtr,
121+
std::shared_ptr<NYdb::TDriver> driver)
103122
: HttpGateway(httpGateway)
104123
, ConnectorClient(connectorClient)
105124
, CredentialsFactory(credentialsFactory)
@@ -113,6 +132,10 @@ namespace NKikimr::NKqp {
113132
, ComputationFactory(computationFactory)
114133
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
115134
, DqTaskTransformFactory(dqTaskTransformFactory)
135+
, PqGatewayConfig(pqGatewayConfig)
136+
, PqGateway(pqGateway)
137+
, ActorSystemPtr(actorSystemPtr)
138+
, Driver(driver)
116139
{
117140
}
118141

@@ -122,7 +145,7 @@ namespace NKikimr::NKqp {
122145
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
123146
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
124147
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
125-
DqTaskTransformFactory};
148+
DqTaskTransformFactory, PqGatewayConfig, PqGateway, ActorSystemPtr, Driver};
126149
}
127150

128151
private:
@@ -139,6 +162,10 @@ namespace NKikimr::NKqp {
139162
NMiniKQL::TComputationNodeFactory ComputationFactory;
140163
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
141164
NYql::TTaskTransformFactory DqTaskTransformFactory;
165+
NYql::TPqGatewayConfig PqGatewayConfig;
166+
NYql::IPqGateway::TPtr PqGateway;
167+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
168+
std::shared_ptr<NYdb::TDriver> Driver;
142169
};
143170

144171
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(

ydb/core/kqp/federated_query/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ PEERDIR(
1313
ydb/library/yql/providers/common/http_gateway
1414
ydb/library/yql/providers/generic/connector/libcpp
1515
ydb/library/yql/providers/s3/actors_factory
16+
ydb/library/yql/providers/pq/gateway/native
1617
ydb/library/yql/providers/solomon/gateway
1718
yql/essentials/core/dq_integration/transform
1819
yql/essentials/public/issue

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include <yql/essentials/providers/common/codec/yql_codec.h>
1919
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2020
#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
21-
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
21+
#include <ydb/library/yql/dq/opt/dq_opt_join_cbo_factory.h>
22+
#include <ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.h>
23+
#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
2224
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
2325
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
2426
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
@@ -30,11 +32,9 @@
3032
#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
3133
#include <ydb/library/yql/providers/dq/helper/yql_dq_helper_impl.h>
3234
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
33-
3435
#include <library/cpp/cache/cache.h>
3536
#include <library/cpp/random_provider/random_provider.h>
3637
#include <library/cpp/time_provider/time_provider.h>
37-
3838
namespace NKikimr {
3939
namespace NKqp {
4040

@@ -1912,6 +1912,23 @@ class TKqpHost : public IKqpHost {
19121912
TypesCtx->AddDataSink(NYql::SolomonProviderName, NYql::CreateSolomonDataSink(solomonState));
19131913
}
19141914

1915+
void InitPqProvider() {
1916+
TString sessionId = CreateGuidAsString();
1917+
auto state = MakeIntrusive<TPqState>(sessionId);
1918+
state->SupportRtmrMode = false;
1919+
state->Types = TypesCtx.Get();
1920+
state->DbResolver = FederatedQuerySetup->DatabaseAsyncResolver;
1921+
state->FunctionRegistry = FuncRegistry;
1922+
// state->Disposition = disposition;
1923+
state->Configuration->Init(FederatedQuerySetup->PqGatewayConfig, TypesCtx, state->DbResolver, state->DatabaseIds);
1924+
state->Gateway = FederatedQuerySetup->PqGateway;;
1925+
state->DqIntegration = NYql::CreatePqDqIntegration(state);
1926+
state->Gateway->OpenSession(sessionId, "username");
1927+
1928+
TypesCtx->AddDataSource(NYql::PqProviderName, NYql::CreatePqDataSource(state, state->Gateway));
1929+
TypesCtx->AddDataSink(NYql::PqProviderName, NYql::CreatePqDataSink(state, state->Gateway));
1930+
}
1931+
19151932
void Init(EKikimrQueryType queryType) {
19161933
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
19171934
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry, ActorSystem);
@@ -1956,6 +1973,9 @@ class TKqpHost : public IKqpHost {
19561973
if (FederatedQuerySetup->SolomonGateway) {
19571974
InitSolomonProvider();
19581975
}
1976+
if (FederatedQuerySetup->PqGateway) {
1977+
InitPqProvider();
1978+
}
19591979
}
19601980

19611981
InitPgProvider();

ydb/core/kqp/provider/yql_kikimr_opt_build.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,6 @@ bool ExploreTx(TExprBase root, TExprContext& ctx, const TKiDataSink& dataSink, T
784784
};
785785

786786
VisitExpr(root.Ptr(), preFunc, postFunc);
787-
788787
return !hasErrors;
789788
}
790789

@@ -1161,6 +1160,9 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TStringBuf datab
11611160
TKiExploreTxResults txExplore;
11621161
txExplore.ConcurrentResults = concurrentResults;
11631162
if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData, types) || txExplore.HasErrors) {
1163+
if (txExplore.HasErrors) {
1164+
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "ExploreTx failed"));
1165+
}
11641166
return txExplore.HasErrors ? nullptr : node.Ptr();
11651167
}
11661168

ydb/core/kqp/ut/federated_query/common/common.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
8686
nullptr,
8787
nullptr,
8888
NYql::NDq::CreateReadActorFactoryConfig(appConfig->GetQueryServiceConfig().GetS3()),
89-
nullptr);
89+
nullptr,
90+
appConfig->GetQueryServiceConfig().GetPq(),
91+
NKqp::MakePqGateway(appConfig->GetQueryServiceConfig().GetPq()),
92+
nullptr,
93+
std::make_shared<NYdb::TDriver>(NYdb::TDriverConfig())
94+
);
9095

9196
settings
9297
.SetFeatureFlags(featureFlags)

0 commit comments

Comments
 (0)