PubsubGrpc (PubsubGrpc v0.4.2)
View SourceGoogle Cloud Pub/Sub gRPC client with connection pooling.
Provides a high-level API for Pub/Sub operations including topic and subscription management, message publishing, pulling, acknowledgment, and schema management.
Configuration
Production (Google Cloud)
# Option 1: Goth Library (Recommended)
config :pubsub_grpc, :goth, MyApp.Goth
# Option 2: gcloud CLI (auto-detected)
# Option 3: GOOGLE_APPLICATION_CREDENTIALS env var
# Option 4: GCE/GKE metadata (automatic)Development/Test (Local Emulator)
config :pubsub_grpc, :emulator,
project_id: "my-project-id",
host: "localhost",
port: 8085Timeouts
# Global default (30s by default)
config :pubsub_grpc, :default_timeout, 30_000Error Handling
All functions return {:ok, result} or {:error, %PubsubGrpc.Error{}}.
Pattern match on the error code for specific handling:
case PubsubGrpc.create_topic("my-project", "my-topic") do
{:ok, topic} -> topic
{:error, %PubsubGrpc.Error{code: :already_exists}} -> "already exists"
{:error, %PubsubGrpc.Error{code: :unauthenticated}} -> "auth failed"
{:error, %PubsubGrpc.Error{} = err} -> "Error: #{err}"
endExamples
# Create a topic
{:ok, topic} = PubsubGrpc.create_topic("my-project", "my-topic")
# Publish messages
messages = [%{data: "Hello", attributes: %{"source" => "app"}}]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
# Create subscription and pull
{:ok, sub} = PubsubGrpc.create_subscription("my-project", "my-topic", "my-sub")
{:ok, messages} = PubsubGrpc.pull("my-project", "my-sub", 10)
# Acknowledge
ack_ids = Enum.map(messages, & &1.ack_id)
:ok = PubsubGrpc.acknowledge("my-project", "my-sub", ack_ids)
Summary
Functions
Acknowledges received messages.
Creates a new schema.
Creates a subscription to a topic.
Creates a new Pub/Sub topic.
Deletes a schema.
Deletes a subscription.
Deletes a Pub/Sub topic.
Executes a custom gRPC operation using a connection from the pool.
Gets details of a specific schema.
Gets details of a subscription.
Gets details of a topic.
Lists revisions of a schema.
Lists schemas in a project.
Lists subscriptions in a project.
Lists topics in a project.
Modifies the ack deadline for received messages.
Negatively acknowledges messages, causing immediate redelivery.
Publishes messages to a topic.
Convenience function to publish a single message.
Pulls messages from a subscription.
Validates a message against an existing schema.
Validates a message against an inline schema definition.
Validates a schema definition.
Executes multiple operations using the same connection.
Functions
@spec acknowledge(String.t(), String.t(), [String.t()], keyword()) :: :ok | {:error, PubsubGrpc.Error.t()}
Acknowledges received messages.
Parameters
ack_ids- Non-empty list of acknowledgment IDs from received messages
Returns
:okon success
Creates a new schema.
Parameters
type-:protocol_bufferor:avrodefinition- The schema definition string
@spec create_subscription(String.t(), String.t(), String.t(), keyword()) :: {:ok, Google.Pubsub.V1.Subscription.t()} | {:error, PubsubGrpc.Error.t()}
Creates a subscription to a topic.
Options
:ack_deadline_seconds- Ack deadline in seconds (10-600, default: 60)
@spec create_topic(String.t(), String.t()) :: {:ok, Google.Pubsub.V1.Topic.t()} | {:error, PubsubGrpc.Error.t()}
Creates a new Pub/Sub topic.
Parameters
project_id- Google Cloud project IDtopic_id- Topic identifier (without full path)
Returns
{:ok, %Google.Pubsub.V1.Topic{}}on success{:error, %PubsubGrpc.Error{code: :already_exists}}if topic exists{:error, %PubsubGrpc.Error{}}on other errors
Deletes a schema.
@spec delete_subscription(String.t(), String.t()) :: :ok | {:error, PubsubGrpc.Error.t()}
Deletes a subscription.
Returns
:okon success
@spec delete_topic(String.t(), String.t()) :: :ok | {:error, PubsubGrpc.Error.t()}
Deletes a Pub/Sub topic.
Returns
:okon success{:error, %PubsubGrpc.Error{}}on error
@spec execute( (GRPC.Channel.t() -> term()), keyword() ) :: {:ok, term()} | {:error, PubsubGrpc.Error.t()}
Executes a custom gRPC operation using a connection from the pool.
The function receives a gRPC channel and should return the result of a gRPC stub call.
Examples
operation = fn channel ->
request = %Google.Pubsub.V1.GetTopicRequest{topic: "projects/my-project/topics/my-topic"}
opts = PubsubGrpc.Auth.request_opts()
Google.Pubsub.V1.Publisher.Stub.get_topic(channel, request, opts)
end
{:ok, topic} = PubsubGrpc.execute(operation)
Gets details of a specific schema.
Options
:view-:basicor:full(default::full)
@spec get_subscription(String.t(), String.t()) :: {:ok, Google.Pubsub.V1.Subscription.t()} | {:error, PubsubGrpc.Error.t()}
Gets details of a subscription.
Returns
{:ok, %Google.Pubsub.V1.Subscription{}}on success{:error, %PubsubGrpc.Error{code: :not_found}}if subscription doesn't exist
@spec get_topic(String.t(), String.t()) :: {:ok, Google.Pubsub.V1.Topic.t()} | {:error, PubsubGrpc.Error.t()}
Gets details of a topic.
Returns
{:ok, %Google.Pubsub.V1.Topic{}}on success{:error, %PubsubGrpc.Error{code: :not_found}}if topic doesn't exist
Lists revisions of a schema.
Lists schemas in a project.
Options
:view-:basicor:full(default::basic):page_size- Maximum number of schemas to return:page_token- Token for pagination
@spec list_subscriptions( String.t(), keyword() ) :: {:ok, %{subscriptions: list(), next_page_token: String.t()}} | {:error, PubsubGrpc.Error.t()}
Lists subscriptions in a project.
Options
:page_size- Maximum number of subscriptions to return:page_token- Token for pagination
Returns
{:ok, %{subscriptions: [subscription], next_page_token: token}}
@spec list_topics( String.t(), keyword() ) :: {:ok, %{topics: list(), next_page_token: String.t()}} | {:error, PubsubGrpc.Error.t()}
Lists topics in a project.
Options
:page_size- Maximum number of topics to return:page_token- Token for pagination
Returns
{:ok, %{topics: [topic], next_page_token: token}}
@spec modify_ack_deadline( String.t(), String.t(), [String.t()], non_neg_integer(), keyword() ) :: :ok | {:error, PubsubGrpc.Error.t()}
Modifies the ack deadline for received messages.
Setting ack_deadline_seconds to 0 causes immediate redelivery (nack).
Use nack/3 as a convenience for this.
Parameters
ack_ids- Non-empty list of acknowledgment IDsack_deadline_seconds- New deadline in seconds (0-600)
Returns
:okon success
@spec nack(String.t(), String.t(), [String.t()], keyword()) :: :ok | {:error, PubsubGrpc.Error.t()}
Negatively acknowledges messages, causing immediate redelivery.
This is a convenience wrapper around modify_ack_deadline/4 with a deadline of 0.
Parameters
ack_ids- Non-empty list of acknowledgment IDs
Returns
:okon success
@spec publish(String.t(), String.t(), [map()], keyword()) :: {:ok, %{message_ids: [String.t()]}} | {:error, PubsubGrpc.Error.t()}
Publishes messages to a topic.
Parameters
messages- Non-empty list of message maps, each with:data(binary) and/or:attributes(map)
Returns
{:ok, %{message_ids: [String.t()]}}
Examples
messages = [
%{data: "Hello World", attributes: %{"source" => "app"}},
%{data: "Another message"}
]
{:ok, response} = PubsubGrpc.publish("my-project", "my-topic", messages)
@spec publish_message(String.t(), String.t(), binary(), map()) :: {:ok, %{message_ids: [String.t()]}} | {:error, PubsubGrpc.Error.t()}
Convenience function to publish a single message.
@spec pull(String.t(), String.t(), pos_integer(), keyword()) :: {:ok, list()} | {:error, PubsubGrpc.Error.t()}
Pulls messages from a subscription.
Parameters
max_messages- Maximum number of messages to pull (default: 10, must be positive)
Returns
{:ok, [%Google.Pubsub.V1.ReceivedMessage{}]}
Validates a message against an existing schema.
Parameters
schema_name- Full schema resource name or just the schema IDmessage- Message bytes to validateencoding-:jsonor:binary
Validates a message against an inline schema definition.
Parameters
type-:protocol_bufferor:avrodefinition- Schema definition stringmessage- Message bytes to validateencoding-:jsonor:binary
Validates a schema definition.
@spec with_connection( (GRPC.Channel.t() -> term()), keyword() ) :: {:ok, term()} | {:error, PubsubGrpc.Error.t()}
Executes multiple operations using the same connection.
More efficient when performing several operations in sequence.
Examples
result = PubsubGrpc.with_connection(fn channel ->
auth_opts = PubsubGrpc.Auth.request_opts()
# ... multiple operations on channel
end)