Skip to content

Commit 82d9e60

Browse files
authored
Migration to the new Poloniex API [Public WS endpoints] (#798)
* Update GetTickersWebSocketAsync * Update GetTradesWebSocketAsync * Update GetOrderBookDepthAsync
1 parent 0ecef8e commit 82d9e60

File tree

1 file changed

+134
-202
lines changed

1 file changed

+134
-202
lines changed

src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs

Lines changed: 134 additions & 202 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ The above copyright notice and this permission notice shall be included in all c
1111
*/
1212

1313
using System.Diagnostics;
14+
using System.Threading;
15+
using Newtonsoft.Json;
1416

1517
namespace ExchangeSharp
1618
{
@@ -24,7 +26,7 @@ namespace ExchangeSharp
2426
public sealed partial class ExchangePoloniexAPI : ExchangeAPI
2527
{
2628
public override string BaseUrl { get; set; } = "https://api.poloniex.com";
27-
public override string BaseUrlWebSocket { get; set; } = "wss://api2.poloniex.com";
29+
public override string BaseUrlWebSocket { get; set; } = "wss://ws.poloniex.com/ws";
2830

2931
private ExchangePoloniexAPI()
3032
{
@@ -222,18 +224,25 @@ private static IEnumerable<ExchangeOrderResult> ParseCompletedOrderDetails(JToke
222224

223225
private async Task<ExchangeTicker> ParseTickerWebSocketAsync(string symbol, JToken token)
224226
{
225-
/*
226-
last: args[1],
227-
lowestAsk: args[2],
228-
highestBid: args[3],
229-
percentChange: args[4],
230-
baseVolume: args[5],
231-
quoteVolume: args[6],
232-
isFrozen: args[7],
233-
high24hr: args[8],
234-
low24hr: args[9]
235-
*/
236-
return await this.ParseTickerAsync(token, symbol, 2, 3, 1, 5, 6);
227+
// {
228+
// "symbol": "ETH_USDT",
229+
// "dailyChange": "0.9428",
230+
// "high": "507",
231+
// "amount": "20",
232+
// "quantity": "3",
233+
// "tradeCount": 11,
234+
// "low": "16",
235+
// "closeTime": 1634062351868,
236+
// "startTime": 1633996800000,
237+
// "close": "204",
238+
// "open": "105",
239+
// "ts": 1648052794867,
240+
// "markPrice": "205",
241+
// }
242+
243+
return await this.ParseTickerAsync(token, symbol, askKey: null, bidKey: null, lastKey: "close",
244+
baseVolumeKey: "quantity", quoteVolumeKey: "amount", timestampKey: "ts",
245+
TimestampType.UnixMilliseconds);
237246
}
238247

239248
public override string PeriodSecondsToString(int seconds)
@@ -449,210 +458,47 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
449458

450459
protected override async Task<IWebSocket> OnGetTickersWebSocketAsync(
451460
Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback,
452-
params string[] symbols)
453-
{
454-
Dictionary<string, string> idsToSymbols = new Dictionary<string, string>();
455-
return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) =>
456-
{
457-
JToken token = JToken.Parse(msg.ToStringFromUTF8());
458-
if (token[0].ConvertInvariant<int>() == 1002)
461+
params string[] symbols) =>
462+
await ConnectWebsocketPublicAsync(
463+
async (socket) => { await SubscribeToChannel(socket, "ticker", symbols); },
464+
async (socket, symbol, sArray, token) =>
459465
{
460-
if (token is JArray outerArray && outerArray.Count > 2 && outerArray[2] is JArray array &&
461-
array.Count > 9 &&
462-
idsToSymbols.TryGetValue(array[0].ToStringInvariant(), out string symbol))
466+
var tickers = new List<KeyValuePair<string, ExchangeTicker>>
463467
{
464-
callback.Invoke(new List<KeyValuePair<string, ExchangeTicker>>
465-
{
466-
new KeyValuePair<string, ExchangeTicker>(symbol,
467-
await ParseTickerWebSocketAsync(symbol, array))
468-
});
469-
}
470-
}
471-
}, async (_socket) =>
472-
{
473-
var tickers = await GetTickersAsync();
474-
foreach (var ticker in tickers)
475-
{
476-
idsToSymbols[ticker.Value.Id] = ticker.Key;
477-
}
478-
479-
// subscribe to ticker channel (1002)
480-
await _socket.SendMessageAsync(new { command = "subscribe", channel = 1002 });
481-
});
482-
}
468+
new KeyValuePair<string, ExchangeTicker>(symbol,
469+
await this.ParseTickerWebSocketAsync(symbol, token))
470+
};
471+
callback(tickers);
472+
});
483473

