How nushell plugin system work
Recently there is a big improvement on nushell plugin system:
- Bidirectional communication and streams for plugins
- Keep plugins persistently running in the background
Here I will write how the new system work.
Recall how to run a command from a plugin:
- register the plugin (e.g:
register ./target/debug/plugin_bin
) - run the command directly.
Register the plugin
For example: nu_plugin_example defines many commands, like: example one
, example two
, example three
. Nushell needs to have a way to get these commands’ signature.
The process can be describe as the following:
nushell plugin
| 1. start plugin |
| ----------------> |
| binary |
| |
| |
| 2. get signature |
| -----------------> |
| |
| |
| 3. response |
| <---------------- |
| |
| |
When we want to register a plugin, nushell will run the plugin binary and get all command signatures from a plugin. All the interesting things lays inside get_signature function.
How get_signautre
I drawed a diagram to show hot it works at high level:
To get signatures from plugin, nushell will make_plugin_interface
to enable communication between engine
and plugin
in background, in make_plugin_interface
, it will create a PluginInterfaceManager
object, this is the core datastructure of new plugin system, it manages communication between nushell and plugin. Here is code snippet:
let mut manager = PluginInterfaceManager::new(source.clone(), (Mutex::new(stdin), encoder));
let interface = manager.get_interface();
// Spawn the reader on a new thread. We need to be able to read messages at the same time that
// we write, because we are expected to be able to handle multiple messages coming in from the
// plugin at any time, including stream messages like `Drop`.
"plugin interface reader ({})",
.spawn(move || {
if let Err(err) = manager.consume_all((reader, encoder)) {
log::warn!("Error in PluginInterfaceManager: {err}");
// If the loop has ended, drop the manager so everyone disconnects and then wait for the
// child to exit
let _ = child.wait();
.map_err(|err| ShellError::PluginFailedToLoad {
msg: format!("Failed to spawn thread for plugin: {err}"),
Then nushell will send PluginCall::Signature
message to plugin. But it’s worth to look at the definition of PlugininterfaceManager
pub struct PluginInterfaceManager {
// ...
/// Receiver for plugin call subscriptions
plugin_call_subscription_receiver: mpsc::Receiver<(PluginCallId, PluginCallState)>,
// ...
impl PluginInterfaceManager {
pub fn new(
source: Arc<PluginSource>,
writer: impl PluginWrite<PluginInput> + 'static,
) -> PluginInterfaceManager {
let (subscription_tx, subscription_rx) = mpsc::channel();
PluginInterfaceManager {
state: Arc::new(PluginInterfaceState {
plugin_call_id_sequence: Sequence::default(),
stream_id_sequence: Sequence::default(),
plugin_call_subscription_sender: subscription_tx,
error: OnceLock::new(),
writer: Box::new(writer),
plugin_call_subscription_receiver: subscription_rx,
As we can see, when we create a PluginInterfaceManager
, it will give us a channel of plugin_call_subscription. plugin_call_subscription_receiver
will be used in manager.consume_call
method. Which is called in marke_plugin_interface, in background thread.
And plugin_call_subscription_sender
will be used when we want to call plugin, nushell will use it inside wirte_plugin_call
fn write_plugin_call(
call: PluginCall<PipelineData>,
ctrlc: Option<Arc<AtomicBool>>,
context_rx: mpsc::Receiver<Context>,
) -> Result<
> {
let id =;
let (tx, rx) = mpsc::channel();
// Convert the call into one with a header and handle the stream, if necessary
let (call, writer) = match call {
PluginCall::Signature => (PluginCall::Signature, Default::default()),
// ...
// Register the subscription to the response, and the context
PluginCallState {
sender: Some(tx),
context_rx: Some(context_rx),
remaining_streams_to_read: 0,
// Write request
self.write(PluginInput::Call(id, call))?;
Ok((writer, rx))
At that level, when we write_plugin_call
- subscribe the call to Manager, so when manager receive response from plugin, it knows how to send response to caller.
- send the
request to plugin.
As we can see, write_plugin_call
returns a pair of writer and channel receiver, which means we need to get response from that receiver. That is how receive_plugin_call_response
fn receive_plugin_call_response(
rx: mpsc::Receiver<ReceivedPluginCallMessage>,
mut context: Option<&mut (dyn PluginExecutionContext + '_)>,
state: CurrentCallState,
) -> Result<PluginCallResponse<PipelineData>, ShellError> {
// Handle message from receiver
for msg in rx {
match msg {
ReceivedPluginCallMessage::Response(resp) => {
// ...
return Ok(resp);
ReceivedPluginCallMessage::Error(err) => {
return Err(err);
ReceivedPluginCallMessage::EngineCall(engine_call_id, engine_call) => {
Given all of this, let’s summarize how get_signature
- Create a
, which includesplugin_call_subscription_sender/receiver
pair. And receive messages in background. - Write
plugin call throughwrite_plugin_call
method, and wait on messages fromrx
, and result sender is send toPluginInterfaceManager
. PluginInterfaceManager
will send response when it receives output from plugin.
How to run command
Actually it’s very similar to get_signature
from plugin, except it calls write_plugin_call
with PluginCall::Run
fn write_plugin_call(
mut call: PluginCall<PipelineData>,
context: Option<&dyn PluginExecutionContext>,
) -> Result<WritePluginCallResult, ShellError> {
// Prepare the call with the state.
state.prepare_plugin_call(&mut call, &self.state.source)?;
// Convert the call into one with a header and handle the stream, if necessary
let (call, writer) = match call {
PluginCall::Signature => (PluginCall::Signature, Default::default()),
PluginCall::CustomValueOp(value, op) => {
(PluginCall::CustomValueOp(value, op), Default::default())
PluginCall::Run(CallInfo {
mut call,
}) => {
state.prepare_call_args(&mut call, &self.state.source)?;
let (header, writer) = self.init_write_pipeline_data(input, &state)?;
PluginCall::Run(CallInfo {
input: header,
// Register the subscription to the response, and the context
// subscribe stream...
// ...
// Write request
self.write(PluginInput::Call(id, call))?;
Ok(WritePluginCallResult {
receiver: rx,
Then it finish writing the plugin call in background, this enables engine send command name and input arguments in background. Then nushell engine can do other jobs. Note that, it’s PluginInterfaceManager
’s job to get response from plugin, if response is PipelineData::ListStream
or PipelineData::ExternalStream
, the Manager
handles stream reading logic, and then send back to engine through plugin call result channel.
The core logic lays inside PluginInterfaceManager::consume
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from plugin: {:?}", input);
match input {
// ....
PluginOutput::Stream(message) => self.consume_stream_message(message),
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = response
.map_data(|data| {
let ctrlc = self.get_ctrlc(id)?;
// Register the streams in the response
for stream_id in data.stream_ids() {
self.recv_stream_started(id, stream_id);
self.read_pipeline_data(data, ctrlc.as_ref())
.unwrap_or_else(|err| {
// If there's an error with initializing this stream, change it to a plugin
// error response, but send it anyway
let result = self.send_plugin_call_response(id, response);
if result.is_ok() {
// When a call ends, it releases a lock on the GC
if let Some(ref gc) = self.gc {
// ...
uses StreamManager
to manage streams.
Stream Management
I drawed another diagram to show how to manage streams.
Receive PluginOutput::CallResponse from plugin
When plugin returns a response, it contains information to indicate the response contains a stream, so PluginInterfaceManager
can add the stream to reading streams.
PluginOutput::CallResponse(id, response) => {
// Handle reading the pipeline data, if any
let response = response
.map_data(|data| {
// ...
self.read_pipeline_data(data, ctrlc.as_ref())
// ...
// read pipeline data
fn read_pipeline_data(
header: PipelineDataHeader,
ctrlc: Option<&Arc<AtomicBool>>,
) -> Result<PipelineData, ShellError> {
self.prepare_pipeline_data(match header {
PipelineDataHeader::ListStream(info) => {
let handle = self.stream_manager().get_handle();
let reader = handle.read_stream(, self.get_interface())?;
PipelineData::ListStream(ListStream::from_stream(reader, ctrlc.cloned()), None)
// ...
// Similar to when header is PipelineDataHerder::ExternamStream
Here, handle.read_stream
will insert a new entrypoint to reading_streams
, which is a map from stream id to pipeline data sender.
pub(crate) fn read_stream<T, W>(
id: StreamId,
writer: W,
) -> Result<StreamReader<T, W>, ShellError>
T: TryFrom<StreamData, Error = ShellError>,
W: WriteStreamMessage,
let (tx, rx) = mpsc::channel();
self.with_lock(|mut state| {
// Must be exclusive
if let btree_map::Entry::Vacant(e) = state.reading_streams.entry(id) {
} else {
// ...
Ok(StreamReader::new(id, rx, writer))
Finally, after insert id to reading_streams
, read_pipeline_data
will return PipelineData
, and it will be our response to a plugic_call. Then nushell engine can process next command.
More data comes from plugin
When more data comes from plugin, PluginInterfaceManager
receives PluginOutput::Stream
, in the case, the manager consume_stream_message
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
log::trace!("from plugin: {:?}", input);
match input {
// ...
PluginOutput::Stream(message) => self.consume_stream_message(message),
// ...
For consuming, it will get stream id from message
, and try to find it from reading_streams
, then send data through channel.
StreamMessage::Data(id, data) => {
if let Some(sender) = state.reading_streams.get(&id) {
let _ = sender.send(Ok(Some(data)));
So in summary, this is how PluginInterfaceManager
manage streams:
- When receive response from plugin, if it contains stream data, create an entry in
, the key is stream id, the value is result data sender channel. - result receiver of channel will be wrapped in
, and returns to nushell engine. - as more data returns from plugin,
will get data sender channel fromreading_streams
, and send these data out.
Reference source code:
All these code in previous sessions can be found here:
- make_plugin_interface: create a new
and runconsume_all
in background. - consume_all: run in background to consume all messages from
. - consume: a main handler to consume a message, it’s invoked by
. - write_plugin_call: Write a plugin call message. Returns the writer for the stream.
- receive_plugin_call_resposne: Read the channel for plugin call messages and handle them until the response is received.
- read_pipeline_data: Read PipelineData from
’s output. - read_stream: Register a new stream for reading.