Исходники.Ру - Программирование
Исходники
Статьи
Книги и учебники
Скрипты
Новости RSS
Магазин программиста

Главная » Статьи по программированию » .NET - Все статьи »

Обсудить на форуме Обсудить на форуме

Работа с очередями сообщений. Создание распределенных приложений в .NET
В этой статье описывается доступ к очередям MSMQ для чтения и записи через классы System.Messaging из Microsoft .NET Framework.
Полный исходный код для этой статьи можно скачать с сайта MSDN Online Code Center (EN).

Содержание

Введение

Выполнять задачу асинхронно означает выполнять ее, не дожидаясь результата. Асинхронная обработка позволяет запустить процесс, требующий много времени на выполнение, и продолжить работу, не дожидаясь окончания его работы.

Асинхронная обработка идеальна во многих ситуациях — особенно в интернете, где на обработку запроса может уходить много времени и где пользователь должен видеть мгновенную реакцию приложения. Асинхронно обрабатывая запросы пользователей, система сможет реагировать на них независимо от того, сколько времени занимает их обработка.

Реализовать асинхронную обработку в приложениях Microsoft® .NET можно разными способами. Например, воспользоваться функциями управления ходом выполнения, имеющимися в Microsoft Exchange 2000 или Microsoft BizTalk™ Server. Можно также использовать компоненты COM+ с поддержкой очередей (COM+ Queued Components) через механизм COM Interop или работать непосредственно с Message Queuing (MSMQ). Детальное сравнение этих двух вариантов вы найдете в готовящейся к публикации статье Architectural Topics (EN).

MSMQ, используемый BizTalk и компонентами COM+ Queued Components, входит в состав Microsoft Windows® NT 4.0 Option Pack и серверных версий Microsoft Windows 2000. При работе с Windows NT 4.0 Workstation, Windows 2000 Professional и Windows XP Professional MSMQ можно использовать для доступа к локальным закрытым очередям (local private queues), что я и делаю в этой статье. MSMQ позволяет приложению помещать данные в очередь, где они хранятся, пока приложение (это же или другое) не запросит их. Если вместо синхронной обработки задачи вы помещаете в очередь сообщение, представляющее эту задачу (например, заказ, подлежащий обработке), задержка в реакции вашей основной системы на запрос равна времени, которое необходимо на передачу сообщения в очередь. Помещать сообщения в очередь — равно как получать и обрабатывать их — может любое число различных систем; все это позволяет создавать более масштабируемые приложения.

В этой статье будет показано, как реализовать в приложении асинхронное управление ходом выполнения при помощи классов System.Messaging из .NET Framework.

Как уже говорилось, MSMQ имеется в Windows NT 4.0, Windows 2000 и Windows XP Professional, но в приводимых здесь примерах предполагается, что вы используете Windows 2000 Server с установленным MSMQ. Также предполагается, что Windows 2000 Server и ваш .NET-код работают на одном компьютере. Однако в реальной системе ваш код может взаимодействовать с очередями сообщений, расположенными на отдельном сервере.

Работа с очередями

Прежде чем взаимодействовать с очередью сообщений, вы должны указать ее. Для этого нужен какой-то способ, позволяющий уникально и согласованно описывать очередь в приложениях. Платформа .NET предоставляет три способа доступа к конкретной очереди.

  • Очередь определяется по пути к ней
    Вы задаете имя сервера (или «.», если сервер локальный) и полный путь к очереди вроде <имя_сервера>private$<имя_очереди>,liquidsoapprivate$Orders.
  • Очередь определяется по строке Format Name
    Строка Format Name содержит параметры подключения и путь к очереди (DIRECT=OS:liquidsoapprivate$Orders) или глобально уникальный идентификатор (GUID), определяющий очередь сообщений.
  • Очередь определяется по метке
    Метка очереди (например, My Orders) — это значение, которое можно задать программно или через административный интерфейс MSMQ.
    Использование меток или путей приводит к заметным издержкам, поскольку MSMQ приходится преобразовывать их в строку Format Name (внутренний формат описания очередей, используемый MSMQ). При указании строки Format Name преобразование имен не происходит, т. е. это более эффективный метод определения нужной очереди. Кроме того, это единственный способ ссылки на очередь, которая в данный момент находится в автономном режиме (offline).

Автономное использование очередей сообщений в этой статье не рассматривается. Подробнее о работе с очередями см. раздел документации Platform SDK, посвященный MSMQ.

