Skip to content

Add ExchangeKrakenAPI.OnGetCandlesWebSocketAsync implementation #600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -456,4 +456,7 @@ launchSettings.json
**/keys.bin
dist/
data/**
!data/.gitkeep
!data/.gitkeep

## Mac specific
.DS_Store
81 changes: 64 additions & 17 deletions src/ExchangeSharp/API/Exchanges/Kraken/ExchangeKrakenAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -843,30 +843,77 @@ protected override async Task OnCancelOrderAsync(string orderId, string marketSy
await MakeJsonRequestAsync<JToken>("/0/private/CancelOrder", null, payload);
}

protected override async Task<IWebSocket> OnGetCandlesWebSocketAsync(Func<MarketCandle, Task> callbackAsync, int periodSeconds, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync(true)).ToArray();
}
//kraken has multiple OHLC channels named ohlc-1|5|15|30|60|240|1440|10080|21600 with interval specified in minutes
int interval = periodSeconds / 60;

return await ConnectWebSocketAsync(null, messageCallback: async (_socket, msg) =>
{
/*
https://docs.kraken.com/websockets/#message-ohlc
[0]channelID integer Channel ID of subscription -deprecated, use channelName and pair
[1]Array array
-time decimal Begin time of interval, in seconds since epoch
-etime decimal End time of interval, in seconds since epoch
-open decimal Open price of interval
-high decimal High price within interval
-low decimal Low price within interval
-close decimal Close price of interval
-vwap decimal Volume weighted average price within interval
-volume decimal Accumulated volume within interval
-count integer Number of trades within interval
[2]channelName string Channel Name of subscription
[3]pair string Asset pair
*/
if (JToken.Parse(msg.ToStringFromUTF8()) is JArray token && token[2].ToStringInvariant() == ($"ohlc-{interval}"))
{
string marketSymbol = token[3].ToStringInvariant();
//Kraken updates the candle open time to the current time, but we want it as open-time i.e. close-time - interval
token[1][0] = token[1][1].ConvertInvariant<long>() - interval * 60;
var candle = this.ParseCandle(token[1], marketSymbol, interval * 60, 2, 3, 4, 5, 0, TimestampType.UnixSeconds, 7, null, 6);
await callbackAsync(candle);
}
}, connectCallback: async (_socket) =>
{
List<string> marketSymbolList = await GetMarketSymbolList(marketSymbols);
await _socket.SendMessageAsync(new
{
@event = "subscribe",
pair = marketSymbolList,
subscription = new { name = "ohlc", interval = interval }
});
});
}

protected override async Task<IWebSocket> OnGetTickersWebSocketAsync(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> tickers, params string[] marketSymbols)
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync(true)).ToArray();
}
return await ConnectWebSocketAsync(null, messageCallback: async (_socket, msg) =>
{
if (JToken.Parse(msg.ToStringFromUTF8()) is JArray token)
{
var exchangeTicker = await ConvertToExchangeTickerAsync(token[3].ToString(), token[1]);
var kv = new KeyValuePair<string, ExchangeTicker>(exchangeTicker.MarketSymbol, exchangeTicker);
tickers(new List<KeyValuePair<string, ExchangeTicker>> { kv });
}
}, connectCallback: async (_socket) =>
{
List<string> marketSymbolList = await GetMarketSymbolList(marketSymbols);
await _socket.SendMessageAsync(new
{
@event = "subscribe",
pair = marketSymbolList,
subscription = new { name = "ticker" }
});
});
{
if (JToken.Parse(msg.ToStringFromUTF8()) is JArray token)
{
var exchangeTicker = await ConvertToExchangeTickerAsync(token[3].ToString(), token[1]);
var kv = new KeyValuePair<string, ExchangeTicker>(exchangeTicker.MarketSymbol, exchangeTicker);
tickers(new List<KeyValuePair<string, ExchangeTicker>> { kv });
}
}, connectCallback: async (_socket) =>
{
List<string> marketSymbolList = await GetMarketSymbolList(marketSymbols);
await _socket.SendMessageAsync(new
{
@event = "subscribe",
pair = marketSymbolList,
subscription = new { name = "ticker" }
});
});
}

protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(Func<KeyValuePair<string, ExchangeTrade>, Task> callback, params string[] marketSymbols)
Expand Down
6 changes: 3 additions & 3 deletions src/ExchangeSharp/API/Exchanges/_Base/ExchangeAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected virtual Task<ExchangeMarginPositionResult> OnGetOpenPositionAsync(stri
throw new NotImplementedException();
protected virtual Task<ExchangeCloseMarginPositionResult> OnCloseMarginPositionAsync(string marketSymbol) =>
throw new NotImplementedException();
protected virtual Task<IWebSocket> OnGetCandlesWebSocketAsync(Func<IReadOnlyCollection<MarketCandle>, Task> callbackAsync, params string[] marketSymbols) =>
protected virtual Task<IWebSocket> OnGetCandlesWebSocketAsync(Func<MarketCandle, Task> callbackAsync, int periodSeconds, params string[] marketSymbols) =>
throw new NotImplementedException();
protected virtual Task<IWebSocket> OnGetTickersWebSocketAsync(Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> tickers, params string[] marketSymbols) =>
throw new NotImplementedException();
Expand Down Expand Up @@ -1005,10 +1005,10 @@ public virtual async Task<ExchangeCloseMarginPositionResult> CloseMarginPosition
/// <param name="callbackAsync">Callback</param>
/// <param name="marketSymbols">Market Symbols</param>
/// <returns>Web socket, call Dispose to close</returns>
public virtual Task<IWebSocket> GetCandlesWebSocketAsync(Func<IReadOnlyCollection<MarketCandle>, Task> callbackAsync, params string[] marketSymbols)
public virtual Task<IWebSocket> GetCandlesWebSocketAsync(Func<MarketCandle, Task> callbackAsync, int periodSeconds, params string[] marketSymbols)
{
callbackAsync.ThrowIfNull(nameof(callbackAsync), "Callback must not be null");
return OnGetCandlesWebSocketAsync(callbackAsync, marketSymbols);
return OnGetCandlesWebSocketAsync(callbackAsync, periodSeconds, marketSymbols);
}

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/ExchangeSharp/API/Exchanges/_Base/IExchangeAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ public interface IExchangeAPI : IDisposable, IBaseAPI, IOrderBookProvider
/// Gets Candles (OHLC) websocket
/// </summary>
/// <param name="callbackAsync">Callback</param>
/// <param name="periodSeconds">Candle interval in seconds</param>
/// <param name="marketSymbols">Market Symbols</param>
/// <returns>Web socket, call Dispose to close</returns>
Task<IWebSocket> GetCandlesWebSocketAsync(Func<IReadOnlyCollection<MarketCandle>, Task> callbackAsync, params string[] marketSymbols);
Task<IWebSocket> GetCandlesWebSocketAsync(Func<MarketCandle, Task> callbackAsync, int periodSeconds, params string[] marketSymbols);

/// <summary>
/// Get all tickers via web socket
Expand Down
25 changes: 14 additions & 11 deletions src/ExchangeSharp/Traders/MovingAverageCalculator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public abstract class MovingAverageCalculatorBase<T>
protected T _previousMovingAverage;
protected T _previousExponentialMovingAverage;

public int WindowSize => _windowSize;

public T WeightingMultiplier => _weightingMultiplier;
/// <summary>
/// Current moving average
/// </summary>
Expand Down Expand Up @@ -58,6 +61,17 @@ public override string ToString()
{
return string.Format("{0}:{1}, {2}:{3}", MovingAverage, Slope, ExponentialMovingAverage, ExponentialSlope);
}

/// <summary>
/// Gets a value indicating whether enough values have been provided to fill the
/// specified window size. Values returned from NextValue may still be used prior
/// to IsMature returning true, however such values are not subject to the intended
/// smoothing effect of the moving average's window size.
/// </summary>
public bool IsMature
{
get { return _valuesIn == _windowSize; }
}
}

