Developing a Streaming Pipeline Component for BizTalk Server
Published: February 2010
Author: Yossi Dahan, Sabra Ltd.
Technical Reviewers:
- Ewan Fairweather, Microsoft
- Oleg Gershikov
- Paolo Salvatori, Microsoft
- Roni Schwarts
- Manuel Stern, Microsoft
- Tim Wieman, Microsoft
Applies to: BizTalk Server 2009
Summary
This paper shows how to address issues with high memory consumption and latency by taking a streaming approach to pipeline component development.
Contents
Developing a Streaming Pipeline Component for BizTalk Server
Summary
What do I mean by “streaming”?
How does a non-streaming pipeline component work?
A streaming pipeline component
A story of streams
The custom stream explained
Summary
Appendix A - Testing the stream
Appendix B - The Sample
Appendix C – Comparing the memory consumption of both components
Orchestrations are often the first port of call for most BizTalk developers and are the feature most identified with the product. However, many scenarios can be implemented very well using messaging alone, harnessing the power of port configuration and pipelines.
Any serious messaging implementation is bound to involve one or more custom pipeline components, and these are not difficult to develop. However,too often developers ignore the streaming fashion of the pipeline and write components that read the entire message into memory.
Whilst this approach does work, and admittedly is somewhat easier to grasp, it has two significant downsides:
- Higher memory consumption – the entire message has to be loaded into memory, whatever the message size is. In addition, if the.NET XML DOM is used (via the XmlDocument class, for example) the memory footprint of the component can be considerably bigger than the message size.
- Latency – any further processing of the messagehas to wait until the current component has finished processing the entire message.
Both of these downsides can be addressed by taking a “streaming” approach to pipeline component development, which is what this article attempts to demonstrate.
What do I mean by “streaming”?
To begin any discussion about streaming pipeline components, it is important to understand what “streaming”means in the context of a BizTalk pipeline.
To do that, let’s take a look at what a typical receive location looks like. (Note: Whilst this article will mostly discuss the use of a component in a receive pipeline, the same applies, only as a mirror image, to a send pipeline.)
Take a receive location that uses the File adapter, and a pass-through pipeline for example -
Figure 1- “Pass-through” receive location
The File adapter opens a file stream and passes it, contained in a BizTalk message but otherwise untouched, to the BizTalk Endpoint Manager (EPM) which hosts the pipeline. Since the pipeline in this case is pass-through, it does not contain any components, and so the stream is read by the endpoint manager, piece by piece, as it writes the message content’s to the message box database.
Once the message has been persisted successfully,the EPM calls back the adapter letting it know it has completed processing the message, allowing the adapter to perform any required clean up. In this case the adapter would close the stream (if not already closed) and delete the file from the file system.
How would the story change as you introduce a couple of pipeline componentsto the pipeline?
Figure 2 -Receive Location with custom components
In this case the EPM passes the BizTalk messageand with it the file stream, to the first pipeline component during the execution of the pipeline, by calling its Execute() method –
publicIBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
The EPM passes one message in (along with its execution context) and expects to receive a message back (either the same message or a different one.)
If a second pipeline component exists in the pipeline, it is the message returned by the first component, and not the message created by the adapter, that is being passed into it. The same will be repeated for all the components in the pipeline, with each component receiving as an input the message returned by the previous component, until all the components in the pipeline have been stringed together.
How does a non-streaming pipeline component work?
A non-streaming pipeline component’s implementation reads the stream provided in the input message into memory, performs whatever manipulation is required in memory (often by loading it into an XmlDocument object or a string, sometimes deserializing it into a typed object), and then – with the processing complete - assignsa modified stream back to the message, which is then being returned to the pipeline. Here’s a sample of how such code might look:
1Stream outStream = newMemoryStream();
2StreamWriter memoryWriter = newStreamWriter(outStream);
3
4using(StreamReader sr = new
5 StreamReader(pInMsg.BodyPart.GetOriginalDataStream()))
6{
7 string record = string.Empty;
8 while ((record = sr.ReadLine()) != null)
9 {
10 processRecord(memoryWriter, record);
11 }
12 memoryWriter.Flush(); //flush writer to ensure writing's done.
13 outStream.Seek(0, SeekOrigin.Begin);
14 pInMsg.BodyPart.Data = outStream;
15 pContext.ResourceTracker.AddResource(memoryWriter);
16
17 return pInMsg;
}
Listing 1–Non-streaming pipeline component implementation
In lines 1 and 2 a MemoryStream object is instantiatedto hold the output message’s stream, as well as a StreamWriter objectto write to it.
Line 4 instantiatesa StreamReader object over the message’s body part stream (retrieved by calling the BodyPart property’s GetOriginalDataStream() metod). This allows the component to read the incoming message’s body part.
In line 8 the input stream is read line by line, and in line 10 it is passed to a methodthat will process the individual line; the method also receives the StreamWriter object, allowing it to write the resulting output into the memory stream; the exact implementation of this method is not important, and will vary depending on the component’s purpose. The key point is that somewhere in there, we’re likely to have the call to write to the memory stream, through the stream writer:
memoryWriter.WriteLine(record);
Once the entire incoming stream has been read (while loop exhausted), the memory stream is being flushed (line 12) to ensure all bytes have been committed and rewind (line 13) so that it can be read again without having to Seek() its beginning.
This is important because subsequent components, just like my sample above, and – in fact - the EPM itself, are likely toassume the stream received is ready to be read and is not (yet) consumed. This is an important assumption because not all input streams are seekable (an http request stream, for example).
Then, most importantly, in line 14, the message’s stream is being replaced with the output stream that was just created.
Before returning the message back to the pipeline in line 18, the StreamWriter object that was created is added to the context’s ResourceTracker (line 15). This ensures the .NET Garbage Collector does not dispose of this object and the stream it is using. If it did, the underlying stream would have been closed and disposed of also ll (the behaviour of the StreamWriter Dispose() method) and would not be available for other component’s or indeed BizTalk Server itself. Equally it also means that BizTalk Server will ensure disposing of this object once the processing of the message is complete.
Note:Registering both the writer and its underlying stream with the ResourceTraceker may lead toObjectDisposedExceptionwhen these are going through the managed disposal process performed after the pipeline execution has completed.
The result of this component is a potentially modified message, returned to the pipeline. But, implemented this way,the rest of the pipeline’s execution is held while this component is performing its execution.
If there are no other pipeline components in the pipeline, this has very little impact. If, however, there is another component in the pipeline, its execution will not be started until the first component has finished processing the entire message. As we will see shortly, by changing the component to work in a streaming fashion, we can allow downstream components to work on portions of the message previous components have already processed, even before processing the entire message.
More crucially, though, in the preceding code,the entire message (output message in this case, but it could equally be the input message, or both, depending on the implementation) has been loaded into memory (using a memory stream, in this case). If the message is large, memory available to the host will be significantly reduced and,as a consequence, throttling (or worse) can happen, seriously affecting the solution’s throughput. This might be okay with a single message, but as is the case with any BizTalk implementation, one has to consider the implication of running in high throughput. What will happen when 100/1,000/10,000 messages will be processed in parallel? High throughput, even of smaller messages, will seriously aggravate this problem.
A streaming pipeline component
By changing the component’s design we can achieve two things:
- The component will return the message to the pipelineshortly after it receives it and before a single byte is read. This would allow subsequent components to do the same, and – all done well – all the components will be able to process the message almost simultaneously(not quite though, as each component has to work on the result of its preceding component).
- The component will only ever load a small portion of the message into memory at a time; how small the portion depends on the requirements. The sample code provided only ever holds slightly more than a single line from the input file in memory.
So, how can this be achieved?
The approach is to take the input message and replace the stream it uses, initially the one provided by the adapter, with a custom stream (in my sample it is ‘RemoveDuplicatesStream’). As far as the code in the pipeline component itself, this is pretty much all that it has to do (the code is explained next):
publicIBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
{
1 Stream bodyPartStream = pInMsg.BodyPart.GetOriginalDataStream();
2 RemoveDuplicatesStream newStream = new
3 RemoveDuplicatesStream(bodyPartStream);
4 pInMsg.BodyPart.Data = newStream;
5 pContext.ResourceTracker.AddResource(newStream);
6 return pInMsg;
}
Listing 2 - Streaming pipeline component implementation
In line 1 the stream from the input message is extracted into a local variable.
In line 2 an instance of a custom stream – in this case one that would remove duplicate records from the message– is being instantiated with the input message’s stream passed into its constructor; we will look into the implementation of this stream shortly.
In line 4 the message’s body stream is being replaced with the custom stream, which is then also being added to the resource tracker in line 5 before the message is being returned to the pipeline in line 6.
You will note that the component has not actually done any processing yet; so far a single byte of the stream hasn’t been read, and the message has been returned to the pipeline more or less untouched, only its stream is now “wrapped”by the custom stream object.
A story of streams
The main principle of this approach is to implement the scenario so that the required processing is initiated from the custom stream’s Read() method. If all the components in any pipeline did the same, the EPM would eventually receive a message in which the adapter’s stream has been wrapped several times in custom streams, but not a single byte was read from the actual adapter’s stream yet.
Then, as the EPM reads the stream it received by calling its Read() method, the call gets bubbled down the layers all the way down to the adapter’s stream, with each layer applying whatever logic is required on the read bytes.
Consider a scenario where a pipeline contains three components, each with its own custom stream implementation, each wrapping the received stream, and neither reading the stream directly.Looking at the streams involved in this scenario, they would look something like this –
Figure 3 - Custom streams in a pipeline
The first component in the pipeline receives the file stream created by the adapter, and wraps it with its own stream, depicted in orange in the preceding diagram.
The message, now containing the orange stream is passed to the second pipeline component, which wraps the stream in its own stream - the green stream.
The message, now containing the green stream, is passed to a third component which, again, wraps received stream with its own custom stream – the blue stream in the preceding diagram.
It is that last message that is handled by the EPM; as the EPM reads the message’s stream (the blue one, created by the last component), the call to the Read()method is bubbled down through the green stream, to the orange stream and, eventually, down to the file stream – the only stream actually containing any bytes;with the Read() method in each one of the streams applying its logic on the read bytes from the previous stream so that by the time the EPM receives any number of bytes from its call to the blue stream’s Read() method, they have been processed by all four streams along the way.
It is these processed bytes that get written, by the EPM,to the Message Box Database.
Of course not all the components have to be streaming for this approach to work.Any component that reads the message will simply “break”this chain, replacing the stream used by any subsequent components, and effectively separating the flow to two separate chains, one before the non-streaming component and one after it; the principle still works, however, within each chain.
Hopefully by now the principles of this approach are quite clear, and the code required to implement a streaming pipeline component is understood; there is really not much more to it than the preceding sample.
The only real variance in the pipeline component’s code between one component to another is usually the description, version, icon properties and, when needed, any other design time properties, which are then usually passed into the stream; as the majority of the code lies within the custom stream the pipeline component’s code is very simple and does not change much from one component to another.
The custom stream explained
So, if the key to this approach lies within the custom stream, whatdoes one look like? This is where there is a lot of variance, and the exact approach taken when implementing the stream really depends on the scenario at hand and its requirements.
There are several factors that need to be considered when deciding on the design of the customsteam, for example:
- What stages of the pipeline will this component need to support? (Will it run before or after the disassembler/assembler? Will it need to support both send and receive pipelines?)
- What would be the shape of the data flowing through the component? Xml? “flat file”? Will the data be encoded in any way?
- Is a forward only implementation sufficient or will I have to support seeking operations? Xpath?
- Is a read only implementation sufficient or will the data have to be changed?
- Etc…
The stream in the sample assumes an ASCII flat file, in a specific format (two fields, separated by a comma, with the records separated by a line feed) and its purpose is to filter out “duplicate” records.Normally a database is likely to be used to determine whether a record has been processed before or not, but for the purpose of this sample the component will deem a record as duplicate if its record id is even (the record id is the first field in the file, expected to be a number.)
Note:Several very usefulstreams are shipped with BizTalk Server –VirtualStream, XmlTranslatorStream, ReadOnlySeekableStream to name but a few.
In many cases these can be used instead of creating a custom stream or as a base class for the custom stream, and should be considered, although some of these classs are not yet fully documented.
When implementing the custom stream a class is created to inherit from the .NET Framework’s abstract Stream class, as below -
publicsealedclassRemoveDuplicatesStream : Stream, IDisposable
Listing 3 – Declaration of the custom stream
The classimplementationhas to redefine all the properties and methods of the Stream class, but the only method that normally has any real implementation is the Read() method; all other methods and properties can generally either be passed-through to the wrapped stream or throw a NotSupportedException.
A word of caution though – theoretically it is possible for any subsequent component in the pipeline to do whatever it wishes with the stream, so component developershave to think carefully about the implementation of the custom stream, taking into account any sequence of calls to the stream’s methods. However, I believe it is very reasonable to expect a behaviour similar to the BizTalk endpoint manager when it comes to reading the stream, and am happy with throwing exception from methods I don’t expect to have to support. (I do try to make sure the messages in these exceptions are as clear as possible.)