|
| 1 | + |
| 2 | +""" |
| 3 | +unit tests for the SPEC filewriter |
| 4 | +""" |
| 5 | + |
| 6 | +import json |
| 7 | +import os |
| 8 | +import shutil |
| 9 | +import sys |
| 10 | +import tempfile |
| 11 | +import unittest |
| 12 | +import zipfile |
| 13 | + |
| 14 | + |
| 15 | +_test_path = os.path.dirname(__file__) |
| 16 | +_path = os.path.join(_test_path, '..') |
| 17 | +if _path not in sys.path: |
| 18 | + sys.path.insert(0, _path) |
| 19 | + |
| 20 | +from apstools.utils import export_json |
| 21 | + |
| 22 | + |
| 23 | +class Test_ExportZippedJson(unittest.TestCase): |
| 24 | + |
| 25 | + def setUp(self): |
| 26 | + self.tempdir = tempfile.mkdtemp() |
| 27 | + |
| 28 | + def tearDown(self): |
| 29 | + if os.path.exists(self.tempdir): |
| 30 | + shutil.rmtree(self.tempdir, ignore_errors=True) |
| 31 | + |
| 32 | + def test_writer_default_name(self): |
| 33 | + from databroker import Broker |
| 34 | + db = Broker.named("mongodb_config") |
| 35 | + headers = db(plan_name="count") |
| 36 | + headers = list(headers)[0:1] |
| 37 | + |
| 38 | + filename = os.path.join(self.tempdir, "export1.txt") |
| 39 | + export_json(headers, filename=filename) |
| 40 | + self.assertTrue(os.path.exists(filename), f"wrote to requested {filename}") |
| 41 | + |
| 42 | + filename = "export2.txt" |
| 43 | + zipfilename = os.path.join(self.tempdir, "export2.zip") |
| 44 | + export_json(headers, filename, zipfilename=zipfilename) |
| 45 | + self.assertFalse(os.path.exists(filename), f"did not write to {filename}") |
| 46 | + self.assertTrue( |
| 47 | + os.path.exists(zipfilename), |
| 48 | + f"wrote to requested ZIP {zipfilename}") |
| 49 | + with zipfile.ZipFile(zipfilename, "r") as fp: |
| 50 | + self.assertIn(filename, fp.namelist(), "found JSON test data") |
| 51 | + buf = fp.read(filename).decode("utf-8") |
| 52 | + testdata = json.loads(buf) |
| 53 | + self.assertEqual(len(testdata), 1, "ZIP file contains one dataset") |
| 54 | + dataset = testdata[0] |
| 55 | + self.assertGreater(len(dataset), 1, "dataset contains more than one document") |
| 56 | + tag, doc = dataset[0] |
| 57 | + self.assertEqual(tag, "start", "found start document") |
| 58 | + self.assertNotEqual(doc.get("plan_name"), None, "found a start document by duck type") |
| 59 | + self.assertNotEqual(doc.get("uid"), None, "found a uid document") |
| 60 | + self.assertEqual( |
| 61 | + doc["uid"], |
| 62 | + headers[0].start["uid"], |
| 63 | + "found matching start document" |
| 64 | + ) |
| 65 | + |
| 66 | + |
| 67 | +def suite(*args, **kw): |
| 68 | + test_list = [ |
| 69 | + Test_ExportZippedJson, |
| 70 | + ] |
| 71 | + |
| 72 | + test_suite = unittest.TestSuite() |
| 73 | + for test_case in test_list: |
| 74 | + test_suite.addTest(unittest.makeSuite(test_case)) |
| 75 | + return test_suite |
| 76 | + |
| 77 | + |
| 78 | +if __name__ == "__main__": |
| 79 | + runner=unittest.TextTestRunner() |
| 80 | + runner.run(suite()) |
0 commit comments