Elixirで簡単なProxyサーバを自作した

どうもこんにちは。ここ最近Erlangを書いてたり、AWS、OpenStackで遊んでいましたが久々にElixirを書いています。 今回はElixirでProxyサーバを自作しているので、どのような実装になっているか備忘録的に書いていきます。

動作環境

全体的な流れ

Proxyサーバを実装するにあたって、全体的な流れは以下のようにしています。

  1. Supervisorにリクエストを待ち受けるworkerを複数件登録
  2. workerでリクエストを待ち受ける
  3. バックエンド側(HTTPサーバ)へのコネクションを確立させる
  4. フロントとバックエンド、それぞれのソケットで受け取ったパケットの送受信をする
  5. フロントかバックエンド側どちらかのソケットがクローズしたら、もう片方もクローズする

workerはSupervisorに登録していき、それぞれのworkerでパケットの処理をしていきます。 また、コネクションごとにworkerは一つとなり、コネクション数ごとにworkerが増えていく想定となっています。

Supervisorにリクエストを待ち受けるworkerを複数件登録

では、これから実装内容を見ていきます。リクエストを待ち受けるworkerを複数件事前に作成し、Supervisorに登録していきます。

  def init(listen) do
    # backend
    hosts = [
      %Exdeath.ProxyNode{
        host: {192, 168, 33, 61},
        port: 8010,
      }
    ]
    spawn(__MODULE__, :start_workers, [listen, hosts])
    {:ok, { {:one_for_one, 6, 60}, []} }
  end
  
  def start_workers(listen, hosts) do
    for _ <- 1..100, do: start_worker(listen, hosts)
  end

  def start_worker(listen, hosts) do
    pid = make_ref()
    child_spec = %{
      id: pid,
      start: {Exdeath.Proxy, :start_link, [listen, hosts, pid]},
      restart: :temporary,
      shutdown: :brutal_kill,
      type: :worker
    }
    Supervisor.start_child(__MODULE__, child_spec)
  end

では簡単に、要点ごとに説明してきたいと思います。

    spawn(__MODULE__, :start_workers, [listen, hosts])

まずは start_workers を別プロセスとして実行します。 hsots はバックエンド側になるサーバをリストとして登録しています。バックエンド側のサーバを複数件登録し、LB知るようにしようと思っていますが、 今回はLBの実装は省略して進めたいと思います。

  def start_workers(listen, hosts) do
    for _ <- 1..100, do: start_worker(listen, hosts)
  end

start_workers でSupervisorに登録する件数分、ループ(リスト内包表記)を回します。今回は100件登録されています。

  def start_worker(listen, hosts) do
    pid = make_ref()
    child_spec = %{
      id: pid,
      start: {Exdeath.Proxy, :start_link, [listen, hosts, pid]},
      restart: :temporary,
      shutdown: :brutal_kill,
      type: :worker
    }
    Supervisor.start_child(__MODULE__, child_spec)
  end

実際のSupervisorへの登録処理です。Exdeath.Proxy をworkerとして起動させています。 pidには make_ref() で取得した値を入れ、それをworkerの名前として登録します(後で出てきます)。 ここで、listenも workerの起動時の引数として渡していますが、これは gen_tcp で listen しているソケットになります。

workerでリクエストを待ち受ける

Supervisorで登録までできたので、ここから個別のworkerでの処理を見ていきましょう。

  use GenServer
  def start_link(listen, hosts, pid) do
    GenServer.start_link(__MODULE__, [listen, hosts], name: {:global, pid})
  end

  def init([listen, hosts]) do
    GenServer.cast(self(), :accept)
    {:ok, %{listen: listen, front: nil, back: nil, hosts: hosts}}
end

  def handle_cast(:accept, %{listen: listen}=state) do
    with {:ok, socket} <- :gen_tcp.accept(listen)
    do
      :gen_tcp.controlling_process(socket, self())
      {:noreply, %{state| front: socket}}
    else
      {:error, :eaddrinuse} ->
        {:stop, {:shutdown, :eaddrinuse}}
      {:error, reason} ->
        {:stop, {:shutdown, reason}}
      error ->
        {:stop, {:shutdown, error}}
    end
  end

workerはGenServerを利用しています。GenServerで実行するとパケットが送られてきたときに handle_info のメソッドで処理できるので、このようにしています。

  def start_link(listen, hosts, pid) do
    GenServer.start_link(__MODULE__, [listen, hosts], name: {:global, pid})
  end

