• en
  • Language: ru
  • Documentation version: 3

Слои канала

Канальные уровни позволяют вам общаться между различными экземплярами приложения. Они являются полезной частью создания распределенного приложения реального времени, если вы не хотите, чтобы все ваши сообщения или события проходили через базу данных.

Кроме того, их можно использовать в сочетании с рабочим процессом для создания базовой очереди задач или для разгрузки задач - подробнее читайте в Рабочие и фоновые задачи.

Примечание

Слои каналов - это совершенно необязательная часть Каналов. Если вы не хотите их использовать, просто оставьте CHANNEL_LAYERS без значения или установите его в пустой диктант {}.

Предупреждение

Канальные слои имеют чисто асинхронный интерфейс (как для отправки, так и для получения); если вы хотите вызывать их из синхронного кода, вам придется обернуть их в конвертер (см. ниже).

Конфигурация

Канальные слои настраиваются с помощью параметра CHANNEL_LAYERS Django.

Вы можете получить слой канала по умолчанию из проекта с помощью channels.layers.get_channel_layer(), но если вы используете потребителей, то копия автоматически предоставляется вам на потребителе в виде self.channel_layer.

Канальный уровень Redis

channels_redis - это единственный официальный поддерживаемый Django канальный слой, поддерживаемый для производственного использования. Этот слой использует Redis в качестве резервного хранилища и поддерживает как односерверную, так и шардированную конфигурацию, а также поддержку групп. Для использования этого слоя вам необходимо установить пакет channels_redis.

В этом примере Redis работает на localhost (127.0.0.1) порт 6379:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

Канальный уровень в памяти

Channels также поставляется с встроенным в память слоем Channels Layer. Этот слой может быть полезен в целях Тестирование или локальной разработки:

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"
    }
}

Предупреждение

Не использовать в производстве

Канальные уровни in-memory работают с каждым процессом как с отдельным уровнем. Это означает, что межпроцессный обмен сообщениями невозможен. Поскольку основная ценность канальных слоев заключается в обеспечении распределенного обмена сообщениями, использование in-memory приведет к неоптимальной производительности и, в конечном итоге, к потере данных в многоэкземплярной среде.

Синхронные функции

По умолчанию send(), group_send(), group_add() и другие функции являются асинхронными функциями, то есть вы должны await их вызывать. Если вам нужно вызвать их из синхронного кода, вам придется использовать удобную обертку asgiref.sync.async_to_sync:

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.send)("channel_name", {...})

Что отправлять по канальному слою

Канальный уровень предназначен для высокоуровневой связи между приложениями. Когда вы отправляете сообщение, его получают потребители, слушающие группу или канал на другом конце. Это означает, что вы должны отправлять высокоуровневые события через канальный уровень, а затем поручить потребителям обрабатывать эти события и выполнять соответствующее низкоуровневое сетевое взаимодействие со своим подключенным клиентом.

Например, приложение для чата может отправлять события, подобные этому, через канальный уровень:

await self.channel_layer.group_send(
    room.group_name,
    {
        "type": "chat.message",
        "room_id": room_id,
        "username": self.scope["user"].username,
        "message": message,
    }
)

А затем потребители определяют функцию обработки для получения этих событий и превращения их в кадры WebSocket:

async def chat_message(self, event):
    """
    Called when someone has messaged our chat.
    """
    # Send a message down to the client
    await self.send_json(
        {
            "msg_type": settings.MSG_TYPE_MESSAGE,
            "room": event["room_id"],
            "username": event["username"],
            "message": event["message"],
        },
    )

Любой потребитель на основе Channels“ SyncConsumer или AsyncConsumer автоматически предоставит вам атрибуты self.channel_layer и << 3 >>>, которые содержат указатель на экземпляр канального уровня и имя канала, который достигнет потребителя соответственно.

Любое сообщение, отправленное на это имя канала - или на группу, в которую было добавлено имя канала - будет получено потребителем, как событие от подключенного клиента, и отправлено на именованный метод потребителя. Имя метода будет представлять собой type события с периодами, замененными символами подчеркивания - так, например, событие, поступающее через канальный уровень с type в chat.join, будет обработано методом chat_join.

Примечание

Если вы наследуете от дерева классов AsyncConsumer, все ваши обработчики событий, включая обработчики событий на канальном уровне, должны быть асинхронными (async def). Если вы находитесь в дереве классов SyncConsumer, то все они должны быть синхронными (<< 3 >>>).

Одиночные каналы

