Buffer-free, Disorder-tolerant Window Aggregate Evaluation

Jin Li1, David Maier1, Vassilis Papadimos1, Peter Tucker2, Kristin Tufte1

1OGI at OHSU 2Whitworth College

Portland, OR, USA Spokane, WA, USA

{jinli, maier, tufte, vpapad} @cse.ogi.edu

Abstract

Window queries are proving essential to data-stream processing. A windowed query operator breaks a stream into possibly overlapping subsets of data and computes results over each. Existing approaches to evaluating windowed aggregates use buffering to handle the complexity of tuples participating in multiple subsets. They also use buffering to sort out-of-order inputs (or they impose ordering requirements on the stream). We present a non-buffering, non-sorting approach for evaluating window aggregates over possibly disordered streams. Our method separates the logical definition of windows from physical data ordering and allows most query operators to be "window agnostic." In the case of overlapping windows, we demonstrate additional performance benefits from dividing windows into disjoint "panes." In addition, our approach inherently handles disordered data, which arises from previous query steps, data prioritization, merging unsynchronized sources and variable network latency. We offer a formal window semantics and provide examples of disordered streams arising in practice. We experimentally study latency versus accuracy tradeoffs of alternative strategies for handling disorder using two types of disorder and different aggregates and window sizes. Finally, we propose an extension for "slide-by-tuple" windows.

1.  Introduction

Permission to copy without fee all or part of this material is granted provided that the copies are not made or distributed for direct commercial advantage, the VLDB copyright notice and the title of the publication and its date appear, and notice is given that copying is by permission of the Very Large Data Base Endowment. To copy otherwise, or to republish, requires a fee and/or special permission from the Endowment

Proceedings of the 30th VLDB Conference,
Toronto, Canada, 2004

Many types of data present themselves in stream format: sensor readings, network flow records, telephone call records and auction bids, for example. Window aggregate queries are an important class of queries over streams. Such a query breaks a stream into subsets and computes results over each. For example, a stream may be broken into five-minute windows, with a new window starting each minute. We use the term window extent to refer to the tuples in one (five-minute, in this case) window. When successive window extents overlap, such as in this example, current window aggregation approaches must buffer tuples. Consider computing an aggregate, say MAX, over a window of length five minutes (as measured by tuple timestamps) that advances every minute. Thus, the ith window extent, call it wi, has tuples with timestamps in the interval [i, i + 5 min]. If the aggregate for each window extent is computed in sequence, then current approaches buffer all the tuples in the overlap between successive extents. For example, when computing the MAX for window extent w27, all tuples with timestamps in the interval [28, 32] must be retained for computing the MAX for extent w28. (An approach that derives the aggregate value by incremental update, such as for COUNT or SUM, must retain tuples in the interval [27, 32], as tuples in the interval [27, 28] must be “backed out” in deriving the aggregate for extent w28.)

However, this buffering is avoidable for a wide range of window definitions. The key mechanism is to maintain aggregate computations for all “open” window extents. Thus, in the previous example, MAX computations for w27, w28, w29, w30 and w31 will be active simultaneously. If a tuple with timestamp 31:16 arrives, it will be used to update the current MAX for each of w27 – w31. Once a tuple arrives with a timestamp past the end of window extent w27, say 32:04, that extent is “closed”, its current MAX value is output, and a MAX computation is opened for extent w32. In this approach, each tuple is processed as it arrives, then discarded. The space savings is almost always substantial – five aggregate values versus five minutes of tuples in the example. We will see in the sequel how this technique can be implemented with one new operator, bucket, and at most minor changes to existing operators.

In this paper, we describe our non-buffering, non-sorting approach to window aggregate evaluation. We introduce a bucket operator that understands window specifications and appends a window-id as an explicit attribute to each tuple. We leverage the database’s existing grouping functionality to group on window-id to compute aggregates over windows. Our approach in effect, transforms a window aggregate query into a traditional query with a group-by operator. Punctuations [19] are used to communicate the end-of-windows. Punctuations are created and inserted into the stream by the bucket operator.

