diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py new file mode 100644 index 00000000..77f10023 --- /dev/null +++ b/docs/source/examples/basic_task_label.py @@ -0,0 +1,47 @@ +"""Basic task label example""" + +import ipyparallel as ipp + +# start up ipp cluster with 2 engines +cluster = ipp.Cluster(n=2) +cluster.start_cluster_sync() + +rc = cluster.connect_client_sync() +rc.wait_for_engines(n=2) + + +def wait(t): + import time + + tic = time.time() + time.sleep(t) + return time.time() - tic + + +# send tasks to cluster +balanced_view = True +if balanced_view: + # use load balanced view + dview = rc.load_balanced_view() + ar_list = [ + dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10) + ] + dview.wait(ar_list) +else: + # use direct view + dview = rc[:] + ar_list = [ + dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10) + ] + dview.wait(ar_list) + +# query database +data = rc.db_query( + {'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid'] +) +for d in data: + print( + f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}" + ) + +cluster.stop_cluster_sync() diff --git a/ipyparallel/client/remotefunction.py b/ipyparallel/client/remotefunction.py index 5b7a9350..36021bfa 100644 --- a/ipyparallel/client/remotefunction.py +++ b/ipyparallel/client/remotefunction.py @@ -195,12 +195,14 @@ def __init__( chunksize=None, ordered=True, return_exceptions=False, + task_label=None, **flags, ): super().__init__(view, f, block=block, **flags) self.chunksize = chunksize self.ordered = ordered self.return_exceptions = return_exceptions + self.task_label = task_label mapClass = Map.dists[dist] self.mapObject = mapClass() @@ -293,7 +295,9 @@ def __call__(self, *sequences, **kwargs): view = self.view if balanced else client[t] with view.temp_flags(block=False, **self.flags): - ar = view.apply(f, *args) + ar = view.apply( + f, *args, task_label=self.task_label + ) # is this the right place to insert the task_label? ar.owner = False msg_id = ar.msg_ids[0] diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 53712c39..855787ee 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -566,13 +566,24 @@ def _really_apply( _idents, _targets = self.client._build_targets(targets) futures = [] + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? + metadata = dict(task_label=task_label) + pf = PrePickled(f) pargs = [PrePickled(arg) for arg in args] pkwargs = {k: PrePickled(v) for k, v in kwargs.items()} for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident + self._socket, + pf, + pargs, + pkwargs, + track=track, + ident=ident, + metadata=metadata, ) futures.append(future) if track: @@ -592,7 +603,15 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -1036,7 +1055,15 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + task_label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1355,10 +1382,19 @@ def _really_apply( # ensure *not* bytes idents = [ident.decode() for ident in idents] + task_label = ( + kwargs.pop("task_label") if "task_label" in kwargs else None + ) # is this the correct/best way of retieving task_label? + after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries + after=after, + follow=follow, + timeout=timeout, + targets=idents, + retries=retries, + task_label=task_label, ) future = self.client.send_apply_request( @@ -1389,6 +1425,7 @@ def map( chunksize=1, ordered=True, return_exceptions=False, + task_label=None, ): """Parallel version of builtin `map`, load-balanced by this View. @@ -1443,6 +1480,7 @@ def map( chunksize=chunksize, ordered=ordered, return_exceptions=return_exceptions, + task_label=task_label, ) return pf.map(*sequences) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index b166fbce..f83a15d5 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -75,6 +75,7 @@ def empty_record(): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': None, } @@ -111,6 +112,7 @@ def init_record(msg): 'error': None, 'stdout': '', 'stderr': '', + 'task_label': msg['metadata'].get('task_label', None), } diff --git a/ipyparallel/engine/kernel.py b/ipyparallel/engine/kernel.py index 8c900167..c27fabb2 100644 --- a/ipyparallel/engine/kernel.py +++ b/ipyparallel/engine/kernel.py @@ -76,6 +76,7 @@ def init_metadata(self, parent): 'is_broadcast': parent_metadata.get('is_broadcast', False), 'is_coalescing': parent_metadata.get('is_coalescing', False), 'original_msg_id': parent_metadata.get('original_msg_id', ''), + 'task_label': parent_metadata.get('task_label', None), } def finish_metadata(self, parent, metadata, reply_content):