eisenmp package

Subpackages

Submodules

eisenmp.eisenmp_procenv module

ProcEnv

class eisenmp.eisenmp_procenv.ProcEnv

Bases: object

Create the environment for worker processes on CPUs. All queues shared among processes. ‘maxsize=1’ can be altered, should be tested and documented.

  • Queues ONLY

  • custom Queue builder with a queue in a dict to show a name

  • another Queue builder can add a category name, dict in dict

static core_count_get()
end_proc()

Graceful shutdown join.

end_thread()

Instance and normal threads.

kwargs_env_update_custom(**kwargs)

override default PROCS_MAX, ‘queue_lst_get’ for worker loader stop msg in all qs

pipe_default_create(start_sequence_num)

[Not used so far.] Pipe creation is utter slow.

wait for worker sent msg’s in wait list, then take random corresponding pipe to send list iterator get ready_list=multiprocessing.connection.wait(p_wrk_sender_lst, timeout=None)

pipe_lst_create()

[Not used so far.]

queue_cust_dict_category_create(*queue_cat_name_maxsize: tuple)

(‘category_1’, ‘input_q_3’, 10)

queue_cust_dict_std_create(*queue_name_maxsize: tuple)

create q, name and maxsize as unpacked list (‘blue_q_7’, 7) - Two queue creator functions. All use tuple to ease unpacking.

‘queue_cust_dict_std_create’ - > ‘queue_cust_dict_std’ ‘queue_cust_dict_category_create’ - > ‘queue_cust_dict_cat’

queue_lst_get()

List of qs for shut down msg put in, of …worker_loader.py

queue_name_avail_get(name)
run_proc(**kwargs)

Create a Process for each CPU core, if num_proc None set or not set. - kwargs dict is updated for worker ‘toolbox’, reveals all vars and dead references (spawn) available

Params:

kwargs: -

Params:

start_method: selection spawn, fork

Params:

START_SEQUENCE_NUM: useful if eisenmp instance is called often, process numbers rise, but start from 0

Params:

target=loader: the worker_loader module is loaded and keeps the process alive

stop_proc()

All worker must have confirmed shutdown msg.

stop_thread()

eisenmp.eisenmp_q_coll module

class eisenmp.eisenmp_q_coll.FunThread(name, fun_ref, *args, **kwargs)

Bases: Thread

Thread maker.

cancel()
run()

_inst.start()

class eisenmp.eisenmp_q_coll.QueueCollect

Bases: ProcEnv

Queue message collector and printer. Logging from prints. Messages in input and output Q have header. One can decide later what to do. Alter, or monitor.

  • Output can be stored in a box. store_result set

enable_info_q()

Thread for loop.

enable_info_thread()

Shows % done, time left, if fed with an end value

enable_output_q()

Start a thread loop to not block the show. Want collect stop confirm worker msg and results, all lists

enable_print_q()

Thread for loop.

enable_q_box_threads()

Collect Q messages and put em in a box for review, if enabled

info_q_loop()

Print info or collect statistics from boxed messages. Box is a standard dict with num generator for unique keys.

output_q_box_view_results(serial_num)

Only lists with header accepted. Box is a dictionary. {key: val} ‘output_q_loop()’ -> output_q_box[num]: payload or stop msg

{1: [‘RESULT_HEADER;PRIME_NUM;_TID_1;Process-1’, [‘10000079’]], 24: [‘PROC_STOP;Process-5’]}

Result header: RESULT_HEADER Delivery header: PRIME_NUM;_TID_;0 [‘header_msg’ and eisenmp, custom iterator, loop ticket id, split(;)]

Add your custom header_msg, taken from double Q Example: - mP.run_q_feeder(generator=generator_aud, feeder_input_q=audio_q_b1, header_msg=’BATCH_1_A’) # custom head - mP.run_q_feeder(generator=generator_vid, feeder_input_q=video_q_b1, header_msg=’BATCH_1_V’)

Params:

serial_num: serial number of output_q_loop

output_q_loop()

Grab output from Queue and put it in a box. Use consecutive_number to create unique keys.

