Dataflows with Examples from Telecommunications

David Belanger, Kenneth Church and Andrew Hume

AT&T InfoLab

AT&T Labs–Research

180 Park Ave.

Florham Park, NJ, USA

{dgb, kwc, andrew}@research.att.com

Many telecommunication applications can be modeled as data flowing through a network of systems. For example, the AT&T billing factory can be thought of as records of telephone calls flowing from one process to the next, e.g., from recording to rating to rendering. We would like to reason about (query) the flow as a whole. How many packets are being lost? If so, where? How long does it take for a packet to flow from here to there? Where are the major bottlenecks?

We introduce the term dataflow to refer to an abstract process of data flowing through a network. Questions about throughput, latency, loss, error rates and so forth can be addressed by attaching probes (packet sniffers) at various points in the network. An alerting system is a simple example. There is a single probe that counts events, such as error conditions, and computes a summary statistic such as a running estimate of the average error rate. This figure of merit is then compared to a threshold or a control limit to decide whether or not to sound an alarm. More interesting examples involve a join of the evidence from two or more probes. To study the latencies in billing, Hume and his colleagues (Hume and Maclellan, 1999) built a system called Gecko that taps the flow in the billing systems at roughly a dozen locations and compares these observations to see how long it takes for records to propagate from one tap point to the next.

What makes dataflows challenging is the combination of scale and timeliness. The networks we want to monitor such as the billing systems tend to be quite significant in a number of dimensions: throughput, latency, reliability, physical distance and cost. But if we are going to tap this network at multiple places and make inferences across these tap points, it is likely that the measurement network will require even more capacity in terms of bandwidth and/or state than the underlying network that is being monitored.

We define a dataflow as an abstract database of triples, <p, t, l>, that keeps track of all packets, p, over all times, t, and all locations, l. We would like to query this database to make inferences over time and location. Obviously, we can't materialize the database. The probes provide direct evidence for a few of these <p, t, l> triples (e.g., the packets that have been received thus far), but many of them (especially the packets that will be received in the future) will have to be inferred by indirect statistical methods. We view the evidence from the probes as a training sample, based on which we hope to derive inferences that will generalize to unseen data. The distinction between training data and test data will be especially important in the discussion of compression below.

