Combining LINQ with System.Messaging

Combining LINQ with System.Messaging

Combining LINQ with System.Messaging

Patrick Toolis

1. LINQ Intro

Language-integrated query (LINQ) functionality provides a way to embed SQL-like query statements into code which iterates over enumerable data. This can be data stored in standard programming data structures like arrays, or it can be data culled from external sources such as XML files or relational databases. The library System.Linq provides an extension to C# and Visual Basic which allows for traditional query syntax to be used in source code. This architecture provides some safety advantages over embedding queries as strings to be interpreted by some other library (like an ODBC driver), as it allows for type-checking to be carried out on the query contents. For example

String query = "select customer from customers where name = 'fred'"

means that the contents of the query are simply a string to the compiler, whereas in

var query = from c in customers where c.name = "fred" select c;

c is an object of type Customer and c.name will be type-checked to be the same type as "fred".

The collection of objects to be queried via LINQ must be a class that implements IEnumerable, the generic IEnumerable(T), IQueryable, or IQueryable(T). Classes which implement IEnumerable include Array and severable members of the System.Collections namespace like ArrayList.

LINQ provides further flexibility to developers by enabling a purely functional programming paradigm. This is done by allowing lambda expressions to be passed to functions which take delegates as parameters. "Lambda expressions", which are named after the "Lambda Calculus" invented by Alonzo Church and Steven Cole Kleene in the 1930s, are basically single constructs for function declaration and implementation without types. These expressions combine the instantiation of the delegate and the implementation of the function in a single expression. The syntax essentially is tantamount to inline functions. In addition, the compiler performs type inference on lambda expressions, allowing programmers to use syntax comparable to that employed in historically functional languages like LISP or ML.

The above query could be rewritten as:

var cusQuery = customers.Where(c=>c.name=="Fred");

2. LINQ Applied to MSMQ

In any scenario involving the sorting of data which is to be sent to or received from MSMQ, LINQ provides a means to simplify program logic. One example would be the code below, which sends a set of messages to a primary queue "main queue", and then sends only those messages with "to_be_filtered" in the body to the secondary "filter_queue".

using System;

using System.Messaging;

using System.Linq;

public class LINQMSMQTest{

private MessageQueue createMainQueue(){

string qFormatName = ".\\private$\\MainQueue";

MessageQueue q;

if(!MessageQueue.Exists(qFormatName)){

MessageQueue.Create(qFormatName, false);

}

q = new MessageQueue(qFormatName);

return q;

}

private MessageQueue createFilterQueue(){

string filtQFN = ".\\private$\\FilterQueue";

MessageQueue filtQ;

if(!MessageQueue.Exists(filtQFN)){

MessageQueue.Create(filtQFN, false);

}

filtQ = new MessageQueue(filtQFN);

return filtQ;

}

private void sendMessagesToMain(MessageQueue q, string[] messages, string[] labels){

for(int i=0;i<messages.Length; i++){

q.Send(messages[i], labels[i]);

}

}

private void sendMessagesToFilter(MessageQueue fq, string[] messages){

var messQuery = from mess in messages where mess.Contains("to_be_filtered") select mess;

int counter = 1;

foreach (string mess in messQuery){

fq.Send(mess, "filter query result " + counter);

counter ++;

}

}

public static void Main(string[] args){

Console.WriteLine("Test of sys messaging start");

LINQMSMQTest lmt = new LINQMSMQTest();

MessageQueue q = lmt.createMainQueue();

string[] messages = {"messages are stylish", "to_be_filtered in autumn is ambient", "fickle indices should not be involved in gensaki transactions"};

string[] labels = {"label ichi", "label dva", "label tres"};

lmt.sendMessagesToMain(q, messages, labels);

MessageQueue filtQ = lmt.createFilterQueue();

lmt.sendMessagesToFilter(filtQ, messages);

}

}

