|
1 | 1 | import pickle |
2 | 2 | import time |
3 | | -from abc import ABC, abstractmethod |
4 | | -from typing import Tuple, List, Sequence, Dict |
| 3 | +from typing import Tuple, List |
5 | 4 |
|
6 | 5 | import zmq |
7 | 6 | from loguru import logger |
8 | | -from numpy._typing import ArrayLike |
9 | 7 |
|
10 | 8 | from . import Engine |
11 | 9 |
|
@@ -76,7 +74,6 @@ def get_measurements(self) -> List[Tuple]: |
76 | 74 | else: |
77 | 75 | assert 'target_measured' in payload |
78 | 76 | x, (y, v) = payload['target_measured'] |
79 | | - # TODO: Any additional quantities to be interrogated in Tsuchinoko can be included in the trailing dict |
80 | 77 | new_measurements.append((x, y, v, {})) |
81 | 78 | # stash the last position measured as the 'current' position of the instrument |
82 | 79 | self.position = x |
@@ -105,158 +102,3 @@ def recv_payload(self, flags=0) -> dict: |
105 | 102 | self.update_targets(self._last_targets_sent) |
106 | 103 | payload_response = self.recv_payload(flags) |
107 | 104 | return payload_response |
108 | | - |
109 | | - |
110 | | -# ---------------------------------------------------------------------------------------------------------------------- |
111 | | -# This is a prototype Agent to be used with bluesky-adaptive. This should be extracted before merge. |
112 | | - |
113 | | -from bluesky_adaptive.agents.base import Agent |
114 | | - |
115 | | - |
116 | | -class TsuchinokoBase(ABC): |
117 | | - def __init__(self, *args, host: str = '127.0.0.1', port: int = 5557, **kwargs): |
118 | | - """ |
119 | | -
|
120 | | - Parameters |
121 | | - ---------- |
122 | | - args |
123 | | - args passed through to `bluesky_adaptive.agents.base.Agent.__init__()` |
124 | | - host |
125 | | - A host address target for the zmq socket. |
126 | | - port |
127 | | - The port used for the zmq socket. |
128 | | - kwargs |
129 | | - kwargs passed through to `bluesky_adaptive.agents.base.Agent.__init__()` |
130 | | - """ |
131 | | - |
132 | | - super().__init__(*args, **kwargs) |
133 | | - self.host = host |
134 | | - self.port = port |
135 | | - self.outbound_measurements = [] |
136 | | - self.context = None |
137 | | - self.socket = None |
138 | | - self.setup_socket() |
139 | | - self.last_targets_received = time.time() |
140 | | - self.kickstart() |
141 | | - |
142 | | - def kickstart(self): |
143 | | - self.send_payload({'send_targets': True}) # kickstart to recover from shutdowns |
144 | | - self.last_targets_received = time.time() # forgive lack of response until now |
145 | | - |
146 | | - def setup_socket(self): |
147 | | - self.context = zmq.Context() |
148 | | - self.socket = self.context.socket(zmq.PAIR) |
149 | | - |
150 | | - # Attempt to connect, retry every second if fails |
151 | | - while True: |
152 | | - try: |
153 | | - self.socket.connect(f"tcp://{self.host}:{self.port}") |
154 | | - except zmq.ZMQError: |
155 | | - logger.info(f'Unable to connect to tcp://{self.host}:{self.port}. Retrying in 1 second...') |
156 | | - time.sleep(1) |
157 | | - else: |
158 | | - logger.info(f'Connected to tcp://{self.host}:{self.port}.') |
159 | | - break |
160 | | - |
161 | | - def tell(self, x, y, v): |
162 | | - """ |
163 | | - Send measurement to BlueskyAdaptiveEngine |
164 | | - """ |
165 | | - yv = (y, v) |
166 | | - payload = {'target_measured': (x, yv)} |
167 | | - self.send_payload(payload) |
168 | | - |
169 | | - def ask(self, batch_size: int) -> Sequence[ArrayLike]: |
170 | | - """ |
171 | | - Wait until at least one target is received, also exhaust the queue of received targets, overwriting old ones |
172 | | - """ |
173 | | - payload = None |
174 | | - while True: |
175 | | - try: |
176 | | - payload = self.recv_payload(flags=zmq.NOBLOCK) |
177 | | - except zmq.ZMQError: |
178 | | - if payload is not None: |
179 | | - break |
180 | | - else: |
181 | | - time.sleep(SLEEP_FOR_TSUCHINOKO_TIME) |
182 | | - if time.time() > self.last_targets_received + FORCE_KICKSTART_TIME: |
183 | | - self.kickstart() |
184 | | - assert 'targets' in payload |
185 | | - self.last_targets_received = time.time() |
186 | | - return payload['targets'] |
187 | | - |
188 | | - def send_payload(self, payload: dict): |
189 | | - logger.info(f'message: {payload}') |
190 | | - self.socket.send(pickle.dumps(payload)) |
191 | | - |
192 | | - def recv_payload(self, flags=0) -> dict: |
193 | | - payload_response = pickle.loads(self.socket.recv(flags=flags)) |
194 | | - logger.info(f'response: {payload_response}') |
195 | | - return payload_response |
196 | | - |
197 | | - |
198 | | -class TsuchinokoAgent(TsuchinokoBase, Agent): |
199 | | - """ |
200 | | - A Bluesky-Adaptive 'Agent'. This Agent communicates with Tsuchinoko over zmq to request new targets and report back |
201 | | - measurements. This is an abstract class that must be subclassed. |
202 | | -
|
203 | | - A `tsuchinoko.execution.bluesky_adaptive.BlueskyAdaptiveEngine` is required for the Tsuchinoko server to complement |
204 | | - one of these `TsuchinokoAgent`. |
205 | | - """ |
206 | | - |
207 | | - def tell(self, x, y, v) -> Dict[str, ArrayLike]: |
208 | | - super().tell(x, y, v) |
209 | | - return self.get_tell_document(x, y, v) |
210 | | - |
211 | | - def ask(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: |
212 | | - targets = super().ask(batch_size) |
213 | | - return self.get_ask_documents(targets), targets |
214 | | - |
215 | | - @abstractmethod |
216 | | - def get_tell_document(self, x, y, v) -> Dict[str, ArrayLike]: |
217 | | - """ |
218 | | - Return any single document corresponding to 'tell'-ing Tsuchinoko about the newly measured `x`, `y` data |
219 | | -
|
220 | | - Parameters |
221 | | - ---------- |
222 | | - x : |
223 | | - Independent variable for data observed |
224 | | - y : |
225 | | - Dependent variable for data observed |
226 | | - v : |
227 | | - Variance for measurement of y |
228 | | -
|
229 | | - Returns |
230 | | - ------- |
231 | | - dict |
232 | | - Dictionary to be unpacked or added to a document |
233 | | -
|
234 | | - """ |
235 | | - ... |
236 | | - |
237 | | - @abstractmethod |
238 | | - def get_ask_documents(self, targets: Sequence[ArrayLike]) -> Sequence[Dict[str, ArrayLike]]: |
239 | | - """ |
240 | | - Ask the agent for a new batch of points to measure. |
241 | | -
|
242 | | - Parameters |
243 | | - ---------- |
244 | | - targets : List[Tuple] |
245 | | - The new target positions to be measured received during this `ask`. |
246 | | -
|
247 | | - Returns |
248 | | - ------- |
249 | | - docs : Sequence[dict] |
250 | | - Documents of key metadata from the ask approach for each point in next_points. |
251 | | - Must be length of batch size. |
252 | | -
|
253 | | - """ |
254 | | - ... |
255 | | - |
256 | | - |
257 | | -if __name__ == '__main__': |
258 | | - # NOTE: This usage is a primitive mocking of Bluesky-Adaptive's processes |
259 | | - agent = TsuchinokoBase() |
260 | | - while True: |
261 | | - targets = agent.ask(0) |
262 | | - agent.tell(targets[0], 1) |
0 commit comments