Skip to content

Merge WebSocketAsync into master #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,5 @@ __pycache__/
# Trader binary and log files
*.bin
*.log*
ExchangeSharpConsole/Properties/launchSettings.json
launchSettings.json
**/PublishProfiles/*
4 changes: 2 additions & 2 deletions ExchangeSharp/API/Common/BaseAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ public async Task<T> MakeJsonRequestAsync<T>(string url, string baseUrl = null,
/// <param name="messageCallback">Callback for messages</param>
/// <param name="connectCallback">Connect callback</param>
/// <returns>Web socket - dispose of the wrapper to shutdown the socket</returns>
public IWebSocket ConnectWebSocket
public Task<IWebSocket> ConnectWebSocket
(
string url,
Func<IWebSocket, byte[], Task> messageCallback,
Expand Down Expand Up @@ -524,7 +524,7 @@ public IWebSocket ConnectWebSocket
wrapper.Disconnected += disconnectCallback;
}
wrapper.Start();
return wrapper;
return Task.FromResult<IWebSocket>(wrapper);
}

/// <summary>
Expand Down
8 changes: 4 additions & 4 deletions ExchangeSharp/API/Exchanges/Binance/ExchangeBinanceAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
return tickers;
}

protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] symbols)
protected override Task<IWebSocket> OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] symbols)
{
return ConnectWebSocket("/stream?streams=!ticker@arr", (_socket, msg) =>
{
Expand All @@ -261,7 +261,7 @@ protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<K
});
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
/*
{
Expand Down Expand Up @@ -297,7 +297,7 @@ protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, Exc
});
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down Expand Up @@ -1060,7 +1060,7 @@ protected override async Task<IEnumerable<ExchangeTransaction>> OnGetDepositHist
return transactions;
}

protected override IWebSocket OnUserDataWebSocket(Action<object> callback, string listenKey)
protected override Task<IWebSocket> OnUserDataWebSocket(Action<object> callback, string listenKey)
{
return ConnectWebSocket($"/ws/{listenKey}", (_socket, msg) =>
{
Expand Down
4 changes: 2 additions & 2 deletions ExchangeSharp/API/Exchanges/BitMEX/ExchangeBitMEXAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ protected override async Task<IEnumerable<ExchangeMarket>> OnGetMarketSymbolsMet
return markets;
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
/*
{"table":"trade","action":"partial","keys":[],
Expand Down Expand Up @@ -270,7 +270,7 @@ protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, Exc
});
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
/*
{"info":"Welcome to the BitMEX Realtime API.","version":"2018-06-29T18:05:14.000Z","timestamp":"2018-07-05T14:22:26.267Z","docs":"https://www.bitmex.com/app/wsAPI","limit":{"remaining":39}}
Expand Down
6 changes: 3 additions & 3 deletions ExchangeSharp/API/Exchanges/Bitfinex/ExchangeBitfinexAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
return tickers;
}

protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
{
Dictionary<int, string> channelIdToSymbol = new Dictionary<int, string>();
return ConnectWebSocket(string.Empty, (_socket, msg) =>
Expand Down Expand Up @@ -251,7 +251,7 @@ protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<K
});
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
Dictionary<int, string> channelIdToSymbol = new Dictionary<int, string>();
if (marketSymbols == null || marketSymbols.Length == 0)
Expand Down Expand Up @@ -534,7 +534,7 @@ protected override async Task<IEnumerable<ExchangeOrderResult>> OnGetCompletedOr
return await GetOrderDetailsInternalV1(new string[] { marketSymbol }, afterDate);
}

protected override IWebSocket OnGetCompletedOrderDetailsWebSocket(Action<ExchangeOrderResult> callback)
protected override Task<IWebSocket> OnGetCompletedOrderDetailsWebSocket(Action<ExchangeOrderResult> callback)
{
return ConnectWebSocket(string.Empty, (_socket, msg) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ protected override async Task<ExchangeWithdrawalResponse> OnWithdrawAsync(Exchan
};
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down
36 changes: 20 additions & 16 deletions ExchangeSharp/API/Exchanges/Bittrex/ExchangeBittrexAPI_WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,12 @@ public BittrexWebSocketManager() : base("https://socket.bittrex.com/signalr", "c
/// Subscribe to all market summaries
/// </summary>
/// <param name="callback">Callback</param>
/// <param name="marketSymbols">Symbols</param>
/// <returns>IDisposable to close the socket</returns>
public IWebSocket SubscribeToSummaryDeltas(Action<string> callback)
public async Task<IWebSocket> SubscribeToSummaryDeltas(Func<string, Task> callback, params string[] marketSymbols)
{
SignalrManager.SignalrSocketConnection conn = new SignalrManager.SignalrSocketConnection(this);
Task.Run(async () => await conn.OpenAsync("uS", (s) =>
{
callback(s);
return Task.CompletedTask;
}));
await conn.OpenAsync("uS", callback);
return conn;
}

Expand All @@ -68,27 +65,29 @@ public IWebSocket SubscribeToSummaryDeltas(Action<string> callback)
/// <param name="callback">Callback</param>
/// <param name="marketSymbols">The market symbols to subscribe to</param>
/// <returns>IDisposable to close the socket</returns>
public IWebSocket SubscribeToExchangeDeltas(Func<string, Task> callback, params string[] marketSymbols)
public async Task<IWebSocket> SubscribeToExchangeDeltas(Func<string, Task> callback, params string[] marketSymbols)
{
SignalrManager.SignalrSocketConnection conn = new SignalrManager.SignalrSocketConnection(this);
List<object[]> paramList = new List<object[]>();
foreach (string marketSymbol in marketSymbols)
{
paramList.Add(new object[] { marketSymbol });
}
Task.Run(async () => await conn.OpenAsync("uE", async (s) =>
{
await callback(s);
}, 0, paramList.ToArray()));
await conn.OpenAsync("uE", callback, 0, paramList.ToArray());
return conn;
}
}

private BittrexWebSocketManager webSocket;

protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] symbols)
protected override Task<IWebSocket> OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
{
void innerCallback(string json)
HashSet<string> filter = new HashSet<string>();
foreach (string marketSymbol in marketSymbols)
{
filter.Add(marketSymbol);
}
Task innerCallback(string json)
{
#region sample json
/*
Expand Down Expand Up @@ -122,6 +121,10 @@ void innerCallback(string json)
foreach (JToken ticker in token)
{
string marketName = ticker["M"].ToStringInvariant();
if (filter.Count != 0 && !filter.Contains(marketName))
{
continue;
}
var (baseCurrency, quoteCurrency) = ExchangeMarketSymbolToCurrencies(marketName);
decimal last = ticker["l"].ConvertInvariant<decimal>();
decimal ask = ticker["A"].ConvertInvariant<decimal>();
Expand All @@ -147,11 +150,12 @@ void innerCallback(string json)
freshTickers[marketName] = t;
}
callback(freshTickers);
return Task.CompletedTask;
}
return new BittrexWebSocketManager().SubscribeToSummaryDeltas(innerCallback);
return new BittrexWebSocketManager().SubscribeToSummaryDeltas(innerCallback, marketSymbols);
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket
(
Action<ExchangeOrderBook> callback,
int maxCount = 20,
Expand Down Expand Up @@ -222,7 +226,7 @@ Task innerCallback(string json)
return new BittrexWebSocketManager().SubscribeToExchangeDeltas(innerCallback, marketSymbols);
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down
6 changes: 3 additions & 3 deletions ExchangeSharp/API/Exchanges/Coinbase/ExchangeCoinbaseAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
}
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
return ConnectWebSocket(string.Empty, (_socket, msg) =>
{
Expand Down Expand Up @@ -362,7 +362,7 @@ protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderB
});
}

protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
{
return ConnectWebSocket("/", (_socket, msg) =>
{
Expand Down Expand Up @@ -393,7 +393,7 @@ protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<K
});
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down
4 changes: 2 additions & 2 deletions ExchangeSharp/API/Exchanges/Digifinex/ExchangeDigifinexAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ protected override async Task OnCancelOrderAsync(string orderId, string marketSy

#region WebSocket APIs

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (callback == null)
return null;
Expand Down Expand Up @@ -454,7 +454,7 @@ await callback.Invoke(new KeyValuePair<string, ExchangeTrade>(
});
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
if (callback == null)
{
Expand Down
2 changes: 1 addition & 1 deletion ExchangeSharp/API/Exchanges/Gemini/ExchangeGeminiAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ protected override async Task OnCancelOrderAsync(string orderId, string marketSy
await MakeJsonRequestAsync<JToken>("/order/cancel", null, new Dictionary<string, object>{ { "nonce", nonce }, { "order_id", orderId } });
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
//{
// "type": "l2_updates",
Expand Down
2 changes: 1 addition & 1 deletion ExchangeSharp/API/Exchanges/Hitbtc/ExchangeHitbtcAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ protected override async Task<ExchangeWithdrawalResponse> OnWithdrawAsync(Exchan

// working on it. Hitbtc has extensive support for sockets, including trading

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down
4 changes: 2 additions & 2 deletions ExchangeSharp/API/Exchanges/Huobi/ExchangeHuobiAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected async override Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>
return tickers;
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
return ConnectWebSocket(string.Empty, async (_socket, msg) =>
{
Expand Down Expand Up @@ -287,7 +287,7 @@ protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, Exc
});
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
return ConnectWebSocket(string.Empty, async (_socket, msg) =>
{
Expand Down
2 changes: 1 addition & 1 deletion ExchangeSharp/API/Exchanges/Kraken/ExchangeKrakenAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ protected override async Task OnCancelOrderAsync(string orderId, string marketSy
await MakeJsonRequestAsync<JToken>("/0/private/CancelOrder", null, payload);
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
Expand Down
4 changes: 2 additions & 2 deletions ExchangeSharp/API/Exchanges/KuCoin/ExchangeKuCoinAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ protected override async Task<ExchangeWithdrawalResponse> OnWithdrawAsync(Exchan

#region Websockets

protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTickersWebSocket(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback, params string[] marketSymbols)
{
var websocketUrlToken = GetWebsocketBulletToken();
return ConnectWebSocket(
Expand Down Expand Up @@ -463,7 +463,7 @@ protected override IWebSocket OnGetTickersWebSocket(Action<IReadOnlyCollection<K
);
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
//{
// "id":"5c24c5da03aa673885cd67aa",
Expand Down
8 changes: 4 additions & 4 deletions ExchangeSharp/API/Exchanges/OKGroup/OKGroupCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void parseData(Tuple<JToken, string> data)
return tickers;
}

protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetTradesWebSocket(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
{
/*
spot request:
Expand Down Expand Up @@ -228,7 +228,7 @@ protected override IWebSocket OnGetTradesWebSocket(Func<KeyValuePair<string, Exc
});
}

protected override IWebSocket OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
protected override Task<IWebSocket> OnGetDeltaOrderBookWebSocket(Action<ExchangeOrderBook> callback, int maxCount = 20, params string[] marketSymbols)
{
/*
request:
Expand Down Expand Up @@ -633,7 +633,7 @@ private ExchangeOrderResult ParseOrder(JToken token)
return result;
}

private IWebSocket ConnectWebSocketOkex(Func<IWebSocket, Task> connected, Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
private Task<IWebSocket> ConnectWebSocketOkex(Func<IWebSocket, Task> connected, Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
{
Timer pingTimer = null;
return ConnectWebSocket(url: string.Empty, messageCallback: async (_socket, msg) =>
Expand Down Expand Up @@ -683,7 +683,7 @@ private IWebSocket ConnectWebSocketOkex(Func<IWebSocket, Task> connected, Func<I
});
}

private IWebSocket ConnectPrivateWebSocketOkex(Func<IWebSocket, Task> connected, Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
private Task<IWebSocket> ConnectPrivateWebSocketOkex(Func<IWebSocket, Task> connected, Func<IWebSocket, string, string[], JToken, Task> callback, int symbolArrayIndex = 3)
{
return ConnectWebSocketOkex(async (_socket) =>
{
Expand Down
Loading