The syntax of the sendMessagesToFilter method replaces a for loop which would iterate through all of the messages in the array and send only those which contain "to_be_filtered" to the filter queue. Note also that there is no need to allocate memory for this subset IEnumerable, as only the reference messQuery is explicitly allocated by the programmer. The power of this language feature can be seen more clearly, if the processing of the query results is more involved. In this case, we would have a separate method, which could take the query results as input. The altered portions of the code would be:

using System.Collections.Generic;

private void sendMessagesToFilter(MessageQueue fq, string[] messages){

IEnumerable<string> messQuery = from mess in messages where mess.Contains("to_be_filtered") select mess;

sendMess(messQuery, fq);

}

private void sendMess(IEnumerable<string> queryResults, MessageQueue fq){

int counter = 1;

foreach (string mess in queryResults){

fq.Send(mess, "filter query result " + counter);

counter ++;

}

}

Of course, MSMQ objects like queues and messages can be iterated over just as strings were traversed above. In addition, queries are not limited to simple "select from where" sentences. The example below joins result sets of two different queries on two different queues using the message label as the field to join on. In summary, the code does the following:

  1. If the queues are not already in existence on the Queue Manager, create three of them.
  2. Send three messages to queues q1 and q2.
  3. Run a query separately on q1 and q2 to extract all messages which arrived after a certain time into separate IEnumerable result sets.
  4. Join the result sets of the queries above, taking all pairs of messages with the same label and selecting the body of each message in the pair. This allows the creation of one new message for each distinct pair in which the two messages have the same label.
  5. Send these messages to q3. So if q1 contained m messages with a label l which arrived after the appropriate time and q2 contained n messages with label l which arrived after the set time, q3 would contain m*n messages (see photos 1,2, and 3 where q1 and q2 each contained 2 messages with the label "l2" which arrived after 2:43). Each message would contain a body consisting of the body of a q1 message and the body of a q2 message (see photo 4, which displays the body tab of the message properties).

using System;

using System.Messaging;

using System.Linq;

using System.Collections.Generic;

public class MessArriveTest{

private IEnumerable<Message> messQuery1;

private IEnumerable<Message> messQuery2;

MessageQueue q1, q2, q3;

public static void Main(string[] args){

MessArriveTest mt = new MessArriveTest();

mt.createQueues();

mt.sendToSourceQueues();

mt.queryAndSendToOutput();

}

public void createQueues(){

string q1name = ".\\private$\\q1";

string q2name = ".\\private$\\q2";

string q3name = ".\\private$\\q3";

if(!MessageQueue.Exists(q1name)){

MessageQueue.Create(q1name);

}

if(!MessageQueue.Exists(q2name)){

MessageQueue.Create(q2name);

}

if(!MessageQueue.Exists(q3name)){

MessageQueue.Create(q3name);

}

q1 = new MessageQueue(q1name);

q1.MessageReadPropertyFilter.SetAll();

q2 = new MessageQueue(q2name);

q2.MessageReadPropertyFilter.SetAll();

q3 = new MessageQueue(q3name);

q3.MessageReadPropertyFilter.SetAll();

}

public void sendToSourceQueues(){

string[] labels1 = {"l1", "l2", "l3"};

string[] bodies1 = {"body 1", "body 2", "body 3"};

string[] bodies2 = {"body ichi", "body ni", "body san"};

string[] labels2 = {"l4", "l2", "l5"};

for(int i=0; i<labels1.Length; i++){

q1.Send(bodies1[i], labels1[i]);

q2.Send(bodies2[i], labels2[i]);

}

}

public void queryAndSendToOutput(){

querySourceQueues();

joinSourceQueriesAndSendToOutput();

}

//get messages which arrived after 12/5/2007 2:43 PM

private void querySourceQueues(){

Message[] receivedQ1 = q1.GetAllMessages();

Message[] receivedQ2 = q2.GetAllMessages();

DateTime threshold1 = new DateTime(2007, 12, 5, 14, 43, 0 );

Console.WriteLine("threshold1 is " + threshold1.ToString());

DateTime threshold2 = new DateTime(2007, 12, 5, 14, 43, 0);

messQuery1 = from rec in receivedQ1 where rec.ArrivedTime >threshold1 select rec;

messQuery2 = from rec in receivedQ2 where rec.ArrivedTime>threshold2 select rec;

}

private void joinSourceQueriesAndSendToOutput(){

var messQuery3 = from rec1 in messQuery1

join rec2 in messQuery2 on rec1.Label.ToString() equals rec2.Label.ToString()

select new {l = rec1.Label, b1 = rec1.Body, b2 =rec2.Body};

int counter = 1;

foreach (var m in messQuery3){

string body = m.b1.ToString() + m.b2.ToString();

Console.WriteLine("body: " + body );

string label = "new concat msg for label " + m.l + ": " + counter;

q3.Send(body, label);

counter++;

}

}

}

