How nushell plugin system work
Recently there is a big improvement on nushell plugin system:
- Bidirectional communication and streams for plugins
- Keep plugins persistently running in the background
Here I will write how the new system work.
Recall how to run a command from a plugin:
- register the plugin (e.g:
register ./target/debug/plugin_bin) - run the command directly.
Register the plugin
For example: nu_plugin_example defines many commands, like: example one, example two, example three. Nushell needs to have a way to get these commands’ signature.
The process can be describe as the following:
nushell plugin
| 1. start plugin |
| ----------------> |
| binary |
| |
| |
| 2. get signature |
| -----------------> |
| |
| |
| 3. response |
| <---------------- |
| |
| |
When we want to register a plugin, nushell will run the plugin binary and get all command signatures from a plugin. All the interesting things lays inside get_signature function.
How get_signautre works
I drawed a diagram to show hot it works at high level:

make_plugin_interface
To get signatures from plugin, nushell will make_plugin_interface to enable communication between engine and plugin in background, in make_plugin_interface, it will create a PluginInterfaceManager object, this is the core datastructure of new plugin system, it manages communication between nushell and plugin. Here is code snippet:
let mut manager = PluginInterfaceManager::new(source.clone(), (Mutex::new(stdin), encoder));
manager.set_garbage_collector(gc);
let interface = manager.get_interface();
interface.hello()?;
// Spawn the reader on a new thread. We need to be able to read messages at the same time that
// we write, because we are expected to be able to handle multiple messages coming in from the
// plugin at any time, including stream messages like `Drop`.
std::thread::Builder::new()
.name(format!(
"plugin interface reader ({})",
source.identity.name()
))
.spawn(move || {
if let Err(err) = manager.consume_all((reader, encoder)) {
log::warn!("Error in PluginInterfaceManager: {err}");
}
// If the loop has ended, drop the manager so everyone disconnects and then wait for the
// child to exit
drop(manager);
let _ = child.wait();
})
.map_err(|err| ShellError::PluginFailedToLoad {
msg: format!("Failed to spawn thread for plugin: {err}"),
})?;
Then nushell will send PluginCall::Signature message to plugin. But it’s worth to look at the definition of PlugininterfaceManager first.
#[derive(Debug)]
#[doc(hidden)]
pub struct PluginInterfaceManager {
// ...
/// Receiver for plugin call subscriptions
plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
// ...
}
impl PluginInterfaceManager {
pub fn new(
source: Arc<PluginSource>,
writer: impl PluginWrite<PluginInput> + 'static,
) -> PluginInterfaceManager {
let (subscription_tx, subscription_rx) = mpsc::channel();
PluginInterfaceManager {
state: Arc::new(PluginInterfaceState {
source,
plugin_call_id_sequence: Sequence::default(),
stream_id_sequence: Sequence::default(),
plugin_call_subscription_sender: subscription_tx,
error: OnceLock::new(),
writer: Box::new(writer),
}),
plugin_call_subscription_receiver: subscription_rx,
}
}
}
As we can see, when we create a PluginInterfaceManager, it will give us a channel of plugin_call_subscription. plugin_call_subscription_receiver will be used in manager.consume_call method. Which is called in marke_plugin_interface, in background thread.
And plugin_call_subscription_sender will be used when we want to call plugin, nushell will use it inside wirte_plugin_call method.
fn write_plugin_call(
&self,
call: PluginCall<PipelineData>,
ctrlc: Option<Arc<AtomicBool>>,
context_rx: mpsc::Receiver<Context>,
) -> Result<
(
PipelineDataWriter<Self>,
mpsc::Receiver<ReceivedPluginCallMessage>,
),
ShellError,
> {
let id = self.state.plugin_call_id_sequence.next()?;
let (tx, rx) = mpsc::channel();
// Convert the call into one with a header and handle the stream, if necessary
let (call, writer) = match call {
PluginCall::Signature => (PluginCall::Signature, Default::default()),
// ...
};
// Register the subscription to the response, and the context
self.state
.plugin_call_subscription_sender
.send((
id,
PluginCallState {
sender: Some(tx),
ctrlc,
context_rx: Some(context_rx),
remaining_streams_to_read: 0,
},
))?;
// Write request
self.write(PluginInput::Call(id, call))?;
self.flush()?;
Ok((writer, rx))
}
At that level, when we write_plugin_call:
- subscribe the call to Manager, so when manager receive response from plugin, it knows how to send response to caller.
- send the
PluginCallrequest to plugin.
receive_plugin_call_response
As we can see, write_plugin_call returns a pair of writer and channel receiver, which means we need to get response from that receiver. That is how receive_plugin_call_response works.
fn receive_plugin_call_response(
&self,
rx: mpsc::Receiver<ReceivedPluginCallMessage>,
mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
state: CurrentCallState,
) -> Result<PluginCallResponse<PipelineData>, ShellError> {
// Handle message from receiver
for msg in rx {
match msg {
ReceivedPluginCallMessage::Response(resp) => {
// ...
return Ok(resp);
}
ReceivedPluginCallMessage::Error(err) => {
return Err(err);
}
ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
self.handle_engine_call(
engine_call_id,
engine_call,
&state,
context.as_deref_mut(),
)?;
}
}
}
}
Summary
Given all of this, let’s summarize how get_signature works:
- Create a
PluginInterfaceManager, which includesplugin_call_subscription_sender/receiverpair. And receive messages in background. - Write
PluginCall::Signatureplugin call throughwrite_plugin_callmethod, and wait on messages fromrx, and result sender is send toPluginInterfaceManagerthroughplugin_call_subscription_sender. PluginInterfaceManagerwill send response when it receives output from plugin.
How to run command
Actually it’s very similar to get_signature from plugin, except it calls write_plugin_call with PluginCall::Run.
fn write_plugin_call(
&self,
mut call: PluginCall<PipelineData>,
context: Option<&dyn PluginExecutionContext>,
) -> Result<WritePluginCallResult, ShellError> {
// Prepare the call with the state.
state.prepare_plugin_call(&mut call, &self.state.source)?;
// Convert the call into one with a header and handle the stream, if necessary
let (call, writer) = match call {
PluginCall::Signature => (PluginCall::Signature, Default::default()),
PluginCall::CustomValueOp(value, op) => {
(PluginCall::CustomValueOp(value, op), Default::default())
}
PluginCall::Run(CallInfo {
name,
mut call,
input,
}) => {
state.prepare_call_args(&mut call, &self.state.source)?;
let (header, writer) = self.init_write_pipeline_data(input, &state)?;
(
PluginCall::Run(CallInfo {
name,
call,
input: header,
}),
writer,
)
}
};
// Register the subscription to the response, and the context
// subscribe stream...
// ...
// Write request
self.write(PluginInput::Call(id, call))?;
self.flush()?;
Ok(WritePluginCallResult {
receiver: rx,
writer,
state,
})
}
Then it finish writing the plugin call in background, this enables engine send command name and input arguments in background. Then nushell engine can do other jobs. Note that, it’s PluginInterfaceManager’s job to get response from plugin, if response is PipelineData::ListStream or PipelineData::ExternalStream, the Manager handles stream reading logic, and then send back to engine through plugin call result channel.
The core logic lays inside PluginInterfaceManager::consume
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from plugin: {:?}", input);
match input {
// ....
PluginOutput::Stream(message) => self.consume_stream_message(message),
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = response
.map_data(|data| {
let ctrlc = self.get_ctrlc(id)?;
// Register the streams in the response
for stream_id in data.stream_ids() {
self.recv_stream_started(id, stream_id);
}
self.read_pipeline_data(data, ctrlc.as_ref())
})
.unwrap_or_else(|err| {
// If there's an error with initializing this stream, change it to a plugin
// error response, but send it anyway
PluginCallResponse::Error(err.into())
});
let result = self.send_plugin_call_response(id, response);
if result.is_ok() {
// When a call ends, it releases a lock on the GC
if let Some(ref gc) = self.gc {
gc.decrement_locks(1);
}
}
result
}
// ...
}
}
PluginInterfaceManager uses StreamManager to manage streams.
Stream Management
I drawed another diagram to show how to manage streams.

