apothik

A la découverte des applications distribuées avec Elixir. Distributed applications with Elixir, a beginners' journey

Accueil Partie 1 Partie 2

Phase 3 : Garantir la conservation des données malgré la perte de machine

Soyons honnêtes, le contenu de cette phase a été choisi au départ de notre aventure, sans connaître le sujet. En réalité, nous avons bien senti la grande différence de difficulté en l’abordant. Jusqu’ici, les choses semblaient relativement faciles, même si nous sommes conscients que nous sommes probablement passés à côté de difficultés nombreuses qui ne manqueraient pas d’apparaître dans un vrai contexte de production. Mais, reconnaissons le, nous n’avons eu jusqu’à présent qu’à rajouter une petite couche de code qui tire parti des possibilités toutes faites fournies par la BEAM.

Nous n’allons pas raconter toutes nos tentatives et nos errements car ce serait long et fastidieux, mais plutôt tenter de présenter un chemin de découverte du problème (pas forcément de la solution, d’ailleurs) qui soit passablement linéaire.

Redondance, vous avez dit redondance ?

Notre approche est un peu naïve : si on stocke plusieurs fois la données (redondance), on se dit intuitivement qu’on doit pouvoir la retrouver en cas de perte d’un noeud.

Essayons déjà de la stocker plusieurs fois et on verra si on peut la retrouver après. Pour se simplifier la vie, revenons dans le cas d’un cluster statique (nombre de machines constant hors moments de pannes) et oublions donc les notions de HashRing de la phase 2. Pour fixer les idées, prenons 5 machines comme initialement.

Comment choisir sur quelle machine stocker la donnée et stocker les copies ?

Au début, influencés par cette façon de poser le problème, nous sommes partis de l’idée d’un noeud “maître” et de “réplicas”. Par exemple, un maître et 2 réplicas. Dans cette hypothèse, la donnée est écrite 3 fois.

Mais est-ce que cette dissymétrie entre rôles de maître et réplicas est finalement une bonne idée ? Que se passe-t-il si le maître tombe ? Il faudrait alors se rappeler quel noeud est le nouveau, et que cette connaissance se répande correctement dans le cluster tout entier à chaque événement (perte et retour). Et finalement, à quoi bon?

Ces tâtonnements ont fait émerger une idée beaucoup plus simple, celle de groupe.

Un groupe est un ensemble de noeuds définis et fixes, qui ont des rôles symétriques au sein du groupe. Voici ce que nous avons choisi. Dans le cas d’un cluster de 5 noeuds numérotés de 0 à 4, nous avons aussi 5 groupes numérotés de 0 à 4. Le groupe 0 aura les noeuds 0,1 & 2. Le groupe 4 aura les noeuds 4,0 & 1. De façon générale, le groupe n a les noeuds n+1 et n+2 modulo 5. Inversement, le noeud 0 appartiendra donc au groupe 0, 4 & 3.

En résumé, tout groupe a 3 noeuds et tout noeud appartient à 3 groupes. Cela se traduit dans le code par:

defp nodes_in_group(group_number) do
n = Cluster.static_nb_nodes()
[group_number, rem(group_number + 1 + n, n), rem(group_number + 2 + n, n)]
end

defp groups_of_a_node(node_number) do
n = Cluster.static_nb_nodes()
[node_number, rem(node_number - 1 + n, n), rem(node_number - 2 + n, n)]
end

Petite parenthèse, nous avons adapté le code de Apothik.Cluster. Voici les éléments principaux:

defmodule Apothik.Cluster do
  alias Apothik.Cache
  use GenServer

  @nb_nodes 5

  @hosts for i <- 0..(@nb_nodes - 1), into: %{}, do: {i, :"apothik_#{i}@127.0.0.1"}

  def static_nb_nodes(), do: @nb_nodes

  def node_name(i), do: @hosts[i]

  def number_from_node_name(node) do
    Enum.find(@hosts, fn {_, v} -> v == node end) |> elem(0)
  end

  (...etc...)
end

Une clé est alors assignée à un groupe et non à un noeud. La fonction key_to_node/1 devient:

defp key_to_group_number(k), do: :erlang.phash2(k, Cluster.static_nb_nodes())

Ecritures multiples

Que va-t-il se passer quand on écrit une valeur dans le cache ? On va l’écrire 3 fois.

Tout d’abord, on peut envoyer l’ordre d’écriture à n’importe quel noeud (rappelons que l’on peut s’adresser au cache de n’importe quel noeud du cluster). Celui-ci va alors déterminer le groupe, puis retenir un noeud au hasard dans le groupe, sauf s’il se trouve que c’est lui-même auquel cas il se privilégie (petite optimisation).

