Para fazermos a comunicação do nosso aplicativo com o servidor do RabbitMQ, a primeira etapa é adicionar as dependencias necessárias para o nosso projeto.
/** AMQP **/
implementation 'com.rabbitmq:amqp-client:5.7.2'
/** CORE **/
implementation "androidx.core:core-ktx:1.1.0"
implementation "androidx.fragment:fragment-ktx:1.1.0"
implementation "androidx.lifecycle:lifecycle-extensions:2.1.0"
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0"
/** KOIN DI **/
implementation 'org.koin:koin-androidx-viewmodel:2.0.1'
/** TEST **/
testImplementation 'junit:junit:4.12'
androidTestImplementation 'androidx.test.ext:junit:1.1.1'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0'
Depois de adicionada todas as dependências, temos que utilizar java 8 para compilar nosso projeto, para isso basta colocar o código abaixo no build.gradle da aplicação.
android {
...
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
Para terminarmos a configuração inicial necessária para que possamos fazer a comunicação, temos que adicionar a permissão de internet no manifest.
<uses-permission android:name="android.permission.INTERNET"/>
Agora estamos prontos para começar a implementar toda a nossa comunicação.
Primeiro vamos criar a nossa classe de application para iniciar o Koin que é o injetor de dependências e o Módulo que será injetado.
val amqpConfigModule = module {
single {
loadAmqpConfig()
}
}
private fun loadAmqpConfig(
): AmqpConfig = AmqpFactory.create(
listOf(
AmqpExchanges.EX_1.exchange,
AmqpExchanges.EX_2.exchange,
AmqpExchanges.EX_3.exchange,
AmqpExchanges.EX_4.exchange
)
)
class MyApplication:Application() {
override fun onCreate() {
super.onCreate()
configureDI()
}
private fun configureDI() {
startKoin {
androidLogger(if (BuildConfig.DEBUG) Level.DEBUG else Level.INFO)
androidContext(this@MyApplication)
modules(
listOf(
amqpConfigModule
)
)
}
}
}
Agora que ja temos o nosso Koin configurado vamos criar quatro classes bases, uma que é onde vamos ter todos os dados do nosso servidor, outra de factory do nosso objeto de configuração, outra que é um Enum que vai conter todos os exchanges e a ultina onde vamos ter as constantes dos dados do servidor.
object AmqpFactory {
fun create(
listExchange: List<String>
) = AmqpConfig(
Parameters.AMQP_HOST,
Parameters.AMQP_PORT,
Parameters.AMQP_USER,
Parameters.AMQP_PASSWORD,
Parameters.AMQP_VIRTUALHOST,
listExchange
)
}
class AmqpConfig internal constructor(
val host: String,
val port: Int,
val username: String,
val password: String,
val virtualHost: String,
val listExchangeToReceive: List<String>) {
fun queueName() = "QUEUE_NAME"
}
enum class AmqpExchanges(val exchange: String) {
EX_1("EX_1"),
EX_2("EX_2"),
EX_3("EX_3"),
EX_4("EX_4");
}
object Parameters {
const val AMQP_HOST = "HOST"
const val AMQP_PORT = 5672
const val AMQP_USER = "USER"
const val AMQP_PASSWORD = "PASS"
const val AMQP_VIRTUALHOST = "V-HOST"
}
Agora que temos tudo necessário vamos criar as classes de comunicação com o RabbitMQ.
A primeira classe que vamos criar é que criará a conexão com o servidor.
abstract class ConnectToRabbitMQ(val amqpConfig: AmqpConfig) {
private var mChannel: Channel? = null
private var mConnection: Connection? = null
open fun dispose() {
try {
if (mConnection != null) mConnection!!.close()
} catch (ex: Exception) {
}
try {
if (mChannel != null) mChannel!!.abort()
} catch (ex: Exception) {
}
}
@Throws(Exception::class)
open fun connectToRabbitMQ() {
if (isAlreadyConnected())
return
val connectionFactory = ConnectionFactory()
connectionFactory.host = amqpConfig.host
connectionFactory.virtualHost = amqpConfig.virtualHost
connectionFactory.isAutomaticRecovercode snippet widgetyEnabled = true
connectionFactory.username = amqpConfig.username
connectionFactory.password = amqpConfig.password
mConnection = connectionFactory.newConnection("Listen")
mChannel = mConnection!!.createChannel()
configure(mChannel!!)
}
fun ackMessage(deliveryTag: Long) {
mChannel?.basicAck(deliveryTag, false)
}
fun ackMultiMessage(deliveryTag: Long) {
mChannel?.basicAck(deliveryTag, true)
}
abstract fun configure(channel: Channel)
private fun isAlreadyConnected() = mChannel != null && mChannel!!.isOpen
}
A segunda classe é a classe responsável por enviar e se registrar no RabbitMq.
class ConsumerAndProducer(
config: AmqpConfig,
private val mOnReceiveMessageHandler: OnReceiveMessageHandler
) : ConnectToRabbitMQ(config) {
companion object {
internal const val FANOUT = "fanout"
}
private var mQueue: String? = null
private var mDelivery = DeliverCallback { _, delivery ->
val processMessage =
mOnReceiveMessageHandler.processMessage(delivery.envelope.exchange, delivery.body)
if (processMessage)
ackMessage(delivery.envelope.deliveryTag)
}
private var mCancel = CancelCallback {}
override fun configure(channel: Channel) {
mQueue = amqpConfig.queueName()
channel.queueDeclare(mQueue, true, false, false, null)
channel.basicConsume(mQueue, false, mDelivery, mCancel)
}
private fun setupConnectionFactory(): ConnectionFactory {
val factory = ConnectionFactory()
factory.isAutomaticRecoveryEnabled = true
factory.networkRecoveryInterval = 5000
factory.host = amqpConfig.host
factory.port = amqpConfig.port
factory.virtualHost = amqpConfig.virtualHost
factory.username = amqpConfig.username
factory.password = amqpConfig.password
return factory
}
@Throws(Exception::class)
fun publish(exchange: String, message: String) {
val factory = setupConnectionFactory()
val connection = factory.newConnection("Send")
val channel = connection.createChannel()
channel.exchangeDeclare(exchange, FANOUT, true, false, null)
channel.basicPublish(exchange, "", null, message.toByteArray())
channel.close()
connection.close()
}
@Throws(Exception::class)
fun binding(exchange: String, queue: String) {
val factory = setupConnectionFactory()
val connection =
factory.newConnection("Bind")
val channel = connection.createChannel()
channel.exchangeDeclare(
exchange,
FANOUT, true, false, null
)
channel!!.queueBind(queue, exchange, "")
channel.close()
connection.close()
}
}
Agora precisamos criar uma interface que será utilizada para enviar as mensagens recebidas para o Service do nosso applicativo.
interface OnReceiveMessageHandler {
fun processMessage(exchange: String, body: ByteArray): Boolean
}
Agora vamos criar o nosso Service que será responsável por manter a nossa comunicação ativa.
class RabbitMqService : Service() {
companion object {
private const val TAG = "RabbitMqService"
}
private val serviceJob by lazy { Job() }
private val serviceScope by lazy { CoroutineScope(Dispatchers.IO + serviceJob) }
private val handler by lazy {
runBlocking(Dispatchers.Main) { Handler() }
}
private var consumerAndProducer: ConsumerAndProducer? = null
private val lostMessages = LinkedHashMap<String, String>()
private val amqpConfig: AmqpConfig by inject()
lateinit var updateCallback: UpdateCallback
override fun onBind(intent: Intent?) = RabbitMqService.RabbitMqBinder(this)
override fun onCreate() {
}
override fun onDestroy() {
super.onDestroy()
serviceJob.cancel()
consumerAndProducer?.dispose()
consumerAndProducer = null
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
if (consumerAndProducer == null) {
init()
}
return START_STICKY
}
private fun init() {
Log.d(TAG, "INIT")
serviceScope.launch {
try {
connectAmqp()
sendLostMessages()
bindToExchange()
} catch (ex: Exception) {
Log.e(TAG, "INIT EXEPTION", ex)
consumerAndProducer?.dispose()
handler.postDelayed(Runnable { init() }, 5_000)
}
}
}
private fun connectAmqp() {
consumerAndProducer = ConsumerAndProducer(amqpConfig, createOnReceiveMessageHandler())
consumerAndProducer!!.connectToRabbitMQ()
}
private fun createOnReceiveMessageHandler(): OnReceiveMessageHandler {
return object : OnReceiveMessageHandler {
override fun processMessage(exchange: String, body: ByteArray): Boolean {
try {
updateCallback.updateView("$exchange -> ${String(body)}")
return true
} catch (e: Exception) {
e.printStackTrace()
}
return false
}
}
}
@Throws(Exception::class)
private fun bindToExchange() {
for (exchange in amqpConfig.listExchangeToReceive) {
consumerAndProducer?.binding(exchange, amqpConfig.queueName())
}
}
private fun sendToAmqp(amqpExchanges: AmqpExchanges, json: String) {
sendLostMessages()
sendToAmqp(amqpExchanges.exchange, json)
}
private fun sendToAmqp(exchange: String, json: String) = runBlocking(Dispatchers.IO) {
try {
consumerAndProducer?.publish(exchange, json)
} catch (e: Exception) {
Log.e(TAG, "Fail to publish message -> $json", e)
lostMessages[exchange] = json
}
}
private fun sendLostMessages() {
if (lostMessages.isEmpty()) return
lostMessages.forEach {
sendToAmqp(it.key, it.value)
}
}
class RabbitMqBinder(private val service: RabbitMqService): Binder() {
fun setCallback(callback: UpdateCallback){
service.updateCallback = callback
}
fun sendMessage(amqpExchanges: AmqpExchanges, json: String){
service.sendToAmqp(amqpExchanges, json)
}
}
}
Com essas classes ja estamos comunicando com o nosso servidor RabbitMQ, mas para vermos funcionando vamos criar uma interface para enviar as mensagens para a activity
interface UpdateCallback {
fun updateView(message: String)
}
Abaixo está o código da Activity que está se conectando com o service e também o layout da activity.
class MainActivity : AppCompatActivity(), UpdateCallback, ServiceConnection {
private lateinit var rabbitServiceBinder: RabbitMqService.RabbitMqBinder
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
ContextCompat.startForegroundService(
this,
Intent(this, RabbitMqService::class.java)
)
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
this.startService(Intent(this, RabbitMqService::class.java))
}
}
override fun onResume() {
super.onResume()
bindService(Intent(this, RabbitMqService::class.java), this, Context.BIND_AUTO_CREATE)
}
override fun onPause() {
super.onPause()
unbindService(this)
}
private fun sendMessagesToRabbit() {
rabbitServiceBinder.sendMessage(AmqpExchanges.EX_1, "message 1")
rabbitServiceBinder.sendMessage(AmqpExchanges.EX_3, "message 2")
rabbitServiceBinder.sendMessage(AmqpExchanges.EX_2, "message 3")
rabbitServiceBinder.sendMessage(AmqpExchanges.EX_4, "message 4")
}
override fun updateView(message: String) {
runOnUiThread {
var text = messages.text.toString()
text += "$message \n"
messages.text = text
}
}
override fun onServiceDisconnected(name: ComponentName?) {}
override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
if (service is RabbitMqService.RabbitMqBinder) {
rabbitServiceBinder = service
rabbitServiceBinder.setCallback(this)
sendMessagesToRabbit()
}
}
}
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<TextView
android:id="@+id/messages"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />
</androidx.constraintlayout.widget.ConstraintLayout>
Se quiser ver o projeto todo, ele está no meu github nesse link
0 comentários