eisenmp package
Subpackages
Submodules
eisenmp.eisenmp_procenv module
ProcEnv
- class eisenmp.eisenmp_procenv.ProcEnv
Bases:
objectCreate the environment for worker processes on CPUs. All queues shared among processes. ‘maxsize=1’ can be altered, should be tested and documented.
Queues ONLY
custom Queue builder with a queue in a dict to show a name
another Queue builder can add a category name, dict in dict
- static core_count_get()
- end_proc()
Graceful shutdown join.
- end_thread()
Instance and normal threads.
- kwargs_env_update_custom(**kwargs)
override default PROCS_MAX, ‘queue_lst_get’ for worker loader stop msg in all qs
- pipe_default_create(start_sequence_num)
[Not used so far.] Pipe creation is utter slow.
wait for worker sent msg’s in wait list, then take random corresponding pipe to send list iterator get ready_list=multiprocessing.connection.wait(p_wrk_sender_lst, timeout=None)
- pipe_lst_create()
[Not used so far.]
- queue_cust_dict_category_create(*queue_cat_name_maxsize: tuple)
(‘category_1’, ‘input_q_3’, 10)
- queue_cust_dict_std_create(*queue_name_maxsize: tuple)
create q, name and maxsize as unpacked list (‘blue_q_7’, 7) - Two queue creator functions. All use tuple to ease unpacking.
‘queue_cust_dict_std_create’ - > ‘queue_cust_dict_std’ ‘queue_cust_dict_category_create’ - > ‘queue_cust_dict_cat’
- queue_lst_get()
List of qs for shut down msg put in, of …worker_loader.py
- queue_name_avail_get(name)
- run_proc(**kwargs)
Create a Process for each CPU core, if num_proc None set or not set. - kwargs dict is updated for worker ‘toolbox’, reveals all vars and dead references (spawn) available
- Params:
kwargs: -
- Params:
start_method: selection spawn, fork
- Params:
START_SEQUENCE_NUM: useful if eisenmp instance is called often, process numbers rise, but start from 0
- Params:
target=loader: the worker_loader module is loaded and keeps the process alive
- stop_proc()
All worker must have confirmed shutdown msg.
- stop_thread()
eisenmp.eisenmp_q_coll module
- class eisenmp.eisenmp_q_coll.FunThread(name, fun_ref, *args, **kwargs)
Bases:
ThreadThread maker.
- cancel()
- run()
_inst.start()
- class eisenmp.eisenmp_q_coll.QueueCollect
Bases:
ProcEnvQueue message collector and printer. Logging from prints. Messages in input and output Q have header. One can decide later what to do. Alter, or monitor.
Output can be stored in a box. store_result set
- enable_info_q()
Thread for loop.
- enable_info_thread()
Shows % done, time left, if fed with an end value
- enable_output_q()
Start a thread loop to not block the show. Want collect stop confirm worker msg and results, all lists
- enable_print_q()
Thread for loop.
- enable_q_box_threads()
Collect Q messages and put em in a box for review, if enabled
- info_q_loop()
Print info or collect statistics from boxed messages. Box is a standard dict with num generator for unique keys.
- output_q_box_view_results(serial_num)
Only lists with header accepted. Box is a dictionary. {key: val} ‘output_q_loop()’ -> output_q_box[num]: payload or stop msg
{1: [‘RESULT_HEADER;PRIME_NUM;_TID_1;Process-1’, [‘10000079’]], 24: [‘PROC_STOP;Process-5’]}
Result header: RESULT_HEADER Delivery header: PRIME_NUM;_TID_;0 [‘header_msg’ and eisenmp, custom iterator, loop ticket id, split(;)]
Add your custom header_msg, taken from double Q Example: - mP.run_q_feeder(generator=generator_aud, feeder_input_q=audio_q_b1, header_msg=’BATCH_1_A’) # custom head - mP.run_q_feeder(generator=generator_vid, feeder_input_q=video_q_b1, header_msg=’BATCH_1_V’)
- Params:
serial_num: serial number of output_q_loop
- output_q_loop()
Grab output from Queue and put it in a box. Use consecutive_number to create unique keys.
Note: Multiple threads can loop over the box (dict).
- output_q_search_stop_confirm(serial_num)
Search stop msg of workers and put ‘em in a list. Stop processes and threads, if list is full. {1: [‘RESULT_HEADER;PRIME_NUM;_TID_1;Process-1’, [‘10000079’]], 24: [‘PROC_STOP;Process-5’]}
- Params:
serial_num: serial number of output_q_loop
- print_findings()
Condensed result list for this run. A thread can sum the results in ‘Result’ class.
- print_q_loop()
Use a Print Q and a thread for formatted printing.
Use it only sparingly. BLOCKS the whole multiprocessing.
- proc_result_list_findings(p_result_row)
RESULT DICT in utils, collect all results Append to result list for print out at finish. Add to a dict to monitor results via additional thread during runtime. [baustelle]
STOP msg was also appended!
- Params:
p_result_row: process loop result list row, can be a list in this row
- static proc_result_store(list_header)
- worker_mods_down_ask(list_header)
All worker MODULES confirm shutdown.
Worker is using the original name of its process in shut down msg. Process IS still running. Worker module entry function returned False.
- Params:
list_header: answer of WORKER MODULE to stop request; string with proc id at end
- Returns:
None; True if done
eisenmp.eisenmp_worker_loader module
Worker and mates module loader. Mate worker modules MUST call a threaded start_function(), else we hang. See watchdog.
- class eisenmp.eisenmp_worker_loader.ToolBox
Bases:
objectStorage box for a Single Worker Process. Switch as much data fields to constant like shape. Used in worker. User can distinguish between custom and (automatic) in-build variables or constants.
- eisenmp.eisenmp_worker_loader.all_worker_exit_msg(toolbox)
Warning: Signal stop event to [ALL] —–> worker MODULES, not to PROCESS.
- Params:
toolbox: tools and Queues for processes
- eisenmp.eisenmp_worker_loader.function_executor(toolbox, mod_fun_lst)
Worker execution in loop, all other functions must start threaded.
- Params:
toolbox: kwargs
- Params:
mod_fun_lst: list with function references to execute
- eisenmp.eisenmp_worker_loader.module_loader(**kwargs)
Modules loaded and function call stored as a reference in a list.
- eisenmp.eisenmp_worker_loader.module_path_load(file_path)
Imports the module from path and returns it in the env.
- eisenmp.eisenmp_worker_loader.mp_worker_entry(**kwargs)
Entry. We are ‘disconnected’ from parent process now. Only Queue communication. Threads can exec() ‘string’ commands. All references are dead. Variables ok. We read only, here.
The worker can loop itself and grab a new list from queue; while loop.
- eisenmp.eisenmp_worker_loader.toolbox_enable(**kwargs)
Populate Toolbox class attributes for the worker to use.
Module contents
A Python multiprocess, multi CPU module.
An example function cracks a game quest.
Inheritance - proc: ProcEnv -> QueueCollect -> Mp
Create Queue/Process -> Collect messages in boxes -> Manage, feed queues
Thread names are in ProcEnv:
QueueCollect [print_q, output_q, input_q, tools_q, info_q],
GhettoGang [view_output_q_box, tools_q feeder],
[ProcInfo]
- class eisenmp.Mp
Bases:
QueueCollectMultiProcessManager.
- q_feeder()
Queue. Chunk list producer of generator input.
A ticket is attached as header to identify the workload (list chunks)
Serial number to rebuild the modified results in the right order
- q_input_put(feeder_input_q, chunk_lst)
- reset()
- run_q_feeder(**kwargs)
Threaded instance, run multiple q_feeder, called by manager of worker
- start(**kwargs)
enable Processes and eisenmp worker threads.
- eisenmp.create_transport_header(num_gen, q_name)
Semicolon to split easy.
- eisenmp.q_name_get(q_name_id_lst, feeder_input_q)
Queue name must be assigned. Name will be dictionary key in result dict, dict in dict. Results are stored like so: {Queue_name: {__TID__1: [foo, bar], {__TID__2: [baz, boo]}}}.