Dataflows borrow many ideas from conventional databases, especially data warehouses and transaction databases. But dataflows emphasize the movement of the data in time and/or space. Suppose that we wanted to join (or merge/purge (Herńandez, M. and Stolfo, 1995) the data being emitted by two I/O streams? Would it be better to use an indexed database or Unix pipes? Indexing can be very helpful, especially when the data are relatively static (fewer loads than queries). But indexing can be a liability when the data are flowing quickly. If there are more loads than queries, then the database could easily end up spending the bulk of its computational resources creating indexes that will never be used. In the two applications that will be described below, we ended up with a load and join process that is somewhere between a database and a buffered I/O stream.

Over the last four years, we have worked on a large number of dataflow applications in quite a number of areas including fraud, marketing, billing and customer care. The fraud applications (Cortes and Pregibon, 1998, 1999) are the most similar to alerting systems. The fraud systems look for suspicious purchasing behavior by comparing a customer's recent purchases with a summary of their long-term history (signature). As mentioned above, dataflow applications are challenging because of the scale and timeliness requirements. The fraud systems process a flow of 10 billion calls per month as quickly as possible, hopefully while there is still time to do something about it.

We have also built tracking systems for marketing, which compare sales across time and space, among many other variables. Marketing is constantly trialing new ideas: a new promotion here, a new price-point there, and so on. They will do more of what works and less of what doesn't. The challenge is to find ways to distinguish what works from what doesn't as quickly as possible, hopefully while there is still time to respond.

This paper will discuss two dataflow applications in more detail. The first example emphasizes scale (throughput and latency) while the second example emphasizes predictions from the training data to unseen test data.

  1. Gecko: a system that monitors the billing factory (Hume and Maclellan, 1999). How long does it take for a record to flow from one system to the next? Do any records get lost along the way? Latencies in the billing factory can be thought of as inventory and lost records can be thought of as shrinkage, a term used in retailing to refer to missing/damaged inventory. As mentioned above, Gecko answers these questions by joining the views of the dataflow from a number of probes distributed throughout the billing factory.
  2. Pzip: a lossless, semantics-free compression tool based on gzip ( Gzip has just two phases, a compression phase and a decompression phase. Pzip introduces a third phase, a training phase, which is given a training sample of records seen thus far and is asked to make inferences about records that will be seen in the future. The training phase searches a space of invertible transforms (e.g., permutations of the columns in the record). Gzip is more effective if dependent fields are moved near one another and independent fields are moved far apart. At compression time, the transform is applied just before gzip, and then at decompression time, the inverse transform is applied just after gunzip. The training phase searches a space of transforms looking for the one that optimizes the expected size of the compressed records. These inferences are then put to the test at compression time when we find out if these transforms do indeed improve the performance of gzip. In several commercially important data warehousing and data transmission applications, these transforms have improved the performance of gzip by at least a factor of two in both space and time.
1. Gecko

The Gecko system monitors residential calls flowing through the billing factory for residential customers (figure 1). As each call is completed, a record is generated at one of about 160 4ESS/5ESS switches distributed geographically around the USA. These calls are collected by the BILLDATS system in Arizona, and are then sent to RICS, which, after converting from the 200 or so switch formats into the appropriate biller specific format, routes them to one of 50 or so billers. We denote the 45 or so business billers collectively as BCS and are thereafter ignored. After RICS, the remaining records are sent to MPS, a large buffer that holds the records until the customer’s next bill cycle, approximately half a month on average (some customers are billed every second month). Every day, Monday through Saturday, the main residential biller, RAMP, requests all records for about 5% of about 80 million customers. RAMP calculates the bill (rating) and sends the results to IDB to render the bill (either sending the bill electronically to a local telephone company or printing and mailing a paper bill). When bills are paid, RAMP sends the appropriate records to RIPS, an accounting system that books the revenue.

The monitoring task is challenging because of the throughput and latency requirements. Each box in figure 1 has a throughput of 1-10 billion calls/month (more toward the beginning of the pipeline and less downstream of the link from RICS to BCS in figure 1). Processing of bill cycles need to be completed in a day or less. Once a day, Gecko updates a database and generates reports tracking shrinkage and inventory. On an average day, the update process adds a billion records in 8-9 hours, but on the peak day, the process added 5 billion records in 13 hours.

The monitoring task requires more bandwidth and state than the underlying dataflow. Tapping a flow with k probes generally increases the bandwidth by a factor of k. In Gecko, 2/3 of the records pass 3 probes and 1/3 pass about 6, for an overall average of k = 4. Thus, the monitoring system has about 4 times as much throughput as the underlying process. As mentioned above, Gecko's update process handles about a billion records per day, which is about 4 times the average throughput of the billing factory. On a typical day, the switches send BILLDATS about 250 million records (or about 10 billion records/month).

State requirements also increase with the number of probes. Gecko’s database holds about 60 billion records occupying 2.6 terabytes. The underlying billing systems generally have much less state. The switches only have enough state to hold onto the flow for 2-5 days. BILLDATS and RICS can only hold onto their flows for a 1.5 days. Gecko's 60 billion record database is designed to store all of these flows for 45-75 days (the end-to-end latency) so that they can be reconciled with downstream processes.

Figure 1: The Billing Systems for Residential Customers

The billing factory has very challenging throughput and latency requirements. A bill cycle needs to be processed reliably in less than a day. Some components have even more demanding latency requirements. RAMP not only interfaces with MPS and IDB as mentioned above, but it also serves as the data warehouse for customer care. The care agents require access to a customer’s bill in interactive time (3 seconds or less), a significant challenge because there are thousands of care agents in dozens of call centers distributed across the country.

Gecko attempts to answer the question: is every call billed exactly once? As mentioned above, it does this by tapping the flow at a dozen or so tap points, and joining the flows at each of these tap points. Although this seems an obvious approach, extreme care is required to cope with the scale and the latency requirements.

A prototype implementation used an off-the-shelf database, but unfortunately Hume and his colleagues found that the database could not even load the flow reliably in real time, let alone answer the shrinkage question. In the end, they implemented the joins with simple sorted flat files, relying on raw I/O speed and the fact that the database could be updated and reports generated in a single pass through the database. Loading the flow into a commercial database wasn't worth the effort since we had to release the storage as soon as possible (as soon as the join completed). Although a dataflow is like a database in many ways, it is also like a buffered I/O stream in the sense that we probably can't afford to store the whole flow in memory at one time, and even if we could, we might not want to.

The Gecko example is interesting because it is just barely possible to bring all of the evidence to a central place. The next example of a dataflow considers a case where even this isn't practical. When compressing a large dataflow, we generally cannot afford algorithms that take time polynomial (or worse) in the size of the flow, but we can afford to apply these expensive algorithms to a small sample of the flow.

[end of insertion]

This database is updated once per day by about a billion records on average, with a peak of 5 billion records on the heaviest day. After each update cycle, various reports are generated tracking shrinkage and inventory, and then the databases are flushed, making room for the next update cycle. Gecko has been in production for over a year.

As suggested above, the monitoring system has considerably more bandwidth than the underlying system. Tapping a flow at k points generally increases the bandwidth by a factor of k. In this case, Gecko has an average throughput of a billion records/day, which is about an order of magnitude larger than the 1-10 billion records/month found in the underlying systems.

Tapping the network at k points also requires more state. Gecko’s database holds about 60 billion records occupying 2.6 terabytes. The underlying billing systems have much less state. The switches and the collectors, for example, typically flush their buffers every few hours or days, but the Gecko buffers (databases) have to hold onto these flows until they can be reconciled with downstream processes. The database of 60 billion records in Gecko is enough to hold onto a couple of months of throughput at a billion records/day.

Figure 1: The Billing Systems for Residential Customers

As each call is completed, a record is generated at one of about 160 4ESS/5ESS switches distributed geographically around the USA. These calls are collected by the BILLDATS system in Arizona, and are then sent to RICS, which decides which biller to send them to. Business calls are sent to the business biller, BCS. RICS also converts the records from the hundreds of switch formats to the appropriate format for the biller. After RICS, records are sent to MPS, a large buffer that holds the records until the customer’s next bill cycle, approximately half a month on average. There are 22 bill cycles per month. On each of these, the residential biller, RAMP, requests all records for about 5% of the 80 million customers. RAMP calculates the bill (rating) and sends the results to IDB to render the bill (either sending the bill electronically to a local telephone company or printing and mailing a paper bill). When bills are paid, RAMP sends the appropriate records to RIPS, an accounting system that books the revenue.

It is challenging to track these flows because of the throughput and latency requirements. A bill cycle needs to be processed reliably in less than a day. Some components have even more demanding latency requirements. RAMP not only interfaces with MPS and IDB as mentioned above, but it also serves as the data warehouse for customer care. The care agents require access to a customer’s bill in interactive time (3 seconds or less), a significant challenge because there are thousands of care agents in dozens of call centers distributed across the country.

Gecko attempts to answer the question: is every call billed exactly once? As mentioned above, it does this by tapping the flow at a dozen or so tap points, and joining the flows at each of these tap points. Although this seems an obvious approach, extreme care is required to cope with the scale and the latency requirements.

The first implementation used an off-the-shelf database, but unfortunately Hume and his colleagues found that the database could not even load the flow reliably in real time, let alone answer the shrinkage question. In the end, they implemented the joins with simple sorted flat files. Loading the flow into a commercial database wasn't worth the effort since we had to release the storage as soon as possible (as soon as the join completed). Although a dataflow is like a database in many ways, it is also like a buffered I/O stream in the sense that we probably can't afford to store the whole flow in memory at one time, and even if we could, we might not want to.

The Gecko example is interesting because it is just barely possible to bring all of the evidence to a central place. The next example of a dataflow considers a case where even this isn't practical. When compressing a large dataflow, we generally cannot afford algorithms that take time polynomial (or worse) in the size of the flow, but we can afford to apply these expensive algorithms to a small sample of the flow.

2. Pin and Pzip: General Purpose Tools for Compressing Fixed-Length Records

Pzip (Partition Zipper) is a general-purpose lossless semantics-free data-compression tool for compressing sequences of fixed length records. The method is based on gzip, a de facto standard that has been adopted by commercial database vendors such as Sybase (Goldstein et al., 1997). Pzip improves on gzip by introducing a training phase pin (Partition Inducer) that induces a transform (a partition or schema) from training data that can then be usefully applied to unseen data during compression. Pin finds the optimal schema by calling gzip non-deterministically many times on the training sample. It would not be practical to call gzip so many times on the entire flow at compression time during production, but we can afford to do so on the relatively small training sample, especially when we can do the training offline at a convenient time of our choosing.

Pin and pzip were developed in collaboration with the Greyhound team, a group of developers responsible for a data warehouse used by marketing. Like Gecko, the warehouse uses simple sorted flat files rather than a proper database. (Most queries require a single linear scan over the data, so there is little advantage to indexing.) When we first started working with these folks, they were using gzip (or the closely related zlib libraries ( to compress the flat files, with surprisingly good results. The compressed files were 15 times smaller than the clear text. The marketing warehouse has since converted from gzip to pzip, saving at least a factor of two in both space and time over the previous gzip solution. A factor of two is worth tens of millions of dollars in this application alone, just counting the cost of the hardware, and ignoring the cost of staffing the computer center, which is even larger. In addition, pzip is being deployed in quite a number of other applications such as sending call detail (records of telephone calls) in real time over a limited bandwidth pipe (e.g., T1). A factor of two makes it possible to do things that could not have been done without the factor of two.