-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
166 lines (134 loc) · 5.29 KB
/
lambda_function.py
File metadata and controls
166 lines (134 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import json
import logging
import base64
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def decode_jwt_payload(token):
"""Decode JWT token payload without verification (for development/testing)"""
try:
# Split the JWT token into its three parts
parts = token.split(".")
if len(parts) != 3:
logger.error("Invalid JWT token format")
return None
# Decode the payload (second part)
payload_part = parts[1]
# Add padding if needed for base64 decoding
padding = 4 - len(payload_part) % 4
if padding != 4:
payload_part += "=" * padding
# Decode from base64
decoded_bytes = base64.urlsafe_b64decode(payload_part)
payload = json.loads(decoded_bytes.decode("utf-8"))
return payload
except Exception as e:
logger.error(f"Error decoding JWT payload: {e}")
return None
def get_user_info_from_token(event):
"""Extract user information and groups from Cognito ID token"""
try:
# Get the ID token from the custom header
headers = event.get("headers", {})
# Look for custom Cognito header (case-insensitive)
id_token = None
for header_name, header_value in headers.items():
if header_name.lower() == "x-cognito-id-token":
id_token = header_value
break
if not id_token:
logger.warning("No X-Cognito-Id-Token header found")
return None, []
# Decode the JWT token
token_payload = decode_jwt_payload(id_token)
if not token_payload:
return None, []
logger.info(f"Token payload: {token_payload}")
# Extract user information
user_info = {
"username": token_payload.get("cognito:username"),
"email": token_payload.get("email"),
"email_verified": token_payload.get("email_verified"),
"sub": token_payload.get("sub"), # User's unique ID
"aud": token_payload.get("aud"), # Client ID
"iss": token_payload.get("iss"), # User Pool URL
"token_use": token_payload.get("token_use"), # Should be 'id'
}
# Extract groups - check various possible group claim names
groups = []
possible_group_claims = [
"cognito:groups",
"groups",
"custom:groups",
"memberOf",
"roles",
]
for claim_name in possible_group_claims:
if claim_name in token_payload:
claim_value = token_payload[claim_name]
if isinstance(claim_value, list):
groups.extend(claim_value)
elif isinstance(claim_value, str):
# Handle comma-separated string
groups.extend([g.strip() for g in claim_value.split(",")])
break
logger.info(f"User info: {user_info}")
logger.info(f"User groups: {groups}")
return user_info, groups
except Exception as e:
logger.error(f"Error extracting user info from token: {str(e)}")
return None, []
def get_user_from_cognito_context(event):
"""Fallback method: Extract user info from Cognito context in request"""
try:
request_context = event.get("requestContext", {})
identity = request_context.get("identity", {})
# Get basic identity information
user_info = {
"cognito_identity_id": identity.get("cognitoIdentityId"),
"cognito_auth_provider": identity.get("cognitoAuthenticationProvider"),
"source_ip": identity.get("sourceIp"),
"user_agent": identity.get("userAgent"),
}
logger.info(f"Cognito context user info: {user_info}")
return user_info, []
except Exception as e:
logger.error(f"Error extracting user from Cognito context: {str(e)}")
return None, []
def handler(event, context):
logger.info("Event: %s", json.dumps(event, indent=2))
logger.info("Context: %s", context)
# Try to get user info from ID token first
user_info, user_groups = get_user_info_from_token(event)
# Fallback to Cognito context if no token found
if not user_info:
user_info, user_groups = get_user_from_cognito_context(event)
# Check if user is admin
is_admin = "Admin" in user_groups
is_viewer = "Viewer" in user_groups
# Extract some additional context for debugging
request_context = event.get("requestContext", {})
api_info = {
"api_id": request_context.get("apiId"),
"stage": request_context.get("stage"),
"request_id": request_context.get("requestId"),
"http_method": request_context.get("httpMethod"),
"resource_path": request_context.get("resourcePath"),
}
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
},
"body": json.dumps(
{
"message": "Hello from Lambda!",
"user_info": user_info,
"user_groups": user_groups,
"is_admin": is_admin,
"is_viewer": is_viewer,
"api_info": api_info,
"timestamp": context.aws_request_id,
},
indent=2,
),
}