An EfficientFramework for Performing Aggregate Queries on Continuous XML Data Streams

Abhishek Saxena

Computer Sciences Department

University of Wisconsin-Madison

Abstract

We designed, implemented and evaluated a framework for performing certain aggregate operators on XML data streams. Streams are sampled and their “synopses” are maintained in main memory using some policies such that the synopses are upto date as well as free from redundancy. Implementation of our analytical model of a weighting scheme combined with three traditional buffer replacement policies is shown to give close results to the actual sampled data values.

1 Introduction

Infinite data streams have become very common with the popularity and widespread deployment of internet and the increased demand for on-line streaming data in a variety of applications.

In particular, with the acceptance and standardization of XML [8] as an efficient language for data interchange over the Internet, efficient handling of streaming XML data becomes important. The primary concerns in streaming data model are two-fold: fast and representative sampling and efficient main-memory management due to time and memory constraints respectively.

In this project we explore the following issues for building an efficient framework for streaming data model:firstly, how to run aggregate operators on certain XML data streams using sampling, and secondly, how to manage main memory efficiently.

We explore the following three kinds of XML data streams and address the problems associated with each.Firstly, stock streams: in this scenario, a bunch of XML stock streams containing current stock price, stock symbol, # shares traded and current time are pouring in and the goal is to sample these streams and maintain an updated “synopsis”of each stream in main memory under the specified memory and time constraints. We propose a new weighting scheme combined with traditional buffer replacement policies – LRU, local and global buffer replacement policies to manage main memory.

Secondly, stock streams with sparsely populated elements of interest. In this scenario, the stock streams have a Nasdaq index value that is sparsely populated in the stock streams. Here the goal is to perform efficient sampling on the sparse XML elements without having too much sampler overhead and without compromising too much correctness. Here also we use some heuristics based on data streaming rate to adopt a policy for sampling sparse elements.

Thirdly, an Election Survey stream. In this scenario, we explore clustering of certain XML elements scattered all over in a stream. A stream containing election survey results from different states is pouring in from (say, the Election Commissioner’s Office) and, let there be “n” candidates contesting the election. So, our system will compute the results for each candidate across all states (i.e., across all XML elements in the stream) and generate results.

We implemented our system in Java-1.2 and used the SAX 2.0parser [6,7] to help parse the XML data streams on the fly without requiring to build a tree structure in main memory. The platform used was Linux-2.4.2. We wrote our own synthetic XML data generator to run our tests. We evaluated our system by recording sampled data and buffered synopsis data in files and comparing their values using line graphs. For a given buffer pool set and a given data streaming rate, our approach gave close results to the sampled data values. Currently, all the aggregate operators have been hard-coded into the system and the results are generated either on the standard output or in a file.

The organization of the rest of the paper is as follows:

  • In Section 2, we provide an overview of relevant work to data stream processing and continuous queries. Although there have been only a handful of papers addressing this topic directly, a number of papers in related areas provide useful techniques and results.
  • In Section 3, we present the design of our system and the analytical model that we use to develop a weighting scheme for managing main memory.
  • In Section 4, we give a short description of our implementation.
  • Section 5 presents the results that we obtained.
  • Section 6 discusses the interpretation of our results.
  • Sections 7 and 8 describe our conclusions and our vision of future work respectively.

2 Related Work

There has been a lot of interest, recently, on performing aggregate queries over data streams. [1] discusses a general and flexible architecture for processing continuous queries in the presence of data streams. There has been interesting work on synopsis data structures[5] to provide a summary of the data set within acceptable levels of accuracy while being much smaller in size, and a framework for extracting synopses (signature) from data streams is proposed in [2]. [4] addresses the problem of approximately answering aggregate SQL queries over continuous data streams with limited memory by partitioning the domain of attributes intelligently and using randomizing techniques that compute small “sketch” summaries of the streams. The YFilter [3] scheme for filtering XML documents filters them according to XQuery or XPath queries that involve both path expressions and predicates. Our approach differs from these in that it uses a new weighting scheme combined with traditional buffer replacement policies to maintain buffered synopses of data streams in main memory. Currently, we do not provide a query interface for querying our synopsis but we evaluate the contents of buffered synopsis by comparing with the actual sampled data values. Our focus is not on parsing XML documents but on dynamically managing a close summary of data streams in main memory that could eventually support a standard query interface over it.

3 Design of our system