ここでGenServerとして実行し、リンクしています。ここで先ほどSupervisorで作ったpidをnameにセットします。

  def init([listen, hosts]) do
    GenServer.cast(self(), :accept)
    {:ok, %{listen: listen, front: nil, back: nil, hosts: hosts}}
end

次に初期化処理ですが、ここで GenServer.cast(self(), :accept) を実行し、リクエストの待ち受けを非同期で行えるようにします。

  def handle_cast(:accept, %{listen: listen}=state) do
    with {:ok, socket} <- :gen_tcp.accept(listen)
    do
      :gen_tcp.controlling_process(socket, self())
      {:noreply, %{state| front: socket}}
    else
      {:error, :eaddrinuse} ->
        {:stop, {:shutdown, :eaddrinuse}}
      {:error, reason} ->
        {:stop, {:shutdown, reason}}
      error ->
        {:stop, {:shutdown, error}}
    end
  end

メインとなるリクエストを待ち受けてる状態となります。このworkerがリクエストを取得するとフロント側のソケットが開き、そのソケットからのメッセージをこのプロセス(self)が受け付けられるよう :gen_tcp.controlling_process(socket, self()) を実行します。

バックエンド側(HTTPサーバ)へのコネクションを確立させる

フロント側のコネクションを確立させたので、次はバックエンド側となります。

  def handle_info({:tcp, socket, packet}, %{front: socket, back: nil, hosts: [host| _]}=state) do
    {:ok, backend_socket} = set_connect(host)
    end(backend_socket, packet)
    {:noreply, %{state| back: proxy_host}}
  end
  
    def set_connect(proxy) do
    with {:ok, conn} <- :gen_tcp.connect(proxy.host, proxy.port, [:binary, packet: 0])
    do
      {:ok, %{proxy| conn: conn}}
    else
      error ->
        error
    end
  end
  
  def send(socket, packet) do
    :gen_tcp.send(socket, packet)
  end

バックエンドのコネクションの確立にはフロント側からパケットを受け取った時に行うようにしています。

def set_connect(proxy) do
    with {:ok, conn} <- :gen_tcp.connect(proxy.host, proxy.port, [:binary, packet: 0])
    do
      {:ok, %{proxy| conn: conn}}
    else
      error ->
        error
    end
  end

ここでTCPのコネクション確立のためバックエンドのサーバへリクエストを送ります。

  def send(proxy, packet) do
    :gen_tcp.send(proxy.conn, packet)
  end

先ほどのバックエンドとのコネクションで確立したソケットに対して、パケットを送信します。 これで、フロントから受け取ったパケットをバックエンドのサーバへ送信することができるようになりました。

フロントとバックエンド、それぞれのソケットで受け取ったパケットの送受信をする

最後にそれぞれのソケット(フロントとバックエンド)で受け取ったパケットを送信するようにします。

  def handle_info({:tcp, socket, packet}, %{front: socket, back: back}=state) do
   send(back, packet)
    {:noreply, state}
  end

  def handle_info({:tcp, socket, packet}, %{front: front, back: %{conn: socket}}=state) do
    send(front, packet)
    {:noreply, state}
  end

ここではパターンマッチを使ってフロントから来たパケットなのか、バックエンドから来たパケットなのかを判断し、それぞれ別方向へパケットを送信します。

フロントかバックエンド側、どちらかのソケットがクローズしたらもう片方もクローズする

最後にソケットのクローズ処理です。

  def handle_info({:tcp_closed, socket}, %{front: nil, back: socket}=state) do
    {:stop, :shutdown, state}
  end
  def handle_info({:tcp_closed, socket}, %{front: front, back: %{conn: socket}}=state) do
    :gen_tcp.close(front)
    {:stop, :shutdown, state}
  end

これも、パケットの送信同様、パターンマッチでどっちのソケットがクローズしたかを判断し、もう片方のソケットをクローズするようにします。

まとめ

Proxy サーバはErlangでも実装したことがあり、その実装を参考にElixirでも実装してみました。

Erlangで簡易的なTCP Proxyを作った - ネットワークプログラマを目指して

Proxyを使えばいろいろと応用的な実装もできそうなので次はそのことについてまとめていきたいと思います。