Указывая очередь по имени, вы должны создать представляющий ее экземпляр объекта System.Messaging.MessageQueue. Следующий код создает новый экземпляр объекта MessageQueue, идентифицируя его по пути к очереди (предполагается, что в начало кода добавлена строка Imports System.Messaging):

Private Const QUEUE_NAME As String = ".\private$\Orders"
Dim msgQ As New MessageQueue(QUEUE_NAME)

См. Example 1 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Заметьте: в пути к очереди используется «.», т. е. дается ссылка на локальную машину, а не на имя реального сервера. Если в MSMQ на вашем компьютере определена закрытая очередь Orders, данный код успешно получит объектную ссылку на нее. В более надежном приложении следует проверять, есть ли такая очередь, и, если нет, автоматически создавать ее (несмотря на то что эта проверка требует достаточно много времени).

Программное создание очередей

Обычно для создания, удаления и просмотра очередей сообщений используют интерфейс MSMQ, но с очередями можно работать и через пространство имен System.Messaging. Класс MessageQueue предоставляет несколько статических методов (которые можно вызывать, не создавая реальный экземпляр класса), позволяющих проверять наличие очереди (Exists), а также создавать ее (Create) и удалять (Delete). С их помощью приложение может проверить наличие очереди и, если этой очереди нет, автоматически создать ее. Все эти действия и выполняет функция GetQ:

Private Function GetQ(ByVal queueName As String) As MessageQueue
    Dim msgQ As MessageQueue
    ' Если очереди нет, создаем ее
    If Not MessageQueue.Exists(queueName) Then
        Try
            ' Создаем очередь сообщений и объект MessageQueue
            msgQ = MessageQueue.Create(queueName)
        Catch CreateException As Exception
            ' Если у кода нет соответствующих разрешений,
            ' при создании очереди может произойти ошибка
            Throw New Exception("Error Creating Queue", CreateException)
        End Try
    Else
        Try
            msgQ = New MessageQueue(queueName)
        Catch GetException As Exception
            Throw New Exception("Error Getting Queue", GetException)
        End Try
    End If
    Return msgQ
End Function

См. GetQ в полном исходном коде (EN) BDAdotNetAsync1.vb.

Такой код упрощает разработку, так как исключает необходимость упреждающего создания очередей, но вы должны понимать, что из-за издержек вызова метода Exists для большей производительности очередь следует создавать программой установки, чтобы приложение могло предполагать, что очередь уже есть. Стоит отметить, что принимающий код тоже должен проверять наличие нужной очереди, поскольку очередь может перестать существовать, если в нее не помещено ни одно сообщение.


Создание и удаление очередей требует соответствующих разрешений защиты, и по умолчанию выполнять эти действия может только создатель/владелец очереди или администратор. При отсутствии у вас нужных разрешений система сгенерирует исключение и выдаст сообщение Access Denied.

Удаление очередей

Еще один статический метод, который будет вам полезен, — Delete. Он принимает путь к очереди (как и два его спутника, Create и Exists), а затем ищет соответствующую очередь и удаляет ее с сервера. Конечно, это достаточно радикальная операция, поскольку она уничтожает все сообщения в очереди и может привести к потере важных данных. Чтобы убедиться в успешном удалении очереди, поместите вызов Delete в код обработки ошибок, аналогичный приведенному ниже.

Try
    ' Удаляем указанную очередь
    MessageQueue.Delete(queuePath)
Catch e As Exception
    ' Отбрасываем исключение вызвавшему
    Throw New Exception("Deletion Failed", e)
End Try

См. Example 2 и DeleteQ в полном исходном коде (EN) BDAdotNetAsync1.vb.

Передача сообщений

Пространство имен System.Messaging предоставляет простой и сложный способы передачи сообщений. Разница между ними — не в самих сообщениях, а в возможностях управления форматом сообщений и их доставкой. Оба способа позволяют отправлять сообщения любых типов; в приводимых здесь примерах передаются String, DataSet и даже класс, объявленный в коде.

Простой способ

Передача сообщения в очередь простым способом требует буквально двух этапов. Во-первых, нужно получить ссылку на соответствующую очередь сообщений (по пути к очереди, строке Format Name или метке), а во-вторых, вызвать метод Send объекта MessageQueue, передав ему посылаемый объект и при необходимости метку сообщения. Вот и все. Формат и параметры отправки сообщения определяются значениями по умолчанию — указывать их не требуется. Следующий код использует процедуру GetQ (подробнее о ней — далее в этой статье) и посылает в локальную закрытую очередь простое строковое сообщение:

