Skip to content

Commit

Permalink
handle $/cancelRequest promptly
Browse files Browse the repository at this point in the history
Summary:
I'm working to support IDE cancellation. The plan is that
1. while ClientIdeDaemon is busy in CPU-bound decling or typechecking work
2. while ClientLsp is awaiting a response from ClientIdeDaemon
3. ClientLsp must be able to see a VSCode cancellation request arrive over stdin, and respond by setting a shmem bit
4. ClientIdeDaemon must frequently poll that shmem bit and, if set, then terminate

This diff achieves part (3).

The changes in behavior are all under a JK, `lsp_cancellation`. Once rolled out to 50%, this is how I'll measure impact:
* autocomplete - https://fburl.com/scuba/hh_server_events/uk3sk49a
* squiggles - https://fburl.com/scuba/hh_server_events/f4chhv01

Differential Revision: D51521765

fbshipit-source-id: 4e33bc55b99b9de34e76f415625249afddbb3873
  • Loading branch information
Lucian Wischik authored and facebook-github-bot committed Nov 28, 2023
1 parent c3f63c7 commit 512bf66
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 34 deletions.
163 changes: 132 additions & 31 deletions hphp/hack/src/client/clientLsp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ let background_status_refresher (ide_service : ClientIdeService.t ref) : unit =
let _future = loop () in
()

(** A thin wrapper around ClientIdeMessage which turns errors into exceptions *)
(** A thin wrapper around ClientIdeMessage which turns errors into exceptions. *)
let ide_rpc
(ide_service : ClientIdeService.t ref)
~(tracking_id : string)
Expand Down Expand Up @@ -4104,35 +4104,48 @@ let cancel_if_stale (client : Jsonrpc.t) (timestamp : float) (timeout : float) :
else
Lwt.return_unit

