-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathggshield_skill.py
More file actions
317 lines (260 loc) · 10.2 KB
/
ggshield_skill.py
File metadata and controls
317 lines (260 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"""
ggshield Secret Scanner Skill for Moltbot
Detects 500+ types of hardcoded secrets (API keys, credentials, certificates)
before they're committed to git.
Author: GitGuardian Team
Version: 1.0.0
"""
import os
import subprocess
class GGShieldSkill:
"""
Moltbot skill that wraps ggshield CLI for secret scanning.
Provides methods to scan repositories, files, staged changes, and Docker images
for hardcoded secrets using GitGuardian's detection engine.
"""
def __init__(self):
"""Initialize the ggshield skill."""
self.name = "ggshield"
self.version = "1.0.0"
self.requires_api_key = True
self.api_key_env = "GITGUARDIAN_API_KEY"
def _get_api_key(self) -> str:
"""
Get GitGuardian API key from environment.
Raises ValueError if not set.
"""
api_key = os.environ.get(self.api_key_env)
if not api_key:
raise ValueError(
f"{self.api_key_env} not set. "
"Get one at https://dashboard.gitguardian.com"
)
return api_key
def _run_ggshield(self, *args: str) -> tuple[int, str, str]:
"""
Run ggshield CLI command with API key.
Args:
*args: Arguments to pass to ggshield (e.g., 'secret', 'scan', 'repo', '.')
Returns:
Tuple of (exit_code, stdout, stderr)
Raises:
FileNotFoundError: If ggshield is not installed
"""
api_key = self._get_api_key()
command = ["ggshield", *args]
env = {**os.environ, self.api_key_env: api_key}
result = subprocess.run(
command,
capture_output=True,
text=True,
env=env,
)
return result.returncode, result.stdout, result.stderr
def _format_success(self, message: str) -> str:
"""Format success message with emoji."""
return f"✅ {message}"
def _format_error(self, message: str) -> str:
"""Format error message with emoji."""
return f"❌ {message}"
def _format_scanning(self, message: str) -> str:
"""Format scanning message with emoji."""
return f"🔍 {message}"
def _is_git_repository(self, path: str = ".") -> bool:
"""Check if the given path is inside a git repository."""
git_dir = os.path.join(path, ".git")
if os.path.exists(git_dir):
return True
# Also check if we're in a subdirectory of a git repo
try:
result = subprocess.run(
["git", "rev-parse", "--git-dir"],
capture_output=True,
text=True,
cwd=path,
)
return result.returncode == 0
except (FileNotFoundError, OSError):
return False
async def scan_repo(self, path: str) -> str:
"""
Scan entire git repository for secrets.
Args:
path: Path to repository root
Returns:
User-facing message describing results
"""
# Validate path exists
if not os.path.exists(path):
return self._format_error(f"Path not found: {path}")
if not os.path.isdir(path):
return self._format_error(f"Not a directory: {path}")
try:
exit_code, stdout, stderr = self._run_ggshield(
"secret", "scan", "repo", path
)
if exit_code == 0:
return self._format_success(f"Repository clean: {path}")
else:
# Secrets were found
output = stdout if stdout else stderr
return self._format_error(f"Secrets found in repository:\n{output}")
except FileNotFoundError:
return self._format_error(
"ggshield not installed. Run: pip install ggshield"
)
except ValueError as e:
return self._format_error(str(e))
except Exception as e:
return self._format_error(f"Unexpected error: {e}")
async def scan_file(self, path: str) -> str:
"""
Scan a single file for secrets.
Args:
path: Path to file to scan
Returns:
User-facing message describing results
"""
# Validate file exists
if not os.path.exists(path):
return self._format_error(f"File not found: {path}")
if not os.path.isfile(path):
return self._format_error(f"Not a file: {path}")
try:
exit_code, stdout, stderr = self._run_ggshield("secret", "scan", "path", path)
if exit_code == 0:
return self._format_success(f"File clean: {path}")
else:
# Secrets were found
output = stdout if stdout else stderr
return self._format_error(f"Secrets found in {path}:\n{output}")
except FileNotFoundError:
return self._format_error(
"ggshield not installed. Run: pip install ggshield"
)
except ValueError as e:
return self._format_error(str(e))
except Exception as e:
return self._format_error(f"Unexpected error: {e}")
async def scan_staged(self) -> str:
"""
Scan only staged git changes (pre-commit mode).
Fast scanning of what's about to be committed. Requires git repository.
Returns:
User-facing message describing results
"""
# Check we're in a git repository
if not self._is_git_repository():
return self._format_error(
"Not in a git repository. Run from repo root."
)
try:
exit_code, stdout, stderr = self._run_ggshield(
"secret", "scan", "pre-commit"
)
if exit_code == 0:
return self._format_success("Staged changes are clean")
else:
# Secrets were found
output = stdout if stdout else stderr
return self._format_error(
f"Secrets detected in staged changes:\n{output}"
)
except FileNotFoundError:
return self._format_error(
"ggshield not installed. Run: pip install ggshield"
)
except ValueError as e:
return self._format_error(str(e))
except Exception as e:
return self._format_error(f"Unexpected error: {e}")
async def install_hooks(self, hook_type: str = "pre-commit") -> str:
"""
Install ggshield as a git pre-commit hook.
After installation, every commit will be scanned automatically.
Args:
hook_type: Type of hook to install: 'pre-commit' or 'pre-push'
Returns:
User-facing message describing installation result
"""
# Validate hook type
valid_hooks = ("pre-commit", "pre-push")
if hook_type not in valid_hooks:
return self._format_error(
f"Invalid hook type: {hook_type}. Must be one of: {', '.join(valid_hooks)}"
)
# Check we're in a git repository
if not self._is_git_repository():
return self._format_error(
"Not in a git repository. Run from repo root."
)
try:
exit_code, stdout, stderr = self._run_ggshield(
"install", "--mode", "local", "--hook-type", hook_type
)
if exit_code == 0:
return self._format_success(
f"Installed {hook_type} hook.\n"
f"From now on, commits with secrets will be blocked."
)
else:
output = stderr if stderr else stdout
return self._format_error(
f"Failed to install hook: {output}"
)
except FileNotFoundError:
return self._format_error(
"ggshield not installed. Run: pip install ggshield"
)
except ValueError as e:
return self._format_error(str(e))
except Exception as e:
return self._format_error(f"Unexpected error: {e}")
async def scan_docker(self, image: str) -> str:
"""
Scan Docker image for secrets in its layers.
Args:
image: Docker image name/tag (e.g., 'myapp:latest')
Returns:
User-facing message describing results
"""
# Validate image string
if not image or not image.strip():
return self._format_error("Docker image name is required")
image = image.strip()
try:
exit_code, stdout, stderr = self._run_ggshield(
"secret", "scan", "docker", image
)
if exit_code == 0:
return self._format_success(f"Docker image {image} is clean")
else:
# Check for Docker not available error
combined_output = f"{stdout}\n{stderr}".lower()
if "docker" in combined_output and (
"not found" in combined_output
or "cannot connect" in combined_output
or "is not running" in combined_output
):
return self._format_error(
"Docker is not available. Make sure Docker is installed and running."
)
# Check for image not found
if "no such image" in combined_output or "not found" in combined_output:
return self._format_error(
f"Docker image not found: {image}. "
"Make sure the image exists locally or pull it first."
)
# Secrets were found
output = stdout if stdout else stderr
return self._format_error(
f"Secrets found in Docker image {image}:\n{output}"
)
except FileNotFoundError:
return self._format_error(
"ggshield not installed. Run: pip install ggshield"
)
except ValueError as e:
return self._format_error(str(e))
except Exception as e:
return self._format_error(f"Unexpected error: {e}")