Voici ce que cela donne (attention, dans ce texte, l’ordre des fonctions est guidé par la compréhension et ne suit pas nécessairement l’ordre dans le code lui-même)

  def put(k, v) do
    alive_node = k |> key_to_group_number() |> pick_a_live_node() |> Cluster.node_name()
    GenServer.call({__MODULE__, alive_node}, {:put, k, v})
  end

  defp pick_a_live_node(group_number) do
    case live_nodes_in_a_group(group_number) do
      [] -> nil
      l -> pick_me_if_possible(l)
    end
  end

  defp live_nodes_in_a_group(group_number),
    do: group_number |> nodes_in_group() |> Enum.filter(&alive?/1)

  defp alive?(node_number) when is_integer(node_number),
    do: node_number |> Cluster.node_name() |> alive?()

  defp alive?(node_name), do: node_name == Node.self() or node_name in Node.list()

  def pick_me_if_possible(l) do
    case Enum.find(l, fn i -> Cluster.node_name(i) == Node.self() end) do
      nil -> Enum.random(l)
      i -> i
    end
  end

Notez que l’on n’a pas besoin de garder l’état du cluster par ailleurs. Nous avons une topologie statique et basée sur les noms, donc Node.list/1 est suffisant pour nous renseigner sur l’état de tel ou tel noeud.

A présent, le coeur du sujet, l’écriture multiple.

  def handle_call({:put, k, v}, _from, state) do
    k
    |> key_to_group_number()
    |> live_nodes_in_a_group()
    |> Enum.reject(fn i -> Cluster.node_name(i) == Node.self() end)
    |> Enum.each(fn replica -> put_as_replica(replica, k, v) end)

    {:reply, :ok, Map.put(state, k, v)}
  end

Tout d’abord, on identifie les autres noeuds du groupe qui sont en fonction et on lance put_as_replica/4 sur chacun (je sais pas si vous êtes comme nous, mais on adore les pipelines avec des belles séries de |> qui se lisent comme un roman). Ensuite, on modifie l’état du noeud courant avec un classique ` {:reply, :ok, Map.put(state, k, v)}`.

Quant à put_replica/3, c’est encore plus simple:

  def put_as_replica(replica, k, v) do
    GenServer.cast({__MODULE__, Cluster.node_name(replica)}, {:put_as_replica, k, v})
  end
  def handle_cast({:put_as_replica, k, v}, state) do
    {:noreply, Map.put(state, k, v)}
  end

Dans l’enfer du deadlock

Attention, attention à ce que nous avons subrepticement glissé ici. Ca n’a l’air de rien mais c’est là que nous basculons dans l’enfer des applications distribuées.

Touchons en un mot avant d’y revenir. Nous avons utilisé GenServer.cast. Et pas GenServer.call. Ce qui signifie que nous envoyons une bouteille à la mer pour dire à nos voisins “met à jour ton cache” mais nous n’attendons pas de retour. Rien ne nous garantit que le message a été traité. Nous n’avons pas d’acquittement. Ainsi, les noeuds du groupe n’ont aucune garantie d’avoir le même état.

Pourquoi ne pas mettre un call, alors?

C’est ce que nous avons fait au début, naïfs que nous étions (et nous le sommes encore; il faut visiblement des années de déniaisement dans le monde distribué). Mais nous sommes tombé dans l’enfer du deadlock. Si deux noeuds du groupe sont simultanément sollicités pour mettre à jour le cache, ils peuvent s’attendre indéfiniment. Dans la réalité, il y a un timeout standard, que l’on peut changer dans les options de GensServer.call. Mais il lance une exception quand il est atteint.

Ca y est, nous sommes entrés dans la cour des grands. Plus précisément, nous l’apercevons au travers du grillage.

Récapitulatif et tests

Voilà ce que ça donne à la fin (notez get/1, qui fonctionne comme attendu, et l’absence de delete qui serait à l’image de put, mais parfois la flemme sort gagnante, voilà tout)