q1contents jpg

Figure 1: q1 contains 2 messages with label “l2” which arrived after 2:43 pm.

q2contents jpg

Figure 2: q2 contains 2 messages with label “l2” which arrived after 2:43 pm.

q3contents jpg

Figure 3: q3 contains 4 messages, combining each message from q1 with each message from q2.

msgbody jpg

Figure 4: Body of q3 message is concatenation of q1 message’s body and q2 message’s body.

3. System Integration Scenario

Below is a diagram illustrating a many-to-many purchase order routing scheme. Customers from three regions can submit purchase orders via a web user interface, and this order data is routed via two mechanisms:

  1. Order data is submitted to the appropriate accounting system based on currency
  2. Order data is submitted to the appropriate fulfillment system based on geographic region

MSMQ is used to provide the transport between the web application which receives the orders from the customers and the six back-end systems for accounting and fulfillment.


The code below illustrates how, given existing queues for accounting and fulfillment systems based on currency and region, LINQ can be used to place the order data (represented as serialized objects) in the correct queue. The messages in these queues can then be received by the appropriate back-end application. The logic is as follows:

  1. Instantiate three MessageQueue objects for the accounting systems to read.
  2. Instantiate three MessageQueue objects for the fulfillment systems to read.
  3. Create one order object from Asia with JPY currency, one from Europe with EUR currency, and one from North America with USD currency.
  4. Route the order objects to one of: accouting-yen, accouting-euros, accounting-dollars based on currency
  5. Route the order objects to one of: fulfillment-asia, fulfillment-N.America, fulfillment-europe based on the region of the customer.

using System;

public class Order{

public int orderID;

public String customerName;

public String customerRegion;

public String product;

public String currency;

public int quantity;

public double price;

public DateTime orderDate;

public Order(){

}

public Order(int id, String cus, String reg, String prod, int quant, String cur, double pr,DateTime od){

orderID = id;

customerName = cus;

customerRegion = reg;

product = prod;

currency = cur;

quantity = quant;

price = pr;

orderDate = od;

}

}

using System;

using System.Messaging;

using System.Linq;

