Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion fabric_rti_mcp/authentication/auth_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from starlette.requests import Request
from starlette.responses import JSONResponse

from fabric_rti_mcp.authentication.token_obo_exchanger import TokenOboExchanger
from fabric_rti_mcp.common import global_config as config
from fabric_rti_mcp.common import logger
from fabric_rti_mcp.config.obo_config import obo_config
from fabric_rti_mcp.kusto.kusto_connection import set_auth_token


Expand Down Expand Up @@ -122,7 +125,33 @@ async def check_auth(

token = extract_token_from_header(auth_header)

# Store the original token without modification
try:

if config.use_obo_flow:
logger.info("Started performing OBO token exchange")
# Create token exchanger and perform OBO token exchange
token_exchanger = TokenOboExchanger()
exchanged_token = await token_exchanger.perform_obo_token_exchange(
user_token=token, resource_uri=obo_config.kusto_audience
)
# Update token to use the exchanged token
token = exchanged_token
logger.info("Successfully performed OBO token exchange")
else:
logger.info("OBO flow not enabled; using original token")

except Exception as e:
# Log the error and raise it to fail the request
logger.error(f"Error during OBO token exchange: {e}")
return JSONResponse(
{
"error": "unauthorized",
"message": "Unauthorized to get the required token to access the resource",
},
status_code=401,
)

# Store the token for use by services
set_auth_token(token)

token_payload = decode_jwt_token(token)
Expand Down
107 changes: 107 additions & 0 deletions fabric_rti_mcp/authentication/token_obo_exchanger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

from typing import Any, Dict, Optional

import msal # type: ignore
from azure.identity import ManagedIdentityCredential

from fabric_rti_mcp.common import logger
from fabric_rti_mcp.config.obo_config import FabricRtiMcpOBOFlowEnvVarNames, obo_config


class TokenOboExchanger:

def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the TokenOboExchanger with optional configuration.

Args:
config: Optional configuration dictionary
"""
self.config = config or {}
self.logger = logger
self.tenant_id = obo_config.azure_tenant_id
self.entra_app_client_id = obo_config.entra_app_client_id
self.umi_client_id = obo_config.umi_client_id
self.logger.info(
f"TokenOboExchanger initialized with tenant_id: {self.tenant_id}, "
f"entra_app_client_id: {self.entra_app_client_id} and umi_client_id: {self.umi_client_id}"
)

async def perform_obo_token_exchange(self, user_token: str, resource_uri: str) -> str:
"""
Perform an On-Behalf-Of token exchange to get a new token for a resource.

Args:
user_token: The original user token
resource_uri: The URI of the target resource to get a token (ex. https://kusto.kusto.windows.net)

Returns:
New access token for the specified resource
"""
self.logger.info(f"TokenOboExchanger: Performing OBO token exchange for target resource: {resource_uri}")

client_id = self.entra_app_client_id

if not client_id:
self.logger.error("TokenOboExchanger: Entra App client ID is not provided for OBO token exchange")
raise ValueError(
f"Entra App client ID is required for OBO token exchange. "
f"Set {FabricRtiMcpOBOFlowEnvVarNames.entra_app_client_id} environment variable."
)

if not self.tenant_id:
self.logger.error("TokenOboExchanger: Tenant ID not available for OBO token exchange")
raise ValueError(
f"{FabricRtiMcpOBOFlowEnvVarNames.azure_tenant_id} environment variable required for OBO token exchange"
)

if not self.umi_client_id:
self.logger.error("TokenOboExchanger: UMI Client ID not available for OBO token exchange")
raise ValueError(
f"{FabricRtiMcpOBOFlowEnvVarNames.umi_client_id} environment variable required for OBO token exchange"
)

try:
authority = f"https://login.microsoftonline.com/{self.tenant_id}"
self.logger.info(
f"TokenOboExchanger: Using Managed Identity for OBO token exchange tenant_id: {self.tenant_id}, "
f"entra_app_client_id: {self.entra_app_client_id} and umi_client_id: {self.umi_client_id}"
)

managed_identity_credential = ManagedIdentityCredential(client_id=self.umi_client_id)
miScopes = "api://AzureADTokenExchange/.default" # this is the default scope to be used
self.logger.info(f"TokenOboExchanger: Start managed identity token acquire for scopes {miScopes}")
access_token_result = managed_identity_credential.get_token(
miScopes
) # get the MI token to be used as client assesrtion for OBO
assertion_token = access_token_result.token

app = msal.ConfidentialClientApplication(
client_id=client_id,
authority=authority,
client_credential={
"client_assertion": assertion_token,
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
},
)

# Set the scopes for the target resource we want to access
target_scopes = [f"{resource_uri}/.default"]
self.logger.info(f"TokenOboExchanger: Requesting access to scopes: {target_scopes}")

# Use the user token to acquire a new token for the target resource
result = app.acquire_token_on_behalf_of(user_assertion=user_token, scopes=target_scopes)

if "access_token" not in result:
error_msg = result.get("error_description") or result.get("error") or "Unknown error"
error_message = f"TokenOboExchanger: Failed to acquire token: {error_msg}"
self.logger.error(error_message)
raise Exception(error_message)

self.logger.info("TokenOboExchanger: Successfully acquired OBO token")
access_token: str = result["access_token"]
return access_token
except Exception as e:
self.logger.error(f"TokenOboExchanger: Error performing OBO token exchange: {e}")
raise Exception(f"OBO token exchange failed: {e}") from e
12 changes: 10 additions & 2 deletions fabric_rti_mcp/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class GlobalFabricRTIEnvVarNames:
functions_deployment_default_port = "FUNCTIONS_CUSTOMHANDLER_PORT" # Azure Functions uses this port name
http_path = "FABRIC_RTI_HTTP_PATH"
stateless_http = "FABRIC_RTI_STATELESS_HTTP"
use_obo_flow = "USE_OBO_FLOW"


DEFAULT_FABRIC_API_BASE = "https://api.fabric.microsoft.com/v1"
Expand All @@ -26,6 +27,7 @@ class GlobalFabricRTIEnvVarNames:
DEFAULT_FABRIC_RTI_HTTP_PATH = "/mcp"
DEFAULT_FABRIC_RTI_HTTP_HOST = "127.0.0.1"
DEFAULT_FABRIC_RTI_STATELESS_HTTP = False
DEFAULT_USE_OBO_FLOW = False


@dataclass(slots=True, frozen=True)
Expand All @@ -36,6 +38,7 @@ class GlobalFabricRTIConfig:
http_port: int
http_path: str
stateless_http: bool
use_obo_flow: bool

@staticmethod
def from_env() -> GlobalFabricRTIConfig:
Expand All @@ -56,6 +59,7 @@ def from_env() -> GlobalFabricRTIConfig:
stateless_http=bool(
os.getenv(GlobalFabricRTIEnvVarNames.stateless_http, DEFAULT_FABRIC_RTI_STATELESS_HTTP)
),
use_obo_flow=bool(os.getenv(GlobalFabricRTIEnvVarNames.use_obo_flow, DEFAULT_USE_OBO_FLOW)),
)

@staticmethod
Expand All @@ -69,6 +73,7 @@ def existing_env_vars() -> List[str]:
GlobalFabricRTIEnvVarNames.http_port,
GlobalFabricRTIEnvVarNames.http_path,
GlobalFabricRTIEnvVarNames.stateless_http,
GlobalFabricRTIEnvVarNames.use_obo_flow,
]
for env_var in env_vars:
if os.getenv(env_var) is not None:
Expand All @@ -85,7 +90,8 @@ def with_args() -> GlobalFabricRTIConfig:
parser.add_argument("--http", action="store_true", help="Use HTTP transport")
parser.add_argument("--host", type=str, help="HTTP host to listen on")
parser.add_argument("--port", type=int, help="HTTP port to listen on")
parser.add_argument("--stateless-http", type=bool, help="Enable or disable stateless HTTP")
parser.add_argument("--stateless-http", action="store_true", help="Enable or disable stateless HTTP")
parser.add_argument("--use-obo-flow", action="store_true", help="Enable or disable OBO flow")
args, _ = parser.parse_known_args()

transport = base_config.transport
Expand All @@ -97,6 +103,7 @@ def with_args() -> GlobalFabricRTIConfig:
stateless_http = args.stateless_http if args.stateless_http is not None else base_config.stateless_http
http_host = args.host if args.host is not None else base_config.http_host
http_port = args.port if args.port is not None else base_config.http_port
use_obo_flow = args.use_obo_flow if args.use_obo_flow is not None else base_config.use_obo_flow

return GlobalFabricRTIConfig(
fabric_api_base=base_config.fabric_api_base,
Expand All @@ -105,8 +112,9 @@ def with_args() -> GlobalFabricRTIConfig:
http_port=http_port,
http_path=base_config.http_path,
stateless_http=stateless_http,
use_obo_flow=use_obo_flow,
)


# Global configuration instance
config = GlobalFabricRTIConfig.with_args()
global_config = GlobalFabricRTIConfig.with_args()
90 changes: 90 additions & 0 deletions fabric_rti_mcp/config/obo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import argparse
import os
from dataclasses import dataclass
from typing import List


class FabricRtiMcpOBOFlowEnvVarNames:
"""Environment variable names for OBO Flow configuration."""

azure_tenant_id = "FABRIC_RTI_MCP_AZURE_TENANT_ID"
# client id for the AAD App which is used to authenticate the user from gateway (APIM)
entra_app_client_id = "FABRIC_RTI_MCP_ENTRA_APP_CLIENT_ID"
# user assigned managed identity client id used as Federated credentials on the Entra App (entra_app_client_id)
umi_client_id = "FABRIC_RTI_MCP_USER_MANAGED_IDENTITY_CLIENT_ID"
kusto_audience = "FABRIC_RTI_MCP_KUSTO_AUDIENCE" # Kusto audience, ex: https://<clustername>.kusto.windows.net


# Default values for OBO Flow configuration
DEFAULT_FABRIC_RTI_MCP_AZURE_TENANT_ID = "72f988bf-86f1-41af-91ab-2d7cd011db47" # MS tenant id
DEFAULT_FABRIC_RTI_MCP_ENTRA_APP_CLIENT_ID = ""
DEFAULT_FABRIC_RTI_MCP_USER_MANAGED_IDENTITY_CLIENT_ID = ""
DEFAULT_FABRIC_RTI_MCP_KUSTO_AUDIENCE = "https://kusto.kusto.windows.net"


@dataclass(slots=True, frozen=True)
class FabricRtiMcpOBOFlowAuthConfig:
"""Configuration for OBO (On-Behalf-Of) Flow authentication."""

azure_tenant_id: str
entra_app_client_id: str
umi_client_id: str
kusto_audience: str

@staticmethod
def from_env() -> "FabricRtiMcpOBOFlowAuthConfig":
"""Load OBO Flow configuration from environment variables."""
return FabricRtiMcpOBOFlowAuthConfig(
azure_tenant_id=os.getenv(
FabricRtiMcpOBOFlowEnvVarNames.azure_tenant_id, DEFAULT_FABRIC_RTI_MCP_AZURE_TENANT_ID
),
entra_app_client_id=os.getenv(
FabricRtiMcpOBOFlowEnvVarNames.entra_app_client_id, DEFAULT_FABRIC_RTI_MCP_ENTRA_APP_CLIENT_ID
),
umi_client_id=os.getenv(
FabricRtiMcpOBOFlowEnvVarNames.umi_client_id, DEFAULT_FABRIC_RTI_MCP_USER_MANAGED_IDENTITY_CLIENT_ID
),
kusto_audience=os.getenv(
FabricRtiMcpOBOFlowEnvVarNames.kusto_audience, DEFAULT_FABRIC_RTI_MCP_KUSTO_AUDIENCE
),
)

@staticmethod
def existing_env_vars() -> List[str]:
"""Return a list of environment variable names that are currently set."""
result: List[str] = []
env_vars = [
FabricRtiMcpOBOFlowEnvVarNames.azure_tenant_id,
FabricRtiMcpOBOFlowEnvVarNames.entra_app_client_id,
FabricRtiMcpOBOFlowEnvVarNames.umi_client_id,
FabricRtiMcpOBOFlowEnvVarNames.kusto_audience,
]
for env_var in env_vars:
if os.getenv(env_var) is not None:
result.append(env_var)
return result

@staticmethod
def with_args() -> "FabricRtiMcpOBOFlowAuthConfig":
"""Load OBO Flow configuration from environment variables and command line arguments."""
obo_config = FabricRtiMcpOBOFlowAuthConfig.from_env()

parser = argparse.ArgumentParser(description="Fabric RTI MCP Server OBO Flow Configuration")
parser.add_argument("--entra-app-client-id", type=str, help="Azure AAD App Client ID")
parser.add_argument("--umi-client-id", type=str, help="User Managed Identity Client ID")
args, _ = parser.parse_known_args()

entra_app_client_id = (
args.entra_app_client_id if args.entra_app_client_id is not None else obo_config.entra_app_client_id
)
umi_client_id = args.umi_client_id if args.umi_client_id is not None else obo_config.umi_client_id

return FabricRtiMcpOBOFlowAuthConfig(
azure_tenant_id=obo_config.azure_tenant_id,
entra_app_client_id=entra_app_client_id,
umi_client_id=umi_client_id,
kusto_audience=obo_config.kusto_audience,
)


obo_config = FabricRtiMcpOBOFlowAuthConfig.with_args()
8 changes: 7 additions & 1 deletion fabric_rti_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

from fabric_rti_mcp import __version__
from fabric_rti_mcp.authentication.auth_middleware import add_auth_middleware
from fabric_rti_mcp.common import config, logger
from fabric_rti_mcp.common import global_config as config
from fabric_rti_mcp.common import logger
from fabric_rti_mcp.config.obo_config import obo_config
from fabric_rti_mcp.eventstream import eventstream_tools
from fabric_rti_mcp.kusto import kusto_config, kusto_tools

Expand Down Expand Up @@ -80,9 +82,13 @@ def main() -> None:
logger.info(f"Port: {config.http_port}")
logger.info(f"Path: {config.http_path}")
logger.info(f"Stateless HTTP: {config.stateless_http}")
logger.info(f"Use OBO flow: {config.use_obo_flow}")

# TODO: Add telemetry configuration here

if config.use_obo_flow and (not obo_config.entra_app_client_id or not obo_config.umi_client_id):
raise ValueError("OBO flow is enabled but required client IDs are missing")

name = "fabric-rti-mcp-server"
if config.transport == "http":
fastmcp_server = FastMCP(
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ dependencies = [
"fastmcp~=2.5.0",
"azure-kusto-data~=5.0.0",
"azure-identity",
"azure-kusto-ingest~=5.0.0"
"azure-kusto-ingest~=5.0.0",
"msal~=1.28.0"
]

[project.optional-dependencies]
Expand Down
Loading