Skip to content

Moving to OKEx V5 WebSocket API #685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ The following cryptocurrency exchanges are supported:
| Livecoin | x | x | |
| NDAX | x | x | T R |
| OKCoin | x | x | R B |
| OKEx | x | x | R B |
| OKEx | x | x | R B O |
| Poloniex | x | x | T R B |
| YoBit | x | x | |
| ZB.com | wip | | R |
Expand Down
283 changes: 263 additions & 20 deletions src/ExchangeSharp/API/Exchanges/OKGroup/ExchangeOKExAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ The above copyright notice and this permission notice shall be included in all c
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using ExchangeSharp.OKGroup;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -28,7 +30,7 @@ public sealed partial class ExchangeOKExAPI : OKGroupCommon
public override string BaseUrl { get; set; } = "https://www.okex.com/api/v1";
public override string BaseUrlV2 { get; set; } = "https://www.okex.com/v2/spot";
public override string BaseUrlV3 { get; set; } = "https://www.okex.com/api";
public override string BaseUrlWebSocket { get; set; } = "wss://real.okex.com:8443/ws/v3";
public override string BaseUrlWebSocket { get; set; } = "wss://ws.okex.com:8443/ws/v5";
public string BaseUrlV5 { get; set; } = "https://www.okex.com/api/v5";
protected override bool IsFuturesAndSwapEnabled { get; } = true;

