|
17 | 17 | import sys
|
18 | 18 | import psutil
|
19 | 19 | from packaging import version
|
| 20 | +from typing import Optional |
20 | 21 | import warnings
|
21 | 22 |
|
22 | 23 | import ray
|
|
34 | 35 | )
|
35 | 36 | from modin.error_message import ErrorMessage
|
36 | 37 |
|
| 38 | +_OBJECT_STORE_TO_SYSTEM_MEMORY_RATIO = 0.6 |
| 39 | + |
37 | 40 | ObjectIDType = ray.ObjectRef
|
38 | 41 | if version.parse(ray.__version__) >= version.parse("1.2.0"):
|
39 | 42 | from ray.util.client.common import ClientObjectRef
|
@@ -98,53 +101,7 @@ def initialize_ray(
|
98 | 101 | ray.init({', '.join([f'{k}={v}' for k,v in extra_init_kw.items()])})
|
99 | 102 | """,
|
100 | 103 | )
|
101 |
| - object_store_memory = Memory.get() |
102 |
| - # In case anything failed above, we can still improve the memory for Modin. |
103 |
| - if object_store_memory is None: |
104 |
| - virtual_memory = psutil.virtual_memory().total |
105 |
| - if sys.platform.startswith("linux"): |
106 |
| - shm_fd = os.open("/dev/shm", os.O_RDONLY) |
107 |
| - try: |
108 |
| - shm_stats = os.fstatvfs(shm_fd) |
109 |
| - system_memory = shm_stats.f_bsize * shm_stats.f_bavail |
110 |
| - if system_memory / (virtual_memory / 2) < 0.99: |
111 |
| - warnings.warn( |
112 |
| - f"The size of /dev/shm is too small ({system_memory} bytes). The required size " |
113 |
| - + f"at least half of RAM ({virtual_memory // 2} bytes). Please, delete files in /dev/shm or " |
114 |
| - + "increase size of /dev/shm with --shm-size in Docker. Also, you can set " |
115 |
| - + "the required memory size for each Ray worker in bytes to MODIN_MEMORY environment variable." |
116 |
| - ) |
117 |
| - finally: |
118 |
| - os.close(shm_fd) |
119 |
| - else: |
120 |
| - system_memory = virtual_memory |
121 |
| - object_store_memory = int(0.6 * system_memory // 1e9 * 1e9) |
122 |
| - # If the memory pool is smaller than 2GB, just use the default in ray. |
123 |
| - if object_store_memory == 0: |
124 |
| - object_store_memory = None |
125 |
| - else: |
126 |
| - object_store_memory = int(object_store_memory) |
127 |
| - |
128 |
| - mac_size_limit = getattr( |
129 |
| - ray.ray_constants, "MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT", None |
130 |
| - ) |
131 |
| - if ( |
132 |
| - sys.platform == "darwin" |
133 |
| - and mac_size_limit is not None |
134 |
| - and object_store_memory > mac_size_limit |
135 |
| - ): |
136 |
| - warnings.warn( |
137 |
| - "On Macs, Ray's performance is known to degrade with " |
138 |
| - + "object store size greater than " |
139 |
| - + f"{mac_size_limit / 2 ** 30:.4} GiB. Ray by default does " |
140 |
| - + "not allow setting an object store size greater than " |
141 |
| - + "that. Modin is overriding that default limit because " |
142 |
| - + "it would rather have a larger, slower object store " |
143 |
| - + "than spill to disk more often. To override Modin's " |
144 |
| - + "behavior, you can initialize Ray yourself." |
145 |
| - ) |
146 |
| - os.environ["RAY_ENABLE_MAC_LARGE_OBJECT_STORE"] = "1" |
147 |
| - |
| 104 | + object_store_memory = _get_object_store_memory() |
148 | 105 | ray_init_kwargs = {
|
149 | 106 | "num_cpus": CpuCount.get(),
|
150 | 107 | "num_gpus": GpuCount.get(),
|
@@ -186,6 +143,65 @@ def initialize_ray(
|
186 | 143 | NPartitions._put(num_cpus)
|
187 | 144 |
|
188 | 145 |
|
| 146 | +def _get_object_store_memory() -> Optional[int]: |
| 147 | + """ |
| 148 | + Get the object store memory we should start Ray with, in bytes. |
| 149 | +
|
| 150 | + - If the ``Memory`` config variable is set, return that. |
| 151 | + - On Linux, take system memory from /dev/shm. On other systems use total |
| 152 | + virtual memory. |
| 153 | + - On Mac, never return more than Ray-specified upper limit. |
| 154 | +
|
| 155 | + Returns |
| 156 | + ------- |
| 157 | + Optional[int] |
| 158 | + The object store memory size in bytes, or None if we should use the Ray |
| 159 | + default. |
| 160 | + """ |
| 161 | + object_store_memory = Memory.get() |
| 162 | + if object_store_memory is not None: |
| 163 | + return object_store_memory |
| 164 | + virtual_memory = psutil.virtual_memory().total |
| 165 | + if sys.platform.startswith("linux"): |
| 166 | + shm_fd = os.open("/dev/shm", os.O_RDONLY) |
| 167 | + try: |
| 168 | + shm_stats = os.fstatvfs(shm_fd) |
| 169 | + system_memory = shm_stats.f_bsize * shm_stats.f_bavail |
| 170 | + if system_memory / (virtual_memory / 2) < 0.99: |
| 171 | + warnings.warn( |
| 172 | + f"The size of /dev/shm is too small ({system_memory} bytes). The required size " |
| 173 | + + f"at least half of RAM ({virtual_memory // 2} bytes). Please, delete files in /dev/shm or " |
| 174 | + + "increase size of /dev/shm with --shm-size in Docker. Also, you can can override the memory " |
| 175 | + + "size for each Ray worker (in bytes) to the MODIN_MEMORY environment variable." |
| 176 | + ) |
| 177 | + finally: |
| 178 | + os.close(shm_fd) |
| 179 | + else: |
| 180 | + system_memory = virtual_memory |
| 181 | + bytes_per_gb = 1e9 |
| 182 | + object_store_memory = int( |
| 183 | + _OBJECT_STORE_TO_SYSTEM_MEMORY_RATIO |
| 184 | + * system_memory |
| 185 | + // bytes_per_gb |
| 186 | + * bytes_per_gb |
| 187 | + ) |
| 188 | + if object_store_memory == 0: |
| 189 | + return None |
| 190 | + # Versions of ray with MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT don't allow us |
| 191 | + # to initialize ray with object store size larger than that constant. |
| 192 | + # For background see https://github.com/ray-project/ray/issues/20388 |
| 193 | + mac_size_limit = getattr( |
| 194 | + ray.ray_constants, "MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT", None |
| 195 | + ) |
| 196 | + if ( |
| 197 | + sys.platform == "darwin" |
| 198 | + and mac_size_limit is not None |
| 199 | + and object_store_memory > mac_size_limit |
| 200 | + ): |
| 201 | + object_store_memory = mac_size_limit |
| 202 | + return object_store_memory |
| 203 | + |
| 204 | + |
189 | 205 | def deserialize(obj):
|
190 | 206 | """
|
191 | 207 | Deserialize a Ray object.
|
|
0 commit comments