@@ -57,7 +57,7 @@ def __init__(self, creator, combiner, mergeCombiner=None):
57
57
class Merger (object ):
58
58
59
59
"""
60
- merge shuffled data together by combinator
60
+ merge shuffled data together by aggregator
61
61
"""
62
62
63
63
def __init__ (self , aggregator ):
@@ -77,8 +77,9 @@ def iteritems(self):
77
77
78
78
79
79
class InMemoryMerger (Merger ):
80
+
80
81
"""
81
- In memory merger based on map
82
+ In memory merger based on in-memory dict.
82
83
"""
83
84
84
85
def __init__ (self , aggregator ):
@@ -107,8 +108,30 @@ def iteritems(self):
107
108
class ExternalMerger (Merger ):
108
109
109
110
"""
110
- External merger will dump the aggregated data into disks when memory usage
111
- is above the limit, then merge them together.
111
+ External merger will dump the aggregated data into disks when
112
+ memory usage goes above the limit, then merge them together.
113
+
114
+ This class works as follows:
115
+
116
+ - It repeatedly combine the items and save them in one dict in
117
+ memory.
118
+
119
+ - When the used memory goes above memory limit, it will split
120
+ the combined data into partitions by hash code, dump them
121
+ into disk, one file per partition.
122
+
123
+ - Then it goes through the rest of the iterator, combine items
124
+ into different dict by hash. Until the used memory goes over
125
+ memory limit, it dump all the dicts into disks, one file per
126
+ dict. Repeat this again until combine all the items.
127
+
128
+ - Before return any items, it will load each partition and
129
+ combine them seperately. Yield them before loading next
130
+ partition.
131
+
132
+ - During loading a partition, if the memory goes over limit,
133
+ it will partition the loaded data and dump them into disks
134
+ and load them partition by partition again.
112
135
113
136
>>> agg = Aggregator(lambda x: x, lambda x, y: x + y)
114
137
>>> merger = ExternalMerger(agg, 10)
0 commit comments