We define a formal window specification and give methods for defining window-ids and window extents. In general, window specifications can be on any attribute, whether supplied externally or attached internally. Our approach supports sliding windows, tumbling windows, windows defined on time, windows defined on tuple count, slide-by-tuple windows and value-based windows [13] (KT – need to scan this paper and make sure we do support value-based windows as defined there).

Our approach has multiple advantages: It separates window definition from delivery of data – that is, we separate knowing what is in a window from knowing when we have seen everything in the window; it simplifies window query implementation by isolating window semantic in the bucket operator; and gives us the ability to process data that arrives out of order with minimal modification to a stream query system. A drawback of current stream query systems that support windows is that window definition tends to be tied to implementation; the exact details of what is in a window can be confused with the implementation of the window. With explicit window-ids, mapping of tuples to windows is explicit, easily understood and verified to be correct.

We present a method for dividing windows into disjoint panes. Aggregate values are computed over the panes and then “rolled up” to compute aggregate values for windows. Using panes has significant performance benefits as we will show.

Disorder naturally arises in streams from factors such as network latency, prior aggregation or other query processing, merging streams and prioritizing data. A robust stream processing systems be able to cope with disorder without undue compromise on efficiency and latency. We show that the explicit-window id approach handles disorder smoothly, without any modifications to the algorithm.

We provide a performance study using our stream query system, which is an extension of version 1.0 of the Niagara Query Engine [8]. Niagara 1.0 was initially developed at the University of Wisconsin-Madison to be a system for querying XML data on the Internet and is written in Java. We have kept the XML processing capabilities of Niagara and its push-based (pipelined) query-processing model. To support explicit window-ids, we have made two major extensions to Niagara: support for punctuation and a new bucket operator to map tuples to window ids. During the course of these additions, many other small improvements have been made to the original Niagara system. Thus our version differs markedly from the original version (and generally executes more efficiently).

We analyze the nature of disorder through a study of anonymized packet traces and network flow records [1][9]. Based on this analysis, we generate disordered data streams and use these streams to investigate the effects of disorder and various approaches to handling disorder. We show the latency-accuracy tradeoffs of evaluating queries using explicit window-ids and sort-based slack [3] when varying the disorder of the input stream, the amount of slack and the query’s window size. We compare the latency and accuracy of evaluating window queries over streams with explicit window-ids using externally generated punctuation to explicit window-ids using slack. We also examine various implementations/definitions of slack and their effects on query accuracy and latency.

This paper is organized as follows: Section 2 describes window aggregate evaluation without buffering; Section 3 provides a formal window semantics and formal basis for our work; Section 4 shows the advantages of dividing windows into disjoint panes; Section 5 presents an analysis of the disorder in network flow data and describes mechanisms for handling disorder; Section 6 presents performance results; Section 7 describes our “slide-by-tuple” extension and Section 8 concludes.

2.  Window Aggregates without Buffering

As described in the Introduction, current methods for evaluating window aggregates buffer tuples from overlapping window extents. We present a method for window aggregate evaluation, using explicit window-ids, which does not buffer tuples; tuples are processed once as they arrive and then are discarded. In this section, we focus on time-based sliding windows over ordered data; Sections 3 and 7 give methods for handling other types of windows; the order assumption is relaxed in Section 5.

2.1 Motivation

The Explicit Window-Id approach is motivated by the observation that a window query can be viewed as a group-by query – grouping, for example, on a range over a timestamp attribute. Consider, an on-line auction monitoring-system running queries over a stream of bids. We may wish to calculate the number of bids generated in the past five minutes and update the result every minute. This request defines a series of “sliding” window extents 12:10-12:15, 12:11-12:16, 12:12-12:17, for example, over which results must be computed. In our approach, we assign each window extent a window-id and for each tuple we calculate the ids of the window extents to which that tuple belongs. The appropriate window-ids are appended to tuples as explicit attributes. (For now, imagine that tuples belonging to multiple window extents are replicated – once for each window extent.) Finally, we use a count operator that groups on window-id to compute the number of bids in each window extent.