defmodule Apothik.Cache do
  use GenServer
  alias Apothik.Cluster

  defp nodes_in_group(group_number) do
    n = Cluster.static_nb_nodes()
    [group_number, rem(group_number + 1 + n, n), rem(group_number + 2 + n, n)]
  end

  defp alive?(node_number) when is_integer(node_number),
    do: node_number |> Cluster.node_name() |> alive?()

  defp alive?(node_name), do: node_name == Node.self() or node_name in Node.list()

  defp live_nodes_in_a_group(group_number),
    do: group_number |> nodes_in_group() |> Enum.filter(&alive?/1)

  def pick_me_if_possible(l) do
    case Enum.find(l, fn i -> Cluster.node_name(i) == Node.self() end) do
      nil -> Enum.random(l)
      i -> i
    end
  end

  defp pick_a_live_node(group_number) do
    case live_nodes_in_a_group(group_number) do
      [] -> nil
      l -> pick_me_if_possible(l)
    end
  end

  defp key_to_group_number(k), do: :erlang.phash2(k, Cluster.static_nb_nodes())

  #### GenServer Interface
  def get(k) do
    alive_node = k |> key_to_group_number() |> pick_a_live_node() |> Cluster.node_name()
    GenServer.call({__MODULE__, alive_node}, {:get, k})
  end

  def put(k, v) do
    alive_node = k |> key_to_group_number() |> pick_a_live_node() |> Cluster.node_name()
    GenServer.call({__MODULE__, alive_node}, {:put, k, v})
  end

  def put_as_replica(replica, k, v) do
    GenServer.cast({__MODULE__, Cluster.node_name(replica)}, {:put_as_replica, k, v})
  end

  def stats(), do: GenServer.call(__MODULE__, :stats)

  def update_nodes(_nodes), do: nil

  def start_link(args), do: GenServer.start_link(__MODULE__, args, name: __MODULE__)

  @impl true
  def init(_args), do: {:ok, %{}}

  @impl true
  def handle_cast({:put_as_replica, k, v}, state), do: {:noreply, Map.put(state, k, v)}

  @impl true
  def handle_call({:get, k}, _from, state), do: {:reply, Map.get(state, k), state}

  def handle_call({:put, k, v}, _from, state) do
    k
    |> key_to_group_number()
    |> live_nodes_in_a_group()
    |> Enum.reject(fn i -> Cluster.node_name(i) == Node.self() end)
    |> Enum.each(fn replica -> put_as_replica(replica, k, v) end)

    {:reply, :ok, Map.put(state, k, v)}
  end

  def handle_call(:stats, _from, state), do: {:reply, map_size(state), state}
end

Vite, vérifions que ça marche. ./scripts/start_cluster.sh dans un terminal, et faison dans un autre:

 % ./scripts/start_master.sh
1> Master.put(1, :a_key, 100)
:ok
2> Master.stat
[{0, 0}, {1, 1}, {2, 1}, {3, 1}, {4, 0}]
3> Master.get(3, :a_key)
100
4> Master.put(2, :another_key, 200)
:ok
5> Master.get(3, :another_key)
200
6> Master.stat
[{0, 1}, {1, 1}, {2, 1}, {3, 2}, {4, 1}]
7> Master.fill(1,5000)
:ok
8> Master.stat
[{0, 2993}, {1, 2968}, {2, 3035}, {3, 3031}, {4, 2979}]
9> Master.sum
15006

On voit bien que :a_key est sur 3 noeuds. On accède bien au cache de n’importe quel noeud et les données sont bien présentes et quand on a chargé les données sur le cluster, on a bien un total de 15006 valeurs stockées, qui est bien 5002 * 3.

Récupération de données

Que se passe-t-il en cas de perte de noeud ? Eh bien, nous perdons les données, pardi !

% ./scripts/start_master.sh
1> Master.fill(1,5000)
:ok
2> Master.kill(2)
:ok
3> Master.stat
[  {0, 2992},  {1, 2967},  {2, {:badrpc, :nodedown}},  {3, 3029},  {4, 2978}]

Dans un autre terminal, ./scripts/start_instance.sh 2 (oui, nous avons écrit un petit script équivalent à elixir --name apothik_$1@127.0.0.1 -S mix run --no-halt) et revenons dans le master:

4> Master.stat
[{0, 2992}, {1, 2967}, {2, 0}, {3, 3029}, {4, 2978}]

apothik_2 est bien vide comme prévu.

Pour récupérer la donnée, essayons une approche toute simple: quand un noeud démarre, il interroge autour de lui pour récupérer l’information manquante. Cela s’appelle “se réhydrater”.

  def init(_args) do
    me = Node.self() |> Cluster.number_from_node_name()

    for g <- groups_of_a_node(me), peer <- nodes_in_group(g), peer != me, alive?(peer) do
      GenServer.cast({__MODULE__, Cluster.node_name(peer)}, {:i_am_thirsty, g, self()})
    end

    {:ok, %{}}
  end

  def handle_cast({:i_am_thirsty, group, from}, state) do
    filtered_on_group = Map.filter(state, fn {k, _} -> key_to_group_number(k) == group end)
    GenServer.cast(from, {:drink, filtered_on_group})
    {:noreply, state}
  end

  def handle_cast({:drink, payload}, state) do
    {:noreply, Map.merge(state, payload)}
  end

A l’initialisation du noeud, celui-ci demande leur contenu à tous les noeuds de tous les groupes auquel il appartient (sauf à lui-même). Attention, pas exactement tout leur contenu, uniquement leur contenu au titre de l’appartenance à un groupe fixé. Par exemple, si le noeud 0, au titre du groupe 0, demande au noeud 2 son contenu, il ne veut pas récupérer le contenu du noeud 2 au titre de son appartenance au groupe 2 (qui est constitué des noeuds 2,3 & 4, donc pas du noeud 0).

