Заметка о Lwt

Что такое Lwt

Lwt это одна из наиболее популярных OCaml библиотек, разрабатываемая сообществом. По сути это просто реализация кооперативной многозадачности (как альтернативы вытесняющей многозадачности) в OCaml на основе Promises.

В дальнейшем я буду использовать слова “поток” и промис как взаимозаменямые.

Виды “многозадачностей”

Этот параграф можно пропустить

Если в двух словах, то вытесняющую и кооперативную многозадачность можно объяснить следующим образом:

  • Вытесняющая многозадачность означает, что у нас есть некий планировщик, принимающий решения о том, когда будет выполняться какой поток, обычно принимая во внимание приоритет потоков и выделяя им некие кванты времени. Этот вид многозадачности используется в большинстве современных ОС.

  • Кооперативная многозадачность – планировщик не принимает решение о переключении потоков. Поток должен явно сигнализировать о том, что он готов прерваться и предоставить процессорное время другим потокам.

Асинхронное программирование с Lwt и промисы

Lwt предоставляет нам тип данных 'a Lwt.t. Можно относиться к нему, как к “потоку”. Это обычный промис, содержащий некоторое значение типа 'a, которое может быть вычислено, например, когда-то в будущем, но только один раз. В терминах OCaml это просто 'a ref, который будет заполнен позже. Изначально значения в нём нет и промис находится в состоянии Sleep (pending). Когда вычисление завершено успешно, Lwt помещает результат типа 'a в наш промис и его состояние становится Return (resolved). В случае же неудачи его состояние изменяется на Fail (rejected) и в него помещается произошедшая ошибка.

API

Вот как выглядит тип, описывающий состояния:

type 'a state =
  | Return of 'a
  | Fail of exn
  | Sleep

Lwt предоставляет несколько полезных ф-ций:

val return : 'a -> 'a Ltw.t

Понятно, что эта ф-ция тривиально упаковывает уже известно значение в промис. Соответственно, этот промис сразу находится в состоянии Return (resolved).

Значения, упакованные в Lwt.t, не могут быть просто извлечены (т.к. вычисление может быть ещё не завершено). Для этого мы должны использовать оператор bind:

val (>>=): 'a Lwt.t -> ('a -> 'b Lwt.t) -> 'b Lwt.t

Такая конструкция позволяет нам делать композицию промисов в “монадическом” стиле.

f >>= g создаст нам промис, который ждёт пока завершится “поток” f : 'a Lwt.t, получает результат его вычислений (некий x : 'a) и передаёт его в ф-цию g : 'a -> 'b Ltw.t, которая запускает поток 'b Ltw.t. Если f находится в состоянии pending, то и f >>= g будет в состоянии pending. Соответственно, если f завершится с ошибкой, то и f >>= g завершится с той же самой ошибкой.

Несколько других полезных ф-ций:

val join : unit Lwt.t list -> unit Lwt.t

join получает список потоков и ожидает пока они все завершатся. Если хотя бы один из них завершится с ошибкой (перейдёт в состояние rejected), то и результирующий поток завершится с той же ошибкой (после того, как все остальные потоки завершат свои вычисления).

val choose : 'a t list -> 'a t

choose ждёт пока выполнится хотя бы один поток. Если таких оказалось несколько, то в качестве результата выбирается один из них cлучайным образом.

На самом деле их огромное множество и все их можно найти тут.

PPX

У нас есть следующий синтаксический сахар:

let%lwt i = f () in
...

Что эквивалентно следующему фрагменту:

Lwt.bind (f ()) (fun i -> ...)

Примеры

Следующие примеры я взял из официального туториала

Чтение из STDIN без Lwt, выполнение блокируется пока не поступит пользовательский ввод:

let () =
  let line : string = Pervasives.read_line () in
  print_endline "Now unblocked!";
  ignore line

С промисами Lwt выполнение продолжается:

let () =
  let line_promise : string Lwt.t = Lwt_io.(read_line stdin) in
  print_endline "Execution just continues...";
  ignore line_promise

В данном случае это не совсем то, что нам нужно, т.к. программа сразу завершает работу так и не дождавшись пользовательского ввода. Чтобы заблокироваться и подождать пока промис выполнится мы можем использовать ф-цию Lwt_main.run:

let () =
  let line_promise : string Lwt.t = Lwt_io.(read_line stdin) in
  print_endline "Execution just continues...";
  let line : string = Lwt_main.run line_promise in
  ignore line

Lwt_main.run используется только 1 раз, чтобы подожать пока завершится промис самого верхнего уровня. Когда этот промис выполнится, программа завершает работу.

Ещё примеры:

Опять же, я не заморачивался и взял примеры из официального туториала, тк я слишком ленивый, чтобы придумывать их самому.

Пример функции, которая печатает “tic” каждую секунду, не блокируя другие “потоки”:

let rec tic () =
  print_endline "tic";
  let%lwt () = Lwt_unix.sleep 1.0 in
  tic ()

Пример запуска нескольких “потоков” и ожидание их результатов.

Допустим у нас есть пара ф-ций f и g:

val f : unit -> unit Lwt.t
val g : unit -> unit Lwt.t

Следующий код запустит их вычисления последовательно:

let%lwt () = f () in
let%lwt () = g () in
...

А вот так мы можем запустить обе ф-ции конкурентно и сразу же подождать их завершения:

let p1 = f () in
let p2 = g () in
let%lwt () = p1 in
let%lwt () = p2 in
...

Пример ф-ции, которая выполняет все вычисления из списка конкрурентно:

let rec map f l =
  match l with
  | [] -> Lwt.return []
  | v :: r ->
      let t = f v in
      let rt = map f r in
      let%lwt v' = t in
      let%lwt l' = rt in
      Lwt.return (v' :: l')

Следующая ф-ция наоборот, ожидает пока выполнится каждое предыдущее вычисление из списка, прежде чем запустить следующее:

let rec map_serial f l =
  match l with
  | [] -> return []
  | v :: r ->
      let%lwt v' = f v in
      let%lwt l' = map_serial f r in
      Lwt.return (v' :: l')

Конечно, это только самые “вершки” Lwt.

Полезные ссылки

  • Promises в книге Functional Programming in OCaml
  • Lwt Gitter
  • Туториал Lwt in 5 minutes
  • Lwt мануал
  • Туториал про то, как сделать простой сервер на сокетах, позволяющий клиентам подключаться, увеличивать счётчик и читать его значение.