gRPC Firestore `Listen`
- 7 minutes read - 1443 wordsA question on Stack overflow Firestore gRPC Listen does not send deletions piqued by interest but created a second problem for me: I’m unable to get its Listen
method to work using gRPCurl.
For what follows, you will need:
- a Google Project with Firestore enabled and a database (default:
(default)
) containing a Collection (e.g.Dogs
) with at least one Document (e.g.freddie
) googleapis
cloned locally in order to access Protobufs
Summary
client | server | R | See |
---|---|---|---|
gRPCurl |
firestore.googleapis.com:443 |
✅ | link |
Go client | firestore.googleapis.com:443 |
✅ | link |
gRPCurl |
Go server | ✅ | link |
Go client | Go server | ✅ | link |
I subsequently tried this using Postman and it also works (✅).
gRPCurl Listen
against Google endpoint
PROJECT="..."
GOOGLEAPIS="/path/to/googleapis"
PACKAGE_PATH="google/firestore/v1"
PACKAGE_NAME="google.firestore.v1"
SERVICE="Firestore"
METHOD="Listen"
DATABASE="projects/${PROJECT}/databases/(default)"
PARENT="${DATABASE}/documents"
COLLECTION_ID="Dogs"
TOKEN=$(gcloud auth print-access-token)
ENDPOINT="firestore.googleapis.com:443"
DATA="{
\"database\":\"${DATABASE}\",
\"add_target\":{
\"query\":{
\"parent\":\"${PARENT}\",
\"structured_query\":{
\"from\":[
{
\"collection_id\":\"${COLLECTION_ID}\"
}
]
}
}
}
}"
# Expects JSON request data (stream) on stdin
# Unable to get this to work with a heredoc
# Using copy-paste instead
grpcurl \
-H "Authorization: Bearer ${TOKEN}" \
-H "google-cloud-resource-prefix: ${DATABASE}" \
--import-path ${GOOGLEAPIS} \
--proto ${GOOGLEAPIS}/${PACKAGE_PATH}/firestore.proto \
-d @ \
${ENDPOINT} \
${PACKAGE_NAME}.${SERVICE}.${METHOD}
And then pasting the value of ${DATA}
into stdin yields:
{
"documentChange": {
"document": {
"name": "projects/{project}/databases/(default)/documents/Dogs/bo",
"fields": {
"Camp": {
"booleanValue": true
},
"Name`": {
"stringValue": "Bo"
}
},
"createTime": "2025-02-10T19:35:51.131456Z",
"updateTime": "2025-02-10T19:41:42.082855Z"
},
"targetIds": [
1
]
}
}
{
"documentChange": {
"document": {
"name": "projects/{project}/databases/(default)/documents/Dogs/freddie",
"fields": {
"Age": {
"integerValue": "6"
},
"Camp": {
"booleanValue": true
},
"Name": {
"stringValue": "Frederik Jack"
}
},
"createTime": "2025-02-08T00:38:50.523731Z",
"updateTime": "2025-02-10T22:08:42.706849Z"
},
"targetIds": [
1
]
}
}
For what follows, ${DATA}
is ({PROJECT}
redacts Project ID):
{
"database": "projects/{project}/databases/(default)",
"add_target": {
"query": {
"parent": "projects/{project}/databases/(default)/documents",
"structured_query": {
"from": [
{
"collection_id": "Dogs"
}
]
}
}
}
}
Go client Listen
against Google endpoint
I wrote a Golang client using Google’s published Firestore subs (firestorepb
v1.18.0).
go.mod
:
module ...
go 1.23.4
require (
cloud.google.com/go/firestore v1.18.0
google.golang.org/api v0.214.0
google.golang.org/grpc v1.67.3
)
main.go
:
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"sync"
"cloud.google.com/go/firestore/apiv1/firestorepb"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/encoding/protojson"
)
func main() {
endpoint := flag.String("endpoint", "firestore.googleapis.com:443", "gRPC service endpoint (host:port)")
flag.Parse()
project := os.Getenv("PROJECT")
if project == "" {
log.Fatal("Unable to get PROJECT from the environment")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scopes := []string{
"https://www.googleapis.com/auth/cloud-platform",
}
creds, err := google.FindDefaultCredentials(ctx, scopes...)
if err != nil {
log.Fatalf("error obtaining credentials: %v", err)
}
// Debugging
// tok, err := creds.TokenSource.Token()
// if err != nil {
// panic(err)
// }
// log.Println(tok.AccessToken)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}),
}
conn, err := grpc.NewClient(*endpoint, opts...)
if err != nil {
log.Fatalf("error creating client: %v", err)
}
defer conn.Close()
client := firestorepb.NewFirestoreClient(conn)
database := fmt.Sprintf("projects/%s/databases/(default)", project)
parent := fmt.Sprintf("%s/documents", database)
// Metadata required by the service
// If not present, the command errors requesting this value be set
md := metadata.New(map[string]string{
"google-cloud-resource-prefix": database,
})
ctx = metadata.NewOutgoingContext(ctx, md)
listener, err := client.Listen(ctx)
if err != nil {
log.Fatalf("error creating listener: %v", err)
}
defer listener.CloseSend()
rqst := &firestorepb.ListenRequest{
Database: database,
TargetChange: &firestorepb.ListenRequest_AddTarget{
AddTarget: &firestorepb.Target{
TargetType: &firestorepb.Target_Query{
Query: &firestorepb.Target_QueryTarget{
Parent: parent,
QueryType: &firestorepb.Target_QueryTarget_StructuredQuery{
StructuredQuery: &firestorepb.StructuredQuery{
From: []*firestorepb.StructuredQuery_CollectionSelector{
{
CollectionId: "Dogs",
},
},
},
},
},
},
},
},
}
// Validate that the gRPCurl 'DATA' variable matches
b, err := protojson.MarshalOptions{
UseProtoNames: true,
}.Marshal(rqst)
if err == nil {
log.Println(string(b))
}
// Send the request
if err := listener.Send(rqst); err != nil {
log.Fatalf("error sending request: %v", err)
}
// Listen for responses
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
resp, err := listener.Recv()
if err == io.EOF {
log.Println("Stream closed by server")
break
}
if err != nil {
log.Fatalf("error receiving response: %v", err)
}
log.Println(resp)
}
}()
wg.Wait()
}
Yields:
2025/02/10 14:31:25 {"database":"projects/{project}/databases/(default)","add_target":{"query":{"parent":"projects/{project}/databases/(default)/documents","structured_query":{"from":[{"collection_id":"Dogs"}]}}}}
2025/02/10 14:31:26 target_change:{target_change_type:ADD target_ids:1}
2025/02/10 14:31:26 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/bo" fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name`" value:{string_value:"Bo"}} create_time:{seconds:1739216151 nanos:131456000} update_time:{seconds:1739216502 nanos:82855000}} target_ids:1}
2025/02/10 14:31:26 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/freddie" fields:{key:"Age" value:{integer_value:6}} fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name" value:{string_value:"Frederik Jack"}} fields:{key:"Nickname" value:{string_value:"Boober"}} create_time:{seconds:1738975130 nanos:523731000} update_time:{seconds:1739225322 nanos:706849000}} target_ids:1}
2025/02/10 14:31:26 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/kaido" fields:{key:"Age" value:{integer_value:8}} fields:{key:"Name" value:{string_value:"Kaido"}} create_time:{seconds:1738984268 nanos:543266000} update_time:{seconds:1738984268 nanos:543266000}} target_ids:1}
2025/02/10 14:31:26 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/louie" fields:{key:"Age" value:{integer_value:6}} fields:{key:"Camp" value:{boolean_value:false}} fields:{key:"Name" value:{string_value:"Louie"}} create_time:{seconds:1738984352 nanos:3340000} update_time:{seconds:1739037801 nanos:467026000}} target_ids:1}
2025/02/10 14:31:26 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/rocky" fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name" value:{string_value:"Rocky"}} create_time:{seconds:1739216409 nanos:763704000} update_time:{seconds:1739216409 nanos:763704000}} target_ids:1}
2025/02/10 14:31:26 target_change:{target_change_type:CURRENT target_ids:1 resume_token:"\n\t\x08\xe8ι\xa6\x94\xba\x8b\x03" read_time:{seconds:1739226686 nanos:56296000}}
2025/02/10 14:31:26 target_change:{resume_token:"\n\t\x08\xe8ι\xa6\x94\xba\x8b\x03" read_time:{seconds:1739226686 nanos:56296000}}
2025/02/10 14:31:26 filter:{target_id:1 count:5 unchanged_names:{bits:{bitmap:"\\\xf7\xa57$i\xe33#\xdau\x00\x19" padding:3} hash_count:14}}
2025/02/10 14:31:26 target_change:{resume_token:"\n\t\x08\xe4⽦\x94\xba\x8b\x03" read_time:{seconds:1739226686 nanos:124388000}}
I confirmed that the JSON produced by protojson.Marshal
of rqst
matches that used in the gPRPCurl above.
I generated an access token from the code and used this in the gRPCurl example.
gRPCurl Listen
against Go server
I implemented the Listen
method in a Go server:
main.go
:
package main
import (
"log"
"net"
"cloud.google.com/go/firestore/apiv1/firestorepb"
"google.golang.org/grpc"
)
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("[main] failed to listen: %v", err)
}
s := grpc.NewServer()
firestorepb.RegisterFirestoreServer(s, &Server{})
log.Printf("[main] server listening: %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
server.go
:
package main
import (
"fmt"
"log"
"sync"
"time"
"cloud.google.com/go/firestore/apiv1/firestorepb"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
)
// Server only implements `Listen` method in the interface
// var _ firestorepb.FirestoreServer = Server{}
type Server struct {
firestorepb.UnimplementedFirestoreServer
}
// Listen implements firestorepb.FirestoreServer
func (s Server) Listen(stream firestorepb.Firestore_ListenServer) error {
log.Println("[Listen] entered")
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return fmt.Errorf("[Listen] metadata not found")
}
for k, v := range md {
fmt.Printf("[Listen] Key: %s, Values: %v\n", k, v)
}
// Recv method
rqst, err := stream.Recv()
if err != nil {
return fmt.Errorf("[Listen] Recv error: %v", err)
}
// Confirm that this matches that gRPCurl produces
b, err := protojson.MarshalOptions{
UseProtoNames: true,
}.Marshal(rqst)
if err != nil {
panic(err)
}
log.Printf("[Listen] rqst: %s", b)
// Send method
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
targetIDs := []int32{0}
for {
now := time.Now()
ts := timestamppb.New(now)
name := fmt.Sprintf("d-%d", now.Unix())
resp := &firestorepb.ListenResponse{
ResponseType: &firestorepb.ListenResponse_DocumentChange{
DocumentChange: &firestorepb.DocumentChange{
Document: &firestorepb.Document{
Name: name,
CreateTime: ts,
UpdateTime: ts,
},
TargetIds: targetIDs,
},
},
}
if err := stream.Send(resp); err != nil {
log.Printf("[Listen] Send error: %v", err)
}
time.Sleep(5 * time.Second)
}
}()
wg.Wait()
return nil
}
Tailscale is excellent and it provides a mechanism to expose a local TCP endpoint via TLS.
If you don’t use Tailscale, you’ll want to find a solution to this.
Running the above server on localhost:50051
and then:
tailscale funnel --https=443 localhost:50051
I can now gRPCurl after changing ENDPOINT={host}.{tailnet}:443
Not entirely suprisingly, this works:
Resolved method descriptor:
// Listens to changes. This method is only available via gRPC or WebChannel
// (not REST).
rpc Listen ( stream .google.firestore.v1.ListenRequest ) returns ( stream .google.firestore.v1.ListenResponse ) {
option (.google.api.http) = {
post: "/v1/{database=projects/*/databases/*}/documents:listen",
body: "*"
};
}
Request metadata to send:
authorization: Bearer [REDACTED]
google-cloud-resource-prefix: projects/{project}/databases/(default)
Response headers received:
content-type: application/grpc
date: Mon, 10 Feb 2025 22:43:24 GMT
Estimated response size: 49 bytes
Response contents:
{
"documentChange": {
"document": {
"name": "d-1739227404",
"createTime": "2025-02-10T22:43:24.755594883Z",
"updateTime": "2025-02-10T22:43:24.755594883Z"
},
"targetIds": [
0
]
}
}
Estimated response size: 49 bytes
Response contents:
{
"documentChange": {
"document": {
"name": "d-1739227409",
"createTime": "2025-02-10T22:43:29.758840604Z",
"updateTime": "2025-02-10T22:43:29.758840604Z"
},
"targetIds": [
0
]
}
}
Estimated response size: 49 bytes
Response contents:
{
"documentChange": {
"document": {
"name": "d-1739227414",
"createTime": "2025-02-10T22:43:34.758936603Z",
"updateTime": "2025-02-10T22:43:34.758936603Z"
},
"targetIds": [
0
]
}
}
And, I get a bunch of metadata from a known-working client:
[Listen] Key: :authority, Values: [{host}.{tailnet}:443]
[Listen] Key: accept-encoding, Values: [gzip]
[Listen] Key: authorization, Values: [Bearer {REDACTED}]
[Listen] Key: content-type, Values: [application/grpc]
[Listen] Key: google-cloud-resource-prefix, Values: [projects/{project}/databases/(default)]
[Listen] Key: grpc-accept-encoding, Values: [gzip]
[Listen] Key: user-agent, Values: [grpcurl/dev-build (no version set) grpc-go/1.61.0]
[Listen] Key: x-forwarded-for, Values: [100.65.15.44]
[Listen] Key: x-forwarded-host, Values: [{host}.{tailnet}:443]
[Listen] Key: x-forwarded-proto, Values: [https]
NOTE The metadata includes
grpc-access-encoding: gzip
which isn’t present using the Go client.
Leaving a final test
Go client Listen
against Go server
go run ./cmd/client --endpoint="{host}.{tailnet}:443"
Yields:
2025/02/10 14:33:22 {"database":"projects/{project}/databases/(default)","add_target":{"query":{"parent":"projects/{project}/databases/(default)/documents","structured_query":{"from":[{"collection_id":"Dogs"}]}}}}
2025/02/10 14:33:22 target_change:{target_change_type:ADD target_ids:1}
2025/02/10 14:33:22 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/bo" fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name`" value:{string_value:"Bo"}} create_time:{seconds:1739216151 nanos:131456000} update_time:{seconds:1739216502 nanos:82855000}} target_ids:1}
2025/02/10 14:33:22 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/freddie" fields:{key:"Age" value:{integer_value:6}} fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name" value:{string_value:"Frederik Jack"}} fields:{key:"Nickname" value:{string_value:"Boober"}} create_time:{seconds:1738975130 nanos:523731000} update_time:{seconds:1739225322 nanos:706849000}} target_ids:1}
2025/02/10 14:33:22 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/kaido" fields:{key:"Age" value:{integer_value:8}} fields:{key:"Name" value:{string_value:"Kaido"}} create_time:{seconds:1738984268 nanos:543266000} update_time:{seconds:1738984268 nanos:543266000}} target_ids:1}
2025/02/10 14:33:22 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/louie" fields:{key:"Age" value:{integer_value:6}} fields:{key:"Camp" value:{boolean_value:false}} fields:{key:"Name" value:{string_value:"Louie"}} create_time:{seconds:1738984352 nanos:3340000} update_time:{seconds:1739037801 nanos:467026000}} target_ids:1}
2025/02/10 14:33:22 document_change:{document:{name:"projects/{project}/databases/(default)/documents/Dogs/rocky" fields:{key:"Camp" value:{boolean_value:true}} fields:{key:"Name" value:{string_value:"Rocky"}} create_time:{seconds:1739216409 nanos:763704000} update_time:{seconds:1739216409 nanos:763704000}} target_ids:1}
2025/02/10 14:33:22 target_change:{target_change_type:CURRENT target_ids:1 resume_token:"\n\t\x08\xc3\xe1\xe7ݔ\xba\x8b\x03" read_time:{seconds:1739226802 nanos:155715000}}
2025/02/10 14:33:22 target_change:{resume_token:"\n\t\x08\xc3\xe1\xe7ݔ\xba\x8b\x03" read_time:{seconds:1739226802 nanos:155715000}}
2025/02/10 14:33:22 filter:{target_id:1 count:5 unchanged_names:{bits:{bitmap:"\\\xf7\xa57$i\xe33#\xdau\x00\x19" padding:3} hash_count:14}}
2025/02/10 14:33:22 target_change:{resume_token:"\n\t\x08͢\xeaݔ\xba\x8b\x03" read_time:{seconds:1739226802 nanos:196813000}}
And, the metadata:
[Listen] Key: :authority, Values: [{host}.{tailnet}:443]
[Listen] Key: accept-encoding, Values: [gzip]
[Listen] Key: authorization, Values: [Bearer {REDACTED}]
[Listen] Key: content-type, Values: [application/grpc]
[Listen] Key: google-cloud-resource-prefix, Values: [projects/{project}/databases/(default)]
[Listen] Key: user-agent, Values: [grpc-go/1.67.3]
[Listen] Key: x-forwarded-for, Values: [100.65.15.44]
[Listen] Key: x-forwarded-host, Values: [{host}.{tailnet}:443]
[Listen] Key: x-forwarded-proto, Values: [https]