Aussi, le {:i_am_thirsty, g, self()}) indique-t-il le groupe au titre duquel se fait la demande, ainsi que l’adresse de retour self(). Le noeud répondant va devoir filtrer les valeurs de sa mémoire, c’est le sens de key_to_group_number(k) == group.

A la réception, c’est à dire à la réhydratation (:drink), on fusionne simplement les map.

Est ce que ça marche? On relance le cluster, puis:

% ./scripts/start_master.sh
1> Master.fill(1,5000)
:ok
2> Master.stat
[{0, 2992}, {1, 2967}, {2, 3034}, {3, 3029}, {4, 2978}]
3> Master.kill(2)
:ok
4> Master.stat
[ {0, 2992},  {1, 2967},  {2, {:badrpc, :nodedown}},  {3, 3029},  {4, 2978}]

et dans un autre terminal % ./scripts/start_instance.sh 2. Retour dans le master

5> Master.stat
[{0, 2992}, {1, 2967}, {2, 3034}, {3, 3029}, {4, 2978}]

Ca marche …mais que c’est moche, mon dieu, que c’est moche

Il y a tellement de choses criticables qu’on ne sait pas par où commencer.

Lançons nous:

Et c’est sans compter les soucis réseau, avec des latences et des déconnections qui peuvent isoler des noeuds pendant quelques millisecondes et désynchroniser l’état du cluster sur un ou plusieurs groupes.

Est-ce que ça marche vraiment?

C’est à ce moment-là que nous sommes partis sur une pente que les informaticiens connaissent bien, et qui a été parfaitement imagée par l’écureuil du film “l’age de glâce”. Nous avons essayé de colmater la première brêche, puis la deuxième, mais la première en a ouvert une troisième, etc.

Car, oui, nous avions toujours notre ambition initiale en tête. Vous vous souvenez “Ajout de redondance de stockage pour garantir la conservation des données malgré la perte de machine”. Peut-être que le “garantir” était fortement présomptueux, après tout.

Et là nous est revenue la phrase de Johanna Larsonn, qui disait à peu près “Attention, les applications distribuées c’est extrêment difficile mais il y a quand même des cas où l’on peut se lancer”.

Le mot “garantir” nous a entrainé trop loin.

Après toutes ces erreurs, nous avons compris que ce qui fait la difficulté d’un système distribué, ce sont les qualités que l’on en attend. Nous avons voulu inconsciemment offrir à notre cache distribué une partie des qualités d’une base de données distribuée, ce qui est clairement au-delà de nos faibles forces de débutants.

En résumé, non, cela ne marche pas. En tout cas si l’on ambitionne d’aller au bout de notre phase 3 avec son titre ronflant et présomptueux.

Peut-on viser plus bas ?

Mais peut-être que nous pourrions nous concentrer sur les qualités attendues d’un cache, et pas plus.

Et c’est le principal morceau de sagesse que nous a procuré notre travail jusqu’ici. Il faut avant tout préciser clairement les qualités attendues de l’application distribuée.. Chaque qualité se payant très cher.

A ce sujet, il nous revient que nous avons lu un jour un fameux théorême qui explique qu’il est mathématiquement impossible de tout avoir à la fois: le théorême CAP. Et ce ne doit pas le seul théorême d’impossibilité.

En application distribuée, “le beurre et l’argent du beurre” est encore plus inatteignable que d’habitude.

Revenons à notre entreprise de coupe claire dans nos ambitions. Ce que nous voulons pour notre cache, c’est

Essayons de nous réhydrater gorgée par gorgée

Revenons sur nos pas: supprimons notre méthode d’hydratation massive au démarrage, ainsi que les handle associés . Vous vous souvenez, ce truc:

for g <- groups_of_a_node(me), peer <- nodes_in_group(g), peer != me, alive?(peer) do
  GenServer.cast({__MODULE__, Cluster.node_name(peer)}, {:i_am_thirsty, g, self()})
end

Notre idée est de commencer de façon extrêmement modeste. Au démarrage du noeud, rien de particulier ne se passe, pas de réhydratation. Le noeud va réagir aux putet put_as_replica comme d’habitude. En revanche, il va gérer les get de façon différente. S’il possède la clé en cache, il renvoie évidemment la valeur associée. En revanche, s’il n’a pas la clé, il va demander à un noeud du groupe la valeur, la stocker et la retourner.

Cette approche assure une réhydratation progressive sur la base de la demande constatée.

Nous allons avoir besoin que l’état de notre GenServer soit plus riche que simplement les données de cache. Nous ajoutons :

defstruct cache: %{}, pending_gets: %{}

cache sont pour les données de cache, pending_gets est expliqué plus bas. Toutes les méthodes doivent être légèrement et trivialement adaptées pour transformer ce qui fut state en state.cache.

