使用并行运行空间执行:

使用并行运行空间执行:

如果我有一个需要在多台计算机上运行的脚本,或者需要使用多个不同的参数,我该如何并行执行它,而不必承担生成新脚本的开销?PSJob 具有Start-Job

举个例子,我想重新同步所有域成员的时间,像这样:

$computers = Get-ADComputer -filter * |Select-Object -ExpandProperty dnsHostName
$creds = Get-Credential domain\user
foreach($computer in $computers)
{
    $session = New-PSSession -ComputerName $computer -Credential $creds
    Invoke-Command -Session $session -ScriptBlock { w32tm /resync /nowait /rediscover }
}

但我不想等待每个 PSSession 连接并调用命令。如何在没有作业的情况下并行完成此操作?

答案1

更新 - 虽然这个答案解释了 PowerShell 运行空间的过程和机制,以及它们如何帮助你多线程非连续工作负载,但 PowerShell 爱好者沃伦‘饼干怪兽’F已经付出了额外的努力,将这些相同的概念融入到一个名为 Invoke-Parallel - 它的功能如下所述,并且他已经用可选开关对其进行了扩展,用于记录日志并准备好会话状态,包括导入的模块,非常酷的东西 - 我强烈建议你一探究竟在构建您自己的闪亮解决方案之前!


使用并行运行空间执行:

减少不可避免的等待时间

在原始特定情况下,调用的可执行文件有一个/nowait选项,可以防止在作业(在本例中为时间重新同步)自行完成时阻塞调用线程。

从发行方的角度来看,这大大减少了总体执行时间,但连接到每台机器仍按顺序进行。由于超时等待的累积,按顺序连接到数千个客户端可能需要很长时间,具体取决于由于某种原因无法访问的机器数量。

为了避免在出现一次或连续几次超时的情况下必须对所有后续连接进行排队,我们可以将连接和调用命令的作业分派到单独的 PowerShell 运行空间,并并行执行。

什么是运行空间?

A运行空间是您的 powershell 代码在其中执行的虚拟容器,并从 PowerShell 语句/命令的角度表示/保存环境。

从广义上讲,1 个运行空间 = 1 个执行线程,因此我们对 PowerShell 脚本进行“多线程”所需的只是一组可以并行执行的运行空间。

与原始问题类似,调用多个运行空间的命令的工作可以分解为:

  1. 创建运行空间池
  2. 将 PowerShell 脚本或等效的可执行代码分配给 RunspacePool
  3. 异步调用代码(即不必等待代码返回)

RunspacePool 模板

PowerShell 有一个类型加速器,称为[RunspaceFactory]这将帮助我们创建运行空间组件 - 让我们开始工作吧

1.创建一个RunspacePool并Open()

$RunspacePool = [runspacefactory]::CreateRunspacePool(1,8)
$RunspacePool.Open()

传递给 的两个参数CreateRunspacePool()18是允许在任何给定时间执行的最小和最大运行空间数量,为我们提供了有效的最大限度并行度共 8 个。

2. 创建 PowerShell 的一个实例,将一些可执行代码附加到它并将其分配给我们的 RunspacePool:

PowerShell 实例与进程(实际上是主机应用程序)不同powershell.exe,而是一个表示要执行的 PowerShell 代码的内部运行时对象。我们可以使用[powershell]类型加速器在 PowerShell 中创建一个新的 PowerShell 实例:

$Code = {
    param($Credentials,$ComputerName)
    $session = New-PSSession -ComputerName $ComputerName -Credential $Credentials
    Invoke-Command -Session $session -ScriptBlock {w32tm /resync /nowait /rediscover}
}
$PSinstance = [powershell]::Create().AddScript($Code).AddArgument($creds).AddArgument("computer1.domain.tld")
$PSinstance.RunspacePool = $RunspacePool

3. 使用 APM 异步调用 PowerShell 实例:

使用 .NET 开发术语中所谓的异步编程模型,我们可以将命令的调用拆分为一个Begin方法,用于给出“绿灯”来执行代码,以及一个End用于收集结果的方法。由于我们在这种情况下并不真正关心任何反馈(我们无论如何都不会等待输出w32tm),我们可以通过简单地调用第一个方法来做到这一点

$PSinstance.BeginInvoke()

将其包装到 RunspacePool 中

使用上述技术,我们可以将创建新连接和调用远程命令的顺序迭代包装在并行执行流中:

$ComputerNames = Get-ADComputer -filter * -Properties dnsHostName |select -Expand dnsHostName

$Code = {
    param($Credentials,$ComputerName)
    $session = New-PSSession -ComputerName $ComputerName -Credential $Credentials
    Invoke-Command -Session $session -ScriptBlock {w32tm /resync /nowait /rediscover}
}

$creds = Get-Credential domain\user

$rsPool = [runspacefactory]::CreateRunspacePool(1,8)
$rsPool.Open()

foreach($ComputerName in $ComputerNames)
{
    $PSinstance = [powershell]::Create().AddScript($Code).AddArgument($creds).AddArgument($ComputerName)
    $PSinstance.RunspacePool = $rsPool
    $PSinstance.BeginInvoke()
}

