From 3462b5fd79f8b5d8266d750501c037990861bd68 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 10:47:33 +0100 Subject: [PATCH 1/7] first version implemented --- ipyparallel/client/remotefunction.py | 2 ++ ipyparallel/client/view.py | 12 ++++++-- ipyparallel/controller/dictdb.py | 16 ++++++++++ ipyparallel/controller/hub.py | 37 ++++++++++++++++++++++++ ipyparallel/controller/task_scheduler.py | 8 +++++ ipyparallel/engine/kernel.py | 1 + 6 files changed, 73 insertions(+), 3 deletions(-) diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 5b7a9350..5dd0573d 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -195,12 +195,14 @@ def __init__( chunksize=None, ordered=True, return_exceptions=False, + task_label=None, **flags, ): super().__init__(view, f, block=block, **flags) self.chunksize = chunksize self.ordered = ordered self.return_exceptions = return_exceptions + self.task_label = task_label mapClass = Map.dists[dist] self.mapObject = mapClass() diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 53712c39..f4b25d9b 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -97,6 +97,7 @@ class View(HasTraits): # flags block = Bool(False) track = Bool(False) + task_label = Any() targets = Any() history = List() @@ -592,7 +593,7 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -1036,7 +1037,7 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1360,6 +1361,8 @@ def _really_apply( metadata = dict( after=after, follow=follow, timeout=timeout, targets=idents, retries=retries ) + if self.task_label: + metadata["task_label"] = self.task_label future = self.client.send_apply_request( self._socket, f, args, kwargs, track=track, metadata=metadata @@ -1389,6 +1392,7 @@ def map( chunksize=1, ordered=True, return_exceptions=False, + task_label=None, ): """Parallel version of builtin `map`, load-balanced by this View. @@ -1433,9 +1437,10 @@ def map( # default if block is None: block = self.block - assert len(sequences) > 0, "must have some sequences to map onto!" + self.task_label = task_label # just for testing + pf = ParallelFunction( self, f, @@ -1443,6 +1448,7 @@ def map( chunksize=chunksize, ordered=ordered, return_exceptions=return_exceptions, + task_label=task_label, ) return pf.map(*sequences) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 4091e43a..6f81bbc0 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,6 +61,20 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } +def _debug_output(where, msg, stack=False): + import inspect, traceback + with open("d:/dictdb_log.txt", "a") as f: + f.write(f"{where} [{datetime.now()}]: {msg['msg_id']}\n") + f.write(f"\tmsg={msg}\n") + #if.write(f"has metadata={'metadata' in msg}\n") + #if 'metadata' in msg: + # f.write(f"{msg['metadata']}\n") + f.write(f"has result_metadata={'result_metadata' in msg}\n") + if 'result_metadata' in msg: + f.write(f"{msg['result_metadata']}\n") + if stack: + for line in traceback.format_stack(): + f.write(line.strip()+"\n") def _add_tz(obj): if isinstance(obj, datetime): @@ -240,6 +254,7 @@ def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError(f"Already have msg_id {msg_id!r}") + #_debug_output("add_record", rec, False) self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) @@ -257,6 +272,7 @@ def update_record(self, msg_id, rec): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError(f"Record {msg_id!r} has been culled for size") + #_debug_output("update_record", rec) self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index b166fbce..23b14d7b 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -50,6 +50,20 @@ def _printer(*args, **kwargs): print(kwargs) +def _debug_output(where, msg): + with open("d:/hub_log.txt", "a") as f: + f.write(f"{where} [{datetime.now()}]: ") + if 'msg_id' in msg: + f.write(f"{msg['msg_id']}\n") + else: + f.write(f"{msg}\n\n\n") + return + f.write(f"has metadata={'metadata' in msg}\n") + if 'metadata' in msg: + f.write(f"{msg['metadata']}\n") + f.write(f"{msg}\n\n\n") + + def empty_record(): """Return an empty dict with all record keys.""" return { @@ -75,6 +89,7 @@ def empty_record(): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': None, } @@ -111,6 +126,7 @@ def init_record(msg): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': msg['metadata'].get('task_label', None), } @@ -330,6 +346,9 @@ def dispatch_monitor_traffic(self, msg): return handler = self.monitor_handlers.get(switch, None) if handler is not None: + #if switch == b'intask': + # _debug_output(f"dispatch_monitor_traffic ({switch} - {handler})", msg) + handler(idents, msg) else: self.log.error("Unrecognized monitor topic: %r", switch) @@ -470,6 +489,8 @@ def save_queue_request(self, idents, msg): record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' + #_debug_output('save_queue_request', msg) + try: # it's posible iopub arrived first: existing = self.db.get_record(msg_id) @@ -516,6 +537,8 @@ def save_queue_result(self, idents, msg): ) return + #_debug_output('save_queue_result', msg) + eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) @@ -576,6 +599,8 @@ def save_broadcast_request(self, idents, msg): msg_id = header['msg_id'] self.pending.add(msg_id) + #_debug_output('save_broadcast_request',msg) + try: self.db.add_record(msg_id, record) except Exception as e: @@ -602,6 +627,8 @@ def save_broadcast_result(self, idents, msg): eid = self.by_ident.get(engine_uuid.encode("utf8"), None) status = md.get('status', None) + #_debug_output('save_broadcast_result', msg) + if msg_id in self.pending: self.log.info(f'broadcast:: broadcast {msg_id} finished on {eid}') self.pending.remove(msg_id) @@ -649,6 +676,8 @@ def save_task_request(self, idents, msg): return record = init_record(msg) + #_debug_output('save_task_request', msg) + record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] @@ -708,6 +737,8 @@ def save_task_result(self, idents, msg): ) return + #_debug_output("save_task_result", msg) + parent = msg['parent_header'] if not parent: # print msg @@ -761,6 +792,8 @@ def save_task_destination(self, idents, msg): except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return + #_debug_output("save_task_destination", msg) + content = msg['content'] # print (content) msg_id = content['msg_id'] @@ -788,6 +821,7 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return + #_debug_output("monitor_iopub_message", msg) msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': @@ -817,6 +851,8 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] + #_debug_output("save_iopub_message", msg) + #_debug_output("save_iopub_message[parent]", parent) # ensure msg_id is in db try: @@ -871,6 +907,7 @@ def connection_request(self, client_id, msg): for eid, ec in self.engines.items(): jsonable[str(eid)] = ec.uuid content['engines'] = jsonable + #_debug_output("connection_request", msg) self.session.send( self.query, 'connection_reply', content, parent=msg, ident=client_id ) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index b354b96f..1389c75d 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,6 +18,12 @@ # Chooser functions # ---------------------------------------------------------------------- +def _debug_output(where, msg): + with open("d:/task_scheduler_log.txt", "a") as f: + f.write(f"{where}: {msg['msg_id']}\n") + f.write(f"has metadata={'metadata' in msg}\n") + if 'metadata' in msg: + f.write(f"{msg['metadata']}\n\n") def plainrandom(loads): """Plain random pick.""" @@ -350,6 +356,8 @@ def dispatch_submission(self, raw_msg): # send to monitor self.mon_stream.send_multipart([b'intask'] + raw_msg, copy=False) + #_debug_output("dispatch_submission",msg) + header = msg['header'] md = msg['metadata'] msg_id = header['msg_id'] diff --git a/ipyparallel/engine/kernel.py b/ipyparallel/engine/kernel.py index 8c900167..c27fabb2 100644 --- a/ipyparallel/engine/kernel.py +++ b/ipyparallel/engine/kernel.py @@ -76,6 +76,7 @@ def init_metadata(self, parent): 'is_broadcast': parent_metadata.get('is_broadcast', False), 'is_coalescing': parent_metadata.get('is_coalescing', False), 'original_msg_id': parent_metadata.get('original_msg_id', ''), + 'task_label': parent_metadata.get('task_label', None), } def finish_metadata(self, parent, metadata, reply_content): From a51f228807a37c50988e71195a4104f9a1aaa6ed Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 11:08:56 +0100 Subject: [PATCH 2/7] _debug_output function removed --- ipyparallel/controller/hub.py | 37 ------------------------ ipyparallel/controller/task_scheduler.py | 9 ------ 2 files changed, 46 deletions(-) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index 23b14d7b..59578c70 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -50,20 +50,6 @@ def _printer(*args, **kwargs): print(kwargs) -def _debug_output(where, msg): - with open("d:/hub_log.txt", "a") as f: - f.write(f"{where} [{datetime.now()}]: ") - if 'msg_id' in msg: - f.write(f"{msg['msg_id']}\n") - else: - f.write(f"{msg}\n\n\n") - return - f.write(f"has metadata={'metadata' in msg}\n") - if 'metadata' in msg: - f.write(f"{msg['metadata']}\n") - f.write(f"{msg}\n\n\n") - - def empty_record(): """Return an empty dict with all record keys.""" return { @@ -346,9 +332,6 @@ def dispatch_monitor_traffic(self, msg): return handler = self.monitor_handlers.get(switch, None) if handler is not None: - #if switch == b'intask': - # _debug_output(f"dispatch_monitor_traffic ({switch} - {handler})", msg) - handler(idents, msg) else: self.log.error("Unrecognized monitor topic: %r", switch) @@ -489,8 +472,6 @@ def save_queue_request(self, idents, msg): record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' - #_debug_output('save_queue_request', msg) - try: # it's posible iopub arrived first: existing = self.db.get_record(msg_id) @@ -537,8 +518,6 @@ def save_queue_result(self, idents, msg): ) return - #_debug_output('save_queue_result', msg) - eid = self.by_ident.get(queue_id, None) if eid is None: self.log.error("queue::unknown engine %r is sending a reply: ", queue_id) @@ -599,8 +578,6 @@ def save_broadcast_request(self, idents, msg): msg_id = header['msg_id'] self.pending.add(msg_id) - #_debug_output('save_broadcast_request',msg) - try: self.db.add_record(msg_id, record) except Exception as e: @@ -627,8 +604,6 @@ def save_broadcast_result(self, idents, msg): eid = self.by_ident.get(engine_uuid.encode("utf8"), None) status = md.get('status', None) - #_debug_output('save_broadcast_result', msg) - if msg_id in self.pending: self.log.info(f'broadcast:: broadcast {msg_id} finished on {eid}') self.pending.remove(msg_id) @@ -676,8 +651,6 @@ def save_task_request(self, idents, msg): return record = init_record(msg) - #_debug_output('save_task_request', msg) - record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] @@ -737,8 +710,6 @@ def save_task_result(self, idents, msg): ) return - #_debug_output("save_task_result", msg) - parent = msg['parent_header'] if not parent: # print msg @@ -792,8 +763,6 @@ def save_task_destination(self, idents, msg): except Exception: self.log.error("task::invalid task tracking message", exc_info=True) return - #_debug_output("save_task_destination", msg) - content = msg['content'] # print (content) msg_id = content['msg_id'] @@ -821,8 +790,6 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return - #_debug_output("monitor_iopub_message", msg) - msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': session = msg['header']['session'] @@ -851,9 +818,6 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] - #_debug_output("save_iopub_message", msg) - #_debug_output("save_iopub_message[parent]", parent) - # ensure msg_id is in db try: rec = self.db.get_record(msg_id) @@ -907,7 +871,6 @@ def connection_request(self, client_id, msg): for eid, ec in self.engines.items(): jsonable[str(eid)] = ec.uuid content['engines'] = jsonable - #_debug_output("connection_request", msg) self.session.send( self.query, 'connection_reply', content, parent=msg, ident=client_id ) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index 1389c75d..7865a3bb 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,13 +18,6 @@ # Chooser functions # ---------------------------------------------------------------------- -def _debug_output(where, msg): - with open("d:/task_scheduler_log.txt", "a") as f: - f.write(f"{where}: {msg['msg_id']}\n") - f.write(f"has metadata={'metadata' in msg}\n") - if 'metadata' in msg: - f.write(f"{msg['metadata']}\n\n") - def plainrandom(loads): """Plain random pick.""" n = len(loads) @@ -356,8 +349,6 @@ def dispatch_submission(self, raw_msg): # send to monitor self.mon_stream.send_multipart([b'intask'] + raw_msg, copy=False) - #_debug_output("dispatch_submission",msg) - header = msg['header'] md = msg['metadata'] msg_id = header['msg_id'] From 0a9ee9d3673277bee2fecb8848074c5cc6fbc4a8 Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 11:13:46 +0100 Subject: [PATCH 3/7] _debug_output function removed --- ipyparallel/controller/dictdb.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index 6f81bbc0..a48a754b 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,21 +61,6 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } -def _debug_output(where, msg, stack=False): - import inspect, traceback - with open("d:/dictdb_log.txt", "a") as f: - f.write(f"{where} [{datetime.now()}]: {msg['msg_id']}\n") - f.write(f"\tmsg={msg}\n") - #if.write(f"has metadata={'metadata' in msg}\n") - #if 'metadata' in msg: - # f.write(f"{msg['metadata']}\n") - f.write(f"has result_metadata={'result_metadata' in msg}\n") - if 'result_metadata' in msg: - f.write(f"{msg['result_metadata']}\n") - if stack: - for line in traceback.format_stack(): - f.write(line.strip()+"\n") - def _add_tz(obj): if isinstance(obj, datetime): obj = ensure_timezone(obj) @@ -254,7 +239,6 @@ def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" if msg_id in self._records: raise KeyError(f"Already have msg_id {msg_id!r}") - #_debug_output("add_record", rec, False) self._check_dates(rec) self._records[msg_id] = rec self._add_bytes(rec) @@ -272,7 +256,6 @@ def update_record(self, msg_id, rec): """Update the data in an existing record.""" if msg_id in self._culled_ids: raise KeyError(f"Record {msg_id!r} has been culled for size") - #_debug_output("update_record", rec) self._check_dates(rec) _rec = self._records[msg_id] self._drop_bytes(_rec) From c0ca7eaed4e6c72d76c5bd9b355a0b6f2d15ec3b Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:00:28 +0100 Subject: [PATCH 4/7] improvements and support for direct view added --- ipyparallel/client/remotefunction.py | 2 +- ipyparallel/client/view.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 5dd0573d..8d92e928 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -295,7 +295,7 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply(f, *args) + ar = view.apply(f, *args, task_label=self.task_label) # is this the right place to insert the task_label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index f4b25d9b..dbdc707b 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -97,7 +97,6 @@ class View(HasTraits): # flags block = Bool(False) track = Bool(False) - task_label = Any() targets = Any() history = List() @@ -567,13 +566,16 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] + task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + metadata = dict(task_label=task_label) + pf = PrePickled(f) pargs = [PrePickled(arg) for arg in args] pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident + self._socket, pf, pargs, pkwargs, track=track, ident=ident, metadata=metadata ) futures.append(future) if track: @@ -1356,13 +1358,13 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] + task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries + after=after, follow=follow, timeout=timeout, targets=idents, retries=retries, task_label=task_label ) - if self.task_label: - metadata["task_label"] = self.task_label future = self.client.send_apply_request( self._socket, f, args, kwargs, track=track, metadata=metadata @@ -1439,8 +1441,6 @@ def map( block = self.block assert len(sequences) > 0, "must have some sequences to map onto!" - self.task_label = task_label # just for testing - pf = ParallelFunction( self, f, From 4db4cf359e79898b41a0977189b631eb5746eb9c Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:12:58 +0100 Subject: [PATCH 5/7] basic task label example added --- docs/source/examples/basic_task_label.py | 36 ++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 docs/source/examples/basic_task_label.py diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py new file mode 100644 index 00000000..3e7c502f --- /dev/null +++ b/docs/source/examples/basic_task_label.py @@ -0,0 +1,36 @@ +""" Basic task label example""" + +import ipyparallel as ipp + +# start up ipp cluster with 2 engines +cluster = ipp.Cluster(n=2) +cluster.start_cluster_sync() + +rc = cluster.connect_client_sync() +rc.wait_for_engines(n=2) + +def wait(t): + import time + tic = time.time() + time.sleep(t) + return time.time()-tic + +# send tasks to cluster +balanced_view = True +if balanced_view: + # use load balanced view + dview = rc.load_balanced_view() + ar_list = [dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10)] + dview.wait(ar_list) +else: + # use direct view + dview = rc[:] + ar_list = [dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10)] + dview.wait(ar_list) + +# query database +data = rc.db_query({'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid']) +for d in data: + print(f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}") + +cluster.stop_cluster_sync() \ No newline at end of file From f6d8c0abd7821c3cf07b88d5da32b912fd40fc6a Mon Sep 17 00:00:00 2001 From: Johannes Otepka Date: Fri, 23 Jan 2026 13:19:06 +0100 Subject: [PATCH 6/7] minor change: restore original formating --- ipyparallel/client/view.py | 1 + ipyparallel/controller/dictdb.py | 1 + ipyparallel/controller/hub.py | 2 ++ ipyparallel/controller/task_scheduler.py | 1 + 4 files changed, 5 insertions(+) diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index dbdc707b..5b7cfdf4 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -1439,6 +1439,7 @@ def map( # default if block is None: block = self.block + assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction( diff --git a/ipyparallel/controller/dictdb.py b/ipyparallel/controller/dictdb.py index a48a754b..4091e43a 100644 --- a/ipyparallel/controller/dictdb.py +++ b/ipyparallel/controller/dictdb.py @@ -61,6 +61,7 @@ '$exists': lambda a, b: (b and a is not None) or (a is None and not b), } + def _add_tz(obj): if isinstance(obj, datetime): obj = ensure_timezone(obj) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index 59578c70..f83a15d5 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -790,6 +790,7 @@ def monitor_iopub_message(self, topics, msg): except Exception: self.log.error("iopub::invalid IOPub message", exc_info=True) return + msg_type = msg['header']['msg_type'] if msg_type == 'shutdown_reply': session = msg['header']['session'] @@ -818,6 +819,7 @@ def save_iopub_message(self, topics, msg): msg_id = parent['msg_id'] msg_type = msg['header']['msg_type'] content = msg['content'] + # ensure msg_id is in db try: rec = self.db.get_record(msg_id) diff --git a/ipyparallel/controller/task_scheduler.py b/ipyparallel/controller/task_scheduler.py index 7865a3bb..b354b96f 100644 --- a/ipyparallel/controller/task_scheduler.py +++ b/ipyparallel/controller/task_scheduler.py @@ -18,6 +18,7 @@ # Chooser functions # ---------------------------------------------------------------------- + def plainrandom(loads): """Plain random pick.""" n = len(loads) From 161a9878377bfd7d3162f388ab9ad8cc6248d281 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 12:46:23 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/source/examples/basic_task_label.py | 31 +++++++++++------ ipyparallel/client/remotefunction.py | 4 ++- ipyparallel/client/view.py | 43 ++++++++++++++++++++---- 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py index 3e7c502f..77f10023 100644 --- a/docs/source/examples/basic_task_label.py +++ b/docs/source/examples/basic_task_label.py @@ -1,4 +1,4 @@ -""" Basic task label example""" +"""Basic task label example""" import ipyparallel as ipp @@ -9,28 +9,39 @@ rc = cluster.connect_client_sync() rc.wait_for_engines(n=2) + def wait(t): - import time - tic = time.time() - time.sleep(t) - return time.time()-tic + import time + + tic = time.time() + time.sleep(t) + return time.time() - tic + # send tasks to cluster balanced_view = True if balanced_view: # use load balanced view dview = rc.load_balanced_view() - ar_list = [dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10)] + ar_list = [ + dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10) + ] dview.wait(ar_list) else: # use direct view dview = rc[:] - ar_list = [dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10)] + ar_list = [ + dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10) + ] dview.wait(ar_list) # query database -data = rc.db_query({'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid']) +data = rc.db_query( + {'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid'] +) for d in data: - print(f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}") + print( + f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}" + ) -cluster.stop_cluster_sync() \ No newline at end of file +cluster.stop_cluster_sync() diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 8d92e928..36021bfa 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -295,7 +295,9 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply(f, *args, task_label=self.task_label) # is this the right place to insert the task_label? + ar = view.apply( + f, *args, task_label=self.task_label + ) # is this the right place to insert the task_label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 5b7cfdf4..855787ee 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -566,7 +566,9 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] - task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? metadata = dict(task_label=task_label) pf = PrePickled(f) @@ -575,7 +577,13 @@ def _really_apply( for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident, metadata=metadata + self._socket, + pf, + pargs, + pkwargs, + track=track, + ident=ident, + metadata=metadata, ) futures.append(future) if track: @@ -595,7 +603,15 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -1039,7 +1055,15 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False, task_label=None): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1358,12 +1382,19 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] - task_label = kwargs.pop("task_label") if "task_label" in kwargs else None # is this the correct/best way of retieving task_label? + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries, task_label=task_label + after=after, + follow=follow, + timeout=timeout, + targets=idents, + retries=retries, + task_label=task_label, ) future = self.client.send_apply_request(