Local service discovery in akka using UDP multicast

Local service discovery in akka using UDP multicast

Imagine a scenario where you have main application and multiple services. Services can be added and removed dynamically. Application need to know about all services available in the network. The services should be able to interact with the application.

We can solve this problem easily if the ip of application is fixed and all services know about that ip. Then service can send discovery request to application ip address and can get back reply. Situation can be difficult if redeploying of application changes his ip. In that case all services need to update the server ip.

Auto discovery come into picture when we try to solve this type of problem. Here we are going to implement simple auto discovery using akka and UDP. Main drawback of using UDP is, it only works in the local network.

UDP can be implemented in unicast, broadcast and multicast mode. Unicast is not a good fit for our problem because we have to know the address of connecting party, in our case we don’t know. In broadcast, service announces his presence in the network, everybody in the network will know it. Service want to tell only application about its presence, not everybody. So broadcast is not a good fit for our solution. In multicast, service announces his presence to a specific group. Only interested parties will know about the service. Here, in our case interested party is application. So UDP multicast is a good fit for our problem.

Let’s start with server implementation. We will do the following:

1) Application will listen to the UDP multicast address, port and join multicast group. 2) When application receives any message, he will know about the ip and port of the sender. Application will respond to the sender.

Here is the code for Application. Note: we are using akka with kotlin, not akka with scala.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package np.com.madanpokharel.discovery.server

import akka.actor.AbstractLoggingActor
import akka.actor.ActorRef
import akka.actor.Props
import akka.io.Udp
import akka.io.UdpMessage
import akka.japi.pf.ReceiveBuilder
import akka.util.ByteString
import java.net.InetAddress
import java.net.InetSocketAddress

class ApplicationActor(private val localInet: InetSocketAddress,
                       private val udpMulticastAddress: InetAddress) : AbstractLoggingActor() {

    companion object {
        fun props(localInet: InetSocketAddress, udpMulticastAddress: InetAddress): Props = Props.create(ApplicationActor::class.java) {
            ApplicationActor(localInet, udpMulticastAddress)
        }
    }

    override fun preStart() {
        super.preStart()
        val manager = Udp.get(context.system).manager
        
        //Need to pass options for Multicasting
        manager.tell(UdpMessage.bind(self, localInet, listOf(InetProtocolFamily(), MulticastGroup(udpMulticastAddress))), self)
    }

    override fun createReceive(): Receive = ReceiveBuilder()
            .match(Udp.Bound::class.java) {
                context.become(ready(sender))
            }
            .build()


    private fun ready(udpConnection: ActorRef): Receive = ReceiveBuilder()
            .match(Udp.Received::class.java) {
                log().info("Received message {} from client: {}", it.data().utf8String(), it.sender())
                udpConnection.tell(UdpMessage.send(ByteString.fromString("ACK_MESSAGE_FROM_SERVER"), it.sender()), self)
            }
            .matchEquals(UdpMessage.unbind()) {
                udpConnection.tell(it, self)
            }
            .match(Udp.Unbound::class.java) {
                context.stop(self)
            }
            .build()
}

class InetProtocolFamily : Inet.DatagramChannelCreator() {
    override fun create(): DatagramChannel {
        return DatagramChannel.open(StandardProtocolFamily.INET) // Using IPV4
    }
}

class MulticastGroup(private val group: InetAddress) : Inet.AbstractSocketOptionV2() {

    override fun afterBind(s: DatagramSocket) {
        try {
            val networkInterface = NetworkInterface.getByInetAddress(InetAddress.getLocalHost())
            s.channel.join(group, networkInterface) //Joining multicast group
        } catch (ex: Exception) {
            ex.printStackTrace()
        }
    }
}

This is the code to run main program

