1
+ from __future__ import annotations
2
+
1
3
import asyncio
2
4
import logging
3
5
from typing import Any , Dict , Optional
@@ -32,20 +34,21 @@ def __init__(self, config: Dict[str, Any]):
32
34
self ._pending = {}
33
35
self ._pending_join = []
34
36
35
- self .version = ''
37
+ self .version : str = ""
36
38
37
39
async def connect (self ):
38
40
api = await ZiGate .new (self ._config [CONF_DEVICE ], self )
39
41
await api .set_raw_mode ()
40
42
await api .set_time ()
41
43
version , lqi = await api .version ()
42
44
43
- hex_version = f"{ version [1 ]:x} "
44
- self .version = f"{ hex_version [0 ]} .{ hex_version [1 :]} "
45
45
self ._api = api
46
46
47
+ major , minor = version .to_bytes (2 , "big" )
48
+ self .version = f"{ major :x} .{ minor :x} "
49
+
47
50
if self .version < '3.21' :
48
- LOGGER .warning ('Old ZiGate firmware detected, you should upgrade to 3.21 or newer' )
51
+ LOGGER .error ('Old ZiGate firmware detected, you should upgrade to 3.21 or newer' )
49
52
50
53
async def disconnect (self ):
51
54
# TODO: how do you stop the network? Is it possible?
@@ -235,20 +238,6 @@ def _handle_frame_failure(self, message_tag, status):
235
238
async def send_packet (self , packet ):
236
239
LOGGER .debug ("Sending packet %r" , packet )
237
240
238
- if packet .dst .addr_mode == zigpy .types .AddrMode .IEEE :
239
- LOGGER .warning ("IEEE addressing is not supported, falling back to NWK" )
240
-
241
- try :
242
- device = self .get_device_with_address (packet .dst )
243
- except (KeyError , ValueError ):
244
- raise ValueError (f"Cannot find device with IEEE { packet .dst .address } " )
245
-
246
- packet = packet .replace (
247
- dst = zigpy .types .AddrModeAddress (
248
- addr_mode = zigpy .types .AddrMode .NWK , address = device .nwk
249
- )
250
- )
251
-
252
241
ack = (zigpy .types .TransmitOptions .ACK in packet .tx_options )
253
242
254
243
try :
@@ -265,11 +254,16 @@ async def send_packet(self, packet):
265
254
except NoResponseError :
266
255
raise zigpy .exceptions .DeliveryError ("ZiGate did not respond to command" )
267
256
257
+ self ._pending [tsn ] = asyncio .get_running_loop ().create_future ()
258
+
268
259
if status != t .Status .Success :
269
260
self ._pending .pop (tsn )
270
- raise zigpy .exceptions .DeliveryError (f"Failed to deliver packet: { status } " , status = status )
271
261
272
- self ._pending [tsn ] = asyncio .get_running_loop ().create_future ()
262
+ # Firmwares 3.1d and below fail to send packets on every request
263
+ if status == t .Status .InvalidParameter and self .version <= "3.1d" :
264
+ pass
265
+ else :
266
+ raise zigpy .exceptions .DeliveryError (f"Failed to send packet: { status } " , status = status )
273
267
274
268
# disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324
275
269
# try:
@@ -300,7 +294,7 @@ def __init__(self, application, ieee, nwk):
300
294
model = 'PiZiGate'
301
295
elif c .is_zigate_din (port ):
302
296
model = 'ZiGate USB-DIN'
303
- self ._model = '{ } {}' . format ( model , application .version )
297
+ self ._model = f' { model } { application .version } '
304
298
305
299
@property
306
300
def manufacturer (self ):
0 commit comments