We will dig into a streaming gRPC function in this post. Please check the following post too if you haven’t read it.
- Dart How to setup devcontainer for gRPC
- Dart The first gRPC server and client with timestamp
- (this post) Dart Server Streaming gRPC function example to download a file
- Dart Client Streaming gRPC function example to Upload a file
- Dart gRPC Bidirectional streaming example
You can clone my GitHub repository if you want to try it yourself.
How to define streaming RPC in proto file
Let’s define the functions in proto file first. The target function is Download
. To define a streaming function, we need to add stream
keyword to the return data type.
syntax = "proto3";
import "google/protobuf/timestamp.proto";
option go_package = "api-test/grpc/apitest";
service Middle {
// Unary RPC
rpc Ping(PingRequest) returns (PingResponse) {}
// Unary RPC
rpc SayHello(HelloRequest) returns (HelloResponse) {}
// Server Streaming RPC
rpc Download(DownloadRequest) returns (stream DownloadResponse) {}
// Client Streaming RPC
rpc Upload(stream UploadRequest) returns (UploadResponse) {}
// Bidirectional Streaming RPC
rpc Communicate(stream CommunicateRequest)
returns (stream CommunicateResponse) {}
}
message DownloadRequest {
string filename = 1;
}
message DownloadResponse {
string line = 1;
}
The way to define a message for stream is the same as the normal definition.
For a server streaming function, DownloadResponse
contains line property. A client receives it several times.
Server implementation
Let’s implement the function on the server side.
final resourcePath = "/workspaces/api-test/languages/dart/resouces/server/";
@override
Stream<rpc.DownloadResponse> download(
ServiceCall call, rpc.DownloadRequest request) async* {
try {
if (request.filename == "error") {
call.sendTrailers(
status: StatusCode.invalidArgument,
message: "error file can't be specified",
);
return;
}
await Future.delayed(Duration(milliseconds: 500));
final absPath = p.join(resourcePath, request.filename);
final file = File(absPath);
final lines = file
.openRead() //
.transform(utf8.decoder)
.transform(LineSplitter());
print("lines for loop");
await for (final line in lines) {
yield rpc.DownloadResponse()..line = line;
}
print("download process completed");
} on GrpcError catch (e) {
print("caught an GrpcError in download: $e");
} catch (e) {
print("caught an error in download: $e");
} finally {
if (call.isCanceled) {
print("download was cancelled");
}
}
}
Be aware of the difference between the unary function and the streaming function for the return data type and async
keyword. It’s not Future
but Stream
. It’s not async
but async*
with an asterisk.
Stream<rpc.DownloadResponse> download(
ServiceCall call, rpc.DownloadRequest request) async* {
It makes the function Stream
. yield
keyword can be used in the function with it. It sends data to the gRPC client multiple times at the line with yield
.
The first thing is to open the target file. I didn’t find a good way to know the script file path. So I set an absolute path to resourcePath
. Then, split it with a line separator.
final absPath = p.join(resourcePath, request.filename);
final file = File(absPath);
final lines = file
.openRead() //
.transform(utf8.decoder)
.transform(LineSplitter());
The main part of this function is for loop with await keyword. The await keyword affects lines
variable in the for loop. When it reads data, the response is sent to a client.
await for (final line in lines) {
yield rpc.DownloadResponse()..line = line;
}
How to send the error info to the client
Throwing an error doesn’t send the error info to the client in gRPC. We must call call.sendTrailers()
if an error needs to be sent to a client.
import "package:grpc/grpc.dart"; // for StatusCode
call.sendTrailers(
status: StatusCode.invalidArgument,
message: "error file can't be specified",
);
It is called in an error case in the code above and then return it. This can of course be called in catch block after throwing an error from the try block. Then, we can differentiate the error and set the desired status and the message. No error is notified to the client without this function call.
I didn’t call this in catch block so we can check the result later.
How to know the cancellation
It seems that an error is not thrown when the call is canceled.
If we need to do something for the cancellation, it needs to be written in finally block.
try{
// ... do something
} finally {
if (call.isCanceled) {
// cancellation steps here
}
}
Note that the process reaches to await for loop even though it’s canceled by the client.
But why is it possible to skip the following statements without throwing an error…? I assume that the following statements are added to the onDone
function callback in listen()
method when using await for
loop.
lines.listen((event) {},
onError: {},
onDone: {
// the statements might be added here
},
cancelOnError: {
// probably isCanceled it set to true here
call.isCanceled = true;
},
);
Client implementation
The client-side calls the function with the file name which we want to download. It can specify the timeout too. The timeout error occurs if the download doesn’t finish within this specified time.
Future<void> download(String filename, int timeoutMs) async {
print("--- download ---");
try {
final request = rpc.DownloadRequest()..filename = filename;
final responses = client.download(
request,
options: CallOptions(timeout: Duration(milliseconds: timeoutMs)),
);
var count = 0;
var lines = [];
await for (final res in responses) {
print("received(${++count}): ${res.line}");
lines.add(res.line);
}
print("download completed");
print("content: $lines");
} on GrpcError catch (e) {
print("caught an GrpcError: $e");
} catch (e) {
print("caught an error: $e");
}
}
The server side sends data multiple times, the client side needs to wait for the response at await for loop. It breaks the loop once the server cancels the function or completes the process.
We can write on GrpcError catch(e)
if the grpc-related property needs to be read.
Result
I call it thrice with the following parameters.
await handler.download("test_file.txt", 500);
await handler.download("error", 1000);
await handler.download("unknown_file.txt", 1000);
await handler.download("test_file.txt", 1000);
The server side caught an error for the unknown file.
$ make runServer
Server listening on port 8080...
lines for loop
download was cancelled
lines for loop
caught an error in download: PathNotFoundException: Cannot open file, path = '/workspaces/api-test/languages/dart/resouces/server/unknown_file.txt' (OS Error: No such file or directory, errno = 2)
lines for loop
download process completed
The error state is correctly notified for the cancellation but the error is not notified to the client for the unknown file as I mentioned at the end of the server implementation.
$ make runClient
--- download ---
caught an GrpcError: gRPC Error (code: 4, codeName: DEADLINE_EXCEEDED, message: Deadline exceeded, details: null, rawResponse: null, trailers: {})
--- download ---
caught an GrpcError: gRPC Error (code: 3, codeName: INVALID_ARGUMENT, message: error file can't be specified, details: [], rawResponse: null, trailers: {})
--- download ---
download completed
content: []
--- download ---
received(1): aaaaaa
received(2): bbbbbb
received(3): cccccc
download completed
content: [aaaaaa, bbbbbb, cccccc]
Don’t forget to call call.sendTrailers()
in catch block.
Comments