top of page
Search
  • Writer's pictureestatkovskii

Uploading a file using Kotlin and gRPC client streaming

Updated: Jun 12, 2023

There are a lot of articles in the internet explaining how to implement file uploading using gRPC in Java. In this post I going to show how to implement the same functionality using Kotlin gRPC implementation.


I going to use IntelliJ IDEA to code this application.


Preparing server


First, lets create blank Kotlin application project. I will use Gradle build system, Java 17 and will name it kotlin-grpc-server.

Next, lets define versions of dependencies we are going to use in file gradle.properties:


kotlin.code.style=official
coroutines_version=1.7.1
grpc_kotlin_version=1.3.0
grpc_version=1.54.0
protobuf_version=3.21.7

Now we are ready to add dependencies into build.gradle.kts file:


val coroutines_version: String by project
val grpc_kotlin_version: String by project
val grpc_version: String by project
val protobuf_version: String by project

plugins {
    kotlin("jvm") version "1.8.21"
    id("com.google.protobuf") version "0.9.3"
    application
}

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version")

    implementation("io.grpc:grpc-kotlin-stub:$grpc_kotlin_version")
    implementation("io.grpc:grpc-protobuf:$grpc_version")
    implementation("com.google.protobuf:protobuf-kotlin:$protobuf_version")
    implementation("com.google.protobuf:protobuf-java-util:$protobuf_version")
    runtimeOnly("io.grpc:grpc-netty:$grpc_version")
    
    testImplementation(kotlin("test"))
}

We also need to add Protobuf Gradle Plugin config in order to configure code generation:


import com.google.protobuf.gradle.*

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:$protobuf_version"
    }
    plugins {
        id("grpc") {
            artifact = "io.grpc:protoc-gen-grpc-java:$grpc_version"
        }
        id("grpckt") {
            artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpc_kotlin_version:jdk8@jar"
        }
    }
    generateProtoTasks {
        ofSourceSet("main").forEach {
            it.plugins {
                id("grpc")
                id("grpckt")
            }
            it.builtins {
                id("kotlin")
            }
        }
    }
}
    

Now let's create new directory proto under src/main and let's add proto file for our gRPC service file-service.proto:


First, let's define Protobuf messages for file transfer:


syntax = "proto3";

option java_multiple_files = true;
option java_package = "ru.statech";

message FileMeta {
  string name = 1;
  string contentType = 2;
}

message FileContent {
  bytes data = 1;
}

message FileUploadRequest {
  oneof payload {
    FileMeta meta = 1;
    FileContent content = 2;
  }
}

enum FileUploadStatus {
  Success = 0;
  Failed = 1;
}

message FileUploadResponse {
  FileUploadStatus status = 1;
}

Our input message FileUploadRequest could contain two types of payload:

  • Metadata, like file name and content type;

  • Part of the content of the file.

Next, let's define or service:


service FileService {
  rpc upload(stream FileUploadRequest) returns(FileUploadResponse);
}
Note: There is stream keyword added before FileUploadRequest parameter. This means client streaming format will be applied for this method.

Now it's time to generate Kotlin classes based on this proto file. We can easily do it using Gradle > Build command:

After build is completed generated files can be found under build > generated > source directrory:

Now it's time to implement our service. Let's create new Kotlin file FileService.kt under main > kotlin directory:

This class will inherit from generated base class:


internal class FileService: FileServiceGrpcKt.FileServiceCoroutineImplBase() {
    override suspend fun upload(requests: Flow<FileUploadRequest>): FileUploadResponse {
        // TODO
    }
}

Now let's implement this method. We going to save incoming file into temporary directory:


internal class FileService: FileServiceGrpcKt.FileServiceCoroutineImplBase() {
    private val _storageDirectory: String = "/tmp/"

    override suspend fun upload(requests: Flow<FileUploadRequest>): FileUploadResponse {
        var writer: OutputStream? = null
        var status: FileUploadStatus;

        try {
            println("Receiving a file...")
            requests.collect { request ->
                if (request.hasMeta()) {
                    writer = openFileWriter(request.meta.name)
                } else {
                    writer?.write(request.content.toByteArray())
                    writer?.flush()
                }
            }
            writer?.close()
            println("File saved!")
            status = FileUploadStatus.Success;
        }
        catch (err: Exception) {
            println("Error occured during saving a file: ${err.message}")
            status = FileUploadStatus.Failed;
        }

        return FileUploadResponse.newBuilder()
            .setStatus(status)
            .build();
    }

