Elixir at Talkdesk

Tiago Sousa

Created: 2017-05-24 Wed 23:51

Who am I?

Tiago Sousa.

  • Platform TL
  • Employee #4
  • Polyglot programmer
  • Elixir since 2014

Today's agenda

  1. Motivation
  2. Fast Prototyping
  3. Elixir at Talkdesk
  4. Technology Radar
  5. Elixir's Future

Motivation

Codebits 2014

First contact with Elixir https://codebits.eu/intra/s/session/370

Hackathon culture

Two days, solve a problem

Exploration mode ON

Building a platform

From monolith to micro-services

  • API Gateway pattern
  • Authorization Server
  • Events Gateway pattern
  • Outbound Webhooks

API Gateway

api-gateway-visualization-flow-architecture-nordic-apis.png

AppConnect

Fast Prototyping

Phoenix microservice

  • Very complete web framework
  • Approachable syntax
  • Performance
  • REPL (help on functions, error messages)
  • Tooling (Mix)

Presence

  • Realtime tracking (vs. heartbeat/Redis)
  • Scalable automatic clustering
  • Resilient to node failures

https://lh6.googleusercontent.com/Lud7lKElX27yr8T1duyedttEkPHmhtra6EL0zjZxKrGK7CSbBgASVIgP4Jzj5XuWgL-XM-59YG7bGw4jwInU-8bfbt4hWEOpdy7l-F4iQj9ULHMBcP8OQ6pns6OKK70AzRqACJfzpV8

defmodule Presence.UserSocket do
  use Phoenix.Socket

  channel "rooms:lobby", Presence.RoomChannel

  transport :websocket, Phoenix.Transports.WebSocket, timeout: 45_000

  def connect(%{"user_id" => id} = params, socket) do
    socket = assign(socket, :user_id, id)
    socket = assign(socket, :device, params["device"])
    socket = assign(socket, :name , params["name"])

    {:ok, socket}
  end
end
defmodule Presence.RoomChannel do
  use Presence.Web, :channel
  alias Presence.Presence

  def join("rooms:lobby", _, socket) do
    send self(), :after_join
    {:ok, socket}
  end
def handle_info(:after_join, socket) do
  Presence.track(socket, socket.assigns.user_id, %{
    device: socket.assigns.device,
    status: "available",
    name:   socket.assigns.name,
    online_at:  inspect(System.system_time(:seconds))
  })
  push socket, "presence_state", Presence.list(socket)
  {:noreply, socket}
end
  def handle_in("set_status", %{"status" => status, "reason" => reason}, socket) do
    Presence.update(socket, socket.assigns.user_id, %{
          device: socket.assigns.device,
          status: status,
          name:   socket.assigns.name,
          online_at: inspect(System.system_time(:seconds)),
    })
    {:noreply, socket}
  end
end
import {Presence, Socket} from 'phoenix';

  init (options) {
    this.presences = {};
    this.socket = new Socket(__PHOENIX_WS_URL__, {params: options});
    this.socket.connect();
    this.room = this.socket.channel("rooms:lobby", {});

    this.room.on("presence_state", state => {
      Presence.syncState(this.presences, state, this.onJoin, this.onLeave);
    });
    this.room.on("presence_diff", state => {
      Presence.syncDiff(this.presences, state, this.onJoin, this.onLeave);
    });
    this.room.join();
  }
setStatus (data) {
  this.room.push("set_status", data);
}

onJoin (userId, currentPresence, {metas}) {
  if (userId === this.userId) {
    benchmarkUpdate(metas);
  } else {
    updateAgentStatus(userId, metas);
  }
}

Empurrador

  • Pusher clone
  • Websockets
  • Fraction of the cost
  • Able to handle current traffic

OmniView

Track your calls across the globe

  • Call geolocation
  • Mapbox API
  • Empurrador

GraphQL

Fast prototyping

  • Facebook
  • Github
  • Yelp
defmodule AppsGraphql.Schema.Types do
  use Absinthe.Schema.Notation
  use Absinthe.Relay.Schema.Notation

  object :app do
    field :name, :string
    field :display_name, :string
    field :scopes, list_of(:string)
    field :installations, list_of(:installation)

    field :resource, :resource do
      arg :type, :resource_type
      resolve &AppsGraphql.FindIcon.by_type/2
    end
  end
  object :user do
    field :name, :string
  end

  enum :subscription_type do
    value :trial
    value :paid
  end
end
defmodule AppsGraphql.Schema do
  use Absinthe.Schema
  use Absinthe.Relay.Schema
  import_types AppsGraphql.Schema.Types

  query do
    field :apps, list_of(:app) do
      resolve &AppsGraphql.FindApps.all/2
    end

    field :user, type: :user do
      arg :id, non_null(:id)
      resolve &AppsGraphql.FindUser.one/2
    end
  end
  mutation do
    field :installation, type: :installation do
      arg :users, non_null(:users_input)
      arg :subscription, :subscription_type

      resolve &AppsGraphql.InstallApp.create/2
    end
  end
