Skip to content

Commit 4fc6ea9

Browse files
committed
Code cleanup, added custom HttpMessageHandler, fixed several issues with pubsub.
1 parent dd303dc commit 4fc6ea9

File tree

5 files changed

+130
-68
lines changed

5 files changed

+130
-68
lines changed

src/CoreApi/PubSubApi.cs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Text;
99
using System.Threading;
1010
using System.Threading.Tasks;
11+
using Multiformats.Base;
1112

1213
namespace Ipfs.Http
1314
{
@@ -21,7 +22,7 @@ internal PubSubApi(IpfsClient ipfs)
2122
this.ipfs = ipfs;
2223
}
2324

24-
public async Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
25+
public async Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default)
2526
{
2627
var json = await ipfs.DoCommandAsync("pubsub/ls", cancel);
2728
var result = JObject.Parse(json);
@@ -30,68 +31,71 @@ internal PubSubApi(IpfsClient ipfs)
3031
return strings.Select(s => (string)s);
3132
}
3233

33-
public async Task<IEnumerable<Peer>> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken))
34+
public async Task<IEnumerable<Peer>> PeersAsync(string topic = null, CancellationToken cancel = default)
3435
{
3536
var json = await ipfs.DoCommandAsync("pubsub/peers", cancel, topic);
3637
var result = JObject.Parse(json);
3738
var strings = result["Strings"] as JArray;
38-
if (strings == null) return new Peer[0];
39+
40+
if (strings == null)
41+
return Array.Empty<Peer>();
42+
3943
return strings.Select(s => new Peer { Id = (string)s });
4044
}
4145

42-
public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken))
46+
public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default)
4347
{
4448
var url = new StringBuilder();
4549
url.Append("/api/v0/pubsub/pub");
46-
url.Append("?arg=");
47-
url.Append(System.Net.WebUtility.UrlEncode(topic));
48-
url.Append("&arg=");
49-
var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length));
50-
url.Append(data);
51-
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel);
50+
url.Append("?arg=u");
51+
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));
52+
53+
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
5254
}
5355

54-
public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken))
56+
public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default)
5557
{
56-
using (MemoryStream ms = new MemoryStream())
57-
{
58-
message.CopyTo(ms);
59-
return PublishAsync(topic, ms.ToArray(), cancel);
60-
}
58+
var url = new StringBuilder();
59+
url.Append("/api/v0/pubsub/pub");
60+
url.Append("?arg=u");
61+
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));
62+
63+
return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
6164
}
6265

63-
public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken))
66+
public async Task PublishAsync(string topic, string message, CancellationToken cancel = default)
6467
{
65-
var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message);
66-
return;
68+
var url = new StringBuilder();
69+
url.Append("/api/v0/pubsub/pub");
70+
url.Append("?arg=u");
71+
url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic)));
72+
73+
await ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel);
6774
}
6875

6976
public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler, CancellationToken cancellationToken)
7077
{
71-
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic);
78+
var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, $"u{Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))}");
7279
var sr = new StreamReader(messageStream);
7380

74-
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
75-
Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken));
76-
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
77-
78-
return;
81+
_ = Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken), cancellationToken);
7982
}
8083

