希望
我想重复运行命令以响应通过管道传递给它的行:
firehose | expensive-command
但是,我收到了很多行,并且该命令非常占用资源。我希望过滤命令的输入,以便它最多只运行一次X秒:
firehose | interval 1 second | expensive-command
该interval
命令不应该只是一个过滤器。如果有的话,它应该在冷却期结束时发送最近收到的行,而不是仅仅阻止冷却期间到达的所有内容。
我怎样才能做到这一点?
试图
epoch () { date +%s --date="$*" }
interval () {
INTERVAL="$*"
LAST_RUN_AT=0
WHEN_TO_RUN=0
while read LINE; do
if (( $(epoch now) >= $WHEN_TO_RUN )) then
echo $LINE
WHEN_TO_RUN="$(epoch now + $INTERVAL)"
fi
done
}
alias firehose='(print "1\n2\n3" ; sleep 2 ; print "4\n")'
alias expensive-command='cat'
firehose | interval 1 second | expensive-command
这在很大程度上是有效的,但存在一个问题,即它不能将传递线路延迟到稍后——它只能决定立即传递它们,或者丢弃它们。
会发生什么:
1
4
油门接收1
并将其传递,然后继续冷却。和1
在3
冷却期间到达,因此它们被完全丢弃。冷却时间在4
到达之前就结束了,所以它被传递了。
我希望发生的事情:
1
3
4
收到 后1
,油门应冷却 1 秒。然后它应该会收到2
,并将其归档以供稍后使用,因为它仍处于冷却状态。然后它会收到3
,它会替换2
稍后提交的内容。然后油门就会停止冷却,此时它应该立即发送3
.最后,4
当该回合的冷却时间结束时,它会到达,因此会立即发送。
如果 zsh 有关闭,我会启动一个休眠的子 shell $INTERVAL
,然后echo
es 最后收到的LINE
,但可惜的是,zsh 没有闭包。
答案1
问题是你需要一个超时的读取。如果firehose
没有发送任何内容,则您的循环无限期地阻塞,并且当它这样做时,它无法发送最近收到的行。Bash 有 -t 参数来表示超时读取。如果 zshread
有这个,那就可以使用它了。
该算法是不断读取行,并设置一个超时时间,该超时时间总是重新计算(越来越短),以在一秒(或任何时间)间隔结束时到期。当该间隔到达时,如果已读取一行或多行,则发送最后一行。否则不发送任何内容,现在开始读取下一个间隔的行。
您可以对收到的第一行或在长于间隔时间后收到的第一行实施“即时传递”。就像如果间隔是 1 秒,并且firehose
自上次输出一行以来 1.5 秒内没有任何内容,那么该行可以通过,并且机器可以重置以在该点开始新的一秒间隔。
TXR Lisp 中的这个概念验证实现对我来说很有效,验证了基本算法:
(defvarl %interval% 1000000) ;; us
(defun epoch-usec ()
(tree-bind (sec . usec) (time-usec)
(+ (* 1000000 sec) usec)))
(let ((now (epoch-usec))
(*stdin* (open-fileno (fileno *stdin*) "rl")) ;; line buffered
remaining-time next-time line done)
(while (not done)
(set next-time (+ now %interval%))
(set remaining-time (- next-time now))
(while (poll (list (cons *stdin* poll-in))
(trunc remaining-time 1000))
;; got a line or EOF poll: no timeout
(iflet ((nline (get-line)))
(set line nline) ;; got line
(progn (flip done) (return))) ;; EOF poll
(set now (epoch-usec))
(when (minusp (set remaining-time (- next-time now)))
(return)))
;; timeout, past deadline or exit: flush line, if any:
(when line
(put-line line)
(set line nil))))
设置无缓冲流,因为poll
正在用于超时读取,并且poll
看不到流缓冲区。这个想法是,当流中存在未读的缓冲数据时,我们不想轮询输入。这是一个挑剔。在测试中,我并没有真正看到这与仅使用缓冲的原始*stdin*
流之间的行为有任何质的差异。如果我们在流中有缓冲数据而文件描述符中没有数据时浪费时间进行轮询,那么我们保证等待的时间不会超过间隔时间,如果新数据到达较早,则等待时间会少于间隔时间。
我们假设成功poll
意味着我们可以阅读整行。poll
当然,并不能保证这一点,但行为良好的文本流输入源应该保证,如果输入的一个字节可用于唤醒poll
,则该字节后面有一个完整的行,而不会出现任何不当的延迟。
剩余时间计算使用日历时间,而poll
仅使用可能对时间调整不敏感的相对等待。因此,通常的注意事项适用。如果时钟突然向后跳,哎呀!
这些测试用例的进行没有任何明显的延迟:
$ echo foo | txr throttle.txr
foo
$ (echo foo; echo bar) | txr throttle.tl
bar
$ (echo foo; echo bar; echo xyzzy) | txr throttle.tl
xyzzy
然后:
$ (echo foo; sleep 2; echo bar; sleep 2; echo xyzzy) | txr throttle.tl
foo
bar
xyzzy
我已经测试过find / | txr throttle.tl
等等。
答案2
第一个变体(不起作用,请参阅第二个变体)
看来我们不能使用read
命令来执行此类任务,因为它read
会停止while
循环执行。
看这个例子:(printf "1\n2\n3\n" ; sleep 5; printf "4\n") | while read -r line; do echo hello; done
。
while
内部循环read
将这样执行:
- 1 次迭代 - 读取
1
; - 2次迭代-读取
2
; - 3次迭代——读取
3
; - 4 次迭代 - 等待 5 秒,然后读取
4
.
我们无法在这个循环内进行预定工作,例如“每 1 秒执行一次” - 因为它将定期停止,等待输入。例如,它可能会等待 1 分钟或更长时间,我们的预定工作也会停止。
function interval () {
amount_of_seconds=$1
print_time=0
buffer=''
while read -r line; do
current_time=$(date +%s)
if (( current_time > print_time )); then
echo -e "${buffer}${line}"
buffer=''
print_time=$((current_time + amount_of_seconds))
else
buffer="$line\n"
fi
done
echo -en "$buffer"
}
测试:
$ alias firehose='(printf "1\n2\n3\n" ; sleep 2 ; printf "4\n"; sleep 2 ; printf "5\n6\n7\n" ; sleep 2; printf "8\n")'
$ firehose | interval 1 | cat
1
3
4
5
7
8
$
第二种变体
将输出重定向firehose
到文件:(
firehose >> buffer_file.txt
解释原因>>
而不见>
下文)
expensive-command
每秒都会读取该文件的最后一行并刷新文件:
while true; do
tail -n 1 buffer_file.txt | expensive-command
# clear file
echo -n '' > buffer_file.txt
# and sleep 1 second
sleep 1
done
结果,我们将得到下一个:
- 两个命令同时运行(
firehose
在后台):
firehose >> buffer_file.txt & ./script_with_expensive_command_inside.sh
APPEND 运算符 ->>
之后需要firehose
,而不是 WRITE>
。否则文件将不会被清理并且会不断增长。这里解释一下这种行为。 - 所有不需要的行都会被丢弃,只有最后一行会传递到
expensive command
- 最后一行将被保存,在
expensive command
不读取它并清除文件之前。
答案3
我做到了!
这是我的interval
脚本(也在 github 上):
#!/usr/bin/env zsh
# Lets a line pass only once every $1 seconds. If multiple lines arrive during
# the cooldown interval, only the latest is passed on when the cooldown ends.
INTERVAL="$1"
CHILD_PID=
BUFFER=$(mktemp)
CAN_PRINT_IMMEDIATELY=1
CAN_START_SUBPROCESS=1
# Reset state when child process returns
child-return () {
CAN_START_SUBPROCESS=1
CAN_PRINT_IMMEDIATELY=1
}
trap child-return CHLD
# Clean up when quitting
cleanup () {
kill -TERM "$CHILD_PID" &> /dev/null
rm "$BUFFER"
exit
}
trap cleanup TERM INT QUIT
while read LINE; do
# If we're just starting, just print immediately
if [[ -n $CAN_PRINT_IMMEDIATELY ]]; then
echo $LINE
CAN_PRINT_IMMEDIATELY=
else
# Otherwise, store the line for later
echo "$LINE" > $BUFFER
# And spawn a subprocess to handle it one interval later, unless one is
# already running. With the SIGCHLD trap, the state variables will
# reset when it exits.
if [[ -n $CAN_START_SUBPROCESS ]]; then
CAN_START_SUBPROCESS=
(
sleep $INTERVAL
tail -n1 $BUFFER
) &
CHILD_PID=$!
fi
fi
done
# Once we exhaust stdin, wait for the last child process to finish, if any.
if [[ -n $CHILD_PID ]]; then
wait $CHILD_PID &> /dev/null
cleanup
fi
我观察到循环read
行并不总是负责打印它们,因为程序有时需要异步打印行(当没有收到任何消息时,有时甚至在stdin
结束后很久)。因此就有了子进程。
它的工作原理如下,输入也tee >(sed)
放在一边以观察时间:
这与我之前的图表相符:
答案4
这应该以一种非常简单的方式做你想做的事:)
firehose | awk '{print $1; system("sleep 1")}' | expensive-command
它的缺点是整个事情变得有点难以杀死(killall awk
可以工作但相当优雅),但至少它很简单并且不需要特殊的脚本或任何东西。