end

In Production

Auth System

Concurrency

"self-healing software without building complex self-healing software"

Goals

  1. Parallel processing across accounts
  2. Save resources based on account activity
  3. Have multiple handlers per type of event
defmodule AuthConsumer.AccountHandler do
  use GenServer

  def start_link(account_id) do
    GenServer.start_link(__MODULE__, name: account_id)
  end

  defp account_consumer(account_id) do
    case Process.whereis(account_id) do
      pid when is_pid(pid) -> {:ok, pid}
      _                    -> start_link(account_id)
    end
  end
  def process(event = %{"account_id" => account_id}) do
    with {:ok, pid} <- account_consumer(account_id) do
      GenServer.cast(pid, {:consume, event})
    end
  end

  def handle_cast({:consume, event}, state) do
    # process event
    {:noreply, %{state | event: event}}
  end
end
defmodule AuthConsumer.AccountHandler do
  use GenServer

  def start_link(account_id, , timeout \\ 1000) do
    GenServer.start_link(__MODULE__, %{timeout: timeout},
                         name: account_id, restart: :transient)
  end

  def init(%{timeout: timeout}) do
    {:ok, %{timeout: timeout}, timeout}
  end
  def handle_cast({:consume, event}, state = %{timeout: timeout}) do
     # process event
    {:noreply, %{state | event: event}, timeout}
  end

  def handle_info(:timeout, state) do
    {:stop, :normal, state}
  end
end
defmodule AuthConsumer.AccountHandler do
  use GenServer

  def init(%{handlers: handlers, timeout: timeout}) do
    {:ok, manager} = GenEvent.start_link
    Enum.each(handlers, &GenEvent.add_mon_handler(manager, &1, %{}))
    {:ok, %{timeout: timeout, manager: manager}, timeout}
  end
  def handle_cast({:consume, event}, state = %{manager: manager, timeout: timeout}) do
    GenEvent.notify(manager, event)
    {:noreply, %{state | event: event}, timeout}
  end
end
defmodule AuthConsumer.AccountHandler do
  alias AuthConsumer.TalkdeskHandler
  require Logger
  use GenServer

  @behaviour TalkdeskHandler
  @default_keep_alive 1000

  def start_link(name, handlers \\ default_handlers, timeout \\ default_timeout) do
    GenServer.start_link(__MODULE__, %{handlers: handlers, timeout: timeout, supervisor: self}, name: name, restart: :transient)
  end

  def init(%{handlers: handlers, timeout: timeout, supervisor: supervisor}) do
    {:ok, manager} = GenEvent.start_link

    handlers |> Enum.each(&GenEvent.add_mon_handler(manager, &1, %{supervisor: supervisor}))

    {:ok, %{timeout: timeout, supervisor: supervisor, manager: manager, event: nil}, timeout}
  end

  def process(event = %{"account_id" => account_id}, _state \\ %{}) do
    with {:ok, pid} <- account_consumer(account_id) do
      GenServer.cast(pid, {:consume, event})
    end
  end

  def handle_cast({:consume, event}, state = %{manager: manager, timeout: timeout}) do
    GenEvent.notify(manager, event)

    {:noreply, %{state | event: event}, timeout}
  end

  def handle_info(:timeout, state = %{timeout: timeout, manager: manager}) do
    Logger.warn "Stopping #{inspect self} due to #{timeout}ms of inactivity"
    GenEvent.stop(manager)
    {:stop, :normal, state}
  end

Handlers

module Events
  class AgentProcessor < Processor

    def on_agent_deleted(event)
      args = {
        account_id: event['account_id'],
        agent_email: event['agent_email'],
        agent_id: event['agent_id']
      }

      Interactor(:remove_agent).call args
    end
  end
end
module TalkdeskEvents

  def self.extended(base)
    base.processors = []
  end

  def process_event(event)
    processors.each do |processor|
      event_handler = "on_#{event['event'] || event[:event]}"
      next unless processor.respond_to?(event_handler)

      processor.send(event_handler, event)
    end
  end
end
defmodule AuthConsumer.UserEventsHandler do
  use AuthConsumer.TalkdeskHandler

  def process(%{
      "event" => event,
      "account_id" => account_id,
      "agent_id" => agent_id,
    }, state) when event in ~w(agent_deleted agent_deactivated) do

    toggle_user(account_id, agent_id, false)
    revoke_tokens(agent_id)

    {:ok, state}
  end
end

Macros are like…

defmodule AuthConsumer.TalkdeskHandler do
  @callback process(%{}, any) :: {:ok, any}

  defmacro __using__(_opts) do
    quote do
      @behaviour AuthConsumer.TalkdeskHandler

      defoverridable [process: 2]
    end
  end
