Chronicle (cratis_chronicle v1.0.2)

Copy Markdown View Source

Idiomatic Elixir client for the Chronicle event-sourcing platform.

Chronicle is an event-sourcing kernel that stores domain events and projects them into read models. This library provides an idiomatic Elixir interface built on top of the Chronicle gRPC API.

Quick Start

Add the dependency to your mix.exs:

{:cratis_chronicle, "~> 0.1"}

Start Chronicle.Client in your application supervision tree:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Chronicle.Client,
        connection_string: "chronicle://localhost:35000?disableTls=true",
        event_store: "my-app",
        event_types: [MyApp.Events.AccountOpened, MyApp.Events.FundsDeposited],
        migrations: [MyApp.Migrations.AccountOpenedV2Migration],
        reactors: [MyApp.Reactors.NotificationReactor],
        reducers: [MyApp.Reducers.AccountReducer],
        seeders: [MyApp.Seeders.InitialDataSeeder],
        event_store_subscriptions: [
          MyApp.EventStoreSubscriptions.DefaultAccountEvents
        ]}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Or rely on artifact auto-discovery:

{Chronicle.Client,
  connection_string: "chronicle://localhost:35000?disableTls=true",
  event_store: "my-app",
  otp_app: :my_app}

Defining Event Types

defmodule MyApp.Events.AccountOpened do
  use Chronicle.Events.EventType, id: "account-opened-v1"
  defstruct [:account_id, :owner_name, :initial_balance]
end

Appending Events

Chronicle.append("account-42", %MyApp.Events.AccountOpened{
  account_id: "account-42",
  owner_name: "Alice",
  initial_balance: 1000
})

Reading Read Models

{:ok, account} = Chronicle.read_model(MyApp.ReadModels.Account, "account-42")

Defining Reactors

defmodule MyApp.Reactors.NotificationReactor do
  use Chronicle.Reactors.Reactor

  @handles MyApp.Events.AccountOpened

  @impl true
  def handle(%MyApp.Events.AccountOpened{} = event, _context) do
    MyApp.Mailer.welcome(event.owner_name)
    :ok
  end
end

Defining Reducers

defmodule MyApp.Reducers.AccountReducer do
  use Chronicle.Reducers.Reducer, model: MyApp.ReadModels.Account

  @handles MyApp.Events.AccountOpened

  @impl true
  def reduce(%MyApp.Events.AccountOpened{} = event, _model, _context) do
    %MyApp.ReadModels.Account{
      account_id: event.account_id,
      owner_name: event.owner_name,
      balance: event.initial_balance
    }
  end
end

Defining Seeders

defmodule MyApp.Seeders.InitialDataSeeder do
  use Chronicle.Seeding.Seeder

  @impl true
  def seed(builder) do
    builder
    |> Chronicle.Seeding.for(
      MyApp.Events.AccountOpened,
      "seed-account-1",
      [%MyApp.Events.AccountOpened{
        account_id: "seed-account-1",
        owner_name: "Initial User",
        initial_balance: 10_000
      }]
    )
  end
end

Modules

Summary

Functions

Returns all instances of the given read model.

Appends a single event to the event log for the given event source.

Appends multiple events to the event log for the given event source.

Begins a new unit of work for the calling process.

Clears the current process correlation id.

Clears the current process identity.

Gets the current process correlation id.

Gets the current process identity.

Returns the current unit of work for the calling process.

Deletes a Chronicle job.

Creates an event sequence wrapper for the given event sequence id.

Returns all event store names.

Gets a single Chronicle job by identifier.

Gets all steps for a Chronicle job.

Gets all jobs for the configured event store namespace.

Returns all namespaces for an event store.

Gets the tail sequence number for an event sequence.

Gets all registered webhooks.

Checks whether there are events for an event source id in an event sequence.

Fetches a read model instance by its key (typically an event source ID).

Registers all discoverable event store subscriptions.

Registers all discoverable webhooks.

Registers a discoverable event store subscription module.

Registers a discoverable webhook module.

Removes a registered webhook by identifier.

Resumes a Chronicle job.

Sets the current process correlation id.

Sets the current process identity.

Stops a Chronicle job.

Registers an event store subscription imperatively using all available event types.

Removes an event store subscription by identifier.

Runs a function inside a unit of work and commits it if the function succeeds.

Functions

all(model_module, opts \\ [])

@spec all(
  module(),
  keyword()
) :: {:ok, [struct()]} | {:error, term()}

Returns all instances of the given read model.

Delegates to Chronicle.ReadModels.all/2.

append(event_source_id, event, opts \\ [])

@spec append(String.t(), struct(), keyword()) :: :ok | {:error, term()}

Appends a single event to the event log for the given event source.

Delegates to Chronicle.EventSequences.EventLog.append/3.

Options

append_many(event_source_id, events, opts \\ [])

@spec append_many(String.t(), [struct()], keyword()) :: :ok | {:error, term()}

Appends multiple events to the event log for the given event source.

Delegates to Chronicle.EventSequences.EventLog.append_many/3. Accepts the same append options as append/3, including :concurrency_scope.

begin_unit_of_work(opts \\ [])

@spec begin_unit_of_work(keyword()) :: Chronicle.Transactions.UnitOfWork.t()

Begins a new unit of work for the calling process.

clear_correlation_id()