(** This is called before we even start processing a message. Its purpose:
if the Jsonrpc queue has already previously read off stdin a cancellation
request for the message we're about to handle, then throw an exception.
There are races, e.g. we might start handling this request because we haven't
yet gotten around to reading a cancellation message off stdin. But
that's inevitable. Think of this only as best-effort. *)
let assert_workers_are_not_stopped () =
if WorkerCancel.is_stop_requested () then begin
HackEventLogger.invariant_violation_bug
"ClientLsp workers are stopped but shouldn't be";
WorkerCancel.resume_workers ()
end;
()

(** This predicate will return true if and only if the second parameter
is a $/cancelRequest notification for LSP request [message]. *)
let is_cancellation_of_request ~message { Jsonrpc.json; _ } : bool =
match message with
| ResponseMessage _
| NotificationMessage _ ->
false
| RequestMessage (id, _request) ->
let module M = struct
exception Is_response
end in
(try
let peek = Lsp_fmt.parse_lsp json (fun _ -> raise M.Is_response) in
match peek with
| NotificationMessage
(CancelRequestNotification { Lsp.CancelRequest.id = peek_id }) ->
Lsp.IdKey.compare id peek_id = 0
| _ -> false
with
| M.Is_response -> false)

(** LEGACY FUNCTION, to be deleted once we roll out lsp_cancellation *)
let cancel_if_has_pending_cancel_request
(client : Jsonrpc.t) (message : lsp_message) : unit =
match message with
| ResponseMessage _ -> ()
| NotificationMessage _ -> ()
| RequestMessage (id, _request) ->
| RequestMessage _ ->
(* Scan the queue for any pending (future) cancellation messages that are requesting
cancellation of the same id as our current request *)
let pending_cancel_request_opt =
Jsonrpc.find_already_queued_message client ~f:(fun { Jsonrpc.json; _ } ->
try
let peek =
Lsp_fmt.parse_lsp json (fun _ ->
failwith "not resolving responses")
in
match peek with
| NotificationMessage
(CancelRequestNotification { Lsp.CancelRequest.id = peek_id })
->
Lsp.IdKey.compare id peek_id = 0
| _ -> false
with
| _ -> false)
Jsonrpc.find_already_queued_message
client
~f:(is_cancellation_of_request ~message)
in
(* If there is a future cancellation request, we won't even embark upon this message *)
if Option.is_some pending_cancel_request_opt then
Expand All @@ -4146,6 +4159,72 @@ let cancel_if_has_pending_cancel_request
else
()

(** This will run [f] but wrap it up in a way that respects cancellation:
1. If there's already a cancellation request for [message] in the queue, then we'll
just raise [RequestCancelled] Lsp exception immediately.
2. Otherwise, we'll await the callback [f], but if a cancellation request arrives
in the queue while awaiting then we'll use [WorkerCancel.stop_workers] to cooperatively
request it to finish.
INVARIANT: only one concurrent caller should invoke this function. (why? because
our cancellation mechanism [WorkerCancel.stop_workers] is a single bit, hence unable
to know which of multiple concurrent calls should be stopped). If this requirement
is violated, (1) we might end up cancelling the wrong thing, (2) we'll log
invariant_violation_bugs in [ide_rpc], here, and other places that call
[assert_workers_are_not_stopped]. It's easy to see how our caller upholds
the invariant -- we have only a single caller, the main ClientLsp message-loop,
which isn't concurrent with itself.
INVARIANT: This function assumes the invariant that workers are not stopped
prior to entering this function, and restores the invariant upon exiting.
(Again, this would go wrong in the presence of multiple concurrent callers). *)
let respect_cancellation
(client : Jsonrpc.t) ~(predicate : Jsonrpc.timestamped_json -> bool) ~f =
assert_workers_are_not_stopped ();
(* Explanation: this statement embodies the invariant that workers are not
stopped prior to entering. This function is the only place in the code which
can stop workers, and it unstops workers in its [~finally] clause. Because
of the invariant that no one will call this function concurrently with itself,
we don't run into problems. *)
let (cancellation_token, cancellation_source) = Lwt.wait () in
(* Following promise will be fulfilled immediately if cancellation is already in the queue;
otherwise will be fulfilled when it eventually arrives, or with [None] if [cancellation_token]
is completed first or if there's a stream error *)
let promise =
Jsonrpc.await_until_found client ~predicate ~cancellation_token
in
(* If there was already a cancel request in the queue, we'll skip all processing right now! *)
if not (Lwt.is_sleeping promise) then begin
raise
(Error.LspException
{
Error.code = Error.RequestCancelled;
message = "request cancelled";
data = None;
})
end;
(* Otherwise, cancellation will cooperatively request clientIdeDaemon to stop *)
let promise =
Lwt.map
(fun cancel_notification_opt ->
match cancel_notification_opt with
| None -> ()
| Some { Jsonrpc.timestamp; _ } ->
Hh_logger.log
"acting upon cancellation at %s"
(Utils.timestring timestamp);
WorkerCancel.stop_workers ())
promise
in
let%lwt result =
Lwt_utils.try_finally ~f ~finally:(fun () ->
ensure_cancelled cancellation_source;
let%lwt () = promise in
WorkerCancel.resume_workers ();
Lwt.return_unit)
in
Lwt.return result

(** This function is called upon didOpen/didChange
to stick a uri into the [uris_that_need_check] queue. *)
let send_file_to_ide_and_defer_check
Expand Down Expand Up @@ -4285,15 +4364,14 @@ let set_verbose_to_file
in
Lwt.return_unit

(* Process and respond to a message from VSCode, and update [state] accordingly. *)
(** Process and respond to a message from VSCode, and update [state] accordingly. *)
let handle_client_message
~(state : state ref)
~(client : Jsonrpc.t)
~(ide_service : ClientIdeService.t ref)
~(metadata : incoming_metadata)
~(message : lsp_message)
~(ref_unblocked_time : float ref) : result_telemetry option Lwt.t =
cancel_if_has_pending_cancel_request client message;
let%lwt result_telemetry_opt =
(* make sure to wrap any exceptions below in the promise *)
let tracking_id = metadata.tracking_id in
Expand Down Expand Up @@ -4995,6 +5073,10 @@ let main
* clientLsp message-loop or hh_server, and can start actually handling.
* Everything that blocks will update this variable. *)
let process_next_event () : unit Lwt.t =
assert_workers_are_not_stopped ();
(* This assertion is satisfied because the only place that stops workers is inside
[respect_cancellation], but its ~finally clause guarantees that it unstops workers
before it completes i.e. before we continue this [process_next_event] loop. *)
try%lwt
let%lwt () =
match !deferred_action with
Expand All @@ -5020,13 +5102,32 @@ let main
let%lwt result_telemetry_opt =
match event with
| Client_message (metadata, message) ->
handle_client_message
~state
~client
~ide_service
~metadata
~message
~ref_unblocked_time
if !env.local_config.ServerLocalConfig.lsp_cancellation then begin
(* Note: the function [respect_cancellation] requires the invariant that it not be called
concurrently with itself. That's satisfied because here in the ClientLsp main loop
is the only place we call it, and the main loop isn't concurrent. *)
respect_cancellation
client
~predicate:(is_cancellation_of_request ~message)
~f:(fun () ->
handle_client_message
~state
~client
~ide_service
~metadata
~message
~ref_unblocked_time)
end else begin
(* TODO(ljw): delete this branch once we roll out lsp_cancellation *)
cancel_if_has_pending_cancel_request client message;
handle_client_message
~state
~client
~ide_service
~metadata
~message
~ref_unblocked_time
end
| Daemon_notification notification ->
handle_daemon_notification
~state
Expand Down
22 changes: 19 additions & 3 deletions hphp/hack/src/client/ide_service/clientIdeDaemon.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1225,12 +1225,28 @@ let handle_one_message_exn
failwith ("Unexpected GotNamingTable in " ^ state_to_log_string state)
| (_, Some (ClientRequest { ClientIdeMessage.tracking_id; message })) ->
let unblocked_time = Unix.gettimeofday () in
(* Our caller has an exception handler which logs the exception.
But we instead must fulfil our contract of responding to the client,
even if we have an exception. Hence we need our own handler here. *)
let (state, response) =
try handle_request message_queue state tracking_id message with
| WorkerCancel.Worker_should_exit as exn ->
let e = Exception.wrap exn in
(* When is this exception raised? several places during Typing_toplevel
inner-loops call [WorkerCancel.raise_if_stop_requested]. So: it will
be raised during [Tast_provider.compute_tast*] shortly after
ClientLsp has called [WorkerCancel.stop_workers], which it does
if it sees $/cancelRequest LSP notification on the incoming queue. *)
let stack = Exception.get_backtrace_string e |> Exception.clean_stack in
let lsp_error =
{
Lsp.Error.code = Lsp.Error.RequestCancelled;
message = Exception.get_ctor_string e;
data = Some (Hh_json.JSON_Object [("stack", Hh_json.string_ stack)]);
}
in
(state, Error lsp_error)
| exn ->
(* Our caller has an exception handler which logs the exception.
But we instead must fulfil our contract of responding to the client,
even if we have an exception. Hence we need our own handler here. *)
let e = Exception.wrap exn in
let reason = ClientIdeUtils.make_rich_error "handle_request" ~e in
(state, Error (ClientIdeUtils.to_lsp_error reason))
Expand Down
29 changes: 29 additions & 0 deletions hphp/hack/src/utils/jsonrpc/jsonrpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,35 @@ let find_already_queued_message ~(f : timestamped_json -> bool) (t : t) :
~init:None
t.messages

let await_until_found
(t : t)
~(predicate : timestamped_json -> bool)
~(cancellation_token : unit Lwt.t) : timestamped_json option Lwt.t =
match find_already_queued_message ~f:predicate t with
| Some message -> Lwt.return_some message
| None ->
let rec loop () : timestamped_json option Lwt.t =
let%lwt () =
Lwt.pick
[
Lwt_unix.wait_read (Lwt_unix.of_unix_file_descr t.daemon_in_fd);
cancellation_token;
]
in
let was_cancelled = not (Lwt.is_sleeping cancellation_token) in
if was_cancelled then
Lwt.return_none
else
match%lwt read_single_message_into_queue_wait t with
| Timestamped_json message when predicate message ->
Lwt.return_some message
| Timestamped_json _ -> loop ()
| Fatal_exception _
| Recoverable_exception _ ->
Lwt.return_none
in
loop ()

let get_message (t : t) =
(* Read one in a blocking manner to ensure that we have one. *)
let%lwt () =
Expand Down
14 changes: 14 additions & 0 deletions hphp/hack/src/utils/jsonrpc/jsonrpc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ val await_until_message :
val find_already_queued_message :
f:(timestamped_json -> bool) -> t -> timestamped_json option

(** This awaits until a message is found which satisfies the predicate,
and returns it as [Some].
If the message was already in the queue at the time this function was called,
then the returned promise will be already-completed.
If [cancellation_token] is fired before a message is found, then
the returned promise will get resolved with [None].
If there's a fault with the incoming file-descriptor like EOF, the returned promise
also gets resolved to [None]. *)
val await_until_found :
t ->
predicate:(timestamped_json -> bool) ->
cancellation_token:unit Lwt.t ->
timestamped_json option Lwt.t

val get_message :
t ->
[> `Message of timestamped_json
Expand Down

0 comments on commit 512bf66

Please sign in to comment.