Voici à quoi ressemble la méthode get:

  def handle_call({:get, k}, from, state) do
    %{cache: cache, pending_gets: pending_gets} = state

    case Map.get(cache, k) do
      nil ->
        peer =
          k
          |> key_to_group_number()
          |> live_nodes_in_a_group()
          |> Enum.reject(fn i -> Cluster.node_name(i) == Node.self() end)
          |> Enum.random()

        :ok = GenServer.cast({__MODULE__, Cluster.node_name(peer)}, {:hydrate_key, self(), k})

        {:noreply, %{state | pending_gets: [{k, from} | pending_gets]}}

      val ->
        {:reply, val, state}
    end
  end

Nous avons utilisé ici un mécanisme plus sophistiqué (nous laissons aux lecteurs le soin de nous dire si nous n’aurions pas pu utilement l’utiliser auparavant): un :no_reply en réponse d’un GenServer.handle_call/3. Notre objectif est que cette requête (aller chercher la valeur chez peer) ne soit pas bloquante pour le noeud qui se réhydrate. C’est une possibilité de GenServer: on peut renvoyer :noreply, qui laisse le process appelant en attente, mais libère l’exécution. Par la suite, la réponse est réalisée par un GenServer.reply/2. Allez voir l’exemple de la documentation, c’est parlant. Il faut néammoins se rappeler des appels en attentes, en premier lieu les pid des processus appelants.

En résumé (dans le cas d’une clé absente):

De son côté, le replica répond très simplement, toujours en asynchrone:

  def handle_cast({:hydrate_key, from, k}, %{cache: cache} = state) do
    :ok = GenServer.cast(from, {:drink_key, k, cache[k]})
    {:noreply, state}
  end

Et quand on revient:

  def handle_cast({:drink_key, k, v}, state) do
    %{cache: cache, pending_gets: pending_gets} = state

    {to_reply, new_pending_gets} =
      Enum.split_with(pending_gets, fn {hk, _} -> k == hk end)

    Enum.each(to_reply, fn {_, client} -> GenServer.reply(client, v) end)

    new_cache =
      case Map.get(cache, k) do
        nil -> Map.put(cache, k, v)
        _ -> cache
      end

    {:noreply, %{state | cache: new_cache, pending_gets: new_pending_gets}}
  end

Les étapes:

Essayons, avec dans un terminal ./scripts/start_cluster.sh et dans l’autre:

% ./scripts/start_master.sh
1> Master.put(4,"hello","world")
:ok
2> Master.stat
[{0, 1}, {1, 1}, {2, 0}, {3, 0}, {4, 1}]
3> Master.kill(0)
:ok
4> Master.stat
[{0, {:badrpc, :nodedown}}, {1, 1}, {2, 0}, {3, 0}, {4, 1}]

On tue le noeud 0 qui fait partie du groupe 4. Dans un autre terminal, on le relance ./scripts/start_instance.sh 0, puis:

5> Master.stat
[{0, 0}, {1, 1}, {2, 0}, {3, 0}, {4, 1}]
6> Master.get(4,"hello")
"world"
7> Master.stat
[{0, 0}, {1, 1}, {2, 0}, {3, 0}, {4, 1}]
8> Master.get(0,"hello")
"world"
9> Master.stat
[{0, 1}, {1, 1}, {2, 0}, {3, 0}, {4, 1}]
10> :rpc.call(:"apothik_0@127.0.0.1", :sys, :get_state, [Apothik.Cache]) 
%{cache: %{"hello" => "world"}, __struct__: Apothik.Cache, pending_gets: []}

Au retour, le noeud est vide. Si l’on s’adress au noeud 4, il va pouvoir répondre car il a la clé disponible. Cela ne change pas le contenu du noeud 0. En revanche, si on s’adresse au noeud 0, on voit bien qu’il obtient la réponse et qu’il se réhydrate. La liste des pending_gets a bien été vidée.

Entre parenthèse, c’est tellement commode qu’on ajoute à .iex.exs :

  def inside(i) do
    :rpc.call(:"apothik_#{i}@127.0.0.1", :sys, :get_state, [Apothik.Cache])
  end

Où en sommes nous?

Hydratation au démarrage

Maintenant que l’on s’est fait la main, et en complément, regardons si l’on ne peut pas améliorer notre système d’hydratation au démarrage. Vous vous souvenez que l’on demandait à tous les noeuds de tous les groupes d’envoyer en une fois les données. Là, nous allons demander à un noeud aléatoire de chaque groupe d’envoyer un petit paquet de données. A la réception, on demandera un autre paquet, et ainsi de suite jusqu’à épuisement du stock.

Au démarrage du noeud, on lance les demandes:

  def init(_args) do
    peers = pick_a_live_node_in_each_group()
    for {group, peer} <- peers, do: ask_for_hydration(peer, group, 0, @batch_size)
    {:ok, %__MODULE__{}}
  end

