Manager
eisenmp needs information about process count, each process workload and start method.
ModuleConfiguration class is used to collect information, variables and data for the worker module.You feed:
location of modules to load
number of processes and workload for a process
custom queues and variables for the worker
Named Queue creation methods
queue_cust_dict_std_create methodblue_q_7_max_3 = ('blue_q_7', 3)
emp.queue_cust_dict_std_create(blue_q_7_max_3)
three_q_lst = [('orange_q_2', 2), ('cyan_q_4', 4), ('black_q_5', 5)]
# simple: q_name and q_maxsize as unpacked list
emp.queue_cust_dict_std_create(*three_q_lst)
queue_cust_dict_category_create methodcat_1_input_q_3_max_10 = ('category_1', 'input_q_3', 10)
emp.queue_cust_dict_category_create(cat_1_input_q_3_max_10)
three_q_lst = [('batch_1', 'orange_q_2', 2), ('batch_1', 'cyan_q_4', 4), ('batch_1', 'black_q_5', 5)]
# category: q_category, q_name and q_maxsize as unpacked list
emp.queue_cust_dict_category_create(*three_q_lst)
Queue list
Helps to debug Queue access.
emp = eisen.Mp()
emp.q_name_id_lst
# queue tuple (name, id, q_ref), all custom created queues will be appended
self.q_name_id_lst = [('mp_input_q (default)', id(self.mp_input_q), self.mp_input_q)]
toolbox.Q_NAME_ID_LST list. Where toolbox is your default worker argument name.('mp_input_q (default)', 2863011365072, <multiprocessing.queues.Queue object at 0x0000020E0F9252D0>)
('batch_1|audio_lg', 2863011368192, <multiprocessing.queues.Queue object at 0x0000020E0FD26D10>)
('batch_1|video_in', 2863011368240, <multiprocessing.queues.Queue object at 0x0000020E0FD27040>)
('batch_7|audio_lg', 2863011368576, <multiprocessing.queues.Queue object at 0x0000020E0FD27370>)
('batch_7|video_in', 2863011368912, <multiprocessing.queues.Queue object at 0x0000020E0FD276A0>)
Worker Loader
eisenmp worker module loader list reveals the modules to load.
Note
Loads independent. No imports of Main() module in the worker. No interesting behaviour.
class ModuleConfiguration:
first_module = {
'worker_path': os.path.join(dir_name, 'worker', 'eisenmp_exa_wrk_csv.py'),
'worker_ref': 'worker_entrance',
}
watchdog_module = {
'WORKER_PATH': os.path.join(dir_name, 'worker', 'eisenmp_exa_wrk_watchdog.py'),
'WORKER_REF': 'mp_start_show_threads',
}
def __init__(self):
# load order list, first module is called in an endless loop, you can append your own loop inside the worker
self.worker_modules = [
self.first_module, # second module must be started by a thread, else we hang
self.watchdog_module,
]
# Multiprocess vars - override default
self.PROCS_MAX = 5 # your process count, default is None: one proc/CPU core
# max generator / ROWS_MAX = number of tickets; 10_000 / 42 = 238.095 -> 238 lists with ticket numbers
self.ROWS_MAX = 50_000 # workload spread, list (generator items) to calc in one loop, default is None: 1_000
self.RESULTS_STORE = True # keep in dictionary, will crash the system if store GB network chunks in mem
self.RESULTS_PRINT = True # result rows of output are collected in a list, display if processes are stopped
self.RESULTS_DICT_PRINT = True # shows content of results dict with ticket numbers, check tickets
self.RESULT_LABEL = 'revised.csv, Average calculation' # pretty print as result header for RESULTS_PRINT
# self.START_METHOD = 'fork' # 'spawn' is default if unused; also use 'forkserver' or 'fork' on Unix only
All modules to start are collected in a worker_modules list. Load order is (LIFO) last in first out.
First worker module is loaded last and is allowed to block the loader loop. Block: kill() processes yourself.
for process in emp.proc_list:
process.kill()
Second to last module must use a threaded start.
class ModuleConfiguration: # name your own class and feed eisenmp with the dict
template_module = {
'WORKER_PATH': os.path.join(dir_name, 'worker', 'eisenmp_exa_wrk_double.py'),
'WORKER_REF': 'worker_entrance',
}
watchdog_module = {
'WORKER_PATH': os.path.join(os.path.dirname(dir_name), 'worker', 'eisenmp_exa_wrk_watchdog.py'),
'WORKER_REF': 'mp_start_show_threads',
}
def __init__(self):
self.worker_modules = [ # in-bld-res
self.template_module, # other modules must start threaded, else we hang
self.watchdog_module # second; thread function call mandatory, last module loaded first
]
kwargs
Init many values and use them in the worker module.
Note
kwargs nickname is toolbox in the examples.
You can use your own.
toolbox.kwargs['START_SEQUENCE_NUM']Note
spawn process start method makes copies of all variables and data structures in kwargs. Means, if you assign a 8 GB dictionary in the parent process to kwargs, each child process will allocate 8 GB.
You should further read about pickling and spawn. Instances are copied and recreated at a new start (offset) address. The same seems to be the case for all other data in kwargs and Queue delivery.
Worker data variables
# Multiprocess vars - override default
self.NUM_PROCS = 2 # your process count, each 'batch' on one CPU core, default is None: one proc/CPU core
self.NUM_ROWS = 3 # your workload spread, list (generator items) to calc in one loop, default None: 1_000
self.RESULTS_STORE = True # keep in dictionary, will crash the system if store GB network chunks in mem
self.RESULTS_PRINT = True # result rows of output are collected in a list, display if processes are stopped
self.RESULT_LABEL = 'fake production of audio and video for WHO studios' # RESULT_LABEL for RESULTS_PRINT
self.RESULTS_DICT_PRINT = True # shows content of results dict with ticket numbers, check tickets
# self.START_METHOD = 'fork' # 'spawn' is default if unused; also use 'forkserver' or 'fork' on Unix only
# work to do
self.sleep_time = 20 # watchdog
self.num_of_lists = 0 # worker lists done counter
modConf instance during configuration phase.modConf = ModuleConfiguration() # Accessible in the manager and worker module.
eisenmp Instance update and process start
instantiate
eisenmpmodConfinstance dictionary is dumped into eisenmp,all attributes will be keysin kwargs.eisenmp updates kwargs dictionary further with
custom created queuesandprocess start idProcesses started, worker in process is blocked - sits on queue and awaits input
eisenmp Queue feeder threads start; or take your own
emp = eisenmp.Mp()
emp.start(**modConf.__dict__) # create processes, load worker mods, start threads (output_p coll, info)
Example
def manager_entry():
"""
- Generator - One time execution.
Divide workload between processes / CPU
-
"""
q_cat_name_maxsize = [
# q_category, q_name, q_maxsize; find your 100 Queues in the debugger, toolbox
('batch_1', 'audio_lg', 5), # queues for batch_1
('batch_1', 'video_in', 1), # dict avail. in worker module: toolbox.batch_1['video_in'].get()
('batch_7', 'audio_lg', 3), # queues for batch_7
('batch_7', 'video_in', 1)
]
emp = eisenmp.Mp()
# create custom queues with category and name
emp.queue_cust_dict_category_create(*q_cat_name_maxsize) # create queues, store in {custom} {category} dict
audio_q_b1 = emp.queue_cust_dict_cat['batch_1']['audio_lg'] # USE Queue:
video_q_b1 = emp.queue_cust_dict_cat['batch_1']['video_in'] # worker module: toolbox.batch_1['video_in'].get()
audio_q_b7 = emp.queue_cust_dict_cat['batch_7']['audio_lg']
video_q_b7 = emp.queue_cust_dict_cat['batch_7']['video_in'] # toolbox.batch_7['video_in'].get()
emp.start(**modConf.__dict__) # create processes, load worker mods, start threads (output_p coll, info)
emp.run_q_feeder(generator=audio_generator_batch_1(), input_q=audio_q_b1)
emp.run_q_feeder(generator=video_generator_batch_1(), input_q=video_q_b1)
emp.run_q_feeder(generator=audio_generator_batch_7(), input_q=audio_q_b7)
emp.run_q_feeder(generator=video_generator_batch_7(), input_q=video_q_b7)
return emp