|
| 1 | + |
| 2 | +""" |
| 3 | +unit tests for the SPEC filewriter |
| 4 | +""" |
| 5 | + |
| 6 | +import os |
| 7 | +import sys |
| 8 | +import unittest |
| 9 | + |
| 10 | +_test_path = os.path.dirname(__file__) |
| 11 | +_path = os.path.join(_test_path, '..') |
| 12 | +if _path not in sys.path: |
| 13 | + sys.path.insert(0, _path) |
| 14 | + |
| 15 | +import apstools |
| 16 | +from databroker import Broker |
| 17 | +import json |
| 18 | +import shutil |
| 19 | +import tempfile |
| 20 | + |
| 21 | + |
| 22 | +""" |
| 23 | +sqlite fails with this error: |
| 24 | +
|
| 25 | + File "/home/mintadmin/Apps/anaconda/envs/bluesky/lib/python3.6/site-packages/databroker/headersource/sqlite.py", line 257, in _insert_events |
| 26 | + values[desc_uid]) |
| 27 | +sqlite3.InterfaceError: Error binding parameter 107 - probably unsupported type. |
| 28 | +
|
| 29 | +Can we pick a different backend for temporary work? |
| 30 | +""" |
| 31 | + |
| 32 | +DOCS_FILE = os.path.join(_test_path, "docs_tune_mr.json") |
| 33 | + |
| 34 | + |
| 35 | +def get_broker_sqlite_config(path, tz=None): |
| 36 | + """ |
| 37 | + get a temporary sqlite directory configuration |
| 38 | + |
| 39 | + path = "/tmp" |
| 40 | + config_tmp_sqlite = get_broker_sqlite_config(path) |
| 41 | + self.db = Broker.from_config(config_tmp_sqlite) |
| 42 | + """ |
| 43 | + |
| 44 | + tz = tz or "US/Central" |
| 45 | + assets_dir = path # FIXME: |
| 46 | + config = { |
| 47 | + 'description': 'lightweight personal database', |
| 48 | + 'metadatastore': { |
| 49 | + 'module': 'databroker.headersource.sqlite', |
| 50 | + 'class': 'MDS', |
| 51 | + 'config': { |
| 52 | + 'directory': path, |
| 53 | + 'timezone': tz} |
| 54 | + }, |
| 55 | + 'assets': { |
| 56 | + 'module': 'databroker.assets.sqlite', |
| 57 | + 'class': 'Registry', |
| 58 | + 'config': { |
| 59 | + 'dbpath': os.path.join(path, '/assets.sqlite')} |
| 60 | + } |
| 61 | + } |
| 62 | + return config |
| 63 | + |
| 64 | + |
| 65 | +class Test_Something(unittest.TestCase): |
| 66 | + |
| 67 | + def setUp(self): |
| 68 | + self.tempdir = tempfile.mkdtemp() |
| 69 | + config_tmp_sqlite = get_broker_sqlite_config(self.tempdir) |
| 70 | + self.db = Broker.from_config(config_tmp_sqlite) |
| 71 | + |
| 72 | + def tearDown(self): |
| 73 | + if os.path.exists(self.tempdir): |
| 74 | + shutil.rmtree(self.tempdir, ignore_errors=True) |
| 75 | + |
| 76 | + def test_newfile(self): |
| 77 | + self.assertTrue(os.path.exists(DOCS_FILE)) |
| 78 | + |
| 79 | + # load the db with our test document stream |
| 80 | + with open(DOCS_FILE, "r") as fp: |
| 81 | + plans = json.load(fp) |
| 82 | + for document in plans["tune_mr"]: |
| 83 | + tag, doc = document |
| 84 | + print(tag) |
| 85 | + self.db.insert(tag, doc) # FIXME: fails here |
| 86 | + |
| 87 | + # TODO: finish this test |
| 88 | + |
| 89 | + |
| 90 | +def suite(*args, **kw): |
| 91 | + test_list = [ |
| 92 | + Test_Something, |
| 93 | + ] |
| 94 | + test_suite = unittest.TestSuite() |
| 95 | + for test_case in test_list: |
| 96 | + test_suite.addTest(unittest.makeSuite(test_case)) |
| 97 | + return test_suite |
| 98 | + |
| 99 | + |
| 100 | +if __name__ == "__main__": |
| 101 | + runner=unittest.TextTestRunner() |
| 102 | + runner.run(suite()) |
0 commit comments