假设 CPU 有能力同时执行所有 8 个运行空间,我们应该能够看到执行时间大大减少,但由于使用了相当“先进”的方法,脚本的可读性会受到影响。


确定最佳并行度:

我们可以轻松创建一个允许同时执行 100 个运行空间的 RunspacePool:

[runspacefactory]::CreateRunspacePool(1,100)

但归根结底,这一切都取决于我们的本地 CPU 可以处理多少个执行单元。换句话说,只要您的代码正在执行,允许比逻辑处理器分配代码执行更多的运行空间是没有意义的。

得益于 WMI,这个阈值相当容易确定:

$NumberOfLogicalProcessor = (Get-WmiObject Win32_Processor).NumberOfLogicalProcessors
[runspacefactory]::CreateRunspacePool(1,$NumberOfLogicalProcessors)

另一方面,如果你正在执行的代码本身由于网络延迟等外部因素而导致大量等待时间,那么你仍然可以从运行比逻辑处理器更多的同时运行空间中获益,因此你可能需要测试可能的最大运行空间范围以找到收支平衡

foreach($n in ($NumberOfLogicalProcessors..($NumberOfLogicalProcessors*3)))
{
    Write-Host "$n: " -NoNewLine
    (Measure-Command {
        $Computers = Get-ADComputer -filter * -Properties dnsHostName |select -Expand dnsHostName -First 100
        ...
        [runspacefactory]::CreateRunspacePool(1,$n)
        ...
    }).TotalSeconds
}

答案2

除了这个讨论之外,缺少的是一个用于存储从运行空间创建的数据的收集器,以及一个用于检查运行空间状态的变量,即它是否完成。

#Add an collector object that will store the data
$Object = New-Object 'System.Management.Automation.PSDataCollection[psobject]'

#Create a variable to check the status
$Handle = $PSinstance.BeginInvoke($Object,$Object)

#So if you want to check the status simply type:
$Handle

#If you want to see the data collected, type:
$Object

答案3

查看PoshRSJob。它提供与本机 *-Job 函数相同/相似的功能,但使用比标准 Powershell 作业更快、响应更快的运行空间。

答案4

@mathias-r-jessen 有一个很棒的回答但我还想补充一些细节。

最大线程数

理论上,线程应该受系统处理器数量的限制。然而,在测试异步Tcp扫描我通过为 选择一个更大的值获得了更好的性能MaxThreads。这就是该模块有一个-MaxThreads输入参数的原因。请记住,分配太多线程会影响性能。

返回数据

从中获取数据ScriptBlock比较困难。我更新了 OP 代码并将其集成到用于异步Tcp扫描

警告:我无法测试以下代码。我根据使用 Active Directory cmdlet 的经验对 OP 脚本进行了一些更改。

# Script to run in each thread.
[System.Management.Automation.ScriptBlock]$ScriptBlock = {

    $result = New-Object PSObject -Property @{ 'Computer' = $args[0];
                                               'Success'  = $false; }

    try {
            $session = New-PSSession -ComputerName $args[0] -Credential $args[1]
            Invoke-Command -Session $session -ScriptBlock { w32tm /resync /nowait /rediscover }
            Disconnect-PSSession -Session $session
            $result.Success = $true
    } catch {

    }

    return $result

} # End Scriptblock

function Invoke-AsyncJob
{
    [CmdletBinding()]
    param(
        [parameter(Mandatory=$true)]
        [System.Management.Automation.PSCredential]
        # Credential object to login to remote systems
        $Credentials
    )

    Import-Module ActiveDirectory

    $Results = @()

    $AllJobs = New-Object System.Collections.ArrayList

    $AllDomainComputers = Get-ADComputer -Filter * -Properties dnsHostName

    $HostRunspacePool = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspacePool(2,10,$Host)

    $HostRunspacePool.Open()

    foreach($DomainComputer in $AllDomainComputers)
    {
        $asyncJob = [System.Management.Automation.PowerShell]::Create().AddScript($ScriptBlock).AddParameters($($($DomainComputer.dnsName),$Credentials))

        $asyncJob.RunspacePool = $HostRunspacePool

        $asyncJobObj = @{ JobHandle   = $asyncJob;
                          AsyncHandle = $asyncJob.BeginInvoke()    }

        $AllJobs.Add($asyncJobObj) | Out-Null
    }

    $ProcessingJobs = $true

    Do {

        $CompletedJobs = $AllJobs | Where-Object { $_.AsyncHandle.IsCompleted }

        if($null -ne $CompletedJobs)
        {
            foreach($job in $CompletedJobs)
            {
                $result = $job.JobHandle.EndInvoke($job.AsyncHandle)

                if($null -ne $result)
                {
                    $Results += $result
                }

                $job.JobHandle.Dispose()

                $AllJobs.Remove($job)
            } 

        } else {

            if($AllJobs.Count -eq 0)
            {
                $ProcessingJobs = $false

            } else {

                Start-Sleep -Milliseconds 500
            }
        }

    } While ($ProcessingJobs)

    $HostRunspacePool.Close()
    $HostRunspacePool.Dispose()

    return $Results

} # End function Invoke-AsyncJob

相关内容