Fig. 1 shows the design of our system. We have a bunch of XML data streams being received at different ports and a SAX parser thread running on each of these ports and sampling and parsing the incoming data. A buffer manager thread keeps on running in parallel with these threads and uses our policies to decide what data to keep in main memory and what to flush to disk. The data from a stream chosen for keeping in main memory is called the “synopsis” of that stream and can be used to support future queries.

Figure 1: Design of our system

Wefirst discuss this design in the context of our model of stock streams. In this scenario, a bunch of stock streams are pouring in and a sampler is running on each input stream sampling out XML elements containing current stock price, stock symbol, #shares traded, and current time and is storing the sampled data, treated as a synopsis of the stream, intoa pre-allocated buffer space in main memory. A thread that we call buffer manager keeps on running in parallel with the samplers and dynamically manages the buffers being used to store the synopses of streams. When a stock stream starts pouring in, the buffer manager provides a set of buffers to it based on the initial data streaming rate. We call this set of buffers a “pool set” that is provided to the sampler of each stream. Periodically, the buffer manager thread checks to see the data rate and increases or decreases the buffer pool set allocated to it accordingly as the data streaming rate increases or decreases.

We maintain a sliding window of stock data that contains the average price, minimum price, maximum price and standard deviation. Also, certain “interesting” data such as global minimum price and global maximum price are also kept in main memory.

Initially, we decided that the buffer manager thread running in parallel with the samplers usethe following policies to dynamically manage the buffers being used to store the synopses of streams:

  • Flush out all data that lies outside the current standard deviation value from the current average stock price to disk(used when the buffer is partially filled up).
  • Local buffer replacement policy based on LRU (used when the buffer fills up and the data streaming rates of the streams have a standard deviation that is less than a certain value. This is the case with low variance amongst streams).
  • Global buffer replacement policy based on LRU (used when the buffer fills up and the streaming rates have a standard deviation that is larger than a certain value.In this case, the buffer from the slowest input stream is chosen for replacement. This in turn uses the knowledge of streaming rates and dynamically adjusts for this purpose).

We later figured out that this solution leads to a lot of data redundancy in the pool set. So, we decided to partition the range of data values into the following five classes:

  1. Data values less than twice the standard deviation value from the current average value.
  2. Data values less than the standard deviation value from the current average value but greater thanor equal to twice the standard deviation value from the current average value.
  3. Data values lying within( and inclusive of)the standard deviation value from the current average value.
  4. Data values greater than the standard deviation value from the current average value but less than or equal to twice the standard deviation value from the current average value.
  5. Data values greater than twice the standard deviation value from the current average value.

Formally, if A = Current Average, SD = Standard Deviation and D = Data value, then,

Class 1 = {-infinity, A - 2SD},

Class 2 = {A-2SD, A-SD},

Class 3 = {A-SD, A+SD},

Class 4 = {A+SD, A+2SD},

and, Class 5 = {A + 2SD, +infinity}, where {a, b} => for all x ε class i, a < x <= b.

Each of these classes was given a class pool. Together these class pools constitute what we call a pool set for a stream. A weight field wasassociated with each class pool. So, the allocation of buffers at any time is as shown in Fig. 2.

Figure 2: Buffer allocation to a sampler

When a new data value comes, the new average value for its appropriate class is calculated as follows:

A(i) new = {A(i) old *wt +D)/(wt+1), where “wt” is the weight field, and “A(i)” is the current average value of the class “i” to which D belongs.

If | A(i) new - A(i) old |T, where T is a threshold and is actually a function (in our case, inverse of square) of the current standard deviation and is changed dynamically by seeing the data values coming in a data stream; T is increased when close values are observed and is decreased when far away values are observed), then, we do the following in this order:

  1. Check to see if there is a free buffer. If yes, allocate it.
  2. Else if the local buffer pool is full, and the local buffer replacement policy is in use then free the earliest allocated buffer in the class with smallest weight by flushing it to disk and allocate this freed buffer to store the new data value.
  3. Else if the local buffer pool is full, and the global buffer replacement policy is in use then free the earliest allocated buffer in the smallest weight class pool set amongst all the data stream pool sets and allocate this freed buffer to store the new data value.
  4. Do A(i) old = A(i) new, and, wt = wt +1.

Else flush this data value to disk.

The average is them computable as the sum:

Sum(A(i) new * wt)/ Sum(wt).

