# This file steals liberally from https://github.com/supabase/realtime, # which in turn draws on https://github.com/cainophile/cainophile defmodule WalEx.TransactionFilter do @moduledoc """ Filter transactions based on the relation and event type. """ alias WalEx.Changes.{ DeletedRecord, NewRecord, Transaction, UpdatedRecord } alias WalEx.Decoder.Messages.Relation.Column require Logger defmodule(Filter, do: defstruct([:schema, :table, :condition])) @doc """ Predicate to check if the filter matches the transaction. ## Examples iex> txn = %Transaction{changes: [ ...> %WalEx.Changes.NewRecord{ ...> columns: [ ...> %WalEx.Decoder.Messages.Relation.Column{flags: [:key], name: "id", type: "int8", type_modifier: 4294967295}, ...> %WalEx.Decoder.Messages.Relation.Column{flags: [], name: "details", type: "text", type_modifier: 4294967295}, ...> %WalEx.Decoder.Messages.Relation.Column{flags: [], name: "user_id", type: "int8", type_modifier: 4294967295} ...> ], ...> commit_timestamp: nil, ...> record: %{"details" => "The SCSI system is down, program the haptic microchip so we can back up the SAS circuit!", "id" => "14", "user_id" => "1"}, ...> schema: "public", ...> table: "todos", ...> type: "INSERT" ...> } ...> ]} iex> matches?(%{event: "*", relation: "*"}, txn) true iex> matches?(%{event: "INSERT", relation: "*"}, txn) true iex> matches?(%{event: "UPDATE", relation: "*"}, txn) false iex> matches?(%{event: "INSERT", relation: "public"}, txn) true iex> matches?(%{event: "INSERT", relation: "myschema"}, txn) false iex> matches?(%{event: "INSERT", relation: "public:todos"}, txn) true iex> matches?(%{event: "INSERT", relation: "myschema:users"}, txn) false """ def matches?(%{event: event, relation: relation}, %Transaction{changes: changes}) do case parse_relation_filter(relation) do {:ok, filter} -> Enum.any?(changes, fn change -> change_matches(event, filter, change) end) {:error, _msg} -> false end end # malformed filter or txn. Should not match. def matches?(_filter, _txn), do: false defp change_matches(event, _filter, %{type: type}) when event != type and event != "*" do false end defp change_matches(_event, filter, change) do name_matches?(filter.schema, change.schema) and name_matches?(filter.table, change.table) end @doc """ Parse a string representing a relation filter to a `Filter` struct. ## Examples iex> parse_relation_filter("public:users") {:ok, %Filter{schema: "public", table: "users", condition: nil}} iex> parse_relation_filter("public") {:ok, %Filter{schema: "public", table: nil, condition: nil}} iex> parse_relation_filter("") {:ok, %Filter{schema: nil, table: nil, condition: nil}} iex> parse_relation_filter("public:users:bad") {:error, "malformed relation filter"} """ def parse_relation_filter(relation) do # We do a very loose validation here. # When the relation filter format is well defined we can do # proper parsing and validation. case String.split(relation, ":") do [""] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}} ["*"] -> {:ok, %Filter{schema: nil, table: nil, condition: nil}} [schema] -> {:ok, %Filter{schema: schema, table: nil, condition: nil}} [schema, table] -> {:ok, %Filter{schema: schema, table: table, condition: nil}} _ -> {:error, "malformed relation filter"} end end defp name_matches?(nil, _change_name), do: true defp name_matches?(filter_name, change_name), do: filter_name == change_name def insert_event?(relation, txn), do: relation("INSERT", relation, txn) def update_event?(relation, txn), do: relation("UPDATE", relation, txn) def delete_event?(relation, txn), do: relation("DELETE", relation, txn) defp relation(event, relation, txn) when is_atom(relation) do matches?(%{event: event, relation: "public:" <> to_string(relation)}, txn) end defp relation(event, relation, txn) when is_binary(relation) do if String.contains?(relation, ":") do matches?(%{event: event, relation: relation}, txn) else matches?(%{event: event, relation: "public:" <> relation}, txn) end end @doc """ Returns a list of subscribed changes """ def filter_subscribed(%Transaction{changes: changes}, app_name) do Enum.filter(changes, &subscribes?(&1, app_name)) end @doc """ Returns a list of changes for the given table name and type (optional) """ def filter_changes(%Transaction{changes: changes}, table, nil, app_name) do subscribes_and_has_table(changes, table, app_name) end def filter_changes(%Transaction{changes: changes}, table, type, app_name) do changes |> subscribes_and_has_table(table, app_name) |> Enum.filter(&record_type?(&1, type)) end defp subscribes_and_has_table(changes, table, app_name) do Enum.filter(changes, &subscribes_to_table?(&1, table, app_name)) end def subscribes_to_table?(change, table, app_name) do has_table?(change, table) && subscribes?(change, app_name) end def subscribes?(%{table: table}, app_name) do subscriptions = WalEx.Config.get_configs(app_name, :subscriptions) table in subscriptions end def has_table?(%{table: table}, table_name) when is_atom(table), do: table == table_name def has_table?(%{table: table}, table_name) when is_binary(table), do: String.to_atom(table) == table_name def has_table?(_txn, _table_name), do: false def record_type?(%NewRecord{type: "INSERT"}, :insert), do: true def record_type?(%UpdatedRecord{type: "UPDATE"}, :update), do: true def record_type?(%DeletedRecord{type: "DELETE"}, :delete), do: true def record_type?(_txn, _type), do: false def filter_unwatched_fields(events, unwatched_changes) do Enum.filter(events, &unwatched_fields?(&1, unwatched_changes)) end def unwatched_fields?(%{changes: nil}, _unwatched_changes), do: true def unwatched_fields?(%{changes: changes}, unwatched_changes) do changes |> Enum.filter(fn {key, _value} -> key not in unwatched_changes end) |> Kernel.!=([]) end def unwatched_fields?(_event, _unwatched_changes), do: true def filter_unwatched_records(events, unwatched_records) do Enum.filter(events, &watched_record?(&1, unwatched_records)) end def watched_record?(%{new_record: nil, old_record: old_record = %{}}, unwatched_records) do not contains_unwatched_records?(old_record, unwatched_records) end def watched_record?(%{new_record: new_record = %{}}, unwatched_records) do not contains_unwatched_records?(new_record, unwatched_records) end def watched_record?(_event, _unwatched_records), do: false def contains_unwatched_records?(record = %{}, unwatched_records = %{}) do Enum.all?(unwatched_records, fn {key, value} -> Map.has_key?(record, key) and Map.get(record, key) == value end) end def map_changes(old_record, new_record) do fields = Map.keys(old_record) Enum.reduce(fields, %{}, fn field, acc -> old_value = Map.get(old_record, field) new_value = Map.get(new_record, field) if old_value != new_value do Map.put(acc, field, %{old_value: old_value, new_value: new_value}) else acc end end) end def map_columns(columns) do Enum.reduce(columns, %{}, fn %Column{name: name, type: type}, acc -> name = String.to_atom(name) Map.put(acc, name, type) end) end end