ask_for_hyration est la demande de paquet de données. Elle signifie “donne moi un paquet de données de taille @batch_size, à partir de l’indice 0, au titre du groupe group”.

La fonction pick_a_live_node_in_each_group fait ce que son nom indique. Il y a une petite astuce pour ne retenir qu’un noeud par groupe:

  def pick_a_live_node_in_each_group() do
    me = Node.self() |> Cluster.number_from_node_name()
    for g <- groups_of_a_node(me), peer <- nodes_in_group(g), peer != me, alive?(peer), into: %{}, do: {g, peer}
  end

On utilise des cast pour les requêtes et réponses.

  def ask_for_hydration(replica, group, start_index, batch_size) do
    GenServer.cast(
      {__MODULE__, Cluster.node_name(replica)},
      {:i_am_thirsty, group, self(), start_index, batch_size}
    )
  end

A la réception, le noeud (celui qui donne à boire) va ordonner ses clés (uniquement celle du groupe demandé), utiliser cet ordre pour les numéroter et renvoyer un paquet. Il renvoie le prochain index et un drapeau pour indiquer que c’est le dernier paquet, ce qui permettra au noeud appelant de s’arrêter de le solliciter.

  def handle_cast({:i_am_thirsty, group, from, start_index, batch_size}, %{cache: cache} = state) do
    filtered_on_group = Map.filter(cache, fn {k, _} -> key_to_group_number(k) == group end)

    keys = filtered_on_group |> Map.keys() |> Enum.sort() |> Enum.slice(start_index, batch_size)
    filtered = for k <- keys, into: %{}, do: {k, filtered_on_group[k]}

    hydrate(from, group, filtered, start_index + map_size(filtered), length(keys) < batch_size)

    {:noreply, state}
  end

Le hydrate est aussi un cast

  def hydrate(peer_pid, group, cache_slice, next_index, last_batch) do
    GenServer.cast(peer_pid, {:drink, group, cache_slice, next_index, last_batch})
  end

A la réception:

  def handle_cast({:drink, group, payload, next_index, final}, %{cache: cache} = state) do
    if not final do
      peer = pick_a_live_node_in_each_group() |> Map.get(group)
      ask_for_hydration(peer, group, next_index, @batch_size)
    end

    filtered_payload = Map.filter(payload, fn {k, _} -> not Map.has_key?(cache, k) end)
    {:noreply, %{state | cache: Map.merge(cache, filtered_payload)}}
  end

Si l’on n’a pas terminé, on demande le paquet suivant à un replica au hasard. Dans tous les cas on met à jour le cache, en ne retenant que les clés inconnues parmi le paquet fourni.

Testons. Dans un terminal ` ./scripts/start_cluster.sh` et dans l’autre:

% ./scripts/start_master.sh
1> Master.fill(1, 5000)
:ok
2> Master.stat
[{0, 2992}, {1, 2967}, {2, 3034}, {3, 3029}, {4, 2978}]
3> Master.kill(1)
:ok
4> Master.stat
[{0, 2992}, {1, {:badrpc, :nodedown}}, {2, 3034}, {3, 3029}, {4, 2978}]

On relance le noeud 1 dans un autre terminal % ./scripts/start_instance.sh 1 et, dans “master”, les clés sont revenues:

5> Master.stat
[{0, 2992}, {1, 2967}, {2, 3034}, {3, 3029}, {4, 2978}]

Petit bilan:

Le plus important: a-t-on bien des états cohérents entre les noeuds du même groupe ? En effet, le cluster continue de vivre, notamment des put surviennent pendant le processus d’hydratation. Pas facile de l’assurer car les scénarios sont multiples. Par exemple:

Ce qu’il faut retenir de cette analyse partielle (et peut-être inexacte), c’est qu’elle est difficile à mener. Il faut envisager tous les scénarios d’arrivées de message sur chacun des processus, sans surtout préjuger d’un ordre logique ou chronologique, et examiner si la cohérence du cache est endommagée dans chacun des scénarios.

Et si on faisait appel à des experts ?

Nous ne pouvons pas nous départir de l’idée que nous avons probablement trouvé des solutions assez frustres à des questions bien compliquées.

Comme précédemment, c’est le moment de chercher ce que des gens bien plus experts ont trouvé. La bonne nouvelle, c’est que cela existe, c’est le CRDT (Conflict-free replicated data type). Il s’agit de toute une famille de solutions qui permettent de synchroniser des états dans une configuration distribuée. Une recherche rapide sur internet parle même de “cohérence éventuelle”. C’est à dire que, si on ne touche plus au cache, les noeuds vont converger vers le même état au bout d’un certain temps.

Bon, nous ne comprenons pas tout en détail, mais nous savons que l’excellent Derek Kraan a notamment écrit une librairie qui implémente une variante (les delta-CRDT) pour les besoins de Horde, une application de supervision et de registre distribuée.