public class OrderRouting{

static Order[] sampleOrders;

MessageQueue accDollarQueue;

MessageQueue accYenQueue;

MessageQueue accEuroQueue;

MessageQueue fulfillNAQueue;

MessageQueue fulfillAsiaQueue;

MessageQueue fulfillEurQueue;

public static void Main(String[] args){

OrderRouting or = new OrderRouting();

or.createSampleOrders();

or.createSampleQueues();

or.routeToAccounting(sampleOrders);

or.routeToFulfillment(sampleOrders);

}

void createSampleOrders(){

sampleOrders = new Order[3];

Order o1 = new Order(100,"A Corporation", "Asia", "aquarium", 967, "JPY", 99.99, new DateTime(2008,1, 28, 14, 0,0));

Order o2 = new Order(101,"B Inc", "North America", "spool of cable", 1345 , "USD", 1000.99, new DateTime(2008,2, 23, 15, 30,0));

Order o3 = new Order(102,"C Ltd", "Europe", "silverware set", 600 , "EUR", 560, new DateTime(2008,3, 3, 13, 10,0));

sampleOrders[0] = o1;

sampleOrders[1] = o2;

sampleOrders[2] = o3;

}

void createSampleQueues(){

accYenQueue = new MessageQueue(".\\private$\\accounting-yen");

accDollarQueue = new MessageQueue(".\\private$\\accounting-dollars");

accEuroQueue = new MessageQueue(".\\private$\\accounting-euros");

fulfillAsiaQueue = new MessageQueue(".\\private$\\fulfillment-asia");

fulfillNAQueue = new MessageQueue(".\\private$\\fulfillment-N.America");

fulfillEurQueue = new MessageQueue(".\\private$\\fulfillment-europe");

}

void routeToAccounting(Order[] orders){

routeToYenQueue(orders);

routeToEuroQueue(orders);

routeToDollarQueue(orders);

}

void routeToYenQueue(Order[] orders){

var query = from o in sampleOrders where o.currency.ToUpper().Equals("JPY") select o;

foreach(var o in query){

String label = "Yen settlement order - " + o.orderID;

accYenQueue.Send(o, label);

}

}

void routeToEuroQueue(Order[] orders){

var query = from o in sampleOrders where o.currency.ToUpper().Equals("EUR") select o;

foreach(var o in query){

String label = "Euro settlement order - " + o.orderID;

accEuroQueue.Send(o, label);

}

}

void routeToDollarQueue(Order[] orders){

var query = from o in sampleOrders where o.currency.ToUpper().Equals("USD") select o;

foreach(var o in query){

String label = "Dollar settlement order - " + o.orderID;

accDollarQueue.Send(o, label);

}

}

void routeToFulfillment(Order[] orders){

routeToAsiaFulfill(orders);

routeToEuropeFulfill(orders);

routeToNAFulfill(orders);

}

void routeToAsiaFulfill(Order[] orders){

var query = from o in sampleOrders where o.customerRegion.ToUpper().Equals("ASIA") select o;

foreach(var o in query){

String label = "New order in Asian Region - " + o.orderID;

fulfillAsiaQueue.Send(o, label);

}

}

void routeToEuropeFulfill(Order[] orders){

var query = from o in sampleOrders

where o.customerRegion.ToUpper().Equals("EUROPE")

select o;

foreach(var o in query){

String label = "New order in European Region - " + o.orderID;

fulfillEurQueue.Send(o, label);

}

}

void routeToNAFulfill(Order[] orders){

var query = from o in sampleOrders where o.customerRegion.ToUpper().Equals("NORTH AMERICA") select o;

foreach(var o in query){

String label = "New order in North American Region - " + o.orderID;

fulfillNAQueue.Send(o, label);

}

}

The allocation of messages to queues is illustrated in the screenshots below. Note that all members of the Order object are visible in the XML of the message body. Each of the accounting queues and each of the fulfillment queues contains the message appropriate to that currency or region. This dispatch was controlled by the selection of the appropriate order from the set of three orders based on currency and region. LINQ was used in this step to provide what might be considered to be more concise and intuitive code.

yenacctg jpg

Figure 6: Yen-based order awaiting retrieival in the appropriate accounting queue

nafulfill jpg

Figure 7: North American order awaiting receipt in the appropriate fulfillment queue.

4. Conclusion

The LINQ libraries provide considerable flexibility by allowing developers to embed queries resembling SQL-like statements or purely-functional procedure calls into object-oriented programs. Their coverage of in-memory objects, relational database tables, and XML files also allows for the possibility of writing generic query logic which is indifferent to the format of the data to be traversed. MSMQ exports a panoply of objects and properties via System.Messaging, and many of these can be incorporated into LINQ fragments. As the first example indicates, queries can efficiently filter data to be routed to separate queues. As is shown by the second example, properties of messages like arrival time and label can be used in more complex queries to merge datasets based on join criteria. The third example demonstrates that LINQ queries can be employed to facilitate system integration in real-world scenarios. However, there are many more query elements, such as group-by and quantifiers, which might provide even more sophisticated logic. This initial investigation suggests that many other creative and useful ways to integrate elements of System.Messaging into LINQ queries remain to be explored.

5. Links

LINQ samples

LINQ namespace

System.Messaging namespace

Lambda Calculus

1