24
24
25
25
import numpy as np
26
26
import simplekml
27
- from multiprocess import Event , Lock , Process , Semaphore , shared_memory
28
- from multiprocess .managers import BaseManager
29
27
30
28
from rocketpy import Function
31
29
from rocketpy ._encoders import RocketPyEncoder
40
38
from rocketpy .tools import (
41
39
generate_monte_carlo_ellipses ,
42
40
generate_monte_carlo_ellipses_coordinates ,
41
+ import_optional_dependency ,
43
42
)
44
43
45
44
# TODO: Create evolution plots to analyze convergence
@@ -355,6 +354,8 @@ def __run_in_parallel(self, n_workers=None):
355
354
# calculate the number of simulations that can be stored in memory
356
355
n_sim_memory = max (n_workers - 2 , 2 ) # at least a double buffer
357
356
357
+ multiprocess , shared_memory , managers = import_multiprocess ()
358
+
358
359
# initialize shared memory buffer
359
360
shared_inputs_buffer = shared_memory .SharedMemory (
360
361
create = True , size = inputs_size * n_sim_memory , name = "shared_inputs"
@@ -364,7 +365,7 @@ def __run_in_parallel(self, n_workers=None):
364
365
)
365
366
366
367
try :
367
- with MonteCarloManager ( ) as manager :
368
+ with create_multiprocess_manager ( multiprocess , managers ) as manager :
368
369
# initialize queue
369
370
errors_lock = manager .Lock ()
370
371
@@ -404,9 +405,10 @@ def __run_in_parallel(self, n_workers=None):
404
405
405
406
# Creates n_workers processes then starts them
406
407
for _ in range (n_workers - 2 ): # leave 2 cores for the writer workers
407
- p = Process (
408
+ p = multiprocess . Process (
408
409
target = self .__run_simulation_worker ,
409
410
args = (
411
+ shared_memory ,
410
412
self .error_file ,
411
413
self .export_list ,
412
414
self .environment ,
@@ -436,9 +438,10 @@ def __run_in_parallel(self, n_workers=None):
436
438
input_writer_stop_event = manager .Event ()
437
439
results_writer_stop_event = manager .Event ()
438
440
439
- input_writer = Process (
441
+ input_writer = multiprocess . Process (
440
442
target = self ._write_data_worker ,
441
443
args = (
444
+ shared_memory ,
442
445
self .input_file ,
443
446
go_write_inputs ,
444
447
go_read_inputs ,
@@ -449,9 +452,10 @@ def __run_in_parallel(self, n_workers=None):
449
452
),
450
453
)
451
454
452
- results_writer = Process (
455
+ results_writer = multiprocess . Process (
453
456
target = self ._write_data_worker ,
454
457
args = (
458
+ shared_memory ,
455
459
self .output_file ,
456
460
go_write_results ,
457
461
go_read_results ,
@@ -500,6 +504,7 @@ def __run_in_parallel(self, n_workers=None):
500
504
501
505
@staticmethod
502
506
def __run_simulation_worker (
507
+ shared_memory ,
503
508
error_file ,
504
509
export_list ,
505
510
sto_env ,
@@ -523,6 +528,8 @@ def __run_simulation_worker(
523
528
524
529
Parameters
525
530
----------
531
+ shared_memory : module
532
+ Shared memory handler of multiprocess module.
526
533
error_file : Path
527
534
Path of the error file.
528
535
export_list : list
@@ -820,6 +827,7 @@ def __loop_though_buffer(
820
827
821
828
@staticmethod
822
829
def _write_data_worker (
830
+ shared_memory ,
823
831
file_path ,
824
832
go_write_semaphores ,
825
833
go_read_semaphores ,
@@ -833,6 +841,8 @@ def _write_data_worker(
833
841
834
842
Parameters
835
843
----------
844
+ shared_memory : module
845
+ Shared memory handler of multiprocess module.
836
846
file_path : str
837
847
Path to the file to write the data.
838
848
go_write_semaphores : list
@@ -1672,16 +1682,50 @@ def prepare_export_data(obj, sample_time=0.1, remove_functions=False):
1672
1682
return result
1673
1683
1674
1684
1675
- class MonteCarloManager (BaseManager ):
1676
- def __init__ (self ):
1677
- super ().__init__ ()
1678
- self .register ('Lock' , Lock )
1679
- self .register ('Event' , Event )
1680
- self .register ('Semaphore' , Semaphore )
1681
- self .register ('SimCounter' , SimCounter )
1682
- self .register ('StochasticEnvironment' , StochasticEnvironment )
1683
- self .register ('StochasticRocket' , StochasticRocket )
1684
- self .register ('StochasticFlight' , StochasticFlight )
1685
+ def import_multiprocess ():
1686
+ """Import the necessary modules for the multiprocess module.
1687
+
1688
+ Returns
1689
+ -------
1690
+ tuple
1691
+ Tuple containing the imported modules.
1692
+ """
1693
+ multiprocess = import_optional_dependency ("multiprocess" )
1694
+ shared_memory = import_optional_dependency ("multiprocess.shared_memory" )
1695
+ managers = import_optional_dependency ("multiprocess.managers" )
1696
+
1697
+ return multiprocess , shared_memory , managers
1698
+
1699
+
1700
+ def create_multiprocess_manager (multiprocess , managers ):
1701
+ """Creates a manager for the multiprocess control of the
1702
+ Monte Carlo simulation.
1703
+
1704
+ Parameters
1705
+ ----------
1706
+ multiprocess : module
1707
+ Multiprocess module.
1708
+ managers : module
1709
+ Managing submodules of the multiprocess module.
1710
+
1711
+ Returns
1712
+ -------
1713
+ MonteCarloManager
1714
+ Subclass of BaseManager with the necessary classes registered.
1715
+ """
1716
+
1717
+ class MonteCarloManager (managers .BaseManager ):
1718
+ def __init__ (self ):
1719
+ super ().__init__ ()
1720
+ self .register ('Lock' , multiprocess .Lock )
1721
+ self .register ('Event' , multiprocess .Event )
1722
+ self .register ('Semaphore' , multiprocess .Semaphore )
1723
+ self .register ('SimCounter' , SimCounter )
1724
+ self .register ('StochasticEnvironment' , StochasticEnvironment )
1725
+ self .register ('StochasticRocket' , StochasticRocket )
1726
+ self .register ('StochasticFlight' , StochasticFlight )
1727
+
1728
+ return MonteCarloManager ()
1685
1729
1686
1730
1687
1731
class SimCounter :
0 commit comments