This post demonstates gPRC load balancing of Java microservice using nginx proxy. gRPC is well established RPC protocol implemented on top of HTTP/2.
There are 2 primary methods implementing load balancing for gRPC services: server-side and client-side. In this article I plan to focus on first one, but you can get more details about both methods on official gRPC documentation website.
In Proxy load balancing, the client issues RPC calls to the a Load Balancer (LB) proxy. The LB distributes the RPC call to one of the available backend servers that implement the actual logic for serving the call. You can see representation of this flow on picture:
Lets create new Java project using maven build tool. I will name it tasks-server. This Java application will represent gRPC server.
Now lets add all required dependencies and plugins into pom.xml file:
<properties>
...
<grpc.java.version>1.54.0</grpc.java.version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.java.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.java.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
</dependencies>
<build>
<plugins>
...
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.java.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protoSourceRoot>
${basedir}/src/main/proto/
</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Now lets create new directory proto under src/main and add new file tasks.proto into it. This file will define our server interface.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.example.proto";
service MyTasksService {
rpc enqueueTask(stream TaskCommand) returns (EnqueueResult);
}
message TaskCommand {
int32 task_id = 1;
string task_payload = 2;
}
message TaskInstance {
string instance_id = 2;
}
message ErrorDetails {
string message = 1;
}
message TaskEnqueueResult {
int32 task_id = 1;
oneof result {
TaskInstance instance = 2;
ErrorDetails error = 3;
}
}
message EnqueueResult {
repeated TaskEnqueueResult results = 1;
}
The service has single method enqueueTask which accepts stream of TaskCommand objects and returns results in EnqueueResult response message.
Now lets write implementation for the service:
public class MyTasksService extends MyTasksServiceGrpc.MyTasksServiceImplBase {
@Override
public StreamObserver<TaskCommand> enqueueTask(StreamObserver<EnqueueResult> responseObserver) {
return new TaskCommandStreamObserver(responseObserver);
}
}
The method enqueueTask simply delegates stream processing to stream observer implemented by TaskCommandStreamObserver class.
Stream observer simulates some task execution by blocking thread for 100 milliseconds. When the stream finishes it returns result.
public final class TaskCommandStreamObserver implements StreamObserver<TaskCommand> {
private final StreamObserver<EnqueueResult> enqueueResultObserver;
private final List<TaskEnqueueResult> results = new ArrayList<>();
public TaskCommandStreamObserver(StreamObserver<EnqueueResult> enqueueResultObserver) {
this.enqueueResultObserver = enqueueResultObserver;
}
@Override
public void onNext(TaskCommand taskCommand) {
System.out.println("[" + ProcessHandle.current().pid() + "] [" + Thread.currentThread().getId() + "] Running task " + taskCommand.getTaskId());
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
results.add(
TaskEnqueueResult.newBuilder()
.setTaskId(taskCommand.getTaskId())
.setInstance(
TaskInstance.newBuilder()
.setInstanceId(UUID.randomUUID().toString())
.build()
).build()
);
}
@Override
public void onError(Throwable throwable) {
// Do nothing
}
@Override
public void onCompleted() {
System.out.println("[" + ProcessHandle.current().pid() + "] [" + Thread.currentThread().getId() + "] Stream is completed (" + results.size() + " tasks done)");
enqueueResultObserver.onNext(
EnqueueResult.newBuilder()
.addAllResults(results)
.build()
);
enqueueResultObserver.onCompleted();
}
}
Now we need to write a code for initializing our gRPC server. Let implement this code in separated class.
public final class GrpcServer {
private final Server server;
public GrpcServer(int port) {
server = ServerBuilder.forPort(port)
.addService(new MyTasksService())
.build();
}
public void startAndWait() throws IOException, InterruptedException {
this.server.start();
System.out.println("[" + ProcessHandle.current().pid() + "] Started server on port " + server.getPort());
this.server.awaitTermination();
}
}
The method startAndWait starts the server on specified port and waits for process termination.
The final part of the server application is to write main method.
public class Main {
public static void main(String[] args) {
int port = args.length > 0 ? Integer.parseInt(args[0]) : 6100;
var server = new GrpcServer(port);
try {
server.startAndWait();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Now lets create gRPC client application. I will create new maven Java project and will name it tasks-client. We need to populate the same dependencies and plugins as in server application. In addition lets copy tasks.proto file from server into src/main/proto directory in order to reuse gRPC service definition.
In order to simulate parallel gRPC requests the client application will use multiple threads. The logic of sending tasks to server is encapsulated in MyJob class.
public class MyJob implements Runnable {
private Queue<TaskCommand> tasks;
private MyTasksServiceGrpc.MyTasksServiceStub stub;
private CountDownLatch latch;
public MyJob(
Queue<TaskCommand> tasks,
MyTasksServiceGrpc.MyTasksServiceStub stub,
CountDownLatch latch
) {
this.tasks = tasks;
this.stub = stub;
this.latch = latch;
}
@Override
public void run() {
try {
do {
var task = tasks.poll();
if (task == null) {
continue;
}
var streamLatch = new CountDownLatch(1);
var stream = stub
.withDeadlineAfter(10, TimeUnit.SECONDS)
.enqueueTask(new TaskEnqueueStreamObserver(streamLatch));
int counter = 0;
while (task != null) {
stream.onNext(task);
if (++counter >= 10) {
break;
}
task = tasks.poll();
}
stream.onCompleted();
streamLatch.await();
} while (!latch.await(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
The run() method checks if new tasks are available in queue and streams them in batches of 10 items.
The code uses stream observer class TaskEnqueueStreamObserver for receiving server result asyncronously.
public class TaskEnqueueStreamObserver implements StreamObserver<EnqueueResult> {
private CountDownLatch latch;
public TaskEnqueueStreamObserver(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void onNext(EnqueueResult taskEnqueue) {
System.out.println("Enqueued " + taskEnqueue.getResultsCount() + " tasks");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Unexpected error: " + throwable.getCause().toString());
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
}
The job submits one stream of 10 items at a time. It uses streamLatch to wait stream processing to be completed on server side.
The final code which bring all pieces together is main method. It opens new connection to server on port 8200 and starts submitting server requests using 3 workers.
public class Main {
public static void main(String[] args) {
var channel = ManagedChannelBuilder.forAddress("localhost", 8200)
.usePlaintext()
.build();
MyTasksServiceGrpc.MyTasksServiceStub stub = MyTasksServiceGrpc.newStub(channel);
var tasks = new ConcurrentLinkedQueue<TaskCommand>();
var stopWorkersLatch = new CountDownLatch(1);
var worker1 = new Thread(new MyJob(tasks, stub, stopWorkersLatch));
var worker2 = new Thread(new MyJob(tasks, stub, stopWorkersLatch));
var worker3 = new Thread(new MyJob(tasks, stub, stopWorkersLatch));
System.out.println("Created workers");
worker1.start();
worker2.start();
worker3.start();
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 50; j++) {
var task = TaskCommand.newBuilder()
.setTaskId(i * 10 + j)
.build();
tasks.add(task);
}
System.out.println("Added batch of tasks. Sleeping...");
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
}
System.out.println("Stopping workers");
stopWorkersLatch.countDown();
try {
worker1.join();
worker2.join();
worker3.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Now it's time to setup load balancer. I will use popular nginx webserver with can be easily configured to act as gRPC proxy.
The simpliest way to launch new nginx server is using Docker. Lets define simple nginx container in file docker-compose.yaml:
version: "3"
services:
nginx:
image: nginx:1.15-alpine
volumes:
- ./conf:/etc/nginx/conf.d
ports:
- 8200:8200
It will use nginx:1.15-alpine docker image as a base with custom webserver configuration defined in subdirectory conf. Besically inside conf subdirectory we need to create just one file - default.conf.
upstream taskservers {
server host.docker.internal:5200;
server host.docker.internal:6200;
}
server {
listen 8200 http2;
location / {
grpc_pass grpc://taskservers;
}
}
Lets assume there will be 2 gRPC server applications running on ports 5200 and 6200. Our proxy will run itself on port 8200 distributing requests anong those servers.
Now lets test our solution. First, lets start 2 gRPC server applications on ports 5200 and 6200:
java -jar ~/CODE/tasks-server/target/tasks-server-1.0-SNAPSHOT-jar-with-dependencies.jar <port>
Next lets launch nginx:
docker compose
And finally we can launch the client application:
java -jar ~/CODE/tasks-client/target/tasks-client-1.0-SNAPSHOT-jar-with-dependencies.jar
Here is small screencast video demostrating all applications running. As you may notice terminating one of the servers lead to no distuptions on client side. The proxy redirected all traffic to single alive server.
Thank you for reading this blog post!
Comments