Использование RabbitMQ в разработке мессенджера. Советы программистам

rabbitmg for developing messengers Использование RabbitMQ в разработке мессенджера. Советы программистам
4.8/5 - (6 голоса)

 
В последнее время широко популярными стали системы (приложения) для мобильного мессенджинга. В данный момент уже разработано большое количество приложений-мессенджеров, каждое из которых имеет свои преимущества и недостатки. Примерами таких приложений могут быть Viber, WhatsApp, Telegram и др.

Подобные приложения должны иметь возможность выдерживать большие нагрузки и хорошо масштабироваться, что требует больших усилий при разработке. В связи с этим мы рекомендуем интегрировать в них уже готовые и проверенные платформы для рассылки сообщений между клиентами.

Интеграция с такими платформами позволит вынести решение проблемы увеличения нагрузки и масштабирования на сторону платформы рассылки сообщений и уделить больше внимания разработке нового полезного функционала и преимуществ, которые сделают приложение более привлекательным для клиентов.

Достаточно хорошо для данной задачи подходит брокер сообщений RabbitMQ, который поддерживает различные протоколы пересылки сообщений, такие как AMQP, MQTT, STOMP и т.д.

RabbitMQ имеет большое количество клиентских библиотек, которые позволяют интегрировать его практически с каждой платформой клиентского приложения.

В данной статье мы рассмотрим задачу реализации архитектуры пересылки и обработки сообщений с использованием RabbitMQ.

Для начала хотелось бы прояснить немного принципы работы и термины, специфичные для данного брокера сообщений.
 

Итак, RabbitMQ имеет следующие компоненты:

  1. producer — клиент, который создает сообщения
  2. consumer — клиент, который получает сообщения
  3. queue — неограниченная по размеру очередь, которая хранит сообщения
  4. exchange — компонент, который позволяет переправлять отправляемые в него сообщения на различные очереди.

В общем виде всё взаимодействие разных компонентов в рамках RabbitMQ может иметь примерно следующий вид: producerотправляет некоторое сообщение в exchange, после чего exchange получает сообщение и пересылает его в подписанные на него queue.

При пересылке сообщений в зависимости от типа exchange они могут либо фильтроваться на основании совпадении ключей, с которым queue подписана к exchange, и ключей, содержащихся в сообщений, либо пересылаться во все интересующиеся queue, после чего сonsumer получает сообщения из queue, на которые он подписан.

Exchange может иметь один из нескольких возможных типов, характеризующих его.
 

Типы exchange:

  1. fanout — пересылает отправляемые в него сообщения во все подключённые queue;
  2. direct — пересылает отправляемые в него сообщения в подключенные queue согласно фильтрации с использованием routeId, который может быть одним словом;
  3. topic — пересылает отправляемые в него сообщения в подключенные queue согласно фильтрации с использованием routeId, который может состоять из нескольких слов, что позволяет достичь более гибкой фильтрации.

 
Этих знаний будет достаточно для дальнейшего понимания статьи. Перейдем к освещению архитектуры реализуемой системы.
 
В общем виде разрабатываемая система будет иметь следующую схему.

RabbitMQ messenger
 
Как представлено на схеме, система состоит из  следующих частей:

  1. Брокер сообщений RabbitMQ
  2. Серверное приложение
  3. Клиентские приложения: Android, iOS

Для работы данной архитектуры необходимо наличие уже установленных exchanges и queues, которые создаются при первом старте серверного приложения.
 

Представим список основных exchanges и queues:

  1. “conversation.outgoing” — это  exchange типа fanout, который в нашем случае необходим для приема входящих сообщений от клиентов и пересылки их дальше в очереди системы;
  2. “conversation.incoming” — это exchange типа topic для рассылки уже обработанных сообщений в exchange конкретных пользователей. Тип topic в нашем случае позволяет отправлять пользователям только сообщения, относящиеся к диалогам, в которых они участвуют;
  3. “chat-application-messages” — это queue для обработки входящих сообщений серверным приложением.

 
После инициализации всех используемых exchanges и queues мы соединяем exchangeconversation.outgoing” c queuechat-application-messages” и создаем  серверный обработчик сообщений из queuechat-application-messages”.

Интеграция всех частей системы начинается с функционала регистрации пользователя.

При регистрации пользователя на сервере, используя любой из клиентов (iOS, Android), серверное приложение создаёт в RabbitMQ exchange типа fanout с уникально сгенерированным именем и возвращает имя этого exchange на клиентское приложение, которое совершало регистрацию.

В дальнейшем все авторизирующиеся клиенты пользователя также будут получать имя этого уже существующего exchange.

Клиенты, получив имя exchange данного пользователя, создают в RabbitMQ временную queue, которая существует только во время соединения клиента с брокером RabbitMQ, и связывают её с данным exchange.

