PubsubGrpc (PubsubGrpc v0.4.2)

View Source

Google Cloud Pub/Sub gRPC client with connection pooling.

Provides a high-level API for Pub/Sub operations including topic and subscription management, message publishing, pulling, acknowledgment, and schema management.

Configuration

Production (Google Cloud)

# Option 1: Goth Library (Recommended)
config :pubsub_grpc, :goth, MyApp.Goth

# Option 2: gcloud CLI (auto-detected)
# Option 3: GOOGLE_APPLICATION_CREDENTIALS env var
# Option 4: GCE/GKE metadata (automatic)

Development/Test (Local Emulator)

config :pubsub_grpc, :emulator,
  project_id: "my-project-id",
  host: "localhost",
  port: 8085

Timeouts

# Global default (30s by default)
config :pubsub_grpc, :default_timeout, 30_000

Error Handling

All functions return {:ok, result} or {:error, %PubsubGrpc.Error{}}. Pattern match on the error code for specific handling:

case PubsubGrpc.create_topic("my-project", "my-topic") do
  {:ok, topic} -> topic
  {:error, %PubsubGrpc.Error{code: :already_exists}} -> "already exists"
  {:error, %PubsubGrpc.Error{code: :unauthenticated}} -> "auth failed"
  {:error, %PubsubGrpc.Error{} = err} -> "Error: #{err}"
end

Examples

# Create a topic
{:ok, topic} = PubsubGrpc.create_topic("my-project", "my-topic")

# Publish messages
messages = [%{data: "Hello", attributes: %{"source" => "app"}}]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)

# Create subscription and pull
{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 10)

# Acknowledge
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)

Summary

Functions

Creates a new Pub/Sub topic.

Deletes a Pub/Sub topic.

Executes a custom gRPC operation using a connection from the pool.

Gets details of a specific schema.

Gets details of a subscription.

Gets details of a topic.

Lists schemas in a project.

Lists subscriptions in a project.

Lists topics in a project.

Negatively acknowledges messages, causing immediate redelivery.

Convenience function to publish a single message.

Validates a message against an existing schema.

Validates a message against an inline schema definition.

Validates a schema definition.

Executes multiple operations using the same connection.

Functions

acknowledge(project_id, subscription_id, ack_ids, opts \\ [])

@spec acknowledge(String.t(), String.t(), [String.t()], keyword()) ::
  :ok | {:error, PubsubGrpc.Error.t()}

Acknowledges received messages.

Parameters

  • ack_ids - Non-empty list of acknowledgment IDs from received messages

Returns

  • :ok on success

create_schema(project_id, schema_id, type, definition)

Creates a new schema.

Parameters

  • type - :protocol_buffer or :avro
  • definition - The schema definition string

create_subscription(project_id, topic_id, subscription_id, opts \\ [])

@spec create_subscription(String.t(), String.t(), String.t(), keyword()) ::
  {:ok, Google.Pubsub.V1.Subscription.t()} | {:error, PubsubGrpc.Error.t()}

Creates a subscription to a topic.

Options

  • :ack_deadline_seconds - Ack deadline in seconds (10-600, default: 60)

create_topic(project_id, topic_id)

@spec create_topic(String.t(), String.t()) ::
  {:ok, Google.Pubsub.V1.Topic.t()} | {:error, PubsubGrpc.Error.t()}

Creates a new Pub/Sub topic.

Parameters

  • project_id - Google Cloud project ID
  • topic_id - Topic identifier (without full path)

Returns

  • {:ok, %Google.Pubsub.V1.Topic{}} on success
  • {:error, %PubsubGrpc.Error{code: :already_exists}} if topic exists
  • {:error, %PubsubGrpc.Error{}} on other errors

delete_schema(project_id, schema_id)

Deletes a schema.

delete_subscription(project_id, subscription_id)

@spec delete_subscription(String.t(), String.t()) ::
  :ok | {:error, PubsubGrpc.Error.t()}

Deletes a subscription.

Returns

  • :ok on success

delete_topic(project_id, topic_id)

@spec delete_topic(String.t(), String.t()) :: :ok | {:error, PubsubGrpc.Error.t()}

Deletes a Pub/Sub topic.

Returns

  • :ok on success
  • {:error, %PubsubGrpc.Error{}} on error

execute(operation_fn, opts \\ [])

@spec execute(
  (GRPC.Channel.t() -> term()),
  keyword()
) :: {:ok, term()} | {:error, PubsubGrpc.Error.t()}

Executes a custom gRPC operation using a connection from the pool.

The function receives a gRPC channel and should return the result of a gRPC stub call.

Examples

operation = fn channel ->
  request = %Google.Pubsub.V1.GetTopicRequest{topic: "projects/my-project/topics/my-topic"}
  opts = PubsubGrpc.Auth.request_opts()
  Google.Pubsub.V1.Publisher.Stub.get_topic(channel, request, opts)
end

{:ok, topic} = PubsubGrpc.execute(operation)

get_schema(project_id, schema_id, opts \\ [])

Gets details of a specific schema.

Options

  • :view - :basic or :full (default: :full)

get_subscription(project_id, subscription_id)

@spec get_subscription(String.t(), String.t()) ::
  {:ok, Google.Pubsub.V1.Subscription.t()} | {:error, PubsubGrpc.Error.t()}