Private Const QUEUE_NAME As String = ".\private$\Orders"
Dim msqQ As MessageQueue
Dim msgText As String
' Подготавливаем текстовое сообщение
msgText = String.Format("Sample Message Sent At {0}", DateTime.Now())
Try
    ' Получаем объект MessageQueue очереди
    msqQ = GetQ(QUEUE_NAME)
    ' Отправляем сообщение в очередь
    msqQ.Send(msgText)
Catch e As Exception
    ' Обрабатываем ошибку
    …
End Try

См. Example 3 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Указать метку сообщения можно, включив в вызов метода Send второй параметр:

msgQ.Send(sMessage, "Sample Message")

Просмотреть имеющиеся на компьютере очереди и отдельные сообщения в них можно при помощи MSMQ Explorer (в Windows NT) или оснастки Computer Management (в Windows 2000/XP). Если сообщению присвоена метка, она отображается на правой панели; в ином случае правая панель пуста.

Узел Message Queuing в оснастке Computer Management

Рис. 1. Узел Message Queuing в оснастке Computer Management

Чтобы просмотреть какое-либо сообщение, щелкните его правой кнопкой мыши, выберите Properties и перейдите на вкладку Body. Если сообщение было отправлено простым способом и значения параметров по умолчанию не изменялись, любой переданный объект сериализован в XML-код — по аналогии со следующим примером (и на рис. 2):

<?xml version="1.0"?>
<string>Sample Message Sent At 5/25/2001 3:51:48 PM</string>

Тело сообщения, сериализованное в XML

Рис. 2. Тело сообщения, сериализованное в XML


По умолчанию сообщения отсылаются в XML-формате, но можно выбрать и другой формат. Подробнее о параметрах форматирования, используемых при сериализации сообщения, см. раздел «Сложный способ».

Данный способ позволяет отправить в очередь любой объект, просто передав его методу Send. Любой аргумент, получаемый этим методом, сериализуется в XML (см. предыдущее примечание) и помещается в тело сообщения. MSMQ используется для передачи не только базовых типов данных, таких как String, Integer и Double, но и объектов двух других типов: DataSet и специфичных для приложения классов объектов, создаваемых пользователем. Следующий фрагмент кода отправляет в очередь объекты обоих этих типов, демонстрируя, насколько легко передавать любую информацию через классы System.Messaging.

Чтобы отправить DataSet, создайте его экземпляр и передайте методу Send объекта MessageQueue. Как и в случае со String, объект DataSet будет сериализован в XML и помещен в тело сообщения:

Dim msgQ As MessageQueue
Dim msgText As String
Dim dSet As DataSet
' Внешний код, заполняющий объект DataSet
dSet = GetDataSet()
' Получаем объект MessageQueue для очереди
msgQ = GetQ(QUEUE_NAME)
' Отправляем сообщение с объектом DatSet в очередь с меткой "Order"
msgQ.Send(dSet, "Order")

См. Example 4 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Если в вашей системе есть собственные классы для хранения данных, например класс Order, соответствующие объекты можно передавать в очередь через MSMQ, как в примере с объектом DataSet. Классы .NET Messaging сериализуют экземпляр объекта в XML и добавят его в сообщение. Следующий фрагмент кода создает экземпляр класса Order и задает значения его свойств. Затем методом Send объекта MessageQueue этот код помещает объект в сообщение и добавляет его в очередь. Рассмотрев сложный способ передачи сообщений, мы обсудим, как получить этот или любой другой объект из очереди при работе с входящими сообщениями.

Dim msgQ As MessageQueue
Dim myOrder As New Order()
' Заполняем Order-объект
myOrder.CustomerID = "ALKI"
myOrder.ID = 34
myOrder.ShipDate = DateTime.Now()
' Получаем объект MessageQueue для очереди
msgQ = GetQ(QUEUE_NAME)
' Посылаем в очередь сообщение с Order-объектом
msgQ.Send(myOrder, "Order")

См. Example 5 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Сложный способ

Хотя в большинстве случаев метод Send вполне приемлем, иногда требуется более тонкий контроль за процессом передачи сообщения. Альтернатива методу Send (см. предыдущие примеры) — применение экземпляра класса System.Messaging.Message. Создав объект Message, представляющий отдельное сообщение MSMQ, вы можете задавать параметры форматирования и шифрования, интервалы ожидания, а также значения других свойств, недоступных при использовании простого метода Send.

