Skip to content

YQ-4317 PQ provider to ydb #18955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
8dcf42a
add pq provider to ydb
kardymonds May 28, 2025
3281f9a
add test
kardymonds May 29, 2025
ecd06b8
next wip
kardymonds May 29, 2025
bcd2c9b
next wip
kardymonds May 30, 2025
f424920
try to fix build
kardymonds May 30, 2025
b8a01f4
next wip
kardymonds Jun 2, 2025
40d2a04
try to fix build
kardymonds Jun 2, 2025
d03f68f
Merge remote-tracking branch 'upstream/main' into YQ-4317-Pq-provider
kardymonds Jun 2, 2025
429190c
fix test
kardymonds Jun 3, 2025
807f118
fix ya.make
kardymonds Jun 3, 2025
b831544
try to fix build
kardymonds Jun 3, 2025
4b8adb0
try to fix build noch einmal
kardymonds Jun 3, 2025
0b9a450
try to fix build
kardymonds Jun 4, 2025
571ded2
add logger to ya.make
kardymonds Jun 4, 2025
ba5c2f6
wip: use ydb provider
kardymonds Jun 5, 2025
282f82d
lambda test
kardymonds Jun 5, 2025
b273e5e
add Datastreams test
kardymonds Jun 5, 2025
84e019b
move to helpers
kardymonds Jun 5, 2025
a7633ed
add available external source test
kardymonds Jun 6, 2025
7b8d37e
fix multiple partiton
kardymonds Jun 6, 2025
478089a
fix new fields test
kardymonds Jun 6, 2025
460ef93
Merge remote-tracking branch 'upstream/main' into YQ-4317-Pq-provider
kardymonds Jun 6, 2025
20a21cd
try to fix build
kardymonds Jun 9, 2025
25e3ff9
add basic auth
kardymonds Jun 9, 2025
c8b963c
fix old tests
kardymonds Jun 9, 2025
8b60cc5
fix tests
kardymonds Jun 9, 2025
adbd6fd
edit ya.make
kardymonds Jun 9, 2025
697bbff
edit ya.make
kardymonds Jun 9, 2025
659fe19
fix ya.make
kardymonds Jun 10, 2025
5b91c3e
fix by pr review
kardymonds Jun 10, 2025
f076b23
Merge remote-tracking branch 'upstream/main' into YQ-4317-Pq-provider
kardymonds Jun 11, 2025
0176b95
fix source
kardymonds Jun 11, 2025
a3ade91
remove changes
kardymonds Jun 11, 2025
e1819f2
Merge remote-tracking branch 'upstream/main' into YQ-4317-Pq-provider
kardymonds Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "validation_functions.h"

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>

namespace NKikimr::NExternalSource {

Expand Down Expand Up @@ -62,6 +63,9 @@ struct TExternalDataSource : public IExternalSource {
throw TExternalSourceException() << proto.GetSourceType() << " source must provide service_name";
}

if (proto.GetSourceType() == ToString(NExternalSource::YdbTopicsType)) {
throw TExternalSourceException() << "External source with type " << proto.GetSourceType() << " is not allowed, use " << ToString(NYql::EDatabaseType::Ydb) << " source type to read from topics ";
}
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

namespace NKikimr::NExternalSource {

constexpr TStringBuf YdbTopicsType = "YdbTopics";

struct TExternalSourceException: public yexception {
};

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
},
{
ToString(NYql::EDatabaseType::Ydb),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC", "SERVICE_ACCOUNT"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"NONE", "BASIC", "SERVICE_ACCOUNT", "TOKEN"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::YT),
Expand Down Expand Up @@ -160,6 +160,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
{
ToString(NYql::EDatabaseType::OpenSearch),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
},
{
ToString(YdbTopicsType),
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "BASIC", "TOKEN"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
}
},
allExternalDataSourcesAreAvailable,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
QueryId.Cluster, TlsActivationContext->ActorSystem(), Config, true, TempTablesState, FederatedQuerySetup);
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Settings.QueryType, QueryId.Database, QueryId.DatabaseId, std::move(loader),
ctx.ActorSystem(), ctx.SelfID.NodeId(), counters, QueryServiceConfig);
Gateway->SetToken(QueryId.Cluster, UserToken);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>

#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>

namespace NKikimr {
namespace NMiniKQL {
Expand Down Expand Up @@ -88,6 +89,8 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
}

NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
}

