aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorromkatv <roman.perepelitsa@gmail.com>2020-01-18 13:09:18 +0300
committerromkatv <roman.perepelitsa@gmail.com>2020-01-18 13:09:18 +0300
commitda498aef57e9a02527a0453eb38c0187029431f6 (patch)
tree29097f61dbabfa0a0d5470c38bdb61110171ed4e
parenta652d49bd906e34a91bcc832461a15e75b1519c1 (diff)
support parallelism in worker
-rw-r--r--internal/worker.zsh214
1 files changed, 141 insertions, 73 deletions
diff --git a/internal/worker.zsh b/internal/worker.zsh
index b086401e..3d296512 100644
--- a/internal/worker.zsh
+++ b/internal/worker.zsh
@@ -1,15 +1,59 @@
# invoked in worker: _p9k_worker_main <timeout>
function _p9k_worker_main() {
- local _p9k_worker_buf _p9k_worker_cmd
+ zmodload zsh/system || return
+ zmodload zsh/zselect || return
+ zselect -t0
+ (( $? == 1 )) || return
+
+ local req fd
+ local -A inflight # fd => id$'\x1f'sync
+ local -a ready
+ local _p9k_worker_tmout # empty or non-negative int, in hundredths of a second
while true; do
- if sysread -t $1 '_p9k_worker_buf[$#_p9k_worker_method+1]'; then
- _p9k_worker_cmd=${_p9k_worker_buf%%$'\x1e'*}
- if (( $#_p9k_worker_cmd != $#_p9k_worker_buf )); then
- _p9k_worker_buf[1,$#_p9k_worker_cmd+1]=""
- eval $_p9k_worker_cmd
- fi
+ if zselect -a ready ${_p9k_worker_tmout:+-t$_p9k_worker_tmout} 0 ${(k)inflight}; then
+ [[ $ready[1] == -r ]] || return
+ for fd in ${ready:1}; do
+ if [[ $fd == 0 ]]; then
+ local buf=
+ while true; do
+ sysread -t 0 'buf[$#buf+1]' && continue
+ (( $? == 4 )) || return
+ [[ $buf[-1] == (|$'\x1e') ]] && break
+ sysread 'buf[$#buf+1]' || return
+ done
+ for req in ${(ps:\x1e:)buf}; do
+ local parts=("${(@ps:\x1f:)req}") # id cond async sync
+ if () { eval $parts[2] }; then
+ if [[ -n $parts[3] ]]; then
+ sysopen -r -o cloexec -u fd <(
+ local REPLY=; eval $parts[3]; print -rn -- $REPLY) || return
+ inflight[$fd]=$parts[1]$'\x1f'$parts[4]
+ continue
+ fi
+ local REPLY=
+ () { eval $parts[4] }
+ fi
+ if [[ -n $parts[1] ]]; then
+ print -rn -- d$parts[1]$'\x1e' || return
+ fi
+ done
+ else
+ local REPLY=
+ while true; do
+ sysread -i $fd 'REPLY[$#REPLY+1]' && continue
+ (( $? == 5 )) || return
+ break
+ done
+ local parts=("${(@ps:\x1f:)inflight[$fd]}") # id sync
+ () { eval $parts[2] }
+ if [[ -n $parts[1] ]]; then
+ print -rn -- d$parts[1]$'\x1e' || return
+ fi
+ unset "inflight[$fd]"
+ fi
+ done
else
- (( $? == 4 )) || return
+ (( $? == 1 )) || return
(( $+functions[_p9k_worker_on_timeout] )) && _p9k_worker_on_timeout
fi
done
@@ -21,8 +65,8 @@ typeset -g _p9k__worker_resp_fd
typeset -g _p9k__worker_shell_pid
typeset -g _p9k__worker_file_prefix
typeset -ga _p9k__worker_params
-typeset -gA _p9k__worker_functions
-typeset -gA _p9k__worker_requests
+typeset -gA _p9k__worker_request_map
+typeset -ga _p9k__worker_request_queue
# invoked in master: _p9k_worker_reply <list>...
#
@@ -33,14 +77,15 @@ function _p9k_worker_reply() { eval $1 }
# usage: _p9k_worker_reply <list>
function _p9k_worker_reply_remote() { print -rn -- e$1$'\x1e' }
-# invoked in worker: _p9k_worker_done <request-id>
-function _p9k_worker_done() { print -rn -- d$1$'\x1e' }
+# invoked in worker: _p9k_worker_on_timeout
+function _p9k_worker_on_timeout() {
+ _p9k_worker_tmout=
+ _p9k_worker_reply _p9k_worker_keep_alive
+}
-# invoked in master: _p9k_worker_eval <list>
-function _p9k_worker_eval() {
- print -rnu $_p9k__worker_req_fd -- $1$'\x1e' && return
- _p9k_worker_stop
- return 1
+# invoked in master: _p9k_worker_keep_alive
+function _p9k_worker_keep_alive() {
+ _p9k_worker_invoke "" "_p9k_worker_tmout=100" "" ""
}
# invoked in master: _p9k_worker_send_params [param]...
@@ -48,7 +93,7 @@ function _p9k_worker_send_params() {
[[ -z $_p9k__worker_resp_fd || $# == 0 ]] && return
if [[ -n $_p9k__worker_req_fd ]]; then
{
- typeset -p -- $* && print -rn -- $'\x1e' && return
+ print -rn -- $'\x1f' && typeset -p -- $* && print -rn -- $'\x1f\x1f\x1e' && return
} >&$_p9k__worker_req_fd
_p9k_worker_stop
return 1
@@ -57,30 +102,27 @@ function _p9k_worker_send_params() {
fi
}
-# invoked in master: _p9k_worker_invoke <request-id> <func> [arg]...
+# invoked in master: _p9k_worker_invoke <request-id> <cond> <async> <sync>
function _p9k_worker_invoke() {
if [[ -n $_p9k__worker_resp_fd ]]; then
- if [[ -n $_p9k__worker_req_fd ]]; then
- local req
- if (( ! $+_p9k__worker_functions[$2] )); then
- req+="function $2() { $functions[$2] ; }"$'\n'
- _p9k__worker_functions[$2]=
- fi
- if (( ! $+_p9k__worker_requests[$1] )); then
- req+="${(j: :)${(@q)${@:2}}}"$'\n'
- _p9k__worker_requests[$1]=
- else
- _p9k__worker_requests[$1]="${(j: :)${(@q)${@:2}}}"
- fi
- if [[ -n $req ]]; then
- _p9k_worker_eval $req"_p9k_worker_done ${(q)1}"
- fi
+ local req=$1$'\x1f'$2$'\x1f'$3$'\x1f'$4$'\x1e'
+ if [[ -n $_p9k__worker_req_fd && $+_p9k__worker_request_map[$1] == 0 ]]; then
+ [[ -n $1 ]] && _p9k__worker_request_map[$1]=
+ print -rnu $_p9k__worker_req_fd -- $req
+ return
+ fi
+ if [[ -n $1 ]]; then
+ (( $+_p9k__worker_request_map[$1] )) || _p9k__worker_request_queue+=$1
+ _p9k__worker_request_map[$1]=$req
else
- _p9k__worker_functions[$2]=
- _p9k__worker_requests[$1]="${(j: :)${(@q)${@:2}}}"
+ _p9k__worker_request_queue+=$req
fi
else
- "${@:2}"
+ if () { eval $2 }; then
+ local REPLY=
+ () { eval $3 }
+ () { eval $4 }
+ fi
fi
}
@@ -104,8 +146,8 @@ function _p9k_worker_stop() {
_p9k__worker_resp_fd=
_p9k__worker_shell_pid=
_p9k__worker_params=()
- _p9k__worker_requests=()
- _p9k__worker_functions=()
+ _p9k__worker_request_map=()
+ _p9k__worker_request_queue=()
return 0
}
@@ -128,12 +170,12 @@ function _p9k_worker_receive() {
local arg=$resp[2,-1]
case $resp[1] in
d)
- local req=$_p9k__worker_requests[$arg]
+ local req=$_p9k__worker_request_map[$arg]
if [[ -n $req ]]; then
- _p9k__worker_requests[$arg]=
- _p9k_worker_eval $req$'\n'"_p9k_worker_done ${(q)arg}" || return
+ _p9k__worker_request_map[$arg]=
+ print -rnu $_p9k__worker_req_fd -- $req || return
else
- unset "_p9k__worker_requests[$arg]"
+ unset "_p9k__worker_request_map[$arg]"
fi
;;
e)
@@ -157,29 +199,32 @@ function _p9k_worker_receive() {
setopt no_hist_expand extended_glob no_prompt_bang prompt_percent prompt_subst no_aliases
zmodload zsh/system
zmodload zsh/datetime
+ function _p9k_worker_main() { $functions[_p9k_worker_main] }
function _p9k_worker_reply() { $functions[_p9k_worker_reply_remote] }" || return
- local f
- for f in _p9k_worker_done _p9k_worker_main ${(k)_p9k__worker_functions}; do
- print -r -- "function $f() { $functions[$f] }" || return
- done
+ if (( _POWERLEVEL9K_EXPERIMENTAL_TIME_REALTIME )); then
+ print -r -- "function _p9k_worker_on_timeout() {
+ $functions[_p9k_worker_on_timeout] }" || return
+ fi
+ print -r -- "_p9k_worker_main" || return
+ print -rn -- $'\x1e' || return
if (( $#_p9k__worker_params )); then
+ print -rn -- $'\x1f' || return
typeset -p -- $_p9k__worker_params || return
+ print -rn -- $'\x1f\x1f\x1e' || return
_p9k__worker_params=()
fi
- local id list
- for id list in "${(@kv)_p9k__worker_requests}"; do
- print -rl -- $list "_p9k_worker_done ${(q)id}" || return
- _p9k__worker_requests[$id]=
+ local req=
+ for req in $_p9k__worker_request_queue; do
+ if [[ $req != *$'\x1e' ]]; then
+ local id=$req
+ req=$_p9k__worker_request_map[$id]
+ _p9k__worker_request_map[$id]=
+ fi
+ print -rnu $_p9k__worker_req_fd -- $req || return
done
- if (( _POWERLEVEL9K_EXPERIMENTAL_TIME_REALTIME )); then
- print -r -- "
- function _p9k_worker_on_timeout() { _p9k_worker_reply '' }
- _p9k_worker_main 1" || return
- else
- print -r -- "_p9k_worker_main -1" || return
- fi
- print -rn -- $'\x1e' || return
+ _p9k__worker_request_queue=()
} >&$_p9k__worker_req_fd
+ (( _POWERLEVEL9K_EXPERIMENTAL_TIME_REALTIME )) && _p9k_worker_keep_alive
;;
*)
return 1
@@ -212,7 +257,7 @@ function _p9k_worker_start() {
fi
log_file=/tmp/log # todo: remove
- trace=
+ trace=x
local fifo=$_p9k__worker_file_prefix.fifo
local cmd=(
@@ -242,7 +287,7 @@ function _p9k_reset_prompt() {
emulate -L zsh -o prompt_subst # -o xtrace
POWERLEVEL9K_WORKER_LOG_LEVEL=DEBUG
-_POWERLEVEL9K_EXPERIMENTAL_TIME_REALTIME=1
+_POWERLEVEL9K_EXPERIMENTAL_TIME_REALTIME=0
typeset -F _p9k__last_prompt_update_time
zmodload zsh/datetime
@@ -253,27 +298,50 @@ typeset -F start_time=EPOCHREALTIME
_p9k_worker_start
echo -E - $((1000*(EPOCHREALTIME-start_time)))
-function compute_foo() {
- local f="${(q+)1} ${(q+)bar} $((foo_counter++))"
- _p9k_worker_reply "typeset -g foo=${(q)f}"
+function foo_cond() {
+ typeset -gi foo_counter
+ typeset -g foo="[$bar] cond $1 $((foo_counter++))"
}
-bar='rofl $ {'
+function foo_async() {
+ sleep 1
+ REPLY="$foo / async $1"
+}
-_p9k_worker_send_params bar
+function foo_sync() {
+ REPLY+=" / sync $1"
+ _p9k_worker_reply "typeset -g foo=${(q)REPLY}"
+}
() {
- local -i i
- for i in {1..10}; do
- _p9k_worker_invoke f compute_foo $i
+ typeset -g RPROMPT='$foo %*'
+ typeset -g bar='lol'
+ _p9k_worker_send_params bar
+
+ local f
+ for f in foo_{cond,async,sync}; do
+ _p9k_worker_invoke "" "function $f() { $functions[$f] }" "" ""
done
-}
-RPROMPT='$foo %*'
+ () {
+ local -i i
+ for i in {1..10}; do
+ _p9k_worker_invoke foo$i "foo_cond c$i\$\{" "foo_async a$i\$\{" "foo_sync s$i\$\{"
+ done
+ }
+}
function in_worker() {
_p9k_worker_reply 'echo roundtrip: $((1000*(EPOCHREALTIME-'$1'))) >>/tmp/log'
}
-_p9k_worker_invoke w in_worker $EPOCHREALTIME
-# for i in {1..100}; do _p9k_worker_invoke w$i in_worker $EPOCHREALTIME; done
+_p9k_worker_invoke "" "function in_worker() { $functions[in_worker] }" "" ""
+_p9k_worker_invoke w "in_worker $EPOCHREALTIME" "" ""
+# for i in {1..100}; do _p9k_worker_invoke w$i "in_worker $EPOCHREALTIME"; done
+
+# TODO:
+#
+# - Segment API: _p9k_prompt_foo_worker_{params,cond,async,sync}.
+# - _p9k_worker_request -- cacheable variable that contains full request to worker.
+# - _p9k_set_prompt sends stuff to worker or evals it.
+# - _p9k_on_expand has _REALTIME check at the top and sends keep-alive to worker.