Одно из основных свойств сообщения, значение которого вы, вероятно, захотите изменить, определяет процесс сериализации передаваемого сообщения. По умолчанию используется форматирующий объект (formatter) XMLMessageFormatter; он сериализует объекты в XML и годится в большинстве ситуаций. Однако существует два других форматирующих объекта: BinaryMessageFormatter и ActiveXMessageFormatter; они сериализуют объекты в двоичный формат, который в отличие от XML неудобочитаем для человека. Форматирующий объект Microsoft ActiveX® применяется при передаче данных примитивных типов и COM-объектов в систему, не поддерживающую .NET, например в приложение, написанное на Microsoft Visual Basic® 6.0; использовать этот форматирующий объект без необходимости не следует. Двоичный форматирующий объект в некоторых случаях работает быстрее, чем применяемый по умолчанию форматирующий XML-объект, и его имеет смысл использовать, если вам очень важна производительность MSMQ. Однако вы должны учитывать издержки замены XML-формата на двоичный: получая более высокую производительность, вы теряете возможность прямого чтения и анализа содержимого сообщения.


Сообщение помещается в очередь и извлекается оттуда только с использованием одного и того же форматирующего объекта.

Следующий фрагмент кода демонстрирует, как передать сообщение сложным способом; при этом вы можете настроить ряд параметров, в том числе форматирования и шифрования, а затем передать сконфигурированный объект Message методу Send объекта MessageQueue.

Dim msgQ As MessageQueue
' Создаем новый объект Message
Dim myMsg As New Message()
' Создаем новый экземпляр класса Order,
' определенного в конце данного файла
Dim myOrder As New Order()
' Заполняем Order-объект
myOrder.CustomerID = "ALKI"
myOrder.ID = 34
myOrder.ShipDate = DateTime.Now()

msgQ = GetQ(QUEUE_NAME)

With myMsg
    .Label = "Order"
    .Formatter = New BinaryMessageFormatter()
    .AppSpecific = 34
    .Priority = MessagePriority.VeryHigh
    .Body = myOrder
End With
' Отправляем объект Message, уже содержащий экземпляр класса Order
msgQ.Send(myMsg)

См. Example 6 в полном исходном коде (EN) BDAdotNetAsync1.vb.

В этом примере вместо форматирующего XML-объекта по умолчанию применяется форматирующий объект BinaryMessageFormatter, поэтому передаваемый класс (Order) должен поддерживать интерфейс ISerializable или иметь атрибут <Serializable()>. В полном коде используется второй вариант (с атрибутом).

Получение сообщений

После того как вы поместили сообщения в очередь, кто-то может захотеть получить их, выбирая данные из очереди в том порядке, в каком они туда добавлялись. Получение сообщения из очереди и работа с содержащимися в нем данными осуществляется в несколько этапов.

  • Во-первых, как и при отправке сообщения, следует получить объект MessageQueue, указывающий на соответствующую очередь:
  • Затем с помощью метода Receive очереди вы должны извлечь из очереди первое сообщение:
  • Если используется форматирующий XML-объект (по умолчанию), определите для форматирующего объекта Message список допустимых типов данных, которые могут содержаться в сообщении. В нашем случае ожидается только один тип — String:
  • Наконец, извлеките свои данные через свойство Body, приводя их к соответствующему типу:

     


    См. Example 7 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Настройка интервала ожидания

Для получения сообщения достаточно только что перечисленных действий, но в таком виде код будет блокирован (или «зависнет») на вызове метода msgQ.Receive до тех пор, пока не появится сообщение. Если в очереди нет сообщений, программа будет неопределенно долго ждать выполнения этой строки. Однако метод Receive принимает в качестве параметра объект System.TimeSpan, позволяя задать время ожидания. Используя один из нескольких конструкторов, объект TimeSpan можно создать прямо в вызове метода Receive:

objMessage = objQ.Receive(New TimeSpan(0, 0, 30))

Эта строка кода задает интервал ожидания в 30 секунд, используя версию конструктора TimeSpan с тремя параметрами (часы, минуты, секунды); при необходимости вы можете указать интервал любой продолжительности. Если вы задаете интервал ожидания в вызове метода Receive, поместите этот вызов следует в процедуру обработки ошибок: если по истечении интервала ожидания не будет принято ни одного сообщения, возникнет ошибка.

Dim myMessage As Message
Try
    myMessage = msgQ.Receive(New TimeSpan(0, 0, 30))