La librairie est delta_crdt_ex. Le README est carrément alléchant ! Voici l’exemple de la documentation:

{:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
{:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap)
DeltaCrdt.set_neighbours(crdt1, [crdt2])
DeltaCrdt.set_neighbours(crdt2, [crdt1])
DeltaCrdt.to_map(crdt1)
%{}
DeltaCrdt.put(crdt1, "CRDT", "is magic!")
Process.sleep(300) # need to wait for propagation
DeltaCrdt.to_map(crdt2)
%{"CRDT" => "is magic!"}

Le DeltaCrdt est exactement ce que nous voulons: un dictionnaire ! Il suffit de lancer des process DeltaCrdt. On présente ses voisins à chaque process. Attention, le lien est monodirectionnel, donc il faut présenter 1 à 2 et 2 à 1.

Bref, il semble que nous allons surtout beaucoup supprimer du code. Et en effet, c’est que nous allons faire !

Nommer les processus et les superviser

Reprenons: nous avons divisé notre cache en groupes. Chaque groupe est présent sur 3 noeuds. Nous voudrions avoir 3 processus DeltaCrdt qui se synchronisent par groupe, chacun sur un noeud différent. Nous décidons de nommer les process liés au groupe n: apothik_crdt_#{n}. Par exemple, le groupe 0 sera représenté par 3 process tous nommés apothik_crdt_0 répartis sur 3 noeuds différents, apothik_0, apothik_1 et apothik_2. Il est possible de présenter un voisin sans utiliser de pid mais en utilisant la convention {nom du process, nom du noeud}:

  DeltaCrdt.set_neighbours(crdt, [{"apothik_crdt_0", :"apothik_1"}])

Au démarrage d’un noeud, il faut instancier 3 processus, un pour chaque groupe porté par le noeud. Nous décidons de faire porter cette responsibilité par un superviseur.

defmodule Apothik.CrdtSupervisor do
  use Supervisor

  alias Apothik.Cache

  def start_link(init_arg), do: Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)

  @impl true
  def init(_init_args) do
    self = Cache.number_from_node_name(Node.self())
    groups = Cache.groups_of_a_node(self)

    children =
      for g <- groups do
        Supervisor.child_spec({Apothik.Cache, g}, id: "apothik_cache_#{g}")
      end

    Supervisor.init(children, strategy: :one_for_one)
  end
end

Attention, nous ne lançons pas directement un processus DeltaCrdt mais {Apothik.Cache, g}, nommé apothik_cache_#{g}. Nous y revenons ci-dessous.

Et ce superviseur est lancé par l’application (qui lance libcluster aussi):

defmodule Apothik.Application do
use Application

  @impl true
  def start(_type, _args) do
  (... same as before ...)

    children = [
      {Cluster.Supervisor, [topologies, [name: Apothik.ClusterSupervisor]]},
      Apothik.CrdtSupervisor
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: Apothik.Supervisor)
  end
end

Présenter les voisins au bon moment

Nous devons présenter ses 2 partenaires de jeux à chaque DeltaCrdt. Pour insérer ce comportement, nous avons décidé de créer un processus intermédiaire (Apothik.Cache) dont la mission est d’instancier le DeltaCrdt et de lui présenter ses voisins. Leurs destins seront liés via l’usage de start_link. Ainsi, si le process DeltaCrdt se termine brusquement, le process Cache aussi et le superviseur le relancera.

Cela donne:

defmodule Apothik.Cache do
  use GenServer

  def crdt_name(i), do: :"apothik_crdt_#{i}"

  @impl true
  def init(g) do
    :net_kernel.monitor_nodes(true)
    {:ok, pid} =  DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, name: crdt_name(g))
    {:ok, set_neighbours(%{group: g, pid: pid})}
  end

  def set_neighbours(state) do
    my_name = crdt_name(state.group)
    self = number_from_node_name(Node.self())

    neighbours = for n <- nodes_in_group(state.group), n != self, do:{my_name, node_name(n)}
    DeltaCrdt.set_neighbours(state.pid, neighbours)

    state
  end
end

D’ailleurs, pour plus de sûreté, nous devons aussi représenter les voisins dès qu’un noeud rejoint le cluster. Cela explique la présence de :net_kernel.monitor_nodes(true) . Nous ajoutons:

  @impl true
  def handle_info({:nodeup, node}, state) do
    Task.start(fn ->
      Process.sleep(1_000)
      set_neighbours(state)
    end)

    {:noreply, state}
  end

  def handle_info({:nodedown, node}, state), do: {:noreply, state}

Nous avons ajouté un petit délai avant de positionner les voisins, pour laisser le noeud se mettre en route.

Changement de l’interface d’accès au cache

