Skip to content

Commit bbf8d5d

Browse files
authored
Add emulator for gcs (tensorflow#1234)
* Bump com_github_googleapis_google_cloud_cpp to `1.21.0` * Add gcs testbench * Bump `libcurl` to `7.69.1`
1 parent c0da760 commit bbf8d5d

File tree

12 files changed

+2035
-23
lines changed

12 files changed

+2035
-23
lines changed

.github/workflows/api.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ jobs:
5050
python -m pytest -s -v tests/test_http_eager.py
5151
python -m pytest -s -v tests/test_s3_eager.py
5252
python -m pytest -s -v tests/test_azure.py
53+
python -m pytest -s -v tests/test_gcs_eager.py
5354
5455
linux:
5556
name: Linux ${{ matrix.python }} + ${{ matrix.version }}
@@ -85,6 +86,7 @@ jobs:
8586
python -m pytest -s -v tests/test_http_eager.py
8687
python -m pytest -s -v tests/test_s3_eager.py
8788
python -m pytest -s -v tests/test_azure.py
89+
python -m pytest -s -v tests/test_gcs_eager.py
8890
8991
windows:
9092
name: Windows ${{ matrix.python }} + ${{ matrix.version }}

WORKSPACE

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,11 @@ http_archive(
419419
http_archive(
420420
name = "curl",
421421
build_file = "//third_party:curl.BUILD",
422-
sha256 = "e9c37986337743f37fd14fe8737f246e97aec94b39d1b71e8a5973f72a9fc4f5",
423-
strip_prefix = "curl-7.60.0",
422+
sha256 = "01ae0c123dee45b01bbaef94c0bc00ed2aec89cb2ee0fd598e0d302a6b5e0a98",
423+
strip_prefix = "curl-7.69.1",
424424
urls = [
425-
"https://storage.googleapis.com/mirror.tensorflow.org/curl.haxx.se/download/curl-7.60.0.tar.gz",
426-
"https://curl.haxx.se/download/curl-7.60.0.tar.gz",
425+
"https://storage.googleapis.com/mirror.tensorflow.org/curl.haxx.se/download/curl-7.69.1.tar.gz",
426+
"https://curl.haxx.se/download/curl-7.69.1.tar.gz",
427427
],
428428
)
429429

@@ -584,11 +584,11 @@ http_archive(
584584
"@com_github_curl_curl": "@curl",
585585
"@com_github_nlohmann_json": "@nlohmann_json_lib",
586586
},
587-
sha256 = "ff82045b9491f0d880fc8e5c83fd9542eafb156dcac9ff8c6209ced66ed2a7f0",
588-
strip_prefix = "google-cloud-cpp-1.17.1",
587+
sha256 = "14bf9bf97431b890e0ae5dca8f8904841d4883b8596a7108a42f5700ae58d711",
588+
strip_prefix = "google-cloud-cpp-1.21.0",
589589
urls = [
590-
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/googleapis/google-cloud-cpp/archive/v1.17.1.tar.gz",
591-
"https://github.com/googleapis/google-cloud-cpp/archive/v1.17.1.tar.gz",
590+
"https://storage.googleapis.com/mirror.tensorflow.org/github.com/googleapis/google-cloud-cpp/archive/v1.21.0.tar.gz",
591+
"https://github.com/googleapis/google-cloud-cpp/archive/v1.21.0.tar.gz",
592592
],
593593
)
594594

tests/test_gcloud/test_gcs.sh

100644100755
Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,19 @@ set -o pipefail
44
if [ "$#" -eq 1 ]; then
55
container=$1
66
docker pull python:3.8
7-
docker run -d --rm --net=host --name=$container -v $PWD:/v -w /v python:3.8 bash -x -c 'python3 -m pip install gcloud-storage-emulator==0.3.0 && gcloud-storage-emulator start --port=9099'
7+
docker run -d --rm --net=host --name=$container -v $PWD:/v -w /v python:3.8 bash -x -c 'python3 -m pip install -r /v/tests/test_gcloud/testbench/requirements.txt && gunicorn --bind "0.0.0.0:9099" --worker-class gevent --chdir "/v/tests/test_gcloud/testbench" testbench:application'
88
echo wait 30 secs until gcs emulator is up and running
99
sleep 30
1010
exit 0
1111
fi
1212

1313
export PATH=$(python3 -m site --user-base)/bin:$PATH
1414

15-
python3 -m pip install gcloud-storage-emulator==0.3.0
16-
17-
gcloud-storage-emulator start --port=9099 &
18-
15+
python3 -m pip install -r tests/test_gcloud/testbench/requirements.txt
16+
echo starting gcs-testbench
17+
gunicorn --bind "0.0.0.0:9099" \
18+
--worker-class gevent \
19+
--chdir "tests/test_gcloud/testbench" \
20+
testbench:application &
1921
sleep 30 # Wait for storage emulator to start
20-
echo gcs emulator started successfully
22+
echo gcs-testbench started successfully

tests/test_gcloud/testbench/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# GCS Testbench
2+
3+
This is a minimal testbench for GCS. It only supports data operation and creating/listing/deleteing bucket.
4+
5+
## Install Dependencies
6+
7+
```bash
8+
pip install -r requirements.txt
9+
```
10+
11+
## Run Test Bench
12+
13+
```bash
14+
gunicorn --bind "0.0.0.0:9099" --worker-class gevent --chdir "tests/test_gcs/testbench" testbench:application
15+
```
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright 2018 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""A helper class to send error responses in the storage client test bench."""
15+
16+
import flask
17+
18+
19+
class ErrorResponse(Exception):
20+
"""Simplify generation of error responses."""
21+
22+
status_code = 400
23+
24+
def __init__(self, message, status_code=None, payload=None):
25+
Exception.__init__(self)
26+
self.message = message
27+
if status_code is not None:
28+
self.status_code = status_code
29+
self.payload = payload
30+
31+
def as_response(self):
32+
kv = dict(self.payload or ())
33+
kv["message"] = self.message
34+
response = flask.jsonify(kv)
35+
response.status_code = self.status_code
36+
return response
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# Copyright 2018 Google Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Implement a class to simulate GCS buckets."""
15+
16+
import base64
17+
import error_response
18+
import flask
19+
import gcs_object
20+
import json
21+
import re
22+
import testbench_utils
23+
import time
24+
25+
26+
class GcsBucket:
27+
"""Represent a GCS Bucket."""
28+
29+
def __init__(self, gcs_url, name):
30+
self.name = name
31+
self.gcs_url = gcs_url
32+
now = time.gmtime(time.time())
33+
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", now)
34+
self.metadata = {
35+
"timeCreated": timestamp,
36+
"updated": timestamp,
37+
"metageneration": "0",
38+
"name": self.name,
39+
"location": "US",
40+
"storageClass": "STANDARD",
41+
"etag": "XYZ=",
42+
"labels": {"foo": "bar", "baz": "qux"},
43+
"owner": {"entity": "project-owners-123456789", "entityId": ""},
44+
}
45+
self.resumable_uploads = {}
46+
47+
def versioning_enabled(self):
48+
"""Return True if versioning is enabled for this Bucket."""
49+
v = self.metadata.get("versioning", None)
50+
if v is None:
51+
return False
52+
return v.get("enabled", False)
53+
54+
def check_preconditions(self, request):
55+
"""Verify that the preconditions in request are met.
56+
57+
:param request:flask.Request the contents of the HTTP request.
58+
:rtype:NoneType
59+
:raises:ErrorResponse if the request does not pass the preconditions,
60+
for example, the request has a `ifMetagenerationMatch` restriction
61+
that is not met.
62+
"""
63+
64+
metageneration_match = request.args.get("ifMetagenerationMatch")
65+
metageneration_not_match = request.args.get("ifMetagenerationNotMatch")
66+
metageneration = self.metadata.get("metageneration")
67+
68+
if (
69+
metageneration_not_match is not None
70+
and metageneration_not_match == metageneration
71+
):
72+
raise error_response.ErrorResponse(
73+
"Precondition Failed (metageneration = %s)" % metageneration,
74+
status_code=412,
75+
)
76+
77+
if metageneration_match is not None and metageneration_match != metageneration:
78+
raise error_response.ErrorResponse(
79+
"Precondition Failed (metageneration = %s)" % metageneration,
80+
status_code=412,
81+
)
82+
83+
def create_resumable_upload(self, upload_url, request):
84+
"""Capture the details for a resumable upload.
85+
86+
:param upload_url: str the base URL for uploads.
87+
:param request: flask.Request the original http request.
88+
:return: the HTTP response to send back.
89+
"""
90+
x_upload_content_type = request.headers.get(
91+
"x-upload-content-type", "application/octet-stream"
92+
)
93+
x_upload_content_length = request.headers.get("x-upload-content-length")
94+
expected_bytes = None
95+
if x_upload_content_length:
96+
expected_bytes = int(x_upload_content_length)
97+
98+
if request.args.get("name") is not None and len(request.data):
99+
raise error_response.ErrorResponse(
100+
"The name argument is only supported for empty payloads",
101+
status_code=400,
102+
)
103+
if len(request.data):
104+
metadata = json.loads(request.data)
105+
else:
106+
metadata = {"name": request.args.get("name")}
107+
108+
if metadata.get("name") is None:
109+
raise error_response.ErrorResponse(
110+
"Missing object name argument", status_code=400
111+
)
112+
metadata.setdefault("contentType", x_upload_content_type)
113+
upload = {
114+
"metadata": metadata,
115+
"instructions": request.headers.get("x-goog-testbench-instructions"),
116+
"fields": request.args.get("fields"),
117+
"next_byte": 0,
118+
"expected_bytes": expected_bytes,
119+
"object_name": metadata.get("name"),
120+
"media": b"",
121+
"transfer": set(),
122+
"done": False,
123+
}
124+
# Capture the preconditions, including those that are None.
125+
for precondition in [
126+
"ifGenerationMatch",
127+
"ifGenerationNotMatch",
128+
"ifMetagenerationMatch",
129+
"ifMetagenerationNotMatch",
130+
]:
131+
upload[precondition] = request.args.get(precondition)
132+
upload_id = base64.b64encode(bytearray(metadata.get("name"), "utf-8")).decode(
133+
"utf-8"
134+
)
135+
self.resumable_uploads[upload_id] = upload
136+
location = "{}?uploadType=resumable&upload_id={}".format(upload_url, upload_id)
137+
response = flask.make_response("")
138+
response.headers["Location"] = location
139+
return response
140+
141+
def receive_upload_chunk(self, gcs_url, request):
142+
"""Receive a new upload chunk.
143+
144+
:param gcs_url: str the base URL for the service.
145+
:param request: flask.Request the original http request.
146+
:return: the HTTP response.
147+
"""
148+
upload_id = request.args.get("upload_id")
149+
if upload_id is None:
150+
raise error_response.ErrorResponse(
151+
"Missing upload_id in resumable_upload_chunk", status_code=400
152+
)
153+
upload = self.resumable_uploads.get(upload_id)
154+
if upload is None:
155+
raise error_response.ErrorResponse(
156+
"Cannot find resumable upload %s" % upload_id, status_code=404
157+
)
158+
# Be gracious in what you accept, if the Content-Range header is not
159+
# set we assume it is a good header and it is the end of the file.
160+
next_byte = upload["next_byte"]
161+
upload["transfer"].add(request.environ.get("HTTP_TRANSFER_ENCODING", ""))
162+
end = next_byte + len(request.data)
163+
total = end
164+
final_chunk = False
165+
payload = testbench_utils.extract_media(request)
166+
content_range = request.headers.get("content-range")
167+
if content_range is not None:
168+
if content_range.startswith("bytes */*"):
169+
# This is just a query to resume an upload, if it is done, return
170+
# the completed upload payload and an empty range header.
171+
response = flask.make_response(upload.get("payload", ""))
172+
if next_byte > 1 and not upload["done"]:
173+
response.headers["Range"] = "bytes=0-%d" % (next_byte - 1)
174+
response.status_code = 200 if upload["done"] else 308
175+
return response
176+
match = re.match(r"bytes \*/(\*|[0-9]+)", content_range)
177+
if match:
178+
if match.group(1) == "*":
179+
total = 0
180+
else:
181+
total = int(match.group(1))
182+
final_chunk = True
183+
else:
184+
match = re.match(r"bytes ([0-9]+)-([0-9]+)\/(\*|[0-9]+)", content_range)
185+
if not match:
186+
raise error_response.ErrorResponse(
187+
"Invalid Content-Range in upload %s" % content_range,
188+
status_code=400,
189+
)
190+
begin = int(match.group(1))
191+
end = int(match.group(2))
192+
if match.group(3) == "*":
193+
total = 0
194+
else:
195+
total = int(match.group(3))
196+
final_chunk = True
197+
198+
if begin != next_byte:
199+
raise error_response.ErrorResponse(
200+
"Mismatched data range, expected data at %d, got %d"
201+
% (next_byte, begin),
202+
status_code=400,
203+
)
204+
if len(payload) != end - begin + 1:
205+
raise error_response.ErrorResponse(
206+
"Mismatched data range (%d) vs. received data (%d)"
207+
% (end - begin + 1, len(payload)),
208+
status_code=400,
209+
)
210+
211+
upload["media"] = upload.get("media", b"") + payload
212+
next_byte = len(upload.get("media", ""))
213+
upload["next_byte"] = next_byte
214+
response_payload = ""
215+
if final_chunk and next_byte >= total:
216+
expected_bytes = upload["expected_bytes"]
217+
if expected_bytes is not None and expected_bytes != total:
218+
raise error_response.ErrorResponse(
219+
"X-Upload-Content-Length"
220+
"validation failed. Expected=%d, got %d." % (expected_bytes, total)
221+
)
222+
upload["done"] = True
223+
object_name = upload.get("object_name")
224+
object_path, blob = testbench_utils.get_object(
225+
self.name, object_name, gcs_object.GcsObject(self.name, object_name)
226+
)
227+
# Release a few resources to control memory usage.
228+
original_metadata = upload.pop("metadata", None)
229+
media = upload.pop("media", None)
230+
blob.check_preconditions_by_value(
231+
upload.get("ifGenerationMatch"),
232+
upload.get("ifGenerationNotMatch"),
233+
upload.get("ifMetagenerationMatch"),
234+
upload.get("ifMetagenerationNotMatch"),
235+
)
236+
if upload.pop("instructions", None) == "inject-upload-data-error":
237+
media = testbench_utils.corrupt_media(media)
238+
revision = blob.insert_resumable(gcs_url, request, media, original_metadata)
239+
revision.metadata.setdefault("metadata", {})
240+
revision.metadata["metadata"]["x_testbench_transfer_encoding"] = ":".join(
241+
upload["transfer"]
242+
)
243+
response_payload = testbench_utils.filter_fields_from_response(
244+
upload.get("fields"), revision.metadata
245+
)
246+
upload["payload"] = response_payload
247+
testbench_utils.insert_object(object_path, blob)
248+
249+
response = flask.make_response(response_payload)
250+
if next_byte == 0:
251+
response.headers["Range"] = "bytes=0-0"
252+
else:
253+
response.headers["Range"] = "bytes=0-%d" % (next_byte - 1)
254+
if upload.get("done", False):
255+
response.status_code = 200
256+
else:
257+
response.status_code = 308
258+
return response

0 commit comments

Comments
 (0)