Steven's profileLunatic ExperimentsPhotosBlogLists Tools Help

Blog


    January 31

    Update: Library-AsyncRunspace.ps1

    I have found a few bugs in Library-AsyncRunspace version 1.1 and I have added a couple of new functions so it's time for an update. The current version is 1.3 now. Yes, I skipped an update. Version 1.2 added the Stop-AsyncPipe function that can be used whenever a pipeline gets stuck in an infinite loop. Version 1.3 added the Write-AsyncMessage function and each new runspace also has a Read-AsyncMessage function. Some mild bugs were also fixed.

    An object queue has now been associated with each runspace. The Write-AsyncMessage function will enqueue an object into the queue and Read-AsyncMessage will dequeue objects form the object queue in order. The purpose of the queue is to easily allow control messages to be sent into a runspace that is operating on a pipeline that is of indefinite length.

    There is no specific use for this new feature. You may use it how you like. An example of its use would be to have a server run in a background runspace, periodically checking for messages in its queue for information on what to do next.
    All other changes are bug fixes.

    Here is the code:

    # Library-AsyncRunspace
    # Version: 1.3

    # Include with {. Library-AsyncRunspace}

    # Sugjested aliases...
    # New-Alias eap End-AsyncPipe
    # New-Alias gar Get-AsyncRunspace
    # New-Alias wam Write-AsyncMessage
    # New-Alias r! Invoke-ExpressionInRunspace
    # New-Alias iar Invoke-ExpressionInRunspace
    # New-Alias nar New-AsyncRunspace
    # New-Alias rap Read-AsyncPipe
    # New-Alias rar Remove-AsyncRunspace
    # New-Alias sap Start-AsyncPipe

    # Don't destroy previous runspace catalog
    if ( -not $RunspaceCatalog ) {

    # Declare the runspace catalog
    $RunspaceCatalog = New-Object PSObject
    $RunspaceCatalog | Add-Member NoteProperty RunspaceCountPosition 0 -PassThru |
     Add-Member NoteProperty RunspaceList @{}

    function New-AsyncRunspace {
     param ([String]$NewName)
     
     # Incroment id number counter
     $RunspaceCatalog.RunspaceCountPosition++
     
     # Generate a runspace and add usefull properties
     $NewRunspace = [management.automation.runspaces.runspacefactory]::CreateRunspace() |
      Add-Member NoteProperty Name $NewName -PassThru |
      Add-Member NoteProperty CurrentPipe $null -PassThru |
      Add-Member NoteProperty MessageQueue ([Collections.Queue]::Synchronized($(New-Object Collections.Queue))) -PassThru |
      Add-Member ScriptProperty PipeState { $this.CurrentPipe.PipelineStateInfo.State } -PassThru |
      Add-Member ScriptProperty PipeStateReason { $this.CurrentPipe.PipelineStateInfo.Reason } -PassThru |
      Add-Member ScriptProperty CurrentCommands { "{$($this.CurrentPipe.Commands)}" } -PassThru
     $NewRunspace.Open()
     
     # Add a reference to the message queue and a function to read it to the runspace
     $PipeLine = $NewRunspace.CreatePipeline({
      $MessageQueue = $input | Select-Object -first 1
      function Read-AsyncMessage {
       if ($MessageQueue.Count) { return $MessageQueue.Dequeue() }
       else { return $null }
      }
     })
     [void]$Pipeline.Input.Write($NewRunspace.MessageQueue)
     $Pipeline.Input.Close()
     $PipeLine.Invoke()
     
     # Add new runspace to catalog
     $RunspaceCatalog.RunspaceList[$RunspaceCatalog.RunspaceCountPosition] = $NewRunspace
     
     # Report new runspace id number
     $RunspaceCatalog.RunspaceCountPosition
    }

    function Remove-AsyncRunspace {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

     if ( -not ($RunspaceID -is [Int32] -or $RunspaceID -is [String])) { throw "A runspace id must be supplied by integer or name!" }
     
     #Find selected runspace by number
     if ($RunspaceID -is [Int32]) {
      $RunspaceCatalog.RunspaceList[$RunspaceID].Close()
      $RunspaceCatalog.RunspaceList.Remove($RunspaceID)
     }
     #Or find selected runspace by name
     else {
      $keys = $RunspaceCatalog.RunspaceList.Keys
      $keys | Foreach-Object {
       if ( $RunspaceCatalog.RunspaceList[$_].Name -like $RunspaceID ) {
        $RunspaceCatalog.RunspaceList[$_].Close()
        $RunspaceCatalog.RunspaceList.Remove($_)
       }
      }
     }
    }

    function Get-AsyncRunspace {
     param ($RunspaceID)
     
     # Return the entire list of available runspaces by default
     if ( $RunspaceID -eq $null ) {
      return $RunspaceCatalog.RunspaceList.Values
     }
     
     # Type check on id
     if ( -not ($RunspaceID -is [Int32] -or $RunspaceID -is [String])) { throw "A runspace id must be supplied by integer or name!" }
     
     #Find selected runspace by number
     if ($RunspaceID -is [Int32]) {
      $SelectedRunspace = $RunspaceCatalog.RunspaceList[$RunspaceID]
     }
     #Or find selected runspace by name
     else {
      $SelectedRunspace = $RunspaceCatalog.RunspaceList.Values | Where-Object { $_.Name -like $RunspaceID }
     }
     
     # Return result
     $SelectedRunspace
    }

    function Write-AsyncMessage {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."), [Object]$Message)
     
     $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
     
     # Throw error if no runspace was found with the given id
     if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

     # Append $Message to message queue
     $SelectedRunspace.MessageQueue.Enqueue($Message)
    }

    function Invoke-ExpressionInRunspace {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."), [String]$Command = (read-host "Enter a command to be executed in the runspace."))
     
     $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
     
     # Throw error if no runspace was found with the given id
     if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
     
     # Insert the command into the runspace.
     $Pipeline = $SelectedRunspace.CreatePipeline($Command)

     # Insert input into the new pipe
     $input | Foreach-Object { [void]$Pipeline.Input.Write($_) }
     $Pipeline.Input.Close()
     
     # Execute pipe
     $Pipeline.Invoke()
     
     # Read pipeline result
     if ($Pipeline.PipelineStateInfo.State -eq [management.automation.runspaces.pipelinestate]::failed) {
      throw $Pipeline.PipelineStateInfo.Reason
     }
     $Pipeline.Error.ReadToEnd() | Foreach-Object { Write-Error $_ }
     $Pipeline.Output.ReadToEnd()
    }

    function Start-AsyncPipe {
     param (
      $RunspaceID = (read-host "Enter a valid runspace ID or name."),
      [String]$Command = (read-host "Enter a command to executed in the runspace.")
     )

     BEGIN {
      $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
      
      # Throw error if no runspace was found with the given id
      if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
      
      # Insert the command into the runspace.
      $Pipeline = $SelectedRunspace.CreatePipeline($Command)
      
      # Start async process
      $Pipeline.InvokeAsync()
      
      # List this pipe as current with the runspace
      $SelectedRunspace.CurrentPipe = $Pipeline
     }
     
     PROCESS {
     # Insert input into the new pipe
     [void]$Pipeline.Input.Write($_)
     }
     
     END {
     # Close input
     $Pipeline.Input.Close()
     }
    }

    function Read-AsyncPipe {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))
     $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1

     # Throw error if no runspace was found with the given id
     if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

     if ($SelectedRunspace.CurrentPipe) {
      # Read pipeline result
      if ($SelectedRunspace.CurrentPipe.pipelineStateInfo.state -eq [management.automation.runspaces.pipelinestate]::failed) {
       throw $SelectedRunspace.CurrentPipe.PipelineStateInfo.Reason
      }
      $SelectedRunspace.CurrentPipe.Error.NonBlockingRead() | Foreach-Object { Write-Error $_ }
      $SelectedRunspace.CurrentPipe.Output.NonBlockingRead()
     }
    }

    function Stop-AsyncPipe {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

     $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
     
     # Throw error if no runspace was found with the given id
     if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
     
     if ( $SelectedRunspace.CurrentPipe ) {
      $SelectedRunspace.CurrentPipe.StopAsync()

      # Pipeline is no longer current
      $SelectedRunspace.CurrentPipe = $null
     }
    }

    function End-AsyncPipe {
     param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

     $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
     
     # Throw error if no runspace was found with the given id
     if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

     # Check current pipeline
     if ($SelectedRunspace.CurrentPipe) {
      # Pipeline is no longer current
      $Pipeline = $SelectedRunspace.CurrentPipe
      $SelectedRunspace.CurrentPipe = $null
     
      # Read pipeline result
      if ($Pipeline.PipelineStateInfo.State -eq [management.automation.runspaces.pipelinestate]::failed) {
       throw $Pipeline.PipelineStateInfo.Reason
      }
      while ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
       $Pipeline.Error.NonBlockingRead() | Foreach-Object { Write-Error $_ }
       $Pipeline.Output.NonBlockingRead()
       sleep -m 100
      }
     }
    }

    } # End if ( -not $RunspaceCatalog )