There is a catch. To produce results, the aggregate operator (count) must know when each window extent (group) is complete, since normally an aggregate operator waits until it has received all of its input to produce output. One solution to this problem is to require the input stream to appear in order on the windowing attribute and to modify the aggregate operator to determine the end of each window extent. This approach has the disadvantages of requiring ordered input and complicating the aggregate operator. We use an alternative approach, based on an existing technique called punctuation [19], to evaluate windowed aggregates with minimal modification to the aggregate operator and without the ordering requirement.

2.2 Preliminaries

Before describing our method for window aggregate evaluation, we give preliminaries on punctuation, working scenario and window query language.

Briefly, a punctuation is a message embedded in a data stream indicating that a certain subset of data is complete; a punctuation indicates that no more tuples having certain attribute values will be seen in the stream. Stream query processing can use punctuation as a mechanism to adapt blocking and stateful operators to data streams [19]. Punctuations have the same schema as the tuples they provide information about and are commingled with tuples in inter-operator streams. Tucker et al. have defined punctuation behaviour for each query operator [19], which we follow. Some operators, such as select, simply pass punctuations through to the next operator in the query plan. Group by operators use punctuations to recognize when groups are complete so they can output results for and purge state associated with those groups. Our system uses punctuations, generated by query operators, to signal the end of window extents.

Throughout this paper, our working scenario is an on-line auction system with a centralized processing system and distributed auction sites. Each auction site accepts bids, timestamps the bids and forwards them to a central system. The central system merges the bid streams from the various auction sites into a combined stream, over which queries are evaluated. The schema of the bid stream is: <item-id, bid-price, auction-site, timestamp>.

Consider the following query over a stream of auction bids: “Find the number of bids for each auction site in the past five minutes (300 seconds) and update that result every minute (60 seconds).” We express this query as:

Q1: SELECT auction-site, count(*)

FROM Bids [RANGe 300 seconds

SLIDE 60 seconds

WINATTR timestamp]

GROUP BY auction-site

The window in this query is specified with a window specification composed of three parameters: RANGE, which defines the window length; SLIDE, which defines how the window moves; and WINATTR. Window specifications are discussed in detail in Section XXX.

2.3 Defining and Computing Window-Ids

A key element of our approach is computing the window-ids for each tuple. In this section, we describe how window-ids are assigned for Q1.

To simplify the following illustration, we use integers to represent timestamp values in the calculation. Integer timestamps represent seconds after 12:00 PM (noon). 12:00 PM corresponds to timestamp 0. We assume the query starts at 12:10 PM, which corresponds to timestamp 600. Windows 6-10 for Q1 and their associated window-ids are as follows.

window extent window-id

seconds 720-1020 (12:12-12:17) 6

seconds 780-1080 (12:13-12:18) 7

seconds 840-1140 (12:14-12:19) 8

seconds 900-1200 (12:15-12:20) 9

seconds 960-1260 (12:16-12:21) 10

Now, given a tuple T1, with timestamp, 1000 (corresponding to 12:16:40 (hours:minutes:seconds)), we compute the window-ids for this tuple as follows. Let T.ts be the tuple timestamp and let Q.ts be the timestamp of the start of the query, then the window-id of the first window extent a tuple belongs to can be calculated with the following formula: (T.ts – Q.ts) mod SLIDE. Therefore, we know that the id of the first window extent T belongs to is: (1000-600) mod 60 = 6. The number of extents a tuple belongs to is determined by RANGE mod SLLIDE. In Q1 all tuples belong to 300 mod 60 = 5 window extents, so we know that tuple T1 belongs to windows 6, 7, 8, 9 and 10.

For many kinds of window queries, the mapping from a tuple to its associated window-ids can be done by computing a simple function, similar to the above, of the tuple timestamp, query range and the slide, query start time. In Section3, we provide a formalization of the functions mapping window-ids to window extents and mapping tuples to window–ids.

2.4 Query Evaluation with Explicit Window-Ids