return factory;
Expand Down
59 changes: 57 additions & 2 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/library/actors/http/http_proxy.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/database_type.h>
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>

#include <ydb/core/base/counters.h>
#include <ydb/core/base/feature_flags.h>
Expand All @@ -13,7 +14,6 @@
#include <ydb/core/fq/libs/db_id_async_resolver_impl/http_proxy.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>
#include <ydb/library/actors/http/http_proxy.h>

#include <yql/essentials/public/issue/yql_issue_utils.h>

#include <yt/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
Expand Down Expand Up @@ -71,6 +71,16 @@ namespace NKikimr::NKqp {
return NYql::IHTTPGateway::Make(&httpGatewayConfig, httpGatewayGroup);
}

NYql::IPqGateway::TPtr MakePqGateway(const std::shared_ptr<NYdb::TDriver>& driver, const NYql::TPqGatewayConfig& pqGatewayConfig) {
NYql::TPqGatewayServices pqServices(
*driver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(pqGatewayConfig),
nullptr);
return CreatePqNativeGateway(pqServices);
}

NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
NYql::THttpGatewayConfig config;
config.SetMaxInFlightCount(2000);
Expand Down Expand Up @@ -119,6 +129,14 @@ namespace NKikimr::NKqp {
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);
DqTaskTransformFactory = NYql::CreateYtDqTaskTransformFactory(true);

ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
NYdb::TDriverConfig cfg;
cfg.SetLog(std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
Driver = std::make_shared<NYdb::TDriver>(cfg);

PqGatewayConfig = NYql::TPqGatewayConfig{};
PqGateway = MakePqGateway(Driver, NYql::TPqGatewayConfig{});

// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
Expand Down Expand Up @@ -175,7 +193,11 @@ namespace NKikimr::NKqp {
SolomonGateway,
nullptr,
S3ReadActorFactoryConfig,
DqTaskTransformFactory};
DqTaskTransformFactory,
PqGatewayConfig,
PqGateway,
ActorSystemPtr,
Driver};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbEndpointGenerator &&
Expand Down Expand Up @@ -270,4 +292,37 @@ namespace NKikimr::NKqp {
return issues;
}

NThreading::TFuture<TGetSchemeEntryResult> GetSchemeEntryType(
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TString& endpoint,
const TString& database,
bool useTls,
const TString& structuredTokenJson,
const TString& path) {

if (!federatedQuerySetup || !federatedQuerySetup->Driver) {
return NThreading::MakeFuture<TGetSchemeEntryResult>(Nothing());
}
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYql::CreateCredentialsProviderFactoryForStructuredToken(nullptr, structuredTokenJson, false);
auto driver = federatedQuerySetup->Driver;

NYdb::TCommonClientSettings opts;
opts
.DiscoveryEndpoint(endpoint)
.Database(database)
.SslCredentials(NYdb::TSslCredentials(useTls))
.CredentialsProviderFactory(credentialsProviderFactory);
auto schemeClient = NYdb::NScheme::TSchemeClient(*driver, opts);
return schemeClient.DescribePath(path)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem()](const NThreading::TFuture<NYdb::NScheme::TDescribePathResult>& result) {
auto describePathResult = result.GetValue();
if (!describePathResult.IsSuccess()) {
LOG_WARN_S(*actorSystem, NKikimrServices::KQP_GATEWAY, "DescribePath failed, " << describePathResult.GetIssues().ToString());
return NThreading::MakeFuture<TGetSchemeEntryResult>(Nothing());
}
NYdb::NScheme::TSchemeEntry entry = describePathResult.GetEntry();
return NThreading::MakeFuture<TGetSchemeEntryResult>(entry.Type);
});
};

} // namespace NKikimr::NKqp
41 changes: 39 additions & 2 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <ydb/library/yql/providers/common/db_id_async_resolver/mdb_endpoint_generator.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>

#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
Expand All @@ -15,6 +18,7 @@
#include <yql/essentials/public/issue/yql_issue_message.h>

#include <yt/yql/providers/yt/provider/yql_yt_gateway.h>
#include <ydb/library/logger/actor.h>

namespace NKikimrConfig {
class TQueryServiceConfig;
Expand All @@ -28,6 +32,9 @@ namespace NKikimr::NKqp {

NYql::IHTTPGateway::TPtr MakeHttpGateway(const NYql::THttpGatewayConfig& httpGatewayConfig, NMonitoring::TDynamicCounterPtr countersRoot);

NYql::IPqGateway::TPtr MakePqGateway(const std::shared_ptr<NYdb::TDriver>& driver, const NYql::TPqGatewayConfig& pqGatewayConfig);


struct TKqpFederatedQuerySetup {
NYql::IHTTPGateway::TPtr HttpGateway;
NYql::NConnector::IClient::TPtr ConnectorClient;
Expand All @@ -42,6 +49,10 @@ namespace NKikimr::NKqp {
NMiniKQL::TComputationNodeFactory ComputationFactory;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
NYql::TPqGatewayConfig PqGatewayConfig;
NYql::IPqGateway::TPtr PqGateway;
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
std::shared_ptr<NYdb::TDriver> Driver;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -81,6 +92,10 @@ namespace NKikimr::NKqp {
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
NYql::TPqGatewayConfig PqGatewayConfig;
NYql::IPqGateway::TPtr PqGateway;
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
std::shared_ptr<NYdb::TDriver> Driver;
};

struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
Expand All @@ -99,7 +114,11 @@ namespace NKikimr::NKqp {
const NYql::ISolomonGateway::TPtr& solomonGateway,
NMiniKQL::TComputationNodeFactory computationFactory,
const NYql::NDq::TS3ReadActorFactoryConfig& s3ReadActorFactoryConfig,
NYql::TTaskTransformFactory dqTaskTransformFactory)
NYql::TTaskTransformFactory dqTaskTransformFactory,
const NYql::TPqGatewayConfig& pqGatewayConfig,
NYql::IPqGateway::TPtr pqGateway,
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr actorSystemPtr,
std::shared_ptr<NYdb::TDriver> driver)
: HttpGateway(httpGateway)
, ConnectorClient(connectorClient)
, CredentialsFactory(credentialsFactory)
Expand All @@ -113,6 +132,10 @@ namespace NKikimr::NKqp {
, ComputationFactory(computationFactory)
, S3ReadActorFactoryConfig(s3ReadActorFactoryConfig)
, DqTaskTransformFactory(dqTaskTransformFactory)
, PqGatewayConfig(pqGatewayConfig)
, PqGateway(pqGateway)
, ActorSystemPtr(actorSystemPtr)
, Driver(driver)
{
}

Expand All @@ -122,7 +145,7 @@ namespace NKikimr::NKqp {
DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig,
YtGatewayConfig, YtGateway, SolomonGatewayConfig,
SolomonGateway, ComputationFactory, S3ReadActorFactoryConfig,
DqTaskTransformFactory};
DqTaskTransformFactory, PqGatewayConfig, PqGateway, ActorSystemPtr, Driver};
}

private:
Expand All @@ -139,6 +162,10 @@ namespace NKikimr::NKqp {
NMiniKQL::TComputationNodeFactory ComputationFactory;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
NYql::TTaskTransformFactory DqTaskTransformFactory;
NYql::TPqGatewayConfig PqGatewayConfig;
NYql::IPqGateway::TPtr PqGateway;
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
std::shared_ptr<NYdb::TDriver> Driver;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
Expand All @@ -162,4 +189,14 @@ namespace NKikimr::NKqp {

NYql::TIssues ValidateResultSetColumns(const google::protobuf::RepeatedPtrField<Ydb::Column>& columns, ui32 maxNestingDepth = 90);

using TGetSchemeEntryResult = TMaybe<NYdb::NScheme::ESchemeEntryType>;

NThreading::TFuture<TGetSchemeEntryResult> GetSchemeEntryType(
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TString& endpoint,
const TString& database,
bool useTls,
const TString& structuredTokenJson,
const TString& path);

} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/federated_query/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ PEERDIR(
ydb/core/fq/libs/grpc
ydb/core/fq/libs/db_id_async_resolver_impl
ydb/library/db_pool/protos
ydb/library/logger
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/generic/connector/libcpp
ydb/library/yql/providers/s3/actors_factory
ydb/library/yql/providers/pq/gateway/native
ydb/library/yql/providers/solomon/gateway
yql/essentials/core/dq_integration/transform
yql/essentials/public/issue
Expand Down
Loading
Loading