Periodically, the standard deviation demarcations are “shifted” based on the current standard deviation value and the buffers’ values are moved accordingly. For this one way could be to keep the partitions sorted at all time (this is done by in-place sorted addition of data values to the buffer set). So, only, the boundaries of classes need to be shifted. This may, however, incur a lot of overhead , so, currently, we have not implemented this mechanism. Given more time, it would be interesting to implement it and observe overheads in real time.

The data pattern is reproducible by reading the buffers sequentially. Another advantage by data partitioning into classes is that we get a finer granularity synopsis that can be used for more precise confidence results on future queries that can be supported on these synopses.

We also explored another method that is as follows: in this case, there is no partitioning into classes based on standard deviation (as discussed earlier) and, when a new data value comes, its weight is computed as follows:

wt = n/(n+1) + D/(A (old) *(n+1)), where n = sample value count and A (old) = last computed average.

{ This equation is derived from: A(new) = A(old) *wt = {A(old) * n + D}/( n+1) }

If wt >T, where T is a threshold (and a function of the standard deviation as earlier), then,we select a buffer(according to local or global buffer replacement policies), store the data value in it and do the following update:

A (new) = A(old)*wt;

Else we flush the data value to disk.

Now, we discuss the design of our system in the context of sampling of sparsely populated XML elements in the data stream. In this scenario, the stock streams also contain the Nasdaq index value that is sparsely populated in the XML stream (i.e., is reported sparsely in the stream) and is the point of our focus now. We keep a separate buffer for this and explore how sampling for Nasdaq index can be done. Initially, we thought that the possible methods could be:

  1. Start off with the same sampling frequency as other samplers and observe the rate at which the Nasdaq buffer gets filled up compared to other buffers and change(usually decrease) its sampling rate accordingly, or,
  2. Donot sample Nasdaq index values but use the other samplers to store its value whenever it gets encountered while parsing/sampling.

Later, we modeled our system in a more rigorous way as follows:

The sparseness of the nasdaq index values in the data stream is observed and based on it a sparseness value is assigned to it as: Sparseness = 1/t, where nasdaq element is encountered once in every t XML elements in the data stream. Let the data streaming rate be d, and, the sampling overhead in terms of time be o for a sampling skip of x XML elements. If 1/t > o/x (which is obtained from our policy that a maximum overhead of 1 millisecond per sparse sample is allowed, i.e., o*t/x <= 1 millisecond), then go for sparse sampling else make use of the stock value samplers only.

Lastly, we discuss the design of our system for clustering of XML elements lying scattered in the pouring data stream. In this case, a stream containing election survey results from different states is pouring in from (say, the Election Commissioner’s Office). And, let there be “n” candidates contesting the election. So, our system computes the results for each candidate across all states (i.e., across all XML elements in the stream) and generates results.

4 Implementation

We usedthe SAX 2.0 parser to parse the incoming streams and Java-1.2 as the language of implementation .The platform we worked on was Linux-2.4.1. We generated our own synthetic XML data for random, normal and self-similar data distributions and poured it into the sockets on which the SAX parser/sampler threads were listening. Our code waspartitioned into the following modules:

For Stock Streams:

  • Module that uses creates threads of SAX parser and sampler to parse/sample XML data streams.
  • Module that stores sampled data in pre-allocated buffers in main-memory.
  • Module that prints out results (being maintained in a sliding window) onto standard console or a file.
  • Module that runs a thread to manage buffers.
  • Module that generates synthetic XML data based on random, normal and self-similar distributions; module to generate zipfian distribution could not get completed.
  • Module to compute and display confidence interval- unfortunately, this could not get completed.

For Election Survey Result stream:

  • Module that parses the election survey stream and prints the aggregate results on the screen.
  • Module that generates synthetic XML data based on random distribution.

5Results

To test our buffer management for stock streams, exhaustively, we ran the following 8 tests and plotted line graphs for each of them:

  1. Random Data Distribution
  2. Normal Data Distribution
  3. Self-similar Data Distribution
  4. Larger buffer allocation for pool sets (50)
  5. Very small window size (5)
  6. Medium window size (50)
  7. Very short duration data streams (<= 10 seconds)
  8. Long duration data streams (>= 20 minutes)

There were 3 changeable parameters for these tests:

  1. Number of buffers allocated to the pool set of each sampler/parser
  2. Window size which is the size of the sliding window (that slides with time) in which the data has to be maintained
  3. Number of iterations for which our XML data generator runs. This is used to change the duration of data streams.

To collect results, data for each stock stream (with symbol say SYM) was logged in 6 files which are as follows: