7
7
8
8
from codegate .db .models import AlertSeverity
9
9
from codegate .pipeline .base import PipelineContext
10
+ from codegate .session .session_store import SessionStore
10
11
11
12
logger = structlog .get_logger ("codegate.pii.analyzer" )
12
13
13
14
14
- class PiiSessionStore :
15
- """
16
- A class to manage PII (Personally Identifiable Information) session storage.
17
-
18
- Attributes:
19
- session_id (str): The unique identifier for the session. If not provided, a new UUID
20
- is generated. mappings (Dict[str, str]): A dictionary to store mappings between UUID
21
- placeholders and PII.
22
-
23
- Methods:
24
- add_mapping(pii: str) -> str:
25
- Adds a PII string to the session store and returns a UUID placeholder for it.
26
-
27
- get_pii(uuid_placeholder: str) -> str:
28
- Retrieves the PII string associated with the given UUID placeholder. If the placeholder
29
- is not found, returns the placeholder itself.
30
- """
31
-
32
- def __init__ (self , session_id : str = None ):
33
- self .session_id = session_id or str (uuid .uuid4 ())
34
- self .mappings : Dict [str , str ] = {}
35
-
36
- def add_mapping (self , pii : str ) -> str :
37
- uuid_placeholder = f"<{ str (uuid .uuid4 ())} >"
38
- self .mappings [uuid_placeholder ] = pii
39
- return uuid_placeholder
40
-
41
- def get_pii (self , uuid_placeholder : str ) -> str :
42
- return self .mappings .get (uuid_placeholder , uuid_placeholder )
43
-
44
-
45
15
class PiiAnalyzer :
46
16
"""
47
17
PiiAnalyzer class for analyzing and anonymizing text containing PII.
@@ -52,12 +22,12 @@ class PiiAnalyzer:
52
22
Get or create the singleton instance of PiiAnalyzer.
53
23
analyze:
54
24
text (str): The text to analyze for PII.
55
- Tuple[str, List[Dict[str, Any]], PiiSessionStore ]: The anonymized text, a list of
25
+ Tuple[str, List[Dict[str, Any]], SessionStore ]: The anonymized text, a list of
56
26
found PII details, and the session store.
57
27
entities (List[str]): The PII entities to analyze for.
58
28
restore_pii:
59
29
anonymized_text (str): The text with anonymized PII.
60
- session_store (PiiSessionStore ): The PiiSessionStore used for anonymization.
30
+ session_store (SessionStore ): The SessionStore used for anonymization.
61
31
str: The text with original PII restored.
62
32
"""
63
33
@@ -95,13 +65,13 @@ def __init__(self):
95
65
# Create analyzer with custom NLP engine
96
66
self .analyzer = AnalyzerEngine (nlp_engine = nlp_engine )
97
67
self .anonymizer = AnonymizerEngine ()
98
- self .session_store = PiiSessionStore ()
68
+ self .session_store = SessionStore ()
99
69
100
70
PiiAnalyzer ._instance = self
101
71
102
72
def analyze (
103
- self , text : str , context : Optional [PipelineContext ] = None
104
- ) -> Tuple [str , List [Dict [str , Any ]], PiiSessionStore ]:
73
+ self , text : str , session_id : str , context : Optional [PipelineContext ] = None
74
+ ) -> Tuple [str , List [Dict [str , Any ]]]:
105
75
# Prioritize credit card detection first
106
76
entities = [
107
77
"PHONE_NUMBER" ,
@@ -135,7 +105,7 @@ def analyze(
135
105
anonymized_text = text
136
106
for result in analyzer_results :
137
107
pii_value = text [result .start : result .end ]
138
- uuid_placeholder = self .session_store .add_mapping (pii_value )
108
+ uuid_placeholder = self .session_store .add_mapping (session_id , pii_value )
139
109
pii_info = {
140
110
"type" : result .entity_type ,
141
111
"value" : pii_value ,
@@ -155,7 +125,7 @@ def analyze(
155
125
uuid = uuid_placeholder ,
156
126
# Don't log the actual PII value for security
157
127
value_length = len (pii_value ),
158
- session_id = self . session_store . session_id ,
128
+ session_id = session_id ,
159
129
)
160
130
161
131
# Log summary of all PII found in this analysis
@@ -176,30 +146,37 @@ def analyze(
176
146
"PII analysis complete" ,
177
147
total_pii_found = len (found_pii ),
178
148
pii_types = [p ["type" ] for p in found_pii ],
179
- session_id = self . session_store . session_id ,
149
+ session_id = session_id ,
180
150
)
181
151
182
152
# Return the anonymized text, PII details, and session store
183
- return anonymized_text , found_pii , self . session_store
153
+ return anonymized_text , found_pii
184
154
185
155
# If no PII found, return original text, empty list, and session store
186
- return text , [], self . session_store
156
+ return text , []
187
157
188
- def restore_pii (self , anonymized_text : str , session_store : PiiSessionStore ) -> str :
158
+ def restore_pii (self , anonymized_text : str , session_id : str ) -> str :
189
159
"""
190
160
Restore the original PII (Personally Identifiable Information) in the given anonymized text.
191
161
192
162
This method replaces placeholders in the anonymized text with their corresponding original
193
- PII values using the mappings stored in the provided PiiSessionStore .
163
+ PII values using the mappings stored in the provided SessionStore .
194
164
195
165
Args:
196
166
anonymized_text (str): The text containing placeholders for PII.
197
- session_store (PiiSessionStore ): The session store containing mappings of placeholders
167
+ session_store (SessionStore ): The session store containing mappings of placeholders
198
168
to original PII.
199
169
200
170
Returns:
201
171
str: The text with the original PII restored.
202
172
"""
203
- for uuid_placeholder , original_pii in session_store .mappings .items ():
173
+ session_data = self .session_store .get_by_session_id (session_id )
174
+ if not session_data :
175
+ logger .warning (
176
+ "No active PII session found for given session ID. Unable to restore PII."
177
+ )
178
+ return anonymized_text
179
+
180
+ for uuid_placeholder , original_pii in session_data .items ():
204
181
anonymized_text = anonymized_text .replace (uuid_placeholder , original_pii )
205
182
return anonymized_text
0 commit comments