484474
protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(
485475
Func<KeyValuePair<string, ExchangeTrade>, Task> callback,
486-
params string[] marketSymbols)
487-
{
488-
Dictionary<int, string> messageIdToSymbol = new Dictionary<int, string>();
489-
Dictionary<string, int> symbolToMessageId = new Dictionary<string, int>();
490-
var symMeta = await GetMarketSymbolsMetadataAsync();
491-
foreach (var symbol in symMeta)
492-
{
493-
messageIdToSymbol.Add(int.Parse(symbol.MarketId), symbol.MarketSymbol);
494-
symbolToMessageId.Add(symbol.MarketSymbol, int.Parse(symbol.MarketId));
495-
}
496-
497-
return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) =>
498-
{
499-
JToken token = JToken.Parse(msg.ToStringFromUTF8());
500-
if (token.Type == JTokenType.Object && token["error"] != null)
501-
throw new APIException($"Exchange returned error: {token["error"].ToStringInvariant()}");
502-
int msgId = token[0].ConvertInvariant<int>();
503-
504-
if (msgId == 1010 || token.Count() == 2) // "[7,2]"
505-
{
506-
// this is a heartbeat message
507-
return;
508-
}
509-
510-
var seq = token[1].ConvertInvariant<long>();
511-
var dataArray = token[2];
512-
foreach (var data in dataArray)
513-
{
514-
var dataType = data[0].ToStringInvariant();
515-
if (dataType == "i")
516-
{
517-
// can also populate messageIdToSymbol from here
518-
continue;
519-
}
520-
else if (dataType == "t")
521-
{
522-
if (messageIdToSymbol.TryGetValue(msgId, out string symbol))
523-
{
524-
// 0 1 2 3 4 5 6
525-
// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>, "<epoch_ms>"]
526-
ExchangeTrade trade = data.ParseTrade(amountKey: 4, priceKey: 3, typeKey: 2,
527-
timestampKey: 6,
528-
timestampType: TimestampType.UnixMilliseconds, idKey: 1, typeKeyIsBuyValue: "1");
529-
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
530-
}
531-
}
532-
else if (dataType == "o")
533-
{
534-
continue;
535-
}
536-
else
537-
{
538-
continue;
539-
}
540-
}
541-
}, async (_socket) =>
542-
{
543-
IEnumerable<int> marketIDs = null;
544-
if (marketSymbols == null || marketSymbols.Length == 0)
545-
{
546-
marketIDs = messageIdToSymbol.Keys;
547-
}
548-
else
476+
params string[] marketSymbols) =>
477+
await ConnectWebsocketPublicAsync(
478+
async (socket) => { await SubscribeToChannel(socket, "trades", marketSymbols); },
479+
async (socket, symbol, sArray, token) =>
549480
{
550-
marketIDs = marketSymbols.Select(s => symbolToMessageId[s]);
551-
}
552-
553-
// subscribe to order book and trades channel for each symbol
554-
foreach (var id in marketIDs)
555-
{
556-
await _socket.SendMessageAsync(new { command = "subscribe", channel = id });
557-
}
558-
});
559-
}
481+
var trade = token.ParseTrade(amountKey: "quantity", priceKey: "price", typeKey: "takerSide",
482+
timestampKey: "ts", TimestampType.UnixMilliseconds, idKey: "id");
483+
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
484+
});
560485