end
defmodule AuthConsumer.TalkdeskHandler do
  @callback process(%{}, any) :: {:ok, any}

  defmacro __using__(_opts) do
    quote do
      use GenEvent
      require Logger

      @behaviour AuthConsumer.TalkdeskHandler

      defdelegate    handle_event(event, state), to: __MODULE__, as: :process
      defoverridable [handle_event: 2]
    end
  end
end
defmodule AuthConsumer.TalkdeskHandler do
  defmacro __using__(_opts) do
    quote do
      use GenEvent
      require Logger

      @behaviour AuthConsumer.TalkdeskHandler

      defdelegate    handle_event(event, state), to: __MODULE__, as: :process
      defoverridable [handle_event: 2]

      import AuthConsumer.TalkdeskHandler
      @before_compile AuthConsumer.TalkdeskHandler
    end
  end
  @callback process(%{}, any) :: {:ok, any}

  defmacro __before_compile__(_env) do
    quote do
      def process(event, state) do
        Logger.debug "Ignoring event #{inspect event}"
        {:ok, state}
      end
    end
  end
end

Debugging

https://lh6.googleusercontent.com/38s_ZcFTuUFit-bfYtQEyoOhrURYrIIBNaCcUuqsnnJQfUAiPz5dilubYO8eYch_oVS7VxeC0K91kz7pvBF9b8NojRUD8-8oNbwWoYxVAjczsbA5se0D4EaMFLv6kxfdkeb7p76ytJM https://lh6.googleusercontent.com/HX_Af_q5QlltTH4FW1kCPrjknthff4d8Qvq7ZpMJRkjQDsBqkwwO5g1gFwRdmFLM0dVpBZb7rNf4Pbn-VYn3qi3VPoBE-_nM1c22-TVw_kOGkEtkzyXHO13DP3UoBUFqvWVmqUYk7vs https://lh6.googleusercontent.com/38s_ZcFTuUFit-bfYtQEyoOhrURYrIIBNaCcUuqsnnJQfUAiPz5dilubYO8eYch_oVS7VxeC0K91kz7pvBF9b8NojRUD8-8oNbwWoYxVAjczsbA5se0D4EaMFLv6kxfdkeb7p76ytJM https://lh6.googleusercontent.com/2w8mEr0A34G6JKRhPw-tPtF6AJ06CwSydFcVzjp8Z1ewFZLZXFarFNq49g5z9P_abzcq3yJ359LPS3o250es-ijs1DcNLPVF9m3LOk98GAxn4YwgEgM5K4NGyQJiTgGJ0H-sHDgePb0 https://lh5.googleusercontent.com/0HgK96YnyafudJQJBgA5b7v5Jaf1Z5P2H3D7EnUpvUgGnfpaNui1pdz8q76sdqgn4m23l5tfXDDqKYOkts70-ASqMaw7hwGAQgeRryeyVB4VrX6oVMEGCra9hxJYkD6XDLW7_KVG1kM https://lh5.googleusercontent.com/W5ko4AQKiZFFtAVwpHE3TClHIt9-Bv-yiwr7mPijnjXncnBq2nPoP_zeeDQatbqdk7EUAtU57jyxFgPzthJ2OwD42MJWjuJuOmCOgufeycziVaw5lrqvgXrd6o3_vQ8ya8eZoqF-TV4 https://lh6.googleusercontent.com/3YVqksG43qJf_9DVut8G6zVrOjQEfjsz780LMLbOMXJGUIVeT7bQseI7oJN8Oy6j-IaRmu8X4GDaVepCu661lxhXWI-U5_AyDB1qxc0fAhXW_l9WPZrIqFKiYltGmhzGqlQmI06mLd0 https://lh6.googleusercontent.com/W_8RCfqf2kPjJyRvIH8GmSSkOJSYJ82BjflUD4rQT7uyIaTGkK7PCwSqqNXNC4kbJBM5AKnX3Exl9xZLStE3eh4B_06yj-_T3aper5QGa5UWGLcfJ4FjXKLkz05BH682FM4b2s09Z_Y

Events API

Monitoring

Retries

GenStage

Technology Radar

Elixir/OTP Platform

Adopt

  • Mix
  • OTP
  • Phoenix
  • Logger

Trial

  • Presence
  • Macros
  • Umbrella Apps
  • Debugging

Assess

  • GenStage
  • Clustered Erlang
  • Deployment
  • Profiling

Hold

  • GenEvent
  • Non-ecto drivers
  • Monitoring

Hex Libraries

Adopt

  • AMQP
  • Credo
  • ExCoveralls
  • JWT

Trial

  • AppSignal
  • Absinthe
  • Confex
  • Bugsnag

Assess

  • Distillery
  • Dialyzer
  • Espec

Hold

  • Meck
  • Xandra
  • MongoDB
  • HTTP Clients
  • OAuth Frameworks

Future

Directly from ElixirConf 2016

  • GenHTTP
  • Property testing
  • Data streams
  • Phoenix 1.4 (focus on monitoring)
  • Paparazzi (visualization tool)
  • Wobserver