Et finalement, les actions fondamentales (get et put) deviennent:

  def get(k) do
    group = k |> key_to_group_number()
    alive_node = group |> pick_a_live_node() |> node_name()
    DeltaCrdt.get({:"apothik_crdt_#{group}", alive_node}, k)
  end

  def put(k, v) do
    group = k |> key_to_group_number()
    alive_node = group |> pick_a_live_node() |> node_name()
    DeltaCrdt.put({:"apothik_crdt_#{group}", alive_node}, k, v)
  end

Comme d’habitude, la clé donne le groupe en charge. Puis on détermine un noeud du groupe en privilégiant le noeud en cours. Puis nous appelons directement le process DeltaCrdt à partir de son nom. Et le tour est joué.

Essayons

Une petite modification d’abord pour récolter les statistiques, dans Apothik.CrdtSupervisor en appelant tous les DeltaCrdt des groupes du noeud

def stats() do
  self = Cache.number_from_node_name(Node.self())
  groups = Cache.groups_of_a_node(self)

  for g <- groups do
    DeltaCrdt.to_map(:"apothik_crdt_#{g}") |> map_size()
  end
  |> Enum.sum()
end

Et dans .iex.exs:

def stat(i) do
  :rpc.call(:"apothik_#{i}@127.0.0.1", Apothik.CrdtSupervisor, :stats, [])
end
def sum_stat() do
  {sum(), stat()}
end

Comme d’habitude, on lance ./scripts/start_cluster.sh dans un terminal et dans un autre:

% ./scripts/start_master.sh
1> Master.sum_stat
{0, [{0, 0}, {1, 0}, {2, 0}, {3, 0}, {4, 0}]}
2> Master.fill(1, 1)
:ok
3> Master.sum_stat
{3, [{0, 0}, {1, 1}, {2, 1}, {3, 1}, {4, 0}]}
4> Master.fill(1, 10)
:ok
5> Master.sum_stat
{30, [{0, 7}, {1, 5}, {2, 5}, {3, 6}, {4, 7}]}
6> Master.fill(1, 100)
:ok
7> Master.sum_stat
{300, [{0, 60}, {1, 62}, {2, 66}, {3, 58}, {4, 54}]}

Magique! On a une propagation automatique sur tous les noeuds. Les totaux sont bien cohérents !

Maintenant, tuons 2 noeuds si de la donnée est perdue:

8> Master.kill(0)
:ok
9> Master.kill(1)
:ok
10> Master.sum_stat
{178, [   {0, {:badrpc, :nodedown}}, {1, {:badrpc, :nodedown}}, {2, 66}, {3, 58}, {4, 54}]}

Et dans deux autres terminaux distinct: % ./scripts/start_instance.sh 0pour l’un et % ./scripts/start_instance.sh 1 pour l’autre. Revenons:

11> Master.sum_stat
{300, [{0, 60}, {1, 62}, {2, 66}, {3, 58}, {4, 54}]}

Et voilà ! Les clés sont revenues !

Ca marche, mais faut pas pousser !

Recommençons à 0 la manipulation (on relance le cluster), et:

% ./scripts/start_master.sh
1> Master.fill(1, 10000)
:ok
2> Master.sum_stat
{18384, [{0, 3564}, {1, 2968}, {2, 3555}, {3, 4148}, {4, 4149}]}

Ouch ! On n’a pas du tout 30_000 clés comme attendu, mais 18_384.

Nous n’allons pas vous expliquer les quelques heures passées à essayer de comprendre ce qui se passe. Il suffit de dire que nous avons consulté internet, les “issues” du repository Github et bien sûr nous nous sommes plongés dans le code. Nous avons compris que

DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, max_sync_size: 30_000, name: crdt_name(g))

permettait d’aller plus loin. Mais que la synchronisation était tronquée à un certain niveau. Bref, que si on n’avait de trop grosses différences (“delta”) dans un temps donné, la propagation ne se faisait pas complètement. Utiliser de la magie, c’est génial tant que tout marche, ça devient de la magie noire quand quelque chose coince.

Conclusion générale

Quand on fait le bilan, nous avons finalement deux solutions:

Nous revenons à nouveau à la conclusion que faire des applications distribuées est très délicat, avec un effet de mur: un petit domaine de choses faisables est entouré de murs très hauts dès lors que l’on vise certaines qualités pour l’application distribuée. Il y a deux façons de franchir ces murs. Soit investir massivement en compréhension des algorithmes déjà inventées par de nombreux chercheurs et implémenter ces algorithmes selon son besoin. Soit s’appuyer sur des librairies ou des produits tout faits, mais alors il est indispensable d’en connaitre précisément les domaines de fonctionnement.

Epilogue

Nous espérons que ces articles vous ont diverti ou inspiré. N’hésitez pas à les commenter, à les corriger ou même à en redemander !

Olivier et Dominique

Accueil Partie 1 Partie 2