561486
protected override async Task<IWebSocket> OnGetDeltaOrderBookWebSocketAsync(
562487
Action<ExchangeOrderBook> callback,
563488
int maxCount = 20,
564489
params string[] marketSymbols)
565490
{
566-
Dictionary<int, Tuple<string, long>> messageIdToSymbol = new Dictionary<int, Tuple<string, long>>();
567-
return await ConnectPublicWebSocketAsync(string.Empty, (_socket, msg) =>
568-
{
569-
JToken token = JToken.Parse(msg.ToStringFromUTF8());
570-
int msgId = token[0].ConvertInvariant<int>();
571-
572-
//return if this is a heartbeat message
573-
if (msgId == 1010)
574-
{
575-
return Task.CompletedTask;
576-
}
577-
578-
var seq = token[1].ConvertInvariant<long>();
579-
var dataArray = token[2];
580-
ExchangeOrderBook book = new ExchangeOrderBook();
581-
foreach (var data in dataArray)
491+
return await ConnectWebsocketPublicAsync(
492+
async (socket) =>
582493
{
583-
var dataType = data[0].ToStringInvariant();
584-
if (dataType == "i")
585-
{
586-
var marketInfo = data[1];
587-
var market = marketInfo["currencyPair"].ToStringInvariant();
588-
messageIdToSymbol[msgId] = new Tuple<string, long>(market, 0);
589-
590-
// we are only returning the deltas, this would create a full order book which we don't want, but keeping it
591-
// here for historical reference
592-
/*
593-
foreach (JProperty jprop in marketInfo["orderBook"][0].Cast<JProperty>())
594-
{
595-
var depth = new ExchangeOrderPrice
596-
{
597-
Price = jprop.Name.ConvertInvariant<decimal>(),
598-
Amount = jprop.Value.ConvertInvariant<decimal>()
599-
};
600-
book.Asks[depth.Price] = depth;
601-
}
602-
foreach (JProperty jprop in marketInfo["orderBook"][1].Cast<JProperty>())
603-
{
604-
var depth = new ExchangeOrderPrice
605-
{
606-
Price = jprop.Name.ConvertInvariant<decimal>(),
607-
Amount = jprop.Value.ConvertInvariant<decimal>()
608-
};
609-
book.Bids[depth.Price] = depth;
610-
}
611-
*/
612-
}
613-
else if (dataType == "o")
614-
{
615-
//removes or modifies an existing item on the order books
616-
if (messageIdToSymbol.TryGetValue(msgId, out Tuple<string, long> symbol))
617-
{
618-
int type = data[1].ConvertInvariant<int>();
619-
var depth = new ExchangeOrderPrice
620-
{
621-
Price = data[2].ConvertInvariant<decimal>(),
622-
Amount = data[3].ConvertInvariant<decimal>()
623-
};
624-
var list = (type == 1 ? book.Bids : book.Asks);
625-
list[depth.Price] = depth;
626-
book.MarketSymbol = symbol.Item1;
627-
book.SequenceId = symbol.Item2 + 1;
628-
messageIdToSymbol[msgId] = new Tuple<string, long>(book.MarketSymbol, book.SequenceId);
629-
}
630-
}
631-
else
632-
{
633-
continue;
634-
}
635-
}
636-
637-
if (book != null && (book.Asks.Count != 0 || book.Bids.Count != 0))
494+
await SubscribeToOrderBookDepthChannel(socket, marketSymbols, maxCount);
495+
}, (socket, symbol, sArray, token) =>
638496
{
497+
var book = token.ParseOrderBookFromJTokenArrays();
498+
book.MarketSymbol = symbol;
639499
callback(book);
640-
}
641-
642-
return Task.CompletedTask;
643-
}, async (_socket) =>
644-
{
645-
if (marketSymbols == null || marketSymbols.Length == 0)
646-
{
647-
marketSymbols = (await GetMarketSymbolsAsync()).ToArray();
648-
}
649-
650-
// subscribe to order book and trades channel for each symbol
651-
foreach (var sym in marketSymbols)
652-
{
653-
await _socket.SendMessageAsync(new { command = "subscribe", channel = NormalizeMarketSymbol(sym) });
654-
}
655-
});
500+
return Task.CompletedTask;
501+
});
656502
}
657503

