I’ve written the following article before to know how to start using gRPC in devcontainer.
In this article, we’ll learn further to know other types of gRPC functions.
You can clone my GitHub repository and try it yourself.
Imported libraries
These are the libraries used in the following code. This section is omitted in each example.
Server
package server
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log"
"math/rand"
"os"
"path/filepath"
"time"
rpc "apitest/internal/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
Bit64 = 64
DEC = 10
resourcePath = "./internal/server/resources/"
)
Client
package client
import (
"bufio"
"context"
"errors"
"io"
"log"
"math/rand"
"os"
"path/filepath"
"strings"
"time"
rpc "apitest/internal/proto"
"apitest/internal/common"
"google.golang.org/grpc"
)
const (
resourcePath = "./internal/client/resources/"
)
Proto file
syntax = "proto3";
import "google/protobuf/timestamp.proto";
option go_package = "api-test/grpc/apitest";
Unary RPC
We’ll add a SayHello
function that requires a parameter.
Definition in a proto file
Let’s add SayHello
to Middle service. The request/response parameter has respectively only one property.
service Middle {
// Unary RPC
rpc Ping(PingRequest) returns (PingResponse) {}
// Unary RPC
rpc SayHello(HelloRequest) returns (HelloResponse) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloResponse {
string message = 1;
}
Server implementation
What we do is to generate a message with the specified name. But we always have to create a new response instance by using the predefined struct HellowResponse
.
type GrpcCallHandler struct {
rpc.UnimplementedMiddleServer
}
func (s *GrpcCallHandler) SayHello(ctx context.Context, req *rpc.HelloRequest) (*rpc.HelloResponse, error) {
response := rpc.HelloResponse{
Message: fmt.Sprintf("Hello %s", req.GetName()),
}
return &response, nil
}
Set error if it needs to notify an error.
Client implementation
The client-side always has to pass context to call a RPC function. I recommend setting a reasonable timeout for the call, otherwise, gRPC library might tell you the connection lost 15 minutes later in some environment.
type MiddleMan struct {
conn *grpc.ClientConn
}
func NewMiddleMan(conn *grpc.ClientConn) *MiddleMan {
return &MiddleMan{
conn: conn,
}
}
func (m *MiddleMan) Greet(ctx context.Context, name string) {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
client := rpc.NewMiddleClient(m.conn)
res, err := client.SayHello(timeoutCtx, &rpc.HelloRequest{Name: name})
if err != nil {
log.Printf("[ERROR] could not greet: %v\n", err)
return
}
log.Printf("Greeting: %s\n", res.GetMessage())
}
A unary function is a blocking call that can be used in the same way as a normal function call. The only difference is that the return value is always wrapped with a struct. Remember that the request/response parameters are defined in a message keyword in a proto file.
Since the property could be nil, it would be better to use GetXxxx
function instead of accessing Message
property directly.
The Greet
function is called in a main function here.
func main() {
grpcConn, err := grpc.Dial(
serverHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer grpcConn.Close()
middleMan := client.NewMiddleMan(grpcConn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
middleMan.Greet(ctx, "Yuto")
}
Result
Start the server first.
$ make runServer
2023/05/24 05:01:44 start gRPC server
Then start the client in a different terminal.
$ make runClient
2023/05/24 05:02:18 Greeting: Hello Yuto
The expected message is returned.
Server streaming RPC to Download a file
Server streaming RPC can be used if the server needs to send data continuously to a client but a client doesn’t need to send the response. Downloading a file is a good example.
Definition in a proto file
stream
keyword needs to be added to the return value for a server streaming RPC. Other things are the same as Unary RPC.
service Middle {
// Unary RPC
rpc Ping(PingRequest) returns (PingResponse) {}
// Unary RPC
rpc SayHello(HelloRequest) returns (HelloResponse) {}
// Server Streaming RPC
rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
}
message DownloadRequest {
string filename = 1;
}
message DownloadResponse {
string line = 1;
}
To restrict the download path, a client has to send only a filename. The server sends one line by one line, so the response is a single-line string. A client-side needs to concatenate it to generate the completed file context.
Server implementation
To download a file from a fixed folder, an absolute path needs to be generated first. Once a file is opened, it sends the content one line by one line.
const (
resourcePath = "./internal/server/resources/"
)
func (s *GrpcCallHandler) Download(
req *rpc.DownloadRequest,
stream rpc.Middle_DownloadServer,
) error {
absPath, err := filepath.Abs(filepath.Join(resourcePath, req.Filename))
if err != nil {
return fmt.Errorf("failed to get absolute path: %w", err)
}
file, err := os.Open(absPath)
if err != nil {
log.Println(absPath)
return fmt.Errorf("failed to open the file: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stream.Send(&rpc.DownloadResponse{Line: scanner.Text()})
<-time.After(time.Second)
}
return nil
}
As you can see above, an error can be returned in a normal way.
Client implementation
The implementation until the function call is the same as the unary RPC function.
// Download(ctx context.Context, in *DownloadRequest, opts ...grpc.CallOption) (Middle_DownloadClient, error)
func (m *MiddleMan) Download(ctx context.Context, filename string) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
client := rpc.NewMiddleClient(m.conn)
// log.Printf("start receiving a file [%s]", name)
stream, err := client.Download(timeoutCtx, &rpc.DownloadRequest{Filename: filename})
if err != nil {
log.Printf("[ERROR] failed to create a stream for Download: %v\n", err)
return
}
lines := []string{}
for {
res, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
log.Printf("[ERROR] failed to receive data for [%s]: %v\n", filename, err)
break
}
log.Println(res.GetLine())
lines = append(lines, res.GetLine())
}
log.Printf("File content is as follows\n%s\n", strings.Join(lines, "\n"))
}
The return value of the function call is stream instead of a response. Recv()
function needs to be called to get the next response. When the server side finishes the process, io.EOF
is returned to the error variable.
Let’s call the function twice. A filename doesn’t exist in the server.
func main() {
grpcConn, err := grpc.Dial(
serverHost,
grpc.WithTransportCredentials(insecure.NewCredentials())
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer grpcConn.Close()
middleMan := client.NewMiddleMan(grpcConn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go middleMan.Download(ctx, "test_file.txt")
middleMan.Download(ctx, "not_exist.txt")
// exit by ctrl + c
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("exit")
}
Result
Server
$ make runServer
2023/06/03 06:44:00 start gRPC server
2023/06/03 06:44:05 /workspaces/api-test/languages/go/internal/server/resources/not_exist.txt
Client
$ make runClient
2023/06/03 06:44:05 aaaaaa
2023/06/03 06:44:05 [ERROR] failed to receive data for [not_exist.txt]: rpc error: code = Unknown desc = failed to open the file: open /workspaces/api-test/languages/go/internal/server/resources/not_exist.txt: no such file or directory
2023/06/03 06:44:05 File content is as follows
2023/06/03 06:44:06 bbbbbb
2023/06/03 06:44:07 cccccc
2023/06/03 06:44:08 File content is as follows
aaaaaa
bbbbbb
cccccc
The error is returned for a non-existing file. On the other hand, a single line string is returned every second for the existing file.
Client streaming RPC Upload a file
In opposite to server streaming RPC, uploading a file to a server is a good example. A client continuously sends data to the server side without any response from the server.
Definition in a proto file
Add stream
keyword to the request parameter.
service Middle {
// Unary RPC
rpc Ping(PingRequest) returns (PingResponse) {}
// Unary RPC
rpc SayHello(HelloRequest) returns (HelloResponse) {}
// Server Streaming RPC
rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
// Client Streaming RPC
rpc Upload(stream UploadRequest) returns (UploadResponse) {}
}
message UploadRequest {
string filename = 1;
bytes chunk = 2;
}
message UploadResponse {
bool result = 1;
int64 writtenSize = 2;
string message = 3;
}
If it’s a text file, data type can be string but it might be byte coded file. Therefore, we define bytes
here.
Server implementation
The implementation is a bit long due to the error handling.
func (s *GrpcCallHandler) Upload(stream rpc.Middle_UploadServer) error {
writtenSize := 0
var file *os.File
log.Println("Upload was triggered")
for {
res, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return stream.SendAndClose(&rpc.UploadResponse{
Result: true,
WrittenSize: int64(writtenSize),
Message: "COMPLETED",
})
}
return status.Errorf(codes.Unknown, "[ERROR] failed to upload: %w\n", err)
}
if file == nil {
if res.GetFilename() == "" {
return status.Errorf(codes.InvalidArgument, "filename must be specified")
}
absPath, err := filepath.Abs(filepath.Join(resourcePath, "from_client", res.GetFilename()))
if err != nil {
errorMsg := fmt.Sprintf("failed to get absolute path: %v", err)
return status.Errorf(codes.Internal, errorMsg)
}
file, err = os.Create(absPath)
if err != nil {
errorMsg := fmt.Sprintf("failed to create a file: %v", err)
return status.Errorf(codes.PermissionDenied, errorMsg)
}
defer file.Close()
} else {
if len(res.GetChunk()) > 0 {
log.Printf("received: %s\n", string(res.GetChunk()))
length, err := file.Write(res.GetChunk())
if err != nil {
errorMsg := fmt.Sprintf("failed to write chunk: %v", err)
return status.Errorf(codes.Internal, errorMsg)
}
writtenSize += length
}
}
}
}
The important thing here is that an error can’t be returned in a normal way. If an error needs to be returned, status.Errorf()
needs to be used. In other way, we can define the error code and the error text in the response message.
Client implementation
It opens a file and sends the data to the server.
func (m *MiddleMan) Upload(ctx context.Context, filename string) {
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
absPath, err := filepath.Abs(filepath.Join(resourcePath, filename))
if err != nil {
log.Printf("[ERROR] failed to get absolute path: %v", err)
return
}
file, err := os.Open(absPath)
if err != nil {
log.Printf("failed to open the file: %v", err)
return
}
defer file.Close()
client := rpc.NewMiddleClient(m.conn)
stream, err := client.Upload(timeoutCtx)
if err != nil {
log.Printf("[ERROR] failed to create a stream for Upload: %v\n", err)
return
}
log.Printf("start to upload file [%s]\n", filename)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
chunk := scanner.Text()
log.Printf("send chunk: %s", chunk)
if err := stream.Send(&rpc.UploadRequest{Filename: filename, Chunk: scanner.Bytes()}); err != nil {
log.Printf("[ERROR] failed to send data: %v", err)
common.ShowErrorMessageInTrailer(stream)
break
}
<-time.After(time.Second)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Printf("failed to close: %v\n", err)
}
log.Printf("completed to upload file [%s]\nresult: %t\nwrittenSize: %d\nmessage: %s\n",
filename, res.GetResult(), res.GetWrittenSize(), res.GetMessage())
}
Use Send()
method to send data to the server. When all the file contents are sent to the server, we need to close the connection. CloseAndRecv()
should be used to release all the resources.
If an error is met, we need to show the error. The server side returns the error by status.Errorf()
. The client-side can show it in the following way.
package common
import (
"log"
"google.golang.org/grpc"
)
func ShowErrorMessageInTrailer(stream grpc.ClientStream) {
trailer := stream.Trailer()
v, exist := trailer["error"]
if exist { // there is an error
log.Println("Error: ", v)
}
}
Call the Upload
function.
func main() {
grpcConn, err := grpc.Dial(
serverHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer grpcConn.Close()
middleMan := client.NewMiddleMan(grpcConn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
middleMan.Upload(ctx, "data.txt")
// exit by ctrl + c
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("exit")
}
Result A successful case
Server
$ make runServer
2023/06/03 07:12:07 start gRPC server
2023/06/03 07:22:38 Upload was triggered
2023/06/03 07:22:39 received: second line
2023/06/03 07:22:40 received: third line
2023/06/03 07:22:41 received: fourth line
2023/06/03 07:22:42 received: and so on...
2023/06/03 07:22:44 received: end line
Client
$ make runClient
2023/06/03 07:22:38 start to upload file [data.txt]
2023/06/03 07:22:38 send chunk: first line
2023/06/03 07:22:39 send chunk: second line
2023/06/03 07:22:40 send chunk: third line
2023/06/03 07:22:41 send chunk: fourth line
2023/06/03 07:22:42 send chunk: and so on...
2023/06/03 07:22:43 send chunk:
2023/06/03 07:22:44 send chunk: end line
2023/06/03 07:22:45 completed to upload file [data.txt]
result: true
writtenSize: 52
message: COMPLETED
Result An error case
To simulate an error case, set an empty string to Filename on the client side.
for scanner.Scan() {
chunk := scanner.Text()
log.Printf("send chunk: %s", chunk)
// set an empty string to Filename
if err := stream.Send(&rpc.UploadRequest{Filename: "", Chunk: scanner.Bytes()}); err != nil {
log.Printf("[ERROR] failed to send data: %v", err)
The result becomes as follows.
Server
$ make runServer
2023/06/03 07:28:37 start gRPC server
2023/06/03 07:28:41 Upload was triggered
Client
$ make runClient
2023/06/03 07:28:41 start to upload file [data.txt]
2023/06/03 07:28:41 send chunk: first line
2023/06/03 07:28:42 send chunk: second line
2023/06/03 07:28:42 [ERROR] failed to send data: EOF
2023/06/03 07:28:42 failed to close: rpc error: code = InvalidArgument desc = filename must be specified
2023/06/03 07:28:42 completed to upload file [data.txt]
result: false
writtenSize: 0
message:
The error is correctly notified.
Bidirectional streaming RPC
Bidirectional streaming RPC is used when a client and a server need to communicate with each other depending on the response.
Definition in a proto file
Add stream
keyword for both request/response parameters.
service Middle {
// Unary RPC
rpc Ping(PingRequest) returns (PingResponse) {}
// Unary RPC
rpc SayHello(HelloRequest) returns (HelloResponse) {}
// Server Streaming RPC
rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
// Client Streaming RPC
rpc Upload(stream UploadRequest) returns (UploadResponse) {}
// Bidirectional Streaming RPC
rpc Communicate(stream CommunicateRequest)
returns (stream CommunicateResponse) {}
}
message CommunicateRequest {
int64 max = 1;
int64 value = 2;
}
message CommunicateResponse {
int64 currentCount = 1;
int64 value = 2;
}
We will implement a simple logic. A client sends the first number. A server generates a random number and returns the sum of the two numbers. A client adds a random number to the sum and sends it to the server again. It repeats the process max
value’s times.
Server implementation
The server has 3 steps.
- Generate a random number
- Calculate the sum and send it to a client
- Receive a response from a client
func (s *GrpcCallHandler) Communicate(stream rpc.Middle_CommunicateServer) error {
ctx := stream.Context()
res, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Unknown, "[ERROR] failed to communicate: %w\n", err)
}
maxCount := res.GetMax()
if maxCount == 0 {
maxCount = 3
}
receivedValue := res.GetValue()
for currentCount := 0; currentCount < int(maxCount); currentCount++ {
// 1. generate random number
randomValue := rand.Intn(100)
if randomValue >= 80 {
return status.Errorf(codes.Internal, "[ERROR] random value is too big. Value was [%d]", randomValue)
}
// 2. calculate the sum and send it to a client
sum := receivedValue + int64(randomValue)
err = stream.Send(&rpc.CommunicateResponse{
CurrentCount: int64(currentCount),
Value: sum,
})
if err != nil {
return status.Errorf(codes.Unknown, "[ERROR] failed to send: %w\n", err)
}
log.Printf("send value (%d): %d + %d = %d", currentCount+1, receivedValue, randomValue, sum)
// 3. receive a response from a client
select {
case <-ctx.Done():
return status.Error(codes.DeadlineExceeded, "[ERROR] communication ends")
case <-time.After(time.Second):
res, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Unknown, "[ERROR] failed to receive: %w\n", err)
}
receivedValue = res.GetValue()
}
}
log.Println("Communicatiopn ends")
return nil
}
In step 3, it returns an error if a client doesn’t send the response in a second.
Client implementation
The client has 4 steps.
- Initialization
- Receive the sum from the server
- Send the next value
- Close the connection
func (m *MiddleMan) Communicate(ctx context.Context, maxCount int64) {
timeoutCtx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
client := rpc.NewMiddleClient(m.conn)
stream, err := client.Communicate(timeoutCtx)
if err != nil {
log.Printf("[ERROR] failed to create a stream: %v\n", err)
return
}
// 1. Initialization
err = stream.Send(&rpc.CommunicateRequest{
Max: maxCount,
Value: 5,
})
if err != nil {
log.Printf("[ERROR] failed to send initial values:%v", err)
return
}
for count := 0; ; count++ {
// 2. Receive the sum from server
res, err := stream.Recv()
if err != nil {
common.ShowErrorMessageInTrailer(stream) // not show anything
if errors.Is(err, io.EOF) {
break
}
log.Printf("[ERROR] failed to receive: %v\n", err)
return
}
log.Printf("received value (%d): %d", count+1, res.GetValue())
// 3. Send the next value
randomValue := rand.Intn(10)
err = stream.Send(&rpc.CommunicateRequest{
Value: res.GetValue() + int64(randomValue),
})
if err != nil {
if errors.Is(err, io.EOF) {
_, err := stream.Recv()
log.Printf("[ERROR] failed to receive 2: %v\n", err)
common.ShowErrorMessageInTrailer(stream) // not show anything
break
}
// when the connection is lost...
// [ERROR] failed to receive: rpc error: code = Unavailable desc = error reading from server: EOF
log.Printf("[ERROR] failed to send value:%v", err)
return
}
}
// 4. Close the connection
err = stream.CloseSend()
if err != nil {
log.Printf("[ERROR] failed to close and send: %v", err)
} else {
log.Println("Communication ends")
}
}
The error is not shown by ShowErrorMessageInTrailer()
function but it can be handled in a normal way.
The connection might be lost during the communication. In this case, the error isn’t caught by errors.Is(err, io.EOF)
for some reason even though the error shows EOF…
Considering connection lost
We must consider the connection lost. It can take max 15 minutes until gRPC client gives up communicating with a gRPC server when the connection between the server and the client is lost. But it seems to depend on the environment. It’s better to test it first. Even though this application is running in a Docker container, the behavior is different depending on the system. I tested it with Windows and Linux. On Windows, it reports the connection error soon but it took 15 min on Linux. If you have the same problem, it might be worth setting WithKeepaliveParams
in a reasonable duration.
func main() {
grpcConn, err := grpc.Dial(
serverHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
// Add this parameter
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 10,
}),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer grpcConn.Close()
middleMan := client.NewMiddleMan(grpcConn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
middleMan.Communicate(ctx, 4)
// exit by ctrl + c
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("exit")
}
Result A successfull case
Server
$ make runServer
2023/06/03 08:13:24 start gRPC server
2023/06/03 08:13:56 send value (1): 5 + 37 = 42
2023/06/03 08:13:57 send value (2): 44 + 77 = 121
2023/06/03 08:13:58 send value (3): 121 + 70 = 191
2023/06/03 08:13:59 send value (4): 194 + 31 = 225
2023/06/03 08:14:00 Communicatiopn ends
Client
$ make runClient
2023/06/03 08:13:56 received value (1): 42
2023/06/03 08:13:57 received value (2): 121
2023/06/03 08:13:58 received value (3): 191
2023/06/03 08:13:59 received value (4): 225
2023/06/03 08:14:00 Communication ends
Result A error case
Server
$ make runServer
2023/06/03 07:57:35 start gRPC server
2023/06/03 07:57:56 send value (1): 5 + 15 = 20
2023/06/03 07:57:57 send value (2): 26 + 28 = 54
Client
$ make runClient
2023/06/03 07:57:56 received value (1): 20
2023/06/03 07:57:57 received value (2): 54
2023/06/03 07:57:58 [ERROR] failed to receive: rpc error: code = Internal desc = [ERROR] random value is too big. Value was [63]
Comments