@@ -5,7 +5,7 @@ use std::io::{Cursor, Read};
55use std:: thread;
66use std:: time;
77use tinyjson:: JsonValue ;
8- use tungstenite:: { Error as TError , Message } ;
8+ use tungstenite:: { client :: IntoClientRequest , Error as TError , Message } ;
99use zstd:: dict:: DecoderDictionary ;
1010
1111const JETSTREAM_ZSTD_DICTIONARY : & [ u8 ] = include_bytes ! ( "../../zstd/dictionary" ) ;
@@ -72,7 +72,7 @@ pub fn consume_jetstream(
7272 let mut latest_cursor = cursor;
7373 ' outer: loop {
7474 let url = WS_URLS [ connect_retries % WS_URLS . len ( ) ] ;
75- let stream = format ! (
75+ let stream_url = format ! (
7676 "{url}?compress=true{}" ,
7777 latest_cursor
7878 . map( |c| {
@@ -81,9 +81,13 @@ pub fn consume_jetstream(
8181 } )
8282 . unwrap_or( "" . into( ) )
8383 ) ;
84+ let mut req = ( & stream_url) . into_client_request ( ) ?;
85+ let ua = format ! ( "ucosm/link aggregator v{}" , env!( "CARGO_PKG_VERSION" ) ) ;
86+ req. headers_mut ( ) . insert ( "user-agent" , ua. parse ( ) ?) ;
87+
8488 counter ! ( "jetstream_connect" , "url" => url, "is_retry" => ( connect_retries > 0 ) . to_string( ) ) . increment ( 1 ) ;
85- println ! ( "jetstream connecting, attempt #{connect_retries}: {stream}... " ) ;
86- let mut socket = match tungstenite:: connect ( & stream ) {
89+ println ! ( "jetstream connecting, attempt #{connect_retries}, {stream_url:?} with user-agent: {ua:?} " ) ;
90+ let mut socket = match tungstenite:: connect ( req ) {
8791 Ok ( ( socket, _) ) => {
8892 println ! ( "jetstream connected." ) ;
8993 connect_retries = 0 ;
@@ -92,6 +96,7 @@ pub fn consume_jetstream(
9296 Err ( e) => {
9397 connect_retries += 1 ;
9498 if connect_retries >= 7 {
99+ eprintln ! ( "jetstream: no more connect retries, breaking out." ) ;
95100 break ;
96101 }
97102 let backoff = time:: Duration :: from_secs ( connect_retries. try_into ( ) . unwrap ( ) ) ;
0 commit comments