658504
protected override async Task<ExchangeOrderBook> OnGetOrderBookAsync(string marketSymbol, int maxCount = 100)
@@ -1112,6 +958,92 @@ private async Task<ExchangeDepositDetails> CreateDepositAddress(
1112958

1113959
return details;
1114960
}
961+
962+
private Task<IWebSocket> ConnectWebsocketPublicAsync(
963+
Func<IWebSocket, Task> connected,
964+
Func<IWebSocket, string, string[], JToken, Task> callback)
965+
{
966+
Timer pingTimer = null;
967+
return ConnectPublicWebSocketAsync(
968+
url: "/public",
969+
messageCallback: async (socket, msg) =>
970+
{
971+
var token = JToken.Parse(msg.ToStringFromUTF8());
972+
var eventType = token["event"]?.ToStringInvariant();
973+
if (eventType != null)
974+
{
975+
if (eventType != "error") return;
976+
Logger.Info("Websocket unable to connect: " + token["msg"]?.ToStringInvariant());
977+
return;
978+
}
979+
980+
if (token["data"] == null) return;
981+
982+
foreach (var d in token["data"])
983+
{
984+
await callback(socket, d["symbol"]?.ToStringInvariant(), null, d);
985+
}
986+
},
987+
connectCallback: async (socket) =>
988+
{
989+
await connected(socket);
990+
pingTimer ??= new Timer(
991+
callback: async s =>
992+
await socket.SendMessageAsync(
993+
JsonConvert.SerializeObject(new { Event = "ping" }, SerializerSettings)),
994+
null, 0, 15000);
995+
},
996+
disconnectCallback: socket =>
997+
{
998+
pingTimer?.Dispose();
999+
pingTimer = null;
1000+
return Task.CompletedTask;
1001+
});
1002+
}
1003+
1004+
private static async Task SubscribeToChannel(
1005+
IWebSocket socket,
1006+
string channel,
1007+
string[] marketSymbols)
1008+
{
1009+
if (marketSymbols.Length == 0)
1010+
{
1011+
marketSymbols = new[] { "all" };
1012+
}
1013+
1014+
var payload = JsonConvert.SerializeObject(new
1015+
{
1016+
Event = "subscribe",
1017+
Channel = new[] { channel },
1018+
Symbols = marketSymbols
1019+
}, SerializerSettings);
1020+
1021+
await socket.SendMessageAsync(payload);
1022+
}
1023+
1024+
private async Task SubscribeToOrderBookDepthChannel(
1025+
IWebSocket socket,
1026+
string[] marketSymbols,
1027+
int depth = 20)
1028+
{
1029+
var depthIsValid = depth == 5 || depth == 10 || depth == 20;
1030+
if (!depthIsValid)
1031+
throw new ArgumentOutOfRangeException(nameof(depth));
1032+
if (marketSymbols.Length == 0)
1033+
{
1034+
marketSymbols = (await OnGetMarketSymbolsAsync()).ToArray();
1035+
}
1036+
1037+
var payload = JsonConvert.SerializeObject(new
1038+
{
1039+
Event = "subscribe",
1040+
Channel = new[] { "book" },
1041+
Symbols = marketSymbols,
1042+
Depth = depth
1043+
}, SerializerSettings);
1044+
1045+
await socket.SendMessageAsync(payload);
1046+
}
11151047
}
11161048

11171049
public partial class ExchangeName

0 commit comments

Comments
 (0)