8184
void ProcessMessages(string topic, Action<PublishedMessage> handler, StreamReader sr, CancellationToken ct)
8285
{
83-
log.DebugFormat("Start listening for '{0}' messages", topic);
86+
log.DebugFormat($"Start listening for '{topic}' messages");
8487

8588
// .Net needs a ReadLine(CancellationToken)
8689
// As a work-around, we register a function to close the stream
87-
ct.Register(() => sr.Dispose());
90+
ct.Register(sr.Dispose);
8891
try
8992
{
9093
while (!sr.EndOfStream && !ct.IsCancellationRequested)
9194
{
9295
var json = sr.ReadLine();
9396
if (json == null)
9497
break;
98+
9599
if (log.IsDebugEnabled)
96100
log.DebugFormat("PubSub message {0}", json);
97101

src/IpfsClient.cs

Lines changed: 80 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ namespace Ipfs.Http
2323
/// <seealso href="https://ipfs.io/docs/api/">IPFS API</seealso>
2424
/// <seealso href="https://ipfs.io/docs/commands/">IPFS commands</seealso>
2525
/// <remarks>
26-
/// <b>IpfsClient</b> is thread safe, only one instance is required
27-
/// by the application.
26+
/// <b>IpfsClient</b> is thread safe, only one instance is required by the application.
2827
/// </remarks>
2928
public partial class IpfsClient : ICoreApi
3029
{
@@ -61,6 +60,7 @@ public IpfsClient()
6160

6261
var assembly = typeof(IpfsClient).GetTypeInfo().Assembly;
6362
var version = assembly.GetName().Version;
63+
6464
UserAgent = string.Format("{0}/{1}.{2}.{3}", assembly.GetName().Name, version.Major, version.Minor, version.Revision);
6565
TrustedPeers = new TrustedPeerCollection(this);
6666

@@ -115,7 +115,7 @@ public IpfsClient(string host)
115115
/// The list of peers that are initially trusted by IPFS.
116116
/// </summary>
117117
/// <remarks>
118-
/// This is equilivent to <c>ipfs bootstrap list</c>.
118+
/// This is equivalent to <c>ipfs bootstrap list</c>.
119119
/// </remarks>
120120
public TrustedPeerCollection TrustedPeers { get; private set; }
121121

@@ -174,6 +174,7 @@ Uri BuildCommand(string command, string arg = null, params string[] options)
174174
{
175175
var url = "/api/v0/" + command;
176176
var q = new StringBuilder();
177+
177178
if (arg != null)
178179
{
179180
q.Append("&arg=");
@@ -223,23 +224,29 @@ HttpClient Api()
223224
{
224225
if (api == null)
225226
{
226-
var handler = new HttpClientHandler();
227-
if (handler.SupportsAutomaticDecompression)
227+
if (HttpMessageHandler is HttpClientHandler handler && handler.SupportsAutomaticDecompression)
228228
{
229229
handler.AutomaticDecompression = DecompressionMethods.GZip
230230
| DecompressionMethods.Deflate;
231231
}
232-
api = new HttpClient(handler)
232+
233+
api = new HttpClient(HttpMessageHandler)
233234
{
234-
Timeout = System.Threading.Timeout.InfiniteTimeSpan
235+
Timeout = Timeout.InfiniteTimeSpan
235236
};
237+
236238
api.DefaultRequestHeaders.Add("User-Agent", UserAgent);
237239
}
238240
}
239241
}
240242
return api;
241243
}
242244

245+
/// <summary>
246+
/// The message handler to use for communicating over HTTP.
247+
/// </summary>
248+
public HttpMessageHandler HttpMessageHandler { get; set; } = new HttpClientHandler();
249+
243250
/// <summary>
244251
/// Perform an <see href="https://ipfs.io/docs/api/">IPFS API command</see> returning a string.
245252
/// </summary>
@@ -265,29 +272,49 @@ HttpClient Api()
265272
public async Task<string> DoCommandAsync(string command, CancellationToken cancel, string arg = null, params string[] options)
266273
{
267274
var url = BuildCommand(command, arg, options);
275+
268276
if (log.IsDebugEnabled)
269-
log.Debug("POST " + url.ToString());
277+
log.Debug("POST " + url);
278+
270279
using (var response = await Api().PostAsync(url, null, cancel))
271280
{
272281
await ThrowOnErrorAsync(response);
273282
var body = await response.Content.ReadAsStringAsync();
283+
274284
if (log.IsDebugEnabled)
275285
log.Debug("RSP " + body);
286+
276287
return body;
277288
}
278289
}
279290

280-
internal async Task DoCommandAsync(Uri url, CancellationToken cancel)
291+
internal Task DoCommandAsync(Uri url, byte[] bytes, CancellationToken cancel)
292+
{
293+
return DoCommandAsync(url, new ByteArrayContent(bytes), cancel);
294+
}
295+
296+
internal Task DoCommandAsync(Uri url, Stream stream, CancellationToken cancel)
297+
{
298+
return DoCommandAsync(url, new StreamContent(stream), cancel);
299+
}
300+
301+
internal Task DoCommandAsync(Uri url, string str, CancellationToken cancel)
302+
{
303+
return DoCommandAsync(url, new StringContent(str), cancel);
304+
}
305+
306+
internal async Task DoCommandAsync(Uri url, HttpContent content, CancellationToken cancel)
281307
{
282308
if (log.IsDebugEnabled)
283-
log.Debug("POST " + url.ToString());
284-
using (var response = await Api().PostAsync(url, null, cancel))
309+
log.Debug("POST " + url);
310+
311+
using (var response = await Api().PostAsync(url, new MultipartFormDataContent { { content, "\"file\"" } }, cancel))
285312
{
286313
await ThrowOnErrorAsync(response);
287314
var body = await response.Content.ReadAsStringAsync();
315+
288316
if (log.IsDebugEnabled)
289317
log.Debug("RSP " + body);
290-
return;
291318
}
292319
}
293320

@@ -353,12 +380,15 @@ public async Task<T> DoCommandAsync<T>(string command, CancellationToken cancel,
353380
public async Task<Stream> PostDownloadAsync(string command, CancellationToken cancel, string arg = null, params string[] options)
354381
{
355382
var url = BuildCommand(command, arg, options);
383+
356384
if (log.IsDebugEnabled)
357-
log.Debug("POST " + url.ToString());
358-
var request = new HttpRequestMessage(HttpMethod.Post, url);
385+
log.Debug("POST " + url);
359386

387+
var request = new HttpRequestMessage(HttpMethod.Post, url);
360388
var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel);
389+
361390
await ThrowOnErrorAsync(response);
391+
362392
return await response.Content.ReadAsStreamAsync();
363393
}
364394

@@ -388,10 +418,13 @@ public async Task<Stream> PostDownloadAsync(string command, CancellationToken ca
388418
public async Task<Stream> DownloadAsync(string command, CancellationToken cancel, string arg = null, params string[] options)
389419
{
390420
var url = BuildCommand(command, arg, options);
421+
391422
if (log.IsDebugEnabled)
392-
log.Debug("GET " + url.ToString());
423+
log.Debug("GET " + url);
424+
393425
var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel);
394426
await ThrowOnErrorAsync(response);
427+
395428
return await response.Content.ReadAsStreamAsync();
396429
}
397430

@@ -421,10 +454,13 @@ public async Task<Stream> DownloadAsync(string command, CancellationToken cancel
421454
public async Task<byte[]> DownloadBytesAsync(string command, CancellationToken cancel, string arg = null, params string[] options)
422455
{
423456
var url = BuildCommand(command, arg, options);
457+
424458
if (log.IsDebugEnabled)
425-
log.Debug("GET " + url.ToString());
459+
log.Debug("GET " + url);
460+
426461
var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel);
427462
await ThrowOnErrorAsync(response);
463+
428464
return await response.Content.ReadAsByteArrayAsync();
429465
}
430466

@@ -460,21 +496,26 @@ public async Task<String> UploadAsync(string command, CancellationToken cancel,
460496
{
461497
var content = new MultipartFormDataContent();
462498
var streamContent = new StreamContent(data);
499+
463500
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
501+
464502
if (string.IsNullOrEmpty(name))
465503
content.Add(streamContent, "file", unknownFilename);
466504
else
467505
content.Add(streamContent, "file", name);
468506

469507
var url = BuildCommand(command, null, options);
470508
if (log.IsDebugEnabled)
471-
log.Debug("POST " + url.ToString());
509+
log.Debug("POST " + url);
510+
472511
using (var response = await Api().PostAsync(url, content, cancel))
473512
{
474513
await ThrowOnErrorAsync(response);
475514
var json = await response.Content.ReadAsStringAsync();
515+
476516
if (log.IsDebugEnabled)
477517
log.Debug("RSP " + json);
518+
478519
return json;
479520
}
480521
}
@@ -510,17 +551,19 @@ public async Task<Stream> Upload2Async(string command, CancellationToken cancel,
510551
{
511552
var content = new MultipartFormDataContent();
512553
var streamContent = new StreamContent(data);
554+
513555
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
514-
if (string.IsNullOrEmpty(name))
515-
content.Add(streamContent, "file", unknownFilename);
516-
else
517-
content.Add(streamContent, "file", name);
556+
557+
content.Add(streamContent, "file", string.IsNullOrEmpty(name) ? unknownFilename : name);
518558

519559
var url = BuildCommand(command, null, options);
560+
520561
if (log.IsDebugEnabled)
521-
log.Debug("POST " + url.ToString());
562+
log.Debug("POST " + url);
563+
522564
var response = await Api().PostAsync(url, content, cancel);
523565
await ThrowOnErrorAsync(response);
566+
524567
return await response.Content.ReadAsStreamAsync();
525568
}
526569

@@ -531,18 +574,24 @@ public async Task<String> UploadAsync(string command, CancellationToken cancel,
531574
{
532575
var content = new MultipartFormDataContent();
533576
var streamContent = new ByteArrayContent(data);
577+
534578
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
535579
content.Add(streamContent, "file", unknownFilename);
536580

537581
var url = BuildCommand(command, null, options);
582+
538583
if (log.IsDebugEnabled)
539-
log.Debug("POST " + url.ToString());
584+
log.Debug("POST " + url);
585+
540586
using (var response = await Api().PostAsync(url, content, cancel))
541587
{
542588
await ThrowOnErrorAsync(response);
589+
543590
var json = await response.Content.ReadAsStringAsync();
591+
544592
if (log.IsDebugEnabled)
545593
log.Debug("RSP " + json);
594+
546595
return json;
547596
}
548597
}
@@ -562,24 +611,31 @@ async Task<bool> ThrowOnErrorAsync(HttpResponseMessage response)
562611
{
563612
if (response.IsSuccessStatusCode)
564613
return true;
614+
565615
if (response.StatusCode == HttpStatusCode.NotFound)
566616
{
567-
var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri.ToString();
617+
var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri;
618+
568619
if (log.IsDebugEnabled)
569620
log.Debug("ERR " + error);
621+
570622
throw new HttpRequestException(error);
571623
}
572624

573625
var body = await response.Content.ReadAsStringAsync();
626+
574627
if (log.IsDebugEnabled)
575628
log.Debug("ERR " + body);
629+
576630
string message = body;
631+
577632
try
578633
{
579634
var res = JsonConvert.DeserializeObject<dynamic>(body);
580635
message = (string)res.Message;
581636
}
582637
catch { }
638+
583639
throw new HttpRequestException(message);
584640
}
585641

0 commit comments

Comments
 (0)