Expand Down Expand Up @@ -317,6 +319,7 @@ protected override async Task<ExchangeOrderResult> OnPlaceOrderAsync(ExchangeOrd
{
payload["clOrdId"] = order.ClientOrderId;
}

payload["side"] = order.IsBuy ? "buy" : "sell";
payload["posSide"] = "net";
payload["ordType"] = order.OrderType switch
Expand All @@ -330,7 +333,8 @@ protected override async Task<ExchangeOrderResult> OnPlaceOrderAsync(ExchangeOrd
payload["sz"] = order.Amount.ToStringInvariant();
if (order.OrderType != OrderType.Market)
{
if (!order.Price.HasValue) throw new ArgumentNullException(nameof(order.Price), "Okex place order request requires price");
if (!order.Price.HasValue)
throw new ArgumentNullException(nameof(order.Price), "Okex place order request requires price");
payload["px"] = order.Price.ToStringInvariant();
}

Expand Down Expand Up @@ -379,30 +383,269 @@ protected override async Task ProcessRequestAsync(IHttpWebRequest request, Dicti
}
}

protected override async Task<IWebSocket> OnGetTickersWebSocketAsync(
Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback,
params string[] symbols)
{
return await ConnectWebSocketOkexAsync(
async (socket) => { await AddMarketSymbolsToChannel(socket, "tickers", symbols); },
async (socket, symbol, sArray, token) =>
{
var tickers = new List<KeyValuePair<string, ExchangeTicker>>
{
new KeyValuePair<string, ExchangeTicker>(symbol, await ParseTickerV5Async(token, symbol))
};
callback(tickers);
});
}

protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(
Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
return await ConnectWebSocketOkexAsync(
async (_socket) => { await AddMarketSymbolsToChannel(_socket, "trades", marketSymbols); },
async (_socket, symbol, sArray, token) =>
{
var trade = token.ParseTrade("sz", "px", "side", "ts", TimestampType.UnixMilliseconds, "tradeId");
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
});
}

protected override async Task<IWebSocket> OnGetDeltaOrderBookWebSocketAsync(Action<ExchangeOrderBook> callback,
int maxCount = 20, params string[] marketSymbols)
{
return await ConnectWebSocketOkexAsync(
async (_socket) =>
{
marketSymbols = await AddMarketSymbolsToChannel(_socket, "books-l2-tbt", marketSymbols);
}, (_socket, symbol, sArray, token) =>
{
ExchangeOrderBook book = token.ParseOrderBookFromJTokenArrays(maxCount: maxCount);
book.MarketSymbol = symbol;
callback(book);
return Task.CompletedTask;
});
}

protected override async Task<IWebSocket> OnGetOrderDetailsWebSocketAsync(Action<ExchangeOrderResult> callback)
{
return await ConnectPrivateWebSocketOkexAsync(async (_socket) =>
{
await WebsocketLogin(_socket);
await SubscribeForOrderChannel(_socket, "orders");
}, (_socket, symbol, sArray, token) =>
{
callback(ParseOrder(token));
return Task.CompletedTask;
});
}

protected override Task<IWebSocket> ConnectWebSocketOkexAsync(Func<IWebSocket, Task> connected,
Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
{
Timer pingTimer = null;
return ConnectPublicWebSocketAsync(url: "/public", messageCallback: async (_socket, msg) =>
{
var msgString = msg.ToStringFromUTF8();
if (msgString == "pong")
{
// received reply to our ping
return;
}

JToken token = JToken.Parse(msgString);
var eventProperty = token["event"]?.ToStringInvariant();
if (eventProperty != null)
{
switch (eventProperty)
{
case "error":
Logger.Info("Websocket unable to connect: " + token["msg"]?.ToStringInvariant());
return;
case "subscribe" when token["arg"]["channel"] != null:
{
// subscription successful
pingTimer ??= new Timer(callback: async s => await _socket.SendMessageAsync("ping"),
null, 0, 15000);
return;
}
default:
return;
}
}

var marketSymbol = string.Empty;
if (token["arg"] != null)
{
marketSymbol = token["arg"]["instId"].ToStringInvariant();
}

if (token["data"] != null)
{
var data = token["data"];
foreach (var t in data)
{
await callback(_socket, marketSymbol, null, t);
}
}
}, async (_socket) => await connected(_socket)
, s =>
{
pingTimer?.Dispose();
pingTimer = null;
return Task.CompletedTask;
});
}

protected override Task<IWebSocket> ConnectPrivateWebSocketOkexAsync(Func<IWebSocket, Task> connected,
Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
{
Timer pingTimer = null;
return ConnectPublicWebSocketAsync(url: "/private", messageCallback: async (_socket, msg) =>
{
var msgString = msg.ToStringFromUTF8();
Logger.Debug(msgString);
if (msgString == "pong")
{
// received reply to our ping
return;
}

JToken token = JToken.Parse(msgString);
var eventProperty = token["event"]?.ToStringInvariant();
if (eventProperty != null)
{
switch (eventProperty)
{
case "error":
Logger.Info("Websocket unable to connect: " + token["msg"]?.ToStringInvariant());
return;
case "subscribe" when token["arg"]["channel"] != null:
{
// subscription successful
pingTimer ??= new Timer(callback: async s => await _socket.SendMessageAsync("ping"),
null, 0, 15000);
return;
}
default:
return;
}
}

var marketSymbol = string.Empty;
if (token["arg"] != null)
{
marketSymbol = token["arg"]["instId"].ToStringInvariant();
}

if (token["data"] != null)
{
var data = token["data"];
foreach (var t in data)
{
await callback(_socket, marketSymbol, null, t);
}
}
}, async (_socket) => await connected(_socket)
, s =>
{
pingTimer?.Dispose();
pingTimer = null;
return Task.CompletedTask;
});
}

protected override async Task<string[]> AddMarketSymbolsToChannel(IWebSocket socket, string channelFormat,
string[] marketSymbols)
{
if (marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync()).ToArray();
}

await SendMessageAsync(marketSymbols);

async Task SendMessageAsync(IEnumerable<string> symbolsToSend)
{
var args = symbolsToSend
.Select(s => new { channel = channelFormat, instId = s })
.ToArray();
await socket.SendMessageAsync(new { op = "subscribe", args });
}

return marketSymbols;
}

private async Task WebsocketLogin(IWebSocket socket)
{
var timestamp = (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalSeconds;
var auth = new
{
apiKey = PublicApiKey?.ToUnsecureString(),
passphrase = Passphrase?.ToUnsecureString(),
timestamp,
sign = CryptoUtility.SHA256SignBase64($"{timestamp}GET/users/self/verify",
PrivateApiKey?.ToUnsecureBytesUTF8())
};
var args = new List<dynamic> { auth };
var request = new { op = "login", args };
await socket.SendMessageAsync(request);
}

private async Task SubscribeForOrderChannel(IWebSocket socket, string channelFormat)
{
var marketSymbols = (await GetMarketSymbolsAsync()).ToArray();
await SendMessageAsync(marketSymbols);

async Task SendMessageAsync(IEnumerable<string> symbolsToSend)
{
var args = symbolsToSend
.Select(s => new
{
channel = channelFormat, instId = s, uly = GetUly(s),
instType = GetInstrumentType(s).ToUpperInvariant()
})
.ToArray();
await socket.SendMessageAsync(new { op = "subscribe", args });
}
}

private static string GetUly(string marketSymbol)
{
var symbolSplit = marketSymbol.Split('-');
return symbolSplit.Length == 3 ? $"{symbolSplit[0]}-{symbolSplit[1]}" : marketSymbol;
}

private async Task<JToken> GetBalance()
{
return await MakeJsonRequestAsync<JToken>("/account/balance", BaseUrlV5, await GetNoncePayloadAsync());
}

private IEnumerable<ExchangeOrderResult> ParseOrders(JToken token)
=> token.Select(x =>
new ExchangeOrderResult()
private static ExchangeOrderResult ParseOrder(JToken token) =>
new ExchangeOrderResult()
{
OrderId = token["ordId"].Value<string>(),
OrderDate = DateTimeOffset.FromUnixTimeMilliseconds(token["cTime"].Value<long>()).DateTime,
Result = token["state"].Value<string>() switch
{
OrderId = x["ordId"].Value<string>(),
OrderDate = DateTimeOffset.FromUnixTimeMilliseconds(x["cTime"].Value<long>()).DateTime,
Result = x["state"].Value<string>() == "live"
? ExchangeAPIOrderResult.Open
: ExchangeAPIOrderResult.FilledPartially,
IsBuy = x["side"].Value<string>() == "buy",
IsAmountFilledReversed = false,
Amount = x["sz"].Value<decimal>(),
AmountFilled = x["accFillSz"].Value<decimal>(),
AveragePrice = x["avgPx"].Value<string>() == string.Empty ? default : x["avgPx"].Value<decimal>(),
Price = x["px"].Value<decimal>(),
ClientOrderId = x["clOrdId"].Value<string>(),
FeesCurrency = x["feeCcy"].Value<string>(),
MarketSymbol = x["instId"].Value<string>()
});
"canceled" => ExchangeAPIOrderResult.Canceled,
"live" => ExchangeAPIOrderResult.Open,
"partially_filled" => ExchangeAPIOrderResult.FilledPartially,
"filled" => ExchangeAPIOrderResult.Filled,
_ => ExchangeAPIOrderResult.Unknown
},
IsBuy = token["side"].Value<string>() == "buy",
IsAmountFilledReversed = false,
Amount = token["sz"].Value<decimal>(),
AmountFilled = token["accFillSz"].Value<decimal>(),
AveragePrice = token["avgPx"].Value<string>() == string.Empty ? default : token["avgPx"].Value<decimal>(),
Price = token["px"].Value<decimal>(),
ClientOrderId = token["clOrdId"].Value<string>(),
FeesCurrency = token["feeCcy"].Value<string>(),
MarketSymbol = token["instId"].Value<string>()
};

private static IEnumerable<ExchangeOrderResult> ParseOrders(JToken token) => token.Select(ParseOrder);

private async Task<ExchangeTicker> ParseTickerV5Async(JToken t, string symbol)
{
Expand Down
Loading