    private fun openFileWriter(fileName: String) =
        FileOutputStream(_storageDirectory + fileName)
}

The code uses the method requests.collect to iterate through the sequence of streamed input items. If received item is meta information, we use it to open file writer, otherwise, - appending file content.

Now, let's bootstrap our new service. For this, let's add new Kotlin class AppGrpcServer:

This new class will control starting and stopping of gRPC server.


import io.grpc.Server
import io.grpc.ServerBuilder

class AppGrpcServer(private val port: Int) {
    val server: Server = ServerBuilder
        .forPort(port)
        .addService(FileService())
        .build()

    fun start() {
        server.start()
        println("gRPC server started, listening on $port")
        Runtime.getRuntime().addShutdownHook(
            Thread {
                println("shutting down gRPC server")
                this@AppGrpcServer.stop()
                println("gRPC server shut down")
            }
        )
    }

    fun blockUntilShutdown() {
        server.awaitTermination()
    }

    private fun stop() {
        server.shutdown()
    }
}

To terminate gRPC server when JVM is shutting down the code uses shutdown hook. Since the code in the hook is in outer scope we use this@AppGrpcServer syntax with class label to point it to our class.

Now let's invoke AppGrpcServer class from main function:


fun main() {
    val server = AppGrpcServer(50051)
    server.start()
    server.blockUntilShutdown()
}

Now we can launch the server:


Preparing client


Now let's build client application. To do so we need to make new project kotlin-grpc-client:

Before writing client code we need to add dependencies into build.gradle.kts file the same way we did for server. Create proto directory and copy file-service.proto file from server solution. Generate Kotlin classes with Gradle > Build command.

Now we are ready to implement client. Add new Kotlin class file FileServiceClient.kt into main directory:

Next let's implement our logic:


import com.google.protobuf.ByteString
import io.grpc.ManagedChannel
import kotlinx.coroutines.flow.flow
import ru.statech.*
import java.io.Closeable
import java.io.File
import java.util.concurrent.TimeUnit
import kotlin.io.path.Path

class FileServiceClient(private val channel: ManagedChannel): Closeable {
    private val stub: FileServiceGrpcKt.FileServiceCoroutineStub
            = FileServiceGrpcKt.FileServiceCoroutineStub(channel);

    suspend fun uploadFile(path: String, contentType: String): FileUploadResponse {
        val inputStream = File(path).inputStream()
        val bytes = ByteArray(4096)
        var size: Int
        return stub.upload(flow {
            // First, send file metadata
            val metaRequest = FileUploadRequest.newBuilder()
                .setMeta(FileMeta.newBuilder()
                    .setName(Path(path).fileName.toString())
                    .setContentType(contentType))
                .build()
            emit(metaRequest)

            // Next, upload file content
            while (inputStream.read(bytes).also { size = it } > 0) {
                val uploadRequest: FileUploadRequest = FileUploadRequest.newBuilder()
                    .setContent(FileContent.newBuilder()
                        .setData(ByteString.copyFrom(bytes, 0, size)))
                    .build()
                emit(uploadRequest)
            }
        })
    }

    override fun close() {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS)
    }
}

Class uses FileServiceCoroutineStub as a proxy connected to the gRPC server. In upload method we use this proxy to submit a flow of requests, first with metadata, and then all content splitted into 4 kb batches.

Now let's implement main function:


import io.grpc.ManagedChannelBuilder
import ru.statech.FileServiceClient
import kotlin.io.path.Path
import kotlin.io.path.exists

suspend fun main(args: Array<String>) {
    val filePath = args.firstOrNull()
    if (filePath == null || !Path(filePath).exists()) {
        println("Invalid path")
        return
    }
    println("Uploading $filePath")
    val contentType = args.getOrElse(1, { _ -> "application/octet-stream" })
    val channel = ManagedChannelBuilder.forAddress("localhost", 50051)
        .usePlaintext()
        .build()
    FileServiceClient(channel).use {
        val result = it.uploadFile(filePath, contentType)
        println("Upload completed! Result: ${result.status}")
    }
}

The code accepts file path as first argument of the application. Then the program opens new managed channel and uses it to create an instance of FileServiceClient class. Finally uploadFile method is invoked to initiate uploading process. Once the process is done the programs prints results on the screen.


Running the code

First, let's run start our server, if it's not running yet:

Next, let's launch client app:

The output says the file was uploaded successfully as expected.


You can find source code from this article on my GitHub repository: https://github.com/sta-tech/kotlin-grpc.

114 views0 comments

Comentarios


bottom of page