Akka Web socket in Kotlin

Akka Web socket in Kotlin

Websocket is the communication protocol for bidirectional communication between the client and server. Mostly it is used in realtime web applications. Akka http has great support for websocket. It uses Akka streams to manage connections. So implementing websocket in akka require knowledge about akka stream. Since websocket is bidirectional communication there are two persistence connection exists.

  1. From client to server.
  2. From server to client.

For the server to client communication akka uses Source and for the client to server it uses Sink. To create websocket connection we need Flow which uses both Source and Sink. Then what is Source , Sink and Flow? In akka streams term Source is the data producer that’s why it is called Source. Sink the data consumer and Flow is the link between producer and consumer. Source has one output but no input, Sink has one input but no output but Flow has both input and output. This is the reason that Flow can connect Source and Sink. Akka team has created nice the documentation about streams. To know in detail you can visit this link Basics and working with Flows.

We will create a simple websocket server using Akka with Kotlin. Our server will keep track of all connected client and replies to client messages.

Let’s start with gardle build file. Add akka-actor, akka-http and akka-stream dependencies.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// File name: gradle.build.kt
plugins {
    kotlin("jvm") version "1.3.70"
}

group = "np.com.madanpokharel"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation(kotlin("stdlib-jdk8"))
}

tasks {
    compileKotlin {
        kotlinOptions.jvmTarget = "1.8"
    }
    compileTestKotlin {
        kotlinOptions.jvmTarget = "1.8"
    }
}

dependencies{
    implementation("com.typesafe.akka","akka-actor_2.13","2.6.3")
    implementation("com.typesafe.akka","akka-stream_2.13","2.6.3")
    implementation("com.typesafe.akka","akka-http_2.13","10.1.11")
}

Lets create a http endpoint called /ws, later we will convert it to websocket endpoint.

Simple http server

This http server listens on port 8080 and displays a message “Hello web socket”. Try to compile and run this program.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import akka.actor.ActorSystem
import akka.http.javadsl.ConnectHttp
import akka.http.javadsl.Http
import akka.http.javadsl.server.Directives.*
import akka.stream.Materializer


fun main() {

        val actorSystem = ActorSystem.create("Akka-Web-Socket")
    
        val materializer = Materializer.createMaterializer(actorSystem)
    
        val http = Http.get(actorSystem)
    
        val routeFlow = path("ws") {
                complete("Hello world")
            }.flow(actorSystem, materializer)
    
        http.bindAndHandle(routeFlow, ConnectHttp.toHost("0.0.0.0", 8080), materializer)
}

Visit http://localhost:8080/ws in web browser, you will see message “Hello world”.

Web Socket

We can convert our web server to accept web socket connections. First we have to define a Flow.
Let’s create a simple websocket flow which welcomes user. Here we use handleWebSocketMessages directive to handle only websocket connection.

1
2
3
4
5
6
7
8
val routeFlow = path("ws") {
        val websocketFlow = Flow.create<Message>()
            .collect(object : JavaPartialFunction<Message, Message>() {
                override fun apply(msg: Message, isCheck: Boolean): Message = TextMessage.create("Welcome ${msg.asTextMessage().strictText}")
            })

        handleWebSocketMessages(websocketFlow)
    }.flow(actorSystem, materializer)

Visit https://dwst.github.io/ and send /connect ws://localhost:8080/ws command. After connection established, try to send your name. You will see greeting message from server.

Handling multiple users with actor

Till now created a simple server which don’t have any knowledge about users. We can handle this using an actor which will keep track of connected and disconnected users.
Let’s create an actor for this purpose.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import akka.actor.AbstractActor
import akka.actor.ActorRef
import akka.actor.Status
import akka.http.javadsl.model.ws.TextMessage
import akka.japi.pf.ReceiveBuilder
import akka.stream.CompletionStrategy

class CommandHandler : AbstractActor() {

    private val clients = mutableMapOf<String, ActorRef>()

    override fun createReceive(): Receive = ReceiveBuilder()
        .match(IncomingMessage::class.java) {

            clients[it.clientId]!!.tell(TextMessage.create("Hello ${it.message.strictText}"), ActorRef.noSender())
        }
        .match(UserConnected::class.java) {

            clients[it.clientId] = it.actorRef
            it.actorRef.tell(TextMessage.create("Welcome to akka Websocket, Your client id is : ${it.clientId}"), self)

        }.match(UserDisconnected::class.java) {

            val actorRef = clients.remove(it.clientId)!!
            // To close the stream, we send Status.Success message to actor
            actorRef.tell(Status.Success(CompletionStrategy.immediately()), ActorRef.noSender())

        }
        .build()

    //We use random UUID for clientId
    data class UserDisconnected(val clientId: String)

    data class UserConnected(val clientId: String, val actorRef: ActorRef)

    data class IncomingMessage(val clientId: String, val message: TextMessage)
}

And here is the websocket flow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
val commandHandlerActor = actorSystem.actorOf(Props.create(CommandHandler::class.java), "CommandHandler")
val routeFlow = path("ws") {
        val clientId = UUID.randomUUID().toString()
        val source = Source.actorRef<Message>(100, OverflowStrategy.dropHead())
            .mapMaterializedValue {
                commandHandlerActor.tell(CommandHandler.UserConnected(clientId, it), ActorRef.noSender())
                NotUsed.getInstance()
            }
        val sink: Sink<Message, NotUsed> = Flow.create<Message>()
            .map {
                commandHandlerActor.tell(
                    CommandHandler.IncomingMessage(clientId, it.asTextMessage()),
                    ActorRef.noSender()
                )
            }
            .to(Sink.onComplete {
                commandHandlerActor.tell(CommandHandler.UserDisconnected(clientId), ActorRef.noSender())
            })

        val websocketFlow = Flow.fromSinkAndSource(sink, source)
        
        handleWebSocketMessages(websocketFlow)

    }.flow(actorSystem, materializer)

This flow supports only Text messages from user. It’s very easy to add support for the binary message.
Please follow official akka documentation for more details.

comments powered by Disqus