48
48
Instance ,
49
49
Integer ,
50
50
List ,
51
- Set ,
52
51
Unicode ,
53
52
default ,
54
53
)
@@ -199,9 +198,6 @@ def _parent_header(self):
199
198
# by record_ports and used by connect_request.
200
199
_recorded_ports = Dict ()
201
200
202
- # set of aborted msg_ids
203
- aborted = Set ()
204
-
205
201
# Track execution count here. For IPython, we override this to use the
206
202
# execution count we store in the shell.
207
203
execution_count = 0
@@ -217,14 +213,12 @@ def _parent_header(self):
217
213
"shutdown_request" ,
218
214
"is_complete_request" ,
219
215
"interrupt_request" ,
220
- # deprecated:
221
- "apply_request" ,
222
216
]
223
- # add deprecated ipyparallel control messages
217
+
218
+ # control channel accepts all shell messages
219
+ # and some of its own
224
220
control_msg_types = [
225
221
* msg_types ,
226
- "clear_request" ,
227
- "abort_request" ,
228
222
"debug_request" ,
229
223
"usage_request" ,
230
224
"create_subshell_request" ,
@@ -307,17 +301,15 @@ async def process_control_message(self, msg=None):
307
301
sys .stderr .flush ()
308
302
self ._publish_status ("idle" , "control" )
309
303
310
- async def should_handle (self , stream , msg , idents ):
304
+ def should_handle (self , stream , msg , idents ):
311
305
"""Check whether a shell-channel message should be handled
312
306
313
307
Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
308
+
309
+ .. versionchanged:: 7
310
+ Subclass should_handle _may_ be async.
311
+ Base class implementation is not async.
314
312
"""
315
- msg_id = msg ["header" ]["msg_id" ]
316
- if msg_id in self .aborted :
317
- # is it safe to assume a msg_id will not be resubmitted?
318
- self .aborted .remove (msg_id )
319
- await self ._send_abort_reply (stream , msg , idents )
320
- return False
321
313
return True
322
314
323
315
async def enter_eventloop (self ):
@@ -479,7 +471,11 @@ async def process_shell_message(self, msg=None, socket=None):
479
471
self .log .debug ("\n *** MESSAGE TYPE:%s***" , msg_type )
480
472
self .log .debug (" Content: %s\n --->\n " , msg ["content" ])
481
473
482
- if not await self .should_handle (socket , msg , idents ):
474
+ should_handle : bool | t .Awaitable [bool ] = self .should_handle (socket , msg , idents )
475
+ if inspect .isawaitable (should_handle ):
476
+ should_handle = await should_handle
477
+ if not should_handle :
478
+ self .log .debug ("Not handling %s:%s" , msg_type , msg ["header" ].get ("msg_id" ))
483
479
return
484
480
485
481
handler = self .shell_handlers .get (msg_type )
@@ -1122,84 +1118,6 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
1122
1118
1123
1119
self .session .send (socket , "list_subshell_reply" , reply , parent , ident )
1124
1120
1125
- # ---------------------------------------------------------------------------
1126
- # Engine methods (DEPRECATED)
1127
- # ---------------------------------------------------------------------------
1128
-
1129
- async def apply_request (self , socket , ident , parent ): # pragma: no cover
1130
- """Handle an apply request."""
1131
- self .log .warning ("apply_request is deprecated in kernel_base, moving to ipyparallel." )
1132
- try :
1133
- content = parent ["content" ]
1134
- bufs = parent ["buffers" ]
1135
- msg_id = parent ["header" ]["msg_id" ]
1136
- except Exception :
1137
- self .log .error ("Got bad msg: %s" , parent , exc_info = True ) # noqa: G201
1138
- return
1139
-
1140
- md = self .init_metadata (parent )
1141
-
1142
- reply_content , result_buf = self .do_apply (content , bufs , msg_id , md )
1143
-
1144
- # flush i/o
1145
- if sys .stdout is not None :
1146
- sys .stdout .flush ()
1147
- if sys .stderr is not None :
1148
- sys .stderr .flush ()
1149
-
1150
- md = self .finish_metadata (parent , md , reply_content )
1151
- if not self .session :
1152
- return
1153
- self .session .send (
1154
- socket ,
1155
- "apply_reply" ,
1156
- reply_content ,
1157
- parent = parent ,
1158
- ident = ident ,
1159
- buffers = result_buf ,
1160
- metadata = md ,
1161
- )
1162
-
1163
- def do_apply (self , content , bufs , msg_id , reply_metadata ):
1164
- """DEPRECATED"""
1165
- raise NotImplementedError
1166
-
1167
- # ---------------------------------------------------------------------------
1168
- # Control messages (DEPRECATED)
1169
- # ---------------------------------------------------------------------------
1170
-
1171
- async def abort_request (self , socket , ident , parent ): # pragma: no cover
1172
- """abort a specific msg by id"""
1173
- self .log .warning (
1174
- "abort_request is deprecated in kernel_base. It is only part of IPython parallel"
1175
- )
1176
- msg_ids = parent ["content" ].get ("msg_ids" , None )
1177
- if isinstance (msg_ids , str ):
1178
- msg_ids = [msg_ids ]
1179
- for mid in msg_ids :
1180
- self .aborted .add (str (mid ))
1181
-
1182
- content = dict (status = "ok" )
1183
- if not self .session :
1184
- return
1185
- reply_msg = self .session .send (
1186
- socket , "abort_reply" , content = content , parent = parent , ident = ident
1187
- )
1188
- self .log .debug ("%s" , reply_msg )
1189
-
1190
- async def clear_request (self , socket , idents , parent ): # pragma: no cover
1191
- """Clear our namespace."""
1192
- self .log .warning (
1193
- "clear_request is deprecated in kernel_base. It is only part of IPython parallel"
1194
- )
1195
- content = self .do_clear ()
1196
- if self .session :
1197
- self .session .send (socket , "clear_reply" , ident = idents , parent = parent , content = content )
1198
-
1199
- def do_clear (self ):
1200
- """DEPRECATED since 4.0.3"""
1201
- raise NotImplementedError
1202
-
1203
1121
# ---------------------------------------------------------------------------
1204
1122
# Protected interface
1205
1123
# ---------------------------------------------------------------------------
0 commit comments