/// <summary>
Expand Down Expand Up @@ -126,17 +140,6 @@ public void NextValue(double nextValue)
}
}

/// <summary>
/// Gets a value indicating whether enough values have been provided to fill the
/// specified window size. Values returned from NextValue may still be used prior
/// to IsMature returning true, however such values are not subject to the intended
/// smoothing effect of the moving average's window size.
/// </summary>
public bool IsMature
{
get { return _valuesIn == _windowSize; }
}

/// <summary>
/// Clears any accumulated state and resets the calculator to its initial configuration.
/// Calling this method is the equivalent of creating a new instance.
Expand Down
6 changes: 4 additions & 2 deletions src/ExchangeSharpConsole/Options/CandlesOption.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
namespace ExchangeSharpConsole.Options
{
[Verb("candles", HelpText = "Prints all candle data from a 12 days period for the given exchange.")]
public class CandlesOption : BaseOption, IOptionPerExchange, IOptionPerMarketSymbol
public class CandlesOption : BaseOption, IOptionPerExchange, IOptionPerMarketSymbol, IOptionWithPeriod
{
public override async Task RunCommand()
{
using var api = GetExchangeInstance(ExchangeName);

var candles = await api.GetCandlesAsync(
MarketSymbol,
1800,
Period,
//TODO: Add interfaces for start and end date
CryptoUtility.UtcNow.AddDays(-12),
CryptoUtility.UtcNow
Expand All @@ -32,5 +32,7 @@ public override async Task RunCommand()
public string ExchangeName { get; set; }

public string MarketSymbol { get; set; }

public int Period { get; set; }
}
}
11 changes: 11 additions & 0 deletions src/ExchangeSharpConsole/Options/Interfaces/IOptionWithPeriod.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using CommandLine;

namespace ExchangeSharpConsole.Options.Interfaces
{
public interface IOptionWithPeriod
{
[Option('p', "period", Default = 1800,
HelpText = "Period in seconds.")]
int Period { get; set; }
}
}
41 changes: 41 additions & 0 deletions src/ExchangeSharpConsole/Options/WebSocketsCandesOption.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CommandLine;
using ExchangeSharp;
using ExchangeSharpConsole.Options.Interfaces;

namespace ExchangeSharpConsole.Options
{
[Verb("ws-candles", HelpText =
"Connects to the given exchange websocket and keeps printing the candles from that exchange." +
"If market symbol is not set then uses all.")]
public class WebSocketsCandlesOption : BaseOption, IOptionPerExchange, IOptionWithMultipleMarketSymbol, IOptionWithPeriod
{
public override async Task RunCommand()
{
async Task<IWebSocket> GetWebSocket(IExchangeAPI api)
{
var symbols = await ValidateMarketSymbolsAsync(api, MarketSymbols.ToArray(), true);

return await api.GetCandlesWebSocketAsync(candle =>
{
Console.WriteLine($"Market {candle.Name,8}: {candle}");
return Task.CompletedTask;
},
Period,
symbols
);
}

await RunWebSocket(ExchangeName, GetWebSocket);
}

public string ExchangeName { get; set; }

public IEnumerable<string> MarketSymbols { get; set; }

public int Period { get; set; }
}
}
3 changes: 2 additions & 1 deletion src/ExchangeSharpConsole/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public partial class Program
typeof(TradeHistoryOption),
typeof(WebSocketsOrderbookOption),
typeof(WebSocketsTickersOption),
typeof(WebSocketsTradesOption)
typeof(WebSocketsTradesOption),
typeof(WebSocketsCandlesOption)
};

public Program()
Expand Down