Receive PluginOutput::CallResponse from plugin
When plugin returns a response, it contains information to indicate the response contains a stream, so PluginInterfaceManager can add the stream to reading streams.
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = response
.map_data(|data| {
// ...
self.read_pipeline_data(data, ctrlc.as_ref())
});
// ...
}
// read pipeline data
fn read_pipeline_data(
&self,
header: PipelineDataHeader,
ctrlc: Option<&Arc<AtomicBool>>,
) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header {
PipelineDataHeader::ListStream(info) => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(info.id, self.get_interface())?;
PipelineData::ListStream(ListStream::from_stream(reader, ctrlc.cloned()), None)
}
// ...
// Similar to when header is PipelineDataHerder::ExternamStream
})
}
Here, handle.read_stream will insert a new entrypoint to reading_streams, which is a map from stream id to pipeline data sender.
pub(crate) fn read_stream<T, W>(
&self,
id: StreamId,
writer: W,
) -> Result<StreamReader<T, W>, ShellError>
where
T: TryFrom<StreamData, Error = ShellError>,
W: WriteStreamMessage,
{
let (tx, rx) = mpsc::channel();
self.with_lock(|mut state| {
// Must be exclusive
if let btree_map::Entry::Vacant(e) = state.reading_streams.entry(id) {
e.insert(tx);
Ok(())
} else {
// ...
}
})?;
Ok(StreamReader::new(id, rx, writer))
}
Finally, after insert id to reading_streams, read_pipeline_data will return PipelineData, and it will be our response to a plugic_call. Then nushell engine can process next command.
More data comes from plugin
When more data comes from plugin, PluginInterfaceManager receives PluginOutput::Stream, in the case, the manager consume_stream_message.
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from plugin: {:?}", input);
match input {
// ...
PluginOutput::Stream(message) => self.consume_stream_message(message),
// ...
}
}
For consuming, it will get stream id from message, and try to find it from reading_streams, then send data through channel.
StreamMessage::Data(id, data) => {
if let Some(sender) = state.reading_streams.get(&id) {
let _ = sender.send(Ok(Some(data)));
Ok(())
}
}
Summary
So in summary, this is how PluginInterfaceManager manage streams:
- When receive response from plugin, if it contains stream data, create an entry in
reading_streams, the key is stream id, the value is result data sender channel. - result receiver of channel will be wrapped in
PipelineData::ListStreamorPipelineData::ExternalStream, and returns to nushell engine. - as more data returns from plugin,
PluginInterfaceManagerwill get data sender channel fromreading_streams, and send these data out.
Reference source code:
All these code in previous sessions can be found here:
- make_plugin_interface: create a new
PluginInterfaceManagerand runconsume_allin background. - consume_all: run in background to consume all messages from
plugin. - consume: a main handler to consume a message, it’s invoked by
consume_all. - write_plugin_call: Write a plugin call message. Returns the writer for the stream.
- receive_plugin_call_resposne: Read the channel for plugin call messages and handle them until the response is received.
- read_pipeline_data: Read PipelineData from
plugin’s output. - read_stream: Register a new stream for reading.