Создание одного уникального exchange для каждого пользователя позволяет получать сообщения одновременно на всех клиентах (iOS, Android) пользователя.

После выключения интернета на клиенте или в случае потери соединения с брокером сообщений RabbitMQ, временная queue будет автоматически удалена на сервере самим RabbitMQ, что предовратит лишние отправки сообщений и максимально оптимизирует нашу рассылку. При новом соединении клиента будет создана новая очередь.

После регистрации пользователь готов к отправке и приему сообщений.

Рассылка сообщений происходит в рамках диалогов. Система должна позволять создавать диалоги пользователей как с одним, так и со множеством собеседников.

При создании диалога exchange каждого учатстника соединяется с exchange серверного приложения “conversation.incoming”, при этом используя id диалога в качестве routeId.

Общая последовательность действий при отправке сообщения представлена на изображении.

RabbitMQ delivery scheme

Поясним последовательность действий отправки сообщений. Входящие сообщения поступают в exchange “conversation.outgoing”, который пересылает их в queue “chat-application-messages”. Серверное приложение обрабатывает все сообщения, попадающие в queue “chat-application-messages”.

При проверке сообщения проверяется валидность пользовательского токена, id диалога, возможность данного пользователя отправлять сообщения в данный диалог, и attachment, который должен отправляться вместе с приложением, сообщение соохраняется в базу серверного приложения и затем отправляется в exchange “conversation.incoming” c id диалога в качестве routeId.

Exchange “conversation.incoming” отправляет сообщение во все exchange, которые подписаны с тем же routeId, что и отправляемое сообщение.

Фильтрация рассылаемых сообщений RabbitMQ позволяет нам удобно масштабировать количество собеседников, получающих сообщение, без увеличения времени отправки сообщения.

Exchange конкретного пользователя пересылает сообщение во все очереди, подписанные на данный exchange, то есть во все клиентские приложения.

Таким образом, при разработке приложения-мессенджера задача пересылки сообщений с поддержкой многопользовательских диалогов и пользователей с множеством одновременно включенных клиентов будет решена при использовании практически только одного RabbitMQ, с написанием минимального количества кода в серверном приложении.

 
Клиентское приложение(Android)

Учитывая, что приложения-мессенджеры подразумевают быстрое общение, для клиентского приложения обязательным является возможность получать новые сообщения в любое время и в любом состоянии (включенном или выключенном).

Для бесперерывного получения сообщений в Аndroid приложении мы реализуем фоновый системный сервис, который постоянно проверяет созданную нами очередь на наличие новых сообщений.

Для реализации данного функционала на платформе Аndroid мы используем класс, реализующий стандартный Аndroid IntentService.

При старте сервиса он создает соединение с сервером RabbitMQ, временную очередь, и соединяет её с exchange, полученным с серверного приложения.

 
connection = ChatApplication.getApp().getRabbitMQConnectionFactory().newConnection();

channel = connection.createChannel();

queueName = channel.queueDeclare().getQueue();

channel.queueBind (queueName, ChatApp.getApp().getAppConfig().getExchangeName(), «#»);

 

С текущего момента клиент готов к получению новых сообщений. Код получения и обработки сообщений представлен ниже.

 
QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

 
while (true) {

  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  handleMessage(delivery);

}

 
Метод handleMessage содержит код обработки сообщений, добавления их в локальную базу клиента и отображении их на ui.

 
Для отправки сообщений Аndroid приложение создает новое подключение к броккеру RabbitMQ и отправляет сообщение в формате json в exchange “conversation.outgoing”.

 
Connection connection = СhatApp.getApp().getRabbitMQConnectionFactory().newConnection();

Channel channel = connection.createChannel();

try {

  Map<String, Object> headers = new HashMap<>();

 
  headers.put(ACCESS_TOKEN_HEADER, getPreparedUserToken());

  AMQP.BasicProperties properties = (new AMQP.BasicProperties()).builder().replyTo(MessageService.getTempQueueName()).headers(headers).build();

 
  MessageBean message = new MessageBean();

  message.setTemporaryId(mTemporaryId);

  message.setAttachments(attachments);

  message.setType(type.getType());

 

  Gson gson = new Gson();

  String messageJson = gson.toJson(message);

  channel.basicPublish(OUTGOING_EXCHANGE, routeId, properties, messageJson.getBytes());

 
Таким бразом, брокер сообщений RabbitMQ позволяет разработать эффективный мессенджер, полностью решая вопрос коммуникации с приложением, что позволяет разработчикам уделять больше внимания и времени непосредственно реализации бизнес-логики разрабатываемого решения.

4.8/5 - (6 голоса)
×