Escolha uma Página

Conectar Android App com RabbitMQ

Escrito por Alisson

agosto 22, 2023

Para fazermos a comunicação do nosso aplicativo com o servidor do RabbitMQ, a primeira etapa é adicionar as dependencias necessárias para o nosso projeto.


/** AMQP **/
implementation 'com.rabbitmq:amqp-client:5.7.2'

/** CORE **/
implementation "androidx.core:core-ktx:1.1.0"
implementation "androidx.fragment:fragment-ktx:1.1.0"
implementation "androidx.lifecycle:lifecycle-extensions:2.1.0"
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0"

/** KOIN DI **/
implementation 'org.koin:koin-androidx-viewmodel:2.0.1'

/** TEST **/
testImplementation 'junit:junit:4.12'
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'

Depois de adicionada todas as dependências, temos que utilizar java 8 para compilar nosso projeto, para isso basta colocar o código abaixo no build.gradle da aplicação.


android {
    ...
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

Para terminarmos a configuração inicial necessária para que possamos fazer a comunicação, temos que adicionar a permissão de internet no manifest.


<uses-permission android:name="android.permission.INTERNET"/>

Agora estamos prontos para começar a implementar toda a nossa comunicação.

Primeiro vamos criar a nossa classe de application para iniciar o Koin que é o injetor de dependências e o Módulo que será injetado.


val amqpConfigModule = module {
    single {
        loadAmqpConfig()
    }
}

private fun loadAmqpConfig(
): AmqpConfig = AmqpFactory.create(
    listOf(
        AmqpExchanges.EX_1.exchange,
        AmqpExchanges.EX_2.exchange,
        AmqpExchanges.EX_3.exchange,
        AmqpExchanges.EX_4.exchange
    )
)

class MyApplication:Application() {

    override fun onCreate() {
        super.onCreate()
        configureDI()
    }

    private fun configureDI() {
        startKoin {
            androidLogger(if (BuildConfig.DEBUG) Level.DEBUG else Level.INFO)
            androidContext(this@MyApplication)
            modules(
                listOf(
                    amqpConfigModule
                )
            )
        }
    }
}

Agora que ja temos o nosso Koin configurado vamos criar quatro classes bases, uma que é onde vamos ter todos os dados do nosso servidor, outra de factory do nosso objeto de configuração, outra que é um Enum que vai conter todos os exchanges e a ultina onde vamos ter as constantes dos dados do servidor.

object AmqpFactory {

    fun create(
        listExchange: List<String>
    ) = AmqpConfig(
        Parameters.AMQP_HOST,
        Parameters.AMQP_PORT,
        Parameters.AMQP_USER,
        Parameters.AMQP_PASSWORD,
        Parameters.AMQP_VIRTUALHOST,
        listExchange
    )
}
class AmqpConfig internal constructor(
    val host: String,
    val port: Int,
    val username: String,
    val password: String,
    val virtualHost: String,
    val listExchangeToReceive: List<String>) {

    fun queueName() = "QUEUE_NAME"
}
enum class AmqpExchanges(val exchange: String) {
    EX_1("EX_1"),
    EX_2("EX_2"),
    EX_3("EX_3"),
    EX_4("EX_4");
}
object Parameters {
    const val AMQP_HOST = "HOST"
    const val AMQP_PORT = 5672
    const val AMQP_USER = "USER"
    const val AMQP_PASSWORD = "PASS"
    const val AMQP_VIRTUALHOST = "V-HOST"
}

Agora que temos tudo necessário vamos criar as classes de comunicação com o RabbitMQ.

A primeira classe que vamos criar é que criará a conexão com o servidor.

abstract class ConnectToRabbitMQ(val amqpConfig: AmqpConfig) {

    private var mChannel: Channel? = null
    private var mConnection: Connection? = null

    open fun dispose() {
        try {
            if (mConnection != null) mConnection!!.close()
        } catch (ex: Exception) {
        }
        try {
            if (mChannel != null) mChannel!!.abort()
        } catch (ex: Exception) {
        }
    }

    @Throws(Exception::class)
    open fun connectToRabbitMQ() {
        if (isAlreadyConnected())
            return

        val connectionFactory = ConnectionFactory()
        connectionFactory.host = amqpConfig.host
        connectionFactory.virtualHost = amqpConfig.virtualHost
        connectionFactory.isAutomaticRecovercode snippet widgetyEnabled = true
        connectionFactory.username = amqpConfig.username
        connectionFactory.password = amqpConfig.password

        mConnection = connectionFactory.newConnection("Listen")

        mChannel = mConnection!!.createChannel()
        configure(mChannel!!)
    }

    fun ackMessage(deliveryTag: Long) {
        mChannel?.basicAck(deliveryTag, false)
    }

    fun ackMultiMessage(deliveryTag: Long) {
        mChannel?.basicAck(deliveryTag, true)
    }

    abstract fun configure(channel: Channel)

    private fun isAlreadyConnected() = mChannel != null && mChannel!!.isOpen
}

A segunda classe é a classe responsável por enviar e se registrar no RabbitMq.

class ConsumerAndProducer(
    config: AmqpConfig,
    private val mOnReceiveMessageHandler: OnReceiveMessageHandler
) : ConnectToRabbitMQ(config) {

    companion object {
        internal const val FANOUT = "fanout"
    }

    private var mQueue: String? = null
    private var mDelivery = DeliverCallback { _, delivery ->
        val processMessage =
            mOnReceiveMessageHandler.processMessage(delivery.envelope.exchange, delivery.body)

        if (processMessage)
            ackMessage(delivery.envelope.deliveryTag)
    }

    private var mCancel = CancelCallback {}

    override fun configure(channel: Channel) {
        mQueue = amqpConfig.queueName()
        channel.queueDeclare(mQueue, true, false, false, null)
        channel.basicConsume(mQueue, false, mDelivery, mCancel)
    }

    private fun setupConnectionFactory(): ConnectionFactory {
        val factory = ConnectionFactory()
        factory.isAutomaticRecoveryEnabled = true
        factory.networkRecoveryInterval = 5000
        factory.host = amqpConfig.host
        factory.port = amqpConfig.port
        factory.virtualHost = amqpConfig.virtualHost
        factory.username = amqpConfig.username
        factory.password = amqpConfig.password
        return factory
    }

    @Throws(Exception::class)
    fun publish(exchange: String, message: String) {
        val factory = setupConnectionFactory()
        val connection = factory.newConnection("Send")
        val channel = connection.createChannel()

        channel.exchangeDeclare(exchange, FANOUT, true, false, null)
        channel.basicPublish(exchange, "", null, message.toByteArray())
        channel.close()
        connection.close()
    }

    @Throws(Exception::class)
    fun binding(exchange: String, queue: String) {
        val factory = setupConnectionFactory()
        val connection =
            factory.newConnection("Bind")
        val channel = connection.createChannel()

        channel.exchangeDeclare(
            exchange,
            FANOUT, true, false, null
        )
        channel!!.queueBind(queue, exchange, "")
        channel.close()
        connection.close()
    }
}

Agora precisamos criar uma interface que será utilizada para enviar as mensagens recebidas para o Service do nosso applicativo.

interface OnReceiveMessageHandler {
    fun processMessage(exchange: String, body: ByteArray): Boolean
}

Agora vamos criar o nosso Service que será responsável por manter a nossa comunicação ativa.

class RabbitMqService : Service() {

    companion object {
        private const val TAG = "RabbitMqService"
    }

    private val serviceJob by lazy { Job() }
    private val serviceScope by lazy { CoroutineScope(Dispatchers.IO + serviceJob) }
    private val handler by lazy {
        runBlocking(Dispatchers.Main) { Handler() }
    }

    private var consumerAndProducer: ConsumerAndProducer? = null
    private val lostMessages = LinkedHashMap<String, String>()
    private val amqpConfig: AmqpConfig by inject()
    lateinit var updateCallback: UpdateCallback

    override fun onBind(intent: Intent?) = RabbitMqService.RabbitMqBinder(this)

    override fun onCreate() {
    }

    override fun onDestroy() {
        super.onDestroy()
        serviceJob.cancel()
        consumerAndProducer?.dispose()
        consumerAndProducer = null
    }

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        if (consumerAndProducer == null) {
            init()
        }

        return START_STICKY
    }

    private fun init() {
        Log.d(TAG, "INIT")
        serviceScope.launch {
            try {
                connectAmqp()
                sendLostMessages()
                bindToExchange()
            } catch (ex: Exception) {
                Log.e(TAG, "INIT EXEPTION", ex)
                consumerAndProducer?.dispose()
                handler.postDelayed(Runnable { init() }, 5_000)
            }
        }
    }

    private fun connectAmqp() {
        consumerAndProducer = ConsumerAndProducer(amqpConfig, createOnReceiveMessageHandler())
        consumerAndProducer!!.connectToRabbitMQ()
    }

    private fun createOnReceiveMessageHandler(): OnReceiveMessageHandler {
        return object : OnReceiveMessageHandler {
            override fun processMessage(exchange: String, body: ByteArray): Boolean {
                try {
                    updateCallback.updateView("$exchange -> ${String(body)}")
                    return true
                } catch (e: Exception) {
                    e.printStackTrace()
                }
                return false
            }
        }
    }

    @Throws(Exception::class)
    private fun bindToExchange() {
        for (exchange in amqpConfig.listExchangeToReceive) {
            consumerAndProducer?.binding(exchange, amqpConfig.queueName())
        }
    }

    private fun sendToAmqp(amqpExchanges: AmqpExchanges, json: String) {
        sendLostMessages()
        sendToAmqp(amqpExchanges.exchange, json)
    }

    private fun sendToAmqp(exchange: String, json: String) = runBlocking(Dispatchers.IO) {
        try {
            consumerAndProducer?.publish(exchange, json)
        } catch (e: Exception) {
            Log.e(TAG, "Fail to publish message -> $json", e)
            lostMessages[exchange] = json
        }
    }

    private fun sendLostMessages() {
        if (lostMessages.isEmpty()) return
        lostMessages.forEach {
            sendToAmqp(it.key, it.value)
        }
    }

    class RabbitMqBinder(private val service: RabbitMqService): Binder() {
        fun setCallback(callback: UpdateCallback){
            service.updateCallback = callback
        }
        fun sendMessage(amqpExchanges: AmqpExchanges, json: String){
            service.sendToAmqp(amqpExchanges, json)
        }
    }
}