Catch eReceive As MessageQueueException
    Console.WriteLine("{0} ({1})", _
            eReceive.Message, _
            eReceive.MessageQueueErrorCode.ToString)
End Try

См. Example 8 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Применение специфического форматирующего объекта

При использовании форматирующего XML-объекта процесс передачи одинаков независимо от типа данных. Если вы применяете другой форматирующий объект, например BinaryMessageFormatter, то перед извлечением тела сообщения укажите нужный форматирующий объект. Следующий фрагмент кода получает экземпляр класса Order с помощью двоичного форматирующего объекта:

Dim msgQ As MessageQueue
Dim myOrder As Order
' Получаем объект MessageQueue для очереди
msgQ = GetQ(QUEUE_NAME)
' Задаем форматирующий объект BinaryMessageFormatter
msgQ.Formatter = New BinaryMessageFormatter()
' Считываем сообщение из очереди и преобразуем его в Order-объект
myOrder = CType(msgQ.Receive().Body, Order)
' Выполняем требуемые действия над Order-объектом
Console.WriteLine(myOrder.CustomerID)

См. Example 9 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Процедура SendMessage — еще один способ передачи сообщений: задав свойство Formatter объекта MessageQueue, вы можете контролировать сериализацию, применяемую при отправке сообщения простым способом.

Эта статья посвящена асинхронной обработке задач, избавляющей от ожидания их выполнения. Однако метод Receive, встречавшийся в нескольких примерах, — полностью синхронный: коду приходится ожидать, пока метод не получит сообщение из очереди. В наших примерах эта задержка незаметна, но в реальной ситуации размер сообщения может составлять до 4 МБ (особенно при обмене объектами DataSet), и, возможно, вы не захотите дожидаться, когда сообщение будет полностью помещено в пространство памяти вашей программы. Для поддержки сообщений большого размера и сетевых соединений с низкой пропускной способностью объект MessageQueue не только предоставляет метод Receive, но и позволяет получать сообщения асинхронно. Для этого вы делаете все то же, что и прежде, но вместо Receive вызываете метод BeginReceive, при необходимости указывая интервал ожидания. Этот метод запускает процесс получения сообщения, а программа переходит к следующей строке кода:

Private Const QUEUE_NAME As String = "liquidsoap\private$\Orders"
Private WithEvents msgQ As MessageQueue
' Получаем объект MessageQueue для очереди
msgQ = GetQ(QUEUE_NAME)
' Начинаем прослушивать сообщения
msgQ.BeginReceive()

Когда сообщение будет успешно извлечено, сработает событие ReceiveCompleted объекта MessageQueue, и будет вызван ваш обработчик этого события. Полученное сообщение передается обработчику как один из его параметров, и этот объект можно использовать для извлечения реальных данных обычным способом:

Public Sub msgQ_ReceiveCompleted(ByVal sender As Object, _
        ByVal e As ReceiveCompletedEventArgs) _
        Handles msgQ.ReceiveCompleted
    Dim msgText As String
    Dim eMsg As Message
    ' Получаем сообщение из аргументов события (e)
    eMsg = e.Message
    ' Указываем форматирующему объекту, что
    ' мы ожидаем единственный тип данных (string)
    eMsg.Formatter = _
        New XmlMessageFormatter(New System.Type() {GetType(String)})
    ' Получаем тело сообщения и приводим его к типу String
    msgText = CStr(eMsg.Body)
    Console.WriteLine(msgText)
    ' Устанавливаем флаг, чтобы выйти из цикла в AsyncReceive
    msgReceived = True
End Sub

См. Example 10 в полном исходном коде (EN) BDAdotNetAsync1.vb.

Резюме

Один из основных способов реализации в приложении асинхронной обработки — интерфейс Message Queuing (MSMQ). Классы System.Messaging позволяют ставить в очередь сообщения и считывать их оттуда, при этом сообщения могут содержать данные типов, допустимые в .NET или COM. О других способах реализации асинхронной обработки см. готовящуюся к публикации статью Architectural Topics (EN).


Может пригодится:


Автор: Дункан Маккензи
Прочитано: 8097
Рейтинг:
Оценить: 1 2 3 4 5

Комментарии: (0)

Добавить комментарий
Ваше имя*:
Ваш email:
URL Вашего сайта:
Ваш комментарий*:
Код безопастности*:

Рассылка новостей
Рейтинги
© 2007, Программирование Исходники.Ру