|
| 1 | + |
| 2 | +""" |
| 3 | +simple unit tests for this package |
| 4 | +""" |
| 5 | + |
| 6 | +from io import StringIO |
| 7 | +import os |
| 8 | +import sys |
| 9 | +import unittest |
| 10 | + |
| 11 | +_path = os.path.dirname(__file__) |
| 12 | +_path = os.path.join(_path, '..') |
| 13 | +if _path not in sys.path: |
| 14 | + sys.path.insert(0, _path) |
| 15 | + |
| 16 | +from apstools import plans as APS_plans |
| 17 | +# from apstools import utils as APS_utils |
| 18 | +from bluesky.simulators import summarize_plan |
| 19 | +import ophyd.sim |
| 20 | + |
| 21 | +class Capture_stdout(list): |
| 22 | + ''' |
| 23 | + capture all printed output (to stdout) into list |
| 24 | + |
| 25 | + # http://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call |
| 26 | + ''' |
| 27 | + def __enter__(self): |
| 28 | + sys.stdout.flush() |
| 29 | + self._stdout = sys.stdout |
| 30 | + sys.stdout = self._stringio = StringIO() |
| 31 | + return self |
| 32 | + |
| 33 | + def __exit__(self, *args): |
| 34 | + self.extend(self._stringio.getvalue().splitlines()) |
| 35 | + del self._stringio # free up some memory |
| 36 | + sys.stdout = self._stdout |
| 37 | + |
| 38 | + |
| 39 | +class Test_Plans(unittest.TestCase): |
| 40 | + |
| 41 | + def setUp(self): |
| 42 | + pass |
| 43 | + |
| 44 | + def tearDown(self): |
| 45 | + pass |
| 46 | + |
| 47 | + def test_addDeviceDataAsStream(self): |
| 48 | + with Capture_stdout() as printed_lines: |
| 49 | + summarize_plan( |
| 50 | + APS_plans.addDeviceDataAsStream( |
| 51 | + ophyd.sim.motor1, |
| 52 | + "test-device")) |
| 53 | + |
| 54 | + received = "\n".join([v[:21] for v in str(printed_lines).strip().splitlines()]) |
| 55 | + expected = str([" Read ['motor1']"]) |
| 56 | + self.assertEqual(received, expected) |
| 57 | + |
| 58 | + with Capture_stdout() as lines2: |
| 59 | + summarize_plan( |
| 60 | + APS_plans.addDeviceDataAsStream( |
| 61 | + [ophyd.sim.motor2, ophyd.sim.motor3], |
| 62 | + "test-device-list")) |
| 63 | + |
| 64 | + print(f"|{lines2}|") |
| 65 | + received = "\n".join([v for v in str(lines2).strip().splitlines()]) |
| 66 | + expected = str([ |
| 67 | + " Read ['motor2']", # TODO: <-- Why? |
| 68 | + " Read ['motor2', 'motor3']", |
| 69 | + ]) |
| 70 | + self.assertEqual(received, expected) |
| 71 | + |
| 72 | + |
| 73 | +def suite(*args, **kw): |
| 74 | + test_list = [ |
| 75 | + Test_Plans, |
| 76 | + ] |
| 77 | + test_suite = unittest.TestSuite() |
| 78 | + for test_case in test_list: |
| 79 | + test_suite.addTest(unittest.makeSuite(test_case)) |
| 80 | + return test_suite |
| 81 | + |
| 82 | + |
| 83 | +if __name__ == "__main__": |
| 84 | + runner=unittest.TextTestRunner() |
| 85 | + runner.run(suite()) |
0 commit comments