@@ -3,6 +3,7 @@ use crate::asset::content::Content;
33use crate :: asset:: content_encoder:: ContentEncoder ;
44use crate :: batch_upload:: semaphores:: Semaphores ;
55use crate :: canister_api:: methods:: chunk:: create_chunk;
6+ use crate :: canister_api:: methods:: chunk:: create_chunks;
67use crate :: canister_api:: types:: asset:: AssetDetails ;
78use crate :: error:: CreateChunkError ;
89use crate :: error:: CreateEncodingError ;
@@ -14,10 +15,12 @@ use futures::TryFutureExt;
1415use ic_utils:: Canister ;
1516use mime:: Mime ;
1617use slog:: { debug, info, Logger } ;
18+ use std:: collections:: BTreeMap ;
1719use std:: collections:: HashMap ;
1820use std:: path:: PathBuf ;
1921use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2022use std:: sync:: Arc ;
23+ use tokio:: sync:: Mutex ;
2124
2225const CONTENT_ENCODING_IDENTITY : & str = "identity" ;
2326
@@ -35,7 +38,7 @@ pub(crate) struct AssetDescriptor {
3538}
3639
3740pub ( crate ) struct ProjectAssetEncoding {
38- pub ( crate ) chunk_ids : Vec < Nat > ,
41+ pub ( crate ) uploader_chunk_ids : Vec < usize > ,
3942 pub ( crate ) sha256 : Vec < u8 > ,
4043 pub ( crate ) already_in_place : bool ,
4144}
@@ -46,30 +49,68 @@ pub(crate) struct ProjectAsset {
4649 pub ( crate ) encodings : HashMap < String , ProjectAssetEncoding > ,
4750}
4851
52+ type IdMapping = BTreeMap < usize , Nat > ;
53+ type UploadQueue = Vec < ( usize , Vec < u8 > ) > ;
4954pub ( crate ) struct ChunkUploader < ' agent > {
5055 canister : Canister < ' agent > ,
5156 batch_id : Nat ,
57+ api_version : u16 ,
5258 chunks : Arc < AtomicUsize > ,
5359 bytes : Arc < AtomicUsize > ,
60+ // maps uploader_chunk_id to canister_chunk_id
61+ id_mapping : Arc < Mutex < IdMapping > > ,
62+ upload_queue : Arc < Mutex < UploadQueue > > ,
5463}
64+
5565impl < ' agent > ChunkUploader < ' agent > {
56- pub ( crate ) fn new ( canister : Canister < ' agent > , batch_id : Nat ) -> Self {
66+ pub ( crate ) fn new ( canister : Canister < ' agent > , api_version : u16 , batch_id : Nat ) -> Self {
5767 Self {
5868 canister,
5969 batch_id,
70+ api_version,
6071 chunks : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
6172 bytes : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
73+ id_mapping : Arc :: new ( Mutex :: new ( BTreeMap :: new ( ) ) ) ,
74+ upload_queue : Arc :: new ( Mutex :: new ( vec ! [ ] ) ) ,
6275 }
6376 }
6477
78+ /// Returns an uploader_chunk_id, which is different from the chunk id on the asset canister.
79+ /// uploader_chunk_id can be mapped to canister_chunk_id using `uploader_ids_to_canister_chunk_ids`
80+ /// once `finalize_upload` has completed.
6581 pub ( crate ) async fn create_chunk (
6682 & self ,
6783 contents : & [ u8 ] ,
6884 semaphores : & Semaphores ,
69- ) -> Result < Nat , CreateChunkError > {
70- self . chunks . fetch_add ( 1 , Ordering :: SeqCst ) ;
85+ ) -> Result < usize , CreateChunkError > {
86+ let uploader_chunk_id = self . chunks . fetch_add ( 1 , Ordering :: SeqCst ) ;
7187 self . bytes . fetch_add ( contents. len ( ) , Ordering :: SeqCst ) ;
72- create_chunk ( & self . canister , & self . batch_id , contents, semaphores) . await
88+ if contents. len ( ) == MAX_CHUNK_SIZE || self . api_version < 2 {
89+ let canister_chunk_id =
90+ create_chunk ( & self . canister , & self . batch_id , contents, semaphores) . await ?;
91+ let mut map = self . id_mapping . lock ( ) . await ;
92+ map. insert ( uploader_chunk_id, canister_chunk_id) ;
93+ Ok ( uploader_chunk_id)
94+ } else {
95+ self . add_to_upload_queue ( uploader_chunk_id, contents) . await ;
96+ // Larger `max_retained_bytes` leads to batches that are filled closer to the max size.
97+ // `4 * MAX_CHUNK_SIZE` leads to a pretty small memory footprint but still offers solid fill rates.
98+ // Mini experiment:
99+ // - Tested with: `for i in $(seq 1 50); do dd if=/dev/urandom of="src/hello_frontend/assets/file_$i.bin" bs=$(shuf -i 1-2000000 -n 1) count=1; done && dfx deploy hello_frontend`
100+ // - Result: Roughly 15% of batches under 90% full.
101+ // With other byte ranges (e.g. `shuf -i 1-3000000 -n 1`) stats improve significantly
102+ self . upload_chunks ( 4 * MAX_CHUNK_SIZE , usize:: MAX , semaphores)
103+ . await ?;
104+ Ok ( uploader_chunk_id)
105+ }
106+ }
107+
108+ pub ( crate ) async fn finalize_upload (
109+ & self ,
110+ semaphores : & Semaphores ,
111+ ) -> Result < ( ) , CreateChunkError > {
112+ self . upload_chunks ( 0 , 0 , semaphores) . await ?;
113+ Ok ( ( ) )
73114 }
74115
75116 pub ( crate ) fn bytes ( & self ) -> usize {
@@ -78,6 +119,80 @@ impl<'agent> ChunkUploader<'agent> {
78119 pub ( crate ) fn chunks ( & self ) -> usize {
79120 self . chunks . load ( Ordering :: SeqCst )
80121 }
122+
123+ /// Call only after `finalize_upload` has completed
124+ pub ( crate ) async fn uploader_ids_to_canister_chunk_ids (
125+ & self ,
126+ uploader_ids : & [ usize ] ,
127+ ) -> Vec < Nat > {
128+ let mapping = self . id_mapping . lock ( ) . await ;
129+ uploader_ids
130+ . iter ( )
131+ . map ( |id| {
132+ mapping
133+ . get ( id)
134+ . expect ( "Chunk uploader did not upload all chunks. This is a bug." )
135+ . clone ( )
136+ } )
137+ . collect ( )
138+ }
139+
140+ async fn add_to_upload_queue ( & self , uploader_chunk_id : usize , contents : & [ u8 ] ) {
141+ let mut queue = self . upload_queue . lock ( ) . await ;
142+ queue. push ( ( uploader_chunk_id, contents. into ( ) ) ) ;
143+ }
144+
145+ /// Calls `upload_chunks` with batches of chunks from `self.upload_queue` until at most `max_retained_bytes`
146+ /// bytes and at most `max_retained_chunks` chunks remain in the upload queue. Larger values
147+ /// will lead to better batch fill rates but also leave a larger memory footprint.
148+ async fn upload_chunks (
149+ & self ,
150+ max_retained_bytes : usize ,
151+ max_retained_chunks : usize ,
152+ semaphores : & Semaphores ,
153+ ) -> Result < ( ) , CreateChunkError > {
154+ let mut queue = self . upload_queue . lock ( ) . await ;
155+
156+ let mut batches = vec ! [ ] ;
157+ while queue
158+ . iter ( )
159+ . map ( |( _, content) | content. len ( ) )
160+ . sum :: < usize > ( )
161+ > max_retained_bytes
162+ || queue. len ( ) > max_retained_chunks
163+ {
164+ // Greedily fills batch with the largest chunk that fits
165+ queue. sort_unstable_by_key ( |( _, content) | content. len ( ) ) ;
166+ let mut batch = vec ! [ ] ;
167+ let mut batch_size = 0 ;
168+ for ( uploader_chunk_id, content) in std:: mem:: take ( & mut * queue) . into_iter ( ) . rev ( ) {
169+ if content. len ( ) <= MAX_CHUNK_SIZE - batch_size {
170+ batch_size += content. len ( ) ;
171+ batch. push ( ( uploader_chunk_id, content) ) ;
172+ } else {
173+ queue. push ( ( uploader_chunk_id, content) ) ;
174+ }
175+ }
176+ batches. push ( batch) ;
177+ }
178+
179+ try_join_all ( batches. into_iter ( ) . map ( |chunks| async move {
180+ let ( uploader_chunk_ids, chunks) : ( Vec < _ > , Vec < _ > ) = chunks. into_iter ( ) . unzip ( ) ;
181+ let canister_chunk_ids =
182+ create_chunks ( & self . canister , & self . batch_id , chunks, semaphores) . await ?;
183+ let mut map = self . id_mapping . lock ( ) . await ;
184+ for ( uploader_id, canister_id) in uploader_chunk_ids
185+ . into_iter ( )
186+ . zip ( canister_chunk_ids. into_iter ( ) )
187+ {
188+ map. insert ( uploader_id, canister_id) ;
189+ }
190+ Ok ( ( ) )
191+ } ) )
192+ . await ?;
193+
194+ Ok ( ( ) )
195+ }
81196}
82197
83198#[ allow( clippy:: too_many_arguments) ]
@@ -110,7 +225,7 @@ async fn make_project_asset_encoding(
110225 false
111226 } ;
112227
113- let chunk_ids = if already_in_place {
228+ let uploader_chunk_ids = if already_in_place {
114229 info ! (
115230 logger,
116231 " {}{} ({} bytes) sha {} is already installed" ,
@@ -144,7 +259,7 @@ async fn make_project_asset_encoding(
144259 } ;
145260
146261 Ok ( ProjectAssetEncoding {
147- chunk_ids ,
262+ uploader_chunk_ids ,
148263 sha256,
149264 already_in_place,
150265 } )
@@ -305,6 +420,13 @@ pub(crate) async fn make_project_assets(
305420 } )
306421 . collect ( ) ;
307422 let project_assets = try_join_all ( project_asset_futures) . await ?;
423+ if let Some ( uploader) = chunk_upload_target {
424+ uploader. finalize_upload ( & semaphores) . await . map_err ( |err| {
425+ CreateProjectAssetError :: CreateEncodingError ( CreateEncodingError :: CreateChunkFailed (
426+ err,
427+ ) )
428+ } ) ?;
429+ }
308430
309431 let mut hm = HashMap :: new ( ) ;
310432 for project_asset in project_assets {
@@ -321,7 +443,7 @@ async fn upload_content_chunks(
321443 content_encoding : & str ,
322444 semaphores : & Semaphores ,
323445 logger : & Logger ,
324- ) -> Result < Vec < Nat > , CreateChunkError > {
446+ ) -> Result < Vec < usize > , CreateChunkError > {
325447 if content. data . is_empty ( ) {
326448 let empty = vec ! [ ] ;
327449 let chunk_id = chunk_uploader. create_chunk ( & empty, semaphores) . await ?;
0 commit comments