Gets details of a subscription.

Returns

  • {:ok, %Google.Pubsub.V1.Subscription{}} on success
  • {:error, %PubsubGrpc.Error{code: :not_found}} if subscription doesn't exist

get_topic(project_id, topic_id)

@spec get_topic(String.t(), String.t()) ::
  {:ok, Google.Pubsub.V1.Topic.t()} | {:error, PubsubGrpc.Error.t()}

Gets details of a topic.

Returns

  • {:ok, %Google.Pubsub.V1.Topic{}} on success
  • {:error, %PubsubGrpc.Error{code: :not_found}} if topic doesn't exist

list_schema_revisions(project_id, schema_id, opts \\ [])

Lists revisions of a schema.

list_schemas(project_id, opts \\ [])

Lists schemas in a project.

Options

  • :view - :basic or :full (default: :basic)
  • :page_size - Maximum number of schemas to return
  • :page_token - Token for pagination

list_subscriptions(project_id, opts \\ [])

@spec list_subscriptions(
  String.t(),
  keyword()
) ::
  {:ok, %{subscriptions: list(), next_page_token: String.t()}}
  | {:error, PubsubGrpc.Error.t()}

Lists subscriptions in a project.

Options

  • :page_size - Maximum number of subscriptions to return
  • :page_token - Token for pagination

Returns

  • {:ok, %{subscriptions: [subscription], next_page_token: token}}

list_topics(project_id, opts \\ [])

@spec list_topics(
  String.t(),
  keyword()
) ::
  {:ok, %{topics: list(), next_page_token: String.t()}}
  | {:error, PubsubGrpc.Error.t()}

Lists topics in a project.

Options

  • :page_size - Maximum number of topics to return
  • :page_token - Token for pagination

Returns

  • {:ok, %{topics: [topic], next_page_token: token}}

modify_ack_deadline(project_id, subscription_id, ack_ids, ack_deadline_seconds, opts \\ [])

@spec modify_ack_deadline(
  String.t(),
  String.t(),
  [String.t()],
  non_neg_integer(),
  keyword()
) :: :ok | {:error, PubsubGrpc.Error.t()}

Modifies the ack deadline for received messages.

Setting ack_deadline_seconds to 0 causes immediate redelivery (nack). Use nack/3 as a convenience for this.

Parameters

  • ack_ids - Non-empty list of acknowledgment IDs
  • ack_deadline_seconds - New deadline in seconds (0-600)

Returns

  • :ok on success

nack(project_id, subscription_id, ack_ids, opts \\ [])

@spec nack(String.t(), String.t(), [String.t()], keyword()) ::
  :ok | {:error, PubsubGrpc.Error.t()}

Negatively acknowledges messages, causing immediate redelivery.

This is a convenience wrapper around modify_ack_deadline/4 with a deadline of 0.

Parameters

  • ack_ids - Non-empty list of acknowledgment IDs

Returns

  • :ok on success

publish(project_id, topic_id, messages, opts \\ [])

@spec publish(String.t(), String.t(), [map()], keyword()) ::
  {:ok, %{message_ids: [String.t()]}} | {:error, PubsubGrpc.Error.t()}

Publishes messages to a topic.

Parameters

  • messages - Non-empty list of message maps, each with :data (binary) and/or :attributes (map)

Returns

  • {:ok, %{message_ids: [String.t()]}}

Examples

messages = [
  %{data: "Hello World", attributes: %{"source" => "app"}},
  %{data: "Another message"}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)

publish_message(project_id, topic_id, data, attributes \\ %{})

@spec publish_message(String.t(), String.t(), binary(), map()) ::
  {:ok, %{message_ids: [String.t()]}} | {:error, PubsubGrpc.Error.t()}

Convenience function to publish a single message.

pull(project_id, subscription_id, max_messages \\ 10, opts \\ [])

@spec pull(String.t(), String.t(), pos_integer(), keyword()) ::
  {:ok, list()} | {:error, PubsubGrpc.Error.t()}

Pulls messages from a subscription.

Parameters

  • max_messages - Maximum number of messages to pull (default: 10, must be positive)

Returns

  • {:ok, [%Google.Pubsub.V1.ReceivedMessage{}]}

validate_message(project_id, schema_name, message, encoding)

Validates a message against an existing schema.

Parameters

  • schema_name - Full schema resource name or just the schema ID
  • message - Message bytes to validate
  • encoding - :json or :binary

validate_message_with_schema(project_id, type, definition, message, encoding)

Validates a message against an inline schema definition.

Parameters

  • type - :protocol_buffer or :avro
  • definition - Schema definition string
  • message - Message bytes to validate
  • encoding - :json or :binary

validate_schema(project_id, type, definition)

Validates a schema definition.

with_connection(fun, opts \\ [])

@spec with_connection(
  (GRPC.Channel.t() -> term()),
  keyword()
) :: {:ok, term()} | {:error, PubsubGrpc.Error.t()}

Executes multiple operations using the same connection.

More efficient when performing several operations in sequence.

Examples

result = PubsubGrpc.with_connection(fn channel ->
  auth_opts = PubsubGrpc.Auth.request_opts()
  # ... multiple operations on channel
end)