Каждый экземпляр приложения - например, каждый долго выполняющийся HTTP-запрос или открытый WebSocket - приводит к созданию одного экземпляра Consumer, и если у вас включены канальные уровни, Consumers создадут для себя уникальное имя канала и начнут прослушивать его на предмет событий.

Это означает, что вы можете посылать этим потребителям события извне процесса - от других потребителей, возможно, или от команд управления - и они будут реагировать на них и выполнять код так же, как если бы они посылали события от своего клиентского соединения.

Имя канала доступно потребителю в виде self.channel_name. Вот пример записи имени канала в базу данных при подключении, а затем указание метода-обработчика для событий на нем:

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        # Make a database row with our channel name
        Clients.objects.create(channel_name=self.channel_name)

    def disconnect(self, close_code):
        # Note that in some rare cases (power loss, etc) disconnect may fail
        # to run; this naive example would leave zombie channel names around.
        Clients.objects.filter(channel_name=self.channel_name).delete()

    def chat_message(self, event):
        # Handles the "chat.message" event when it's sent to us.
        self.send(text_data=event["text"])

Обратите внимание, что поскольку вы смешиваете обработку событий от канального уровня и от протокольного соединения, вам нужно убедиться, что ваши имена типов не противоречат друг другу. Рекомендуется использовать префиксные имена типов (как мы сделали здесь chat.), чтобы избежать столкновений.

Чтобы отправить на отдельный канал, просто найдите название канала (для приведенного выше примера мы можем воспользоваться базой данных) и используйте channel_layer.send:

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()
await channel_layer.send("channel_name", {
    "type": "chat.message",
    "text": "Hello there!",
})

Группы

Очевидно, что отправка на отдельные каналы не особенно полезна - в большинстве случаев вы захотите отправить сообщение сразу на несколько каналов/потребителей в виде широковещательной рассылки. Не только для таких случаев, как чат, где вы хотите отправлять входящие сообщения всем присутствующим, но даже для отправки отдельному пользователю, у которого может быть подключено несколько вкладок браузера или устройств.

При желании вы можете создать собственное решение для этого, используя существующие хранилища данных, или воспользоваться системой Groups, встроенной в некоторые канальные слои. Группы - это широковещательная система, которая:

  • Позволяет добавлять и удалять имена каналов из именованных групп, а также отправлять в эти именованные группы.

  • Обеспечивает истечение срока действия группы для очистки соединений, обработчик отключения которых не успел запуститься (например, при отключении питания).

Они не позволяют перечислять или составлять список каналов в группе; это чисто вещательная система. Если вам нужен более точный контроль или необходимо знать, кто подключен, вам следует создать собственную систему или использовать подходящую систему стороннего производителя.

Вы используете группы, добавляя к ним канал при подключении и удаляя его при отключении, как показано здесь на примере общего потребителя WebSocket:

# This example uses WebSocket consumer, which is synchronous, and so
# needs the async channel layer functions to be converted.
from asgiref.sync import async_to_sync

class ChatConsumer(WebsocketConsumer):

    def connect(self):
        async_to_sync(self.channel_layer.group_add)("chat", self.channel_name)

    def disconnect(self, close_code):
        async_to_sync(self.channel_layer.group_discard)("chat", self.channel_name)

Затем, для отправки группе, используйте group_send, как в этом небольшом примере, который транслирует сообщения чата на каждый подключенный сокет в сочетании с кодом выше:

class ChatConsumer(WebsocketConsumer):

    ...

    def receive(self, text_data):
        async_to_sync(self.channel_layer.group_send)(
            "chat",
            {
                "type": "chat.message",
                "text": text_data,
            },
        )

    def chat_message(self, event):
        self.send(text_data=event["text"])

Использование за пределами потребителей

Вы часто захотите отправлять на канальный уровень из-за пределов области действия потребителя, и поэтому у вас не будет self.channel_layer. В этом случае для его получения следует использовать функцию get_channel_layer:

from channels.layers import get_channel_layer
channel_layer = get_channel_layer()

Затем, когда он у вас есть, вы можете просто вызывать методы на нем. Помните, что канальные слои поддерживают только асинхронные методы, поэтому вы можете либо вызывать их из собственного асинхронного контекста:

for chat_name in chats:
    await channel_layer.group_send(
        chat_name,
        {"type": "chat.system_message", "text": announcement_text},
    )

Или вам нужно будет использовать async_to_sync:

from asgiref.sync import async_to_sync

async_to_sync(channel_layer.group_send)("chat", {"type": "chat.force_disconnect"})