Note: Multiple threads can loop over the box (dict).

output_q_search_stop_confirm(serial_num)

Search stop msg of workers and put ‘em in a list. Stop processes and threads, if list is full. {1: [‘RESULT_HEADER;PRIME_NUM;_TID_1;Process-1’, [‘10000079’]], 24: [‘PROC_STOP;Process-5’]}

Params:

serial_num: serial number of output_q_loop

print_findings()

Condensed result list for this run. A thread can sum the results in ‘Result’ class.

print_q_loop()

Use a Print Q and a thread for formatted printing.

Use it only sparingly. BLOCKS the whole multiprocessing.

proc_result_list_findings(p_result_row)

RESULT DICT in utils, collect all results Append to result list for print out at finish. Add to a dict to monitor results via additional thread during runtime. [baustelle]

STOP msg was also appended!

Params:

p_result_row: process loop result list row, can be a list in this row

static proc_result_store(list_header)
worker_mods_down_ask(list_header)

All worker MODULES confirm shutdown.

Worker is using the original name of its process in shut down msg. Process IS still running. Worker module entry function returned False.

Params:

list_header: answer of WORKER MODULE to stop request; string with proc id at end

Returns:

None; True if done

eisenmp.eisenmp_worker_loader module

Worker and mates module loader. Mate worker modules MUST call a threaded start_function(), else we hang. See watchdog.

class eisenmp.eisenmp_worker_loader.ToolBox

Bases: object

Storage box for a Single Worker Process. Switch as much data fields to constant like shape. Used in worker. User can distinguish between custom and (automatic) in-build variables or constants.

eisenmp.eisenmp_worker_loader.all_worker_exit_msg(toolbox)

Warning: Signal stop event to [ALL] —–> worker MODULES, not to PROCESS.

Params:

toolbox: tools and Queues for processes

eisenmp.eisenmp_worker_loader.function_executor(toolbox, mod_fun_lst)

Worker execution in loop, all other functions must start threaded.

Params:

toolbox: kwargs

Params:

mod_fun_lst: list with function references to execute

eisenmp.eisenmp_worker_loader.module_loader(**kwargs)

Modules loaded and function call stored as a reference in a list.

eisenmp.eisenmp_worker_loader.module_path_load(file_path)

Imports the module from path and returns it in the env.

eisenmp.eisenmp_worker_loader.mp_worker_entry(**kwargs)

Entry. We are ‘disconnected’ from parent process now. Only Queue communication. Threads can exec() ‘string’ commands. All references are dead. Variables ok. We read only, here.

The worker can loop itself and grab a new list from queue; while loop.

eisenmp.eisenmp_worker_loader.toolbox_enable(**kwargs)

Populate Toolbox class attributes for the worker to use.

Module contents

A Python multiprocess, multi CPU module. An example function cracks a game quest.

Inheritance - proc: ProcEnv -> QueueCollect -> Mp
    Create Queue/Process -> Collect messages in boxes -> Manage, feed queues

Thread names are in ProcEnv:
    QueueCollect [print_q, output_q, input_q, tools_q, info_q],
    GhettoGang [view_output_q_box, tools_q feeder],
    [ProcInfo]
class eisenmp.Mp

Bases: QueueCollect

MultiProcessManager.

q_feeder()

Queue. Chunk list producer of generator input.

  • A ticket is attached as header to identify the workload (list chunks)

  • Serial number to rebuild the modified results in the right order

q_input_put(feeder_input_q, chunk_lst)
reset()
run_q_feeder(**kwargs)

Threaded instance, run multiple q_feeder, called by manager of worker

start(**kwargs)

enable Processes and eisenmp worker threads.

eisenmp.create_transport_header(num_gen, q_name)

Semicolon to split easy.

eisenmp.q_name_get(q_name_id_lst, feeder_input_q)

Queue name must be assigned. Name will be dictionary key in result dict, dict in dict. Results are stored like so: {Queue_name: {__TID__1: [foo, bar], {__TID__2: [baz, boo]}}}.