@spec clear_correlation_id() :: Chronicle.Correlation.CorrelationId.t()

Clears the current process correlation id.

clear_identity()

@spec clear_identity() :: :ok

Clears the current process identity.

current_correlation_id()

@spec current_correlation_id() :: Chronicle.Correlation.CorrelationId.t()

Gets the current process correlation id.

current_identity()

@spec current_identity() :: Chronicle.Identity.t()

Gets the current process identity.

current_unit_of_work()

@spec current_unit_of_work() :: Chronicle.Transactions.UnitOfWork.t()

Returns the current unit of work for the calling process.

delete_job(job_id, opts \\ [])

@spec delete_job(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Deletes a Chronicle job.

event_sequence(event_sequence_id, opts \\ [])

@spec event_sequence(
  String.t(),
  keyword()
) :: Chronicle.EventSequences.EventSequence.t()

Creates an event sequence wrapper for the given event sequence id.

get_event_stores(opts \\ [])

@spec get_event_stores(keyword()) :: {:ok, [String.t()]} | {:error, term()}

Returns all event store names.

get_job(job_id, opts \\ [])

@spec get_job(
  String.t(),
  keyword()
) :: {:ok, Chronicle.Jobs.Job.t() | nil} | {:error, term()}

Gets a single Chronicle job by identifier.

get_job_steps(job_id, opts \\ [])

@spec get_job_steps(
  String.t(),
  keyword()
) :: {:ok, [Chronicle.Jobs.JobStep.t()]} | {:error, term()}

Gets all steps for a Chronicle job.

get_jobs(opts \\ [])

@spec get_jobs(keyword()) :: {:ok, [Chronicle.Jobs.Job.t()]} | {:error, term()}

Gets all jobs for the configured event store namespace.

get_namespaces(opts \\ [])

@spec get_namespaces(keyword()) :: {:ok, [String.t()]} | {:error, term()}

Returns all namespaces for an event store.

Uses the configured client event store by default.

get_tail_sequence_number(event_source_id \\ nil, opts \\ [])

@spec get_tail_sequence_number(
  String.t() | nil,
  keyword()
) :: {:ok, non_neg_integer()} | {:error, term()}

Gets the tail sequence number for an event sequence.

get_webhooks(opts \\ [])

@spec get_webhooks(keyword()) ::
  {:ok, [Chronicle.WebHooks.Definition.t()]} | {:error, term()}

Gets all registered webhooks.

has_events_for?(event_source_id, opts \\ [])

@spec has_events_for?(
  String.t(),
  keyword()
) :: {:ok, boolean()} | {:error, term()}

Checks whether there are events for an event source id in an event sequence.

read_model(model_module, key, opts \\ [])

@spec read_model(module(), String.t(), keyword()) ::
  {:ok, struct() | nil} | {:error, term()}

Fetches a read model instance by its key (typically an event source ID).

Delegates to Chronicle.ReadModels.get/3.

Returns {:ok, model_struct} on success, or {:ok, nil} if not found.

register_discovered_event_store_subscriptions(opts \\ [])

@spec register_discovered_event_store_subscriptions(keyword()) ::
  :ok | {:error, term()}

Registers all discoverable event store subscriptions.

register_discovered_webhooks(opts \\ [])

@spec register_discovered_webhooks(keyword()) :: :ok | {:error, term()}

Registers all discoverable webhooks.

register_event_store_subscription(subscription_module, opts \\ [])

@spec register_event_store_subscription(
  module(),
  keyword()
) :: :ok | {:error, term()}

Registers a discoverable event store subscription module.

register_webhook(webhook_module, opts \\ [])

@spec register_webhook(
  module(),
  keyword()
) :: :ok | {:error, term()}

Registers a discoverable webhook module.

register_webhook(webhook_id, target_url, configure, opts \\ [])

@spec register_webhook(
  String.t(),
  String.t(),
  (Chronicle.WebHooks.DefinitionBuilder.t() ->
     Chronicle.WebHooks.DefinitionBuilder.t()),
  keyword()
) :: :ok | {:error, term()}

Registers a webhook imperatively.

remove_webhook(webhook_id, opts \\ [])

@spec remove_webhook(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Removes a registered webhook by identifier.

resume_job(job_id, opts \\ [])

@spec resume_job(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Resumes a Chronicle job.

set_correlation_id(correlation_id)

Sets the current process correlation id.

set_identity(identity)

@spec set_identity(Chronicle.Identity.t()) :: Chronicle.Identity.t()

Sets the current process identity.

stop_job(job_id, opts \\ [])

@spec stop_job(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Stops a Chronicle job.

subscribe_to_event_store(subscription_id, source_event_store, opts)

@spec subscribe_to_event_store(String.t(), String.t(), keyword()) ::
  :ok | {:error, term()}

Registers an event store subscription imperatively using all available event types.

subscribe_to_event_store(subscription_id, source_event_store, configure, opts \\ [])

Registers an event store subscription imperatively.

unsubscribe_from_event_store(subscription_id, opts \\ [])

@spec unsubscribe_from_event_store(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Removes an event store subscription by identifier.

with_unit_of_work(fun, opts \\ [])

@spec with_unit_of_work(
  (Chronicle.Transactions.UnitOfWork.t() -> any()),
  keyword()
) :: any()

Runs a function inside a unit of work and commits it if the function succeeds.