Com essas classes ja estamos comunicando com o nosso servidor RabbitMQ, mas para vermos funcionando vamos criar uma interface para enviar as mensagens para a activity

interface UpdateCallback {
    fun updateView(message: String)
}

Abaixo está o código da Activity que está se conectando com o service e também o layout da activity.

class MainActivity : AppCompatActivity(), UpdateCallback, ServiceConnection {

    private lateinit var rabbitServiceBinder: RabbitMqService.RabbitMqBinder

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        ContextCompat.startForegroundService(
            this,
            Intent(this, RabbitMqService::class.java)
        )
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
            this.startService(Intent(this, RabbitMqService::class.java))
        }
    }

    override fun onResume() {
        super.onResume()
        bindService(Intent(this, RabbitMqService::class.java), this, Context.BIND_AUTO_CREATE)
    }

    override fun onPause() {
        super.onPause()
        unbindService(this)
    }

    private fun sendMessagesToRabbit() {
        rabbitServiceBinder.sendMessage(AmqpExchanges.EX_1, "message 1")
        rabbitServiceBinder.sendMessage(AmqpExchanges.EX_3, "message 2")
        rabbitServiceBinder.sendMessage(AmqpExchanges.EX_2, "message 3")
        rabbitServiceBinder.sendMessage(AmqpExchanges.EX_4, "message 4")
    }

    override fun updateView(message: String) {
        runOnUiThread {
            var text = messages.text.toString()
            text += "$message \n"
            messages.text = text
        }
    }

    override fun onServiceDisconnected(name: ComponentName?) {}

    override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
        if (service is RabbitMqService.RabbitMqBinder) {
            rabbitServiceBinder = service
            rabbitServiceBinder.setCallback(this)

            sendMessagesToRabbit()
        }
    }
}
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">

    <TextView
        android:id="@+id/messages"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintLeft_toLeftOf="parent"
        app:layout_constraintRight_toRightOf="parent"
        app:layout_constraintTop_toTopOf="parent" />

</androidx.constraintlayout.widget.ConstraintLayout>

Se quiser ver o projeto todo, ele está no meu github nesse link

Você pode gostar…

0 comentários

Enviar um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *