File tree Expand file tree Collapse file tree 1 file changed +59
-0
lines changed Expand file tree Collapse file tree 1 file changed +59
-0
lines changed Original file line number Diff line number Diff line change
1
+ from concurrent .futures import ThreadPoolExecutor
2
+ from typing import Callable
3
+
4
+
5
+ def func_wrapper (args ):
6
+ (f , offset , * args ) = args
7
+ items = f (* args )
8
+ return list ((i + offset , item ) for i , item in enumerate (items ))
9
+
10
+
11
+ def get_items (
12
+ func : Callable ,
13
+ * args ,
14
+ parse : Callable = lambda _ : _ ,
15
+ chunk_size : int = 50 ,
16
+ processes : int = 5 ,
17
+ ):
18
+ """
19
+ This function performs pagination on a function that supports
20
+ `limit`/`offset` parameters and it runs API requests in parallel to speed
21
+ things up.
22
+ """
23
+ items = []
24
+ offsets = [- chunk_size ]
25
+ remaining = chunk_size * processes
26
+
27
+ with ThreadPoolExecutor (
28
+ processes , thread_name_prefix = f"mopidy-tidal-{ func .__name__ } -"
29
+ ) as pool :
30
+ while remaining == chunk_size * processes :
31
+ offsets = [offsets [- 1 ] + chunk_size * (i + 1 ) for i in range (processes )]
32
+
33
+ pool_results = pool .map (
34
+ func_wrapper ,
35
+ [
36
+ (
37
+ func ,
38
+ offset ,
39
+ * args ,
40
+ chunk_size , # limit
41
+ offset , # offset
42
+ )
43
+ for offset in offsets
44
+ ],
45
+ )
46
+
47
+ new_items = []
48
+ for results in pool_results :
49
+ new_items .extend (results )
50
+
51
+ remaining = len (new_items )
52
+ items .extend (new_items )
53
+
54
+ items = [_ for _ in items if _ ]
55
+ sorted_items = list (
56
+ map (lambda item : item [1 ], sorted (items , key = lambda item : item [0 ]))
57
+ )
58
+
59
+ return list (map (parse , sorted_items ))
You can’t perform that action at this time.
0 commit comments