1
2
3
4
5
6
7
8
9
10
fun main() {
    val actorSystem = ActorSystem.create("AkkaUDPDiscoveryServer")
    val config = actorSystem.settings().config().getConfig("service-discovery-application")

    val localIntet = InetSocketAddress(config.getInt("udp-multicast-port"))
    val multicastAddress = InetAddress.getByName(config.getString("udp-multicast-address")!!)

    actorSystem.actorOf(ApplicationActor.props(localIntet, multicastAddress), ApplicationActor::class.java.simpleName)
}

This is the content of application.conf file

1
2
3
4
5
6
7
8
service-discovery-application {
  akka {
    loglevel = "DEBUG"
  }

  udp-multicast-address = "239.255.100.100"
  udp-multicast-port = 9443
}

Let’s go to service implementation. We will implement following for service.

1) Once service started, service will start listening on random UDP port.
2) Service will start sending message to multicast address and port every second.
3) Once it gets the reply from application, it will stop sending message to multicast address.
4) Service will know the address of application, and will get the additional message sent by application.

Here is the code for service actor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package np.com.madanpokharel.discovery.client

import akka.actor.AbstractActorWithTimers
import akka.actor.ActorRef
import akka.actor.Props
import akka.io.Udp
import akka.io.UdpMessage
import akka.japi.pf.ReceiveBuilder
import akka.util.ByteString
import java.net.InetSocketAddress
import java.time.Duration

class ServiceActor(private val localInet: InetSocketAddress,
                   private val remoteInet: InetSocketAddress) : AbstractActorWithTimers() {
    private val log = context.system.log()

    companion object {
        private const val SEND_MESSAGE_UDP = "SEND_MESSAGE_UDP"
        fun props(local: InetSocketAddress, remote: InetSocketAddress): Props = Props.create(ServiceActor::class.java) {
            ServiceActor(local, remote)
        }
    }

    override fun preStart() {
        super.preStart()
        val manager = Udp.get(context.system).manager
        manager.tell(UdpMessage.bind(self, localInet), self)
    }

    override fun createReceive(): Receive = ReceiveBuilder()
            .match(Udp.Bound::class.java) {
                context.become(ready(sender))
                timers().startPeriodicTimer(SEND_MESSAGE_UDP, SendMsg, Duration.ofSeconds(1)) //Starting a timer which sends message every seconds
            }
            .build()


    private fun ready(udpConnection: ActorRef): Receive = ReceiveBuilder()
            .match(Udp.Received::class.java) {
                log.info("Received message {}, from server: {}", it.data().utf8String(), it.sender())
                //We got reply from server, cancel timer
                timers().cancel(SEND_MESSAGE_UDP)

            }
            .match(SendMsg::class.java) {
                log.info("sending discovery message to server")
                udpConnection.tell(UdpMessage.send(ByteString.fromString("CLIENT_DISCOVERY"), remoteInet), self)
            }
            .matchEquals(UdpMessage.unbind()) {
                udpConnection.tell(it, self)
            }
            .match(Udp.Unbound::class.java) {
                context.stop(self)
            }
            .build()

    object SendMsg
}

This is the code for main

1
2
3
4
5
6
7
8
9
10
11
fun main() {
    val actorSystem = ActorSystem.create("AkkaUDPDiscoveryClient")
    val config = actorSystem.settings().config().getConfig("service-discovery-client")
    val remoteInet = InetSocketAddress(
            InetAddress.getByName(config.getString("udp-multicast-address")!!),
            config.getInt("udp-multicast-port")
    )
    val localIntet = InetSocketAddress(0)

    actorSystem.actorOf(ServiceActor.props(localIntet, remoteInet), ServiceActor::class.java.simpleName)
}

This is the content of application.conf file

1
2
3
4
5
6
7
8
service-discovery-service {
  akka {
    loglevel = "DEBUG"
  }

  udp-multicast-address = "239.255.100.100"
  udp-multicast-port = 9443
}

Full source can be found here https://github.com/madansp/akka-service-discovery-udp

comments powered by Disqus