Stream processing
In computer science, stream processing (also known as event stream processing, data stream processing, or distributed stream processing) is a programming paradigm which views streams, or sequences of events in time, as the central input and output objects of computation. Stream processing encompasses dataflow programming, reactive programming, and distributed data processing.[1] Stream processing systems aim to expose parallel processing for data streams and rely on streaming algorithms for efficient implementation. The software stack for these systems includes components such as programming models and query languages, for expressing computation; stream management systems, for distribution and scheduling; and hardware components for acceleration including floating-point units, graphics processing units, and field-programmable gate arrays.[2]
The stream processing paradigm simplifies parallel software and hardware by restricting the parallel computation that can be performed. Given a sequence of data (a stream), a series of operations (kernel functions) is applied to each element in the stream. Kernel functions are usually pipelined, and optimal local on-chip memory reuse is attempted, in order to minimize the loss in bandwidth, associated with external memory interaction. Uniform streaming, where one kernel function is applied to all elements in the stream, is typical. Since the kernel and stream abstractions expose data dependencies, compiler tools can fully automate and optimize on-chip management tasks. Stream processing hardware can use scoreboarding, for example, to initiate a direct memory access (DMA) when dependencies become known. The elimination of manual DMA management reduces software complexity, and an associated elimination for hardware cached I/O, reduces the data area expanse that has to be involved with service by specialized computational units such as arithmetic logic units.
During the 1980s stream processing was explored within dataflow programming. An example is the language SISAL (Streams and Iteration in a Single Assignment Language).
Applications
Stream processing is essentially a compromise, driven by a data-centric model that works very well for traditional DSP or GPU-type applications (such as image, video and digital signal processing) but less so for general purpose processing with more randomized data access (such as databases). By sacrificing some flexibility in the model, the implications allow easier, faster and more efficient execution. Depending on the context, processor design may be tuned for maximum efficiency or a trade-off for flexibility.
Stream processing is especially suitable for applications that exhibit three application characteristics:
- Compute intensity, the number of arithmetic operations per I/O or global memory reference. In many signal processing applications today it is well over 50:1 and increasing with algorithmic complexity.
- Data parallelism exists in a kernel if the same function is applied to all records of an input stream and a number of records can be processed simultaneously without waiting for results from previous records.
- Data locality is a specific type of temporal locality common in signal and media processing applications where data is produced once, read once or twice later in the application, and never read again. Intermediate streams passed between kernels as well as intermediate data within kernel functions can capture this locality directly using the stream processing programming model.
Examples of records within streams include:
- In graphics, each record might be the vertex, normal, and color information for a triangle;
- In image processing, each record might be a single pixel from an image;
- In a video encoder, each record may be 256 pixels forming a macroblock of data; or
- In wireless signal processing, each record could be a sequence of samples received from an antenna.
For each record we can only read from the input, perform operations on it, and write to the output. It is permissible to have multiple inputs and multiple outputs, but never a piece of memory that is both readable and writable.
Code examples
By way of illustration, the following code fragments demonstrate detection of patterns within event streams. The first is an example of processing a data stream using a continuous SQL query (a query that executes forever processing arriving data based on timestamps and window duration). This code fragment illustrates a JOIN of two data streams, one for stock orders, and one for the resulting stock trades. The query outputs a stream of all Orders matched by a Trade within one second of the Order being placed. The output stream is sorted by timestamp, in this case, the timestamp from the Orders stream.
SELECT DataStream
Orders.TimeStamp, Orders.orderId, Orders.ticker,
Orders.amount, Trade.amount
FROM Orders
JOIN Trades OVER (RANGE INTERVAL '1' SECOND FOLLOWING)
ON Orders.orderId = Trades.orderId;
Another sample code fragment detects weddings among a flow of external "events" such as church bells ringing, the appearance of a man in a tuxedo or morning suit, a woman in a flowing white gown and rice flying through the air. A "complex" or "composite" event is what one infers from the individual simple events: a wedding is happening.
WHEN Person.Gender EQUALS "man" AND Person.Clothes EQUALS "tuxedo"
FOLLOWED-BY
Person.Clothes EQUALS "gown" AND
(Church_Bell OR Rice_Flying)
WITHIN 2 hours
ACTION Wedding
Comparison to prior parallel paradigms
Basic computers started from a sequential execution paradigm. Traditional CPUs are SISD based, which means they conceptually perform only one operation at a time. As the computing needs of the world evolved, the amount of data to be managed increased very quickly. It was obvious that the sequential programming model could not cope with the increased need for processing power. Various efforts have been spent on finding alternative ways to perform massive amounts of computations but the only solution was to exploit some level of parallel execution. The result of those efforts was SIMD, a programming paradigm which allowed applying one instruction to multiple instances of (different) data. Most of the time, SIMD was being used in a SWAR environment. By using more complicated structures, one could also have MIMD parallelism.
Although those two paradigms were efficient, real-world implementations were plagued with limitations from memory alignment problems to synchronization issues and limited parallelism. Only few SIMD processors survived as stand-alone components; most were embedded in standard CPUs.
Consider a simple program adding up two arrays containing 100 4-component vectors (i.e. 400 numbers in total).
Conventional, sequential paradigm
for (int i = 0; i < 400; i++)
result[i] = source0[i] + source1[i];
This is the sequential paradigm that is most familiar. Variations do exist (such as inner loops, structures and such), but they ultimately boil down to that construct.
Parallel SIMD paradigm, packed registers (SWAR)
for (int el = 0; el < 100; el++) // for each vector
vector_sum(result[el], source0[el], source1[el]);
This is actually oversimplified. It assumes the instruction vector_sum
works. Although this is what happens with instruction intrinsics, much information is actually not taken into account here such as the number of vector components and their data format. This is done for clarity.
You can see however, this method reduces the number of decoded instructions from numElements * componentsPerElement to numElements. The number of jump instructions is also decreased, as the loop is run fewer times. These gains result from the parallel execution of the four mathematical operations.
What happened however is that the packed SIMD register holds a certain amount of data so it's not possible to get more parallelism. The speed up is somewhat limited by the assumption we made of performing four parallel operations (please note this is common for both AltiVec and SSE).
Parallel stream paradigm (SIMD/MIMD)
// This is a fictional language for demonstration purposes.
elements = array streamElement([number, number])[100]
kernel = instance streamKernel("@arg0[@iter]")
result = kernel.invoke(elements)
In this paradigm, the whole dataset is defined, rather than each component block being defined separately. Describing the set of data is assumed to be in the first two rows. After that, the result is inferred from the sources and kernel. For simplicity, there's a 1:1 mapping between input and output data but this does not need to be. Applied kernels can also be much more complex.
An implementation of this paradigm can "unroll" a loop internally. This allows throughput to scale with chip complexity, easily utilizing hundreds of ALUs.[3][4] The elimination of complex data patterns makes much of this extra power available.
While stream processing is a branch of SIMD/MIMD processing, they must not be confused. Although SIMD implementations can often work in a "streaming" manner, their performance is not comparable: the model envisions a very different usage pattern which allows far greater performance by itself.
It has been noted that when applied on generic processors such as standard CPU, only a 1.5x speedup can be reached.[5] By contrast, ad-hoc stream processors easily reach over 10x performance, mainly attributed to the more efficient memory access and higher levels of parallel processing.[6]
Although there are various degrees of flexibility allowed by the model, stream processors usually impose some limitations on the kernel or stream size. For example, consumer hardware often lacks the ability to perform high-precision math, lacks complex indirection chains or presents lower limits on the number of instructions which can be executed.
Research
Stanford University stream processing projects included the Stanford Real-Time Programmable Shading Project started in 1999.[7] A prototype called Imagine was developed in 2002.[8] A project called Merrimac ran until about 2004.[9] AT&T also researched stream-enhanced processors as graphics processing units rapidly evolved in both speed and functionality. Since these early days, dozens of stream processing languages have been developed, as well as specialized hardware.
Programming model notes
The most immediate challenge in the realm of parallel processing does not lie as much in the type of hardware architecture used, but in how easy it will be to program the system in question in a real-world environment with acceptable performance. Machines like Imagine use a straightforward single-threaded model with automated dependencies, memory allocation and DMA scheduling. This in itself is a result of the research at MIT and Stanford in finding an optimal layering of tasks between programmer, tools and hardware. Programmers beat tools in mapping algorithms to parallel hardware, and tools beat programmers in figuring out smartest memory allocation schemes, etc. Of particular concern are MIMD designs such as Cell, for which the programmer needs to deal with application partitioning across multiple cores and deal with process synchronization and load balancing.
A drawback of SIMD programming was the issue of array-of-structures (AoS) and structure-of-arrays (SoA). Programmers often create representations of enitities in memory, for example, the location of an particle in 3D space, the colour of the ball and its size as below:
// A particle in a three-dimensional space.
struct particle_t {
float x, y, z; // not even an array!
unsigned byte color[3]; // 8 bit per channel, say we care about RGB only
float size;
// ... and many other attributes may follow...
};
When multiple of these structures exist in memory they are placed end to end creating an arrays in an array of structures (AoS) topology. This means that should some algorithim be applied to the location of each particle in turn it must skip over memory locations containing the other attributes. If these attributes are not needed this results in wasteful usage of the CPU cache. Additionally, a SIMD instruction will typically expect the data it will operate on to be continguous in memory, the elements may also need to be aligned. By moving the memory location of the data out of the structure data can be better organised for efficient access in a stream and for SIMD instructions to operate one. A structure of arrays (SoA), as shown below, can allow this.
struct particle_t {
float *x, *y, *z;
unsigned byte *colorRed, *colorBlue, *colorGreen;
float *size;
};
Instead of holding the data in the structure, it holds only pointers (memory locations) for the data. Shortcomings are that if an multiple attributes to of an object are to be operated on they might now be distant in memory and so result in a cache miss. The aligning and any needed padding lead to increased memory usage. Overall, memory management may be more complicated if structures are added and removed for example.
For stream processors, the usage of structures is encouraged. From an application point of view, all the attributes can be defined with some flexibility. Taking GPUs as reference, there is a set of attributes (at least 16) available. For each attribute, the application can state the number of components and the format of the components (but only primitive data types are supported for now). The various attributes are then attached to a memory block, possibly defining a stride between 'consecutive' elements of the same attributes, effectively allowing interleaved data. When the GPU begins the stream processing, it will gather all the various attributes in a single set of parameters (usually this looks like a structure or a "magic global variable"), performs the operations and scatters the results to some memory area for later processing (or retrieving).
More modern stream processing frameworks provide a FIFO like interface to structure data as a literal stream. This abstraction provides a means to specify data dependencies implicitly while enabling the runtime/hardware to take full advantage of that knowledge for efficient computation. One of the simplest and most efficient stream processing modalities to date for C++, is RaftLib, which enables linking independent compute kernels together as a data flow graph using C++ stream operators. As an example:
#include <raft>
#include <raftio>
#include <cstdlib>
#include <string>
class hi : public raft::kernel
{
public:
hi() : raft::kernel()
{
output.addPort<std::string>("0");
}
virtual raft::kstatus run()
{
output["0"].push(std::string("Hello World\n"));
return raft::stop;
}
};
int main(int argc, char **argv)
{
/** instantiate print kernel **/
raft::print< std::string > p;
/** instantiate hello world kernel **/
hi hello;
/** make a map object **/
raft::map m;
/** add kernels to map, both hello and p are executed concurrently **/
m += hello >> p;
/** execute the map **/
m.exe();
return EXIT_SUCCESS;
}
Models of computation for stream processing
Apart from specifying streaming applications in high-level languages, models of computation (MoCs) also have been widely used as dataflow models and process-based models.
Generic processor architecture
Historically, CPUs began implementing various tiers of memory access optimizations because of the ever-increasing performance when compared to relatively slow growing external memory bandwidth. As this gap widened, big amounts of die area were dedicated to hiding memory latencies. Since fetching information and opcodes to those few ALUs is expensive, very little die area is dedicated to actual mathematical machinery (as a rough estimation, consider it to be less than 10%).
A similar architecture exists on stream processors but thanks to the new programming model, the amount of transistors dedicated to management is actually very little.
Beginning from a whole system point of view, stream processors usually exist in a controlled environment. GPUs do exist on an add-in board (this seems to also apply to Imagine). CPUs continue do the job of managing system resources, running applications, and such.
The stream processor is usually equipped with a fast, efficient, proprietary memory bus (crossbar switches are now common, multi-buses have been employed in the past). The exact amount of memory lanes is dependent on the market range. As this is written, there are still 64-bit wide interconnections around (entry-level). Most mid-range models use a fast 128-bit crossbar switch matrix (4 or 2 segments), while high-end models deploy huge amounts of memory (actually up to 512 MB) with a slightly slower crossbar that is 256 bits wide. By contrast, standard processors from Intel Pentium to some Athlon 64 have only a single 64-bit wide data bus.
Memory access patterns are much more predictable. While arrays do exist, their dimension is fixed at kernel invocation. The thing which most closely matches a multiple pointer indirection is an indirection chain, which is however guaranteed to finally read or write from a specific memory area (inside a stream).
Because of the SIMD nature of the stream processor's execution units (ALUs clusters), read/write operations are expected to happen in bulk, so memories are optimized for high bandwidth rather than low latency (this is a difference from Rambus and DDR SDRAM, for example). This also allows for efficient memory bus negotiations.
Most (90%) of a stream processor's work is done on-chip, requiring only 1% of the global data to be stored to memory. This is where knowing the kernel temporaries and dependencies pays.
Internally, a stream processor features some clever communication and management circuits but what's interesting is the Stream Register File (SRF). This is conceptually a large cache in which stream data is stored to be transferred to external memory in bulks. As a cache-like software-controlled structure to the various ALUs, the SRF is shared between all the various ALU clusters. The key concept and innovation here done with Stanford's Imagine chip is that the compiler is able to automate and allocate memory in an optimal way, fully transparent to the programmer. The dependencies between kernel functions and data is known through the programming model which enables the compiler to perform flow analysis and optimally pack the SRFs. Commonly, this cache and DMA management can take up the majority of a project's schedule, something the stream processor (or at least Imagine) totally automates. Tests done at Stanford showed that the compiler did an as well or better job at scheduling memory than if you hand tuned the thing with much effort.
There is proof; there can be a lot of clusters because inter-cluster communication is assumed to be rare. Internally however, each cluster can efficiently exploit a much lower amount of ALUs because intra-cluster communication is common and thus needs to be highly efficient.
To keep those ALUs fetched with data, each ALU is equipped with local register files (LRFs), which are basically its usable registers.
This three-tiered data access pattern, makes it easy to keep temporary data away from slow memories, thus making the silicon implementation highly efficient and power-saving.
Hardware-in-the-loop issues
Although an order of magnitude speedup can be reasonably expected (even from mainstream GPUs when computing in a streaming manner), not all applications benefit from this. Communication latencies are actually the biggest problem. Although PCI Express improved this with full-duplex communications, getting a GPU (and possibly a generic stream processor) to work will possibly take long amounts of time. This means it's usually counter-productive to use them for small datasets. Because changing the kernel is a rather expensive operation the stream architecture also incurs penalties for small streams, a behaviour referred to as the short stream effect.
Pipelining is a very widespread and heavily used practice on stream processors, with GPUs featuring pipelines exceeding 200 stages. The cost for switching settings is dependent on the setting being modified but it is now considered to always be expensive. To avoid those problems at various levels of the pipeline, many techniques have been deployed such as "über shaders" and "texture atlases". Those techniques are game-oriented because of the nature of GPUs, but the concepts are interesting for generic stream processing as well.
Examples
- The Blitter in the Commodore Amiga is an early (circa 1985) graphics processor capable of combining three source streams of 16 component bit vectors in 256 ways to produce an output stream consisting of 16 component bit vectors. Total input stream bandwidth is up to 42 million bits per second. Output stream bandwidth is up to 28 million bits per second.
- Imagine,[10] headed by Professor William Dally of Stanford University, is a flexible architecture intended to be both fast and energy efficient. The project, originally conceived in 1996, included architecture, software tools, a VLSI implementation and a development board, was funded by DARPA, Intel and Texas Instruments.
- Another Stanford project, called Merrimac,[11] is aimed at developing a stream-based supercomputer. Merrimac intends to use a stream architecture and advanced interconnection networks to provide more performance per unit cost than cluster-based scientific computers built from the same technology.
- The Storm-1 family from Stream Processors, Inc, a commercial spin-off of Stanford's Imagine project, was announced during a feature presentation at ISSCC 2007. The family contains four members ranging from 30 GOPS to 220 16-bit GOPS (billions of operations per second), all fabricated at TSMC in a 130 nanometer process. The devices target the high end of the DSP market including video conferencing, multifunction printers and digital video surveillance equipment.
- GPUs are widespread, consumer-grade stream processors designed mainly by AMD and Nvidia. Various generations to be noted from a stream processing point of view:
- Pre-R2xx/NV2x: no explicit support for stream processing. Kernel operations were hidden in the API and provided too little flexibility for general use.
- R2xx/NV2x: kernel stream operations became explicitly under the programmer's control but only for vertex processing (fragments were still using old paradigms). No branching support severely hampered flexibility but some types of algorithms could be run (notably, low-precision fluid simulation).
- R3xx/NV4x: flexible branching support although some limitations still exist on the number of operations to be executed and strict recursion depth, as well as array manipulation.
- R8xx: Supports append/consume buffers and atomic operations. This generation is the state of the art.
- AMD FireStream brand name for product line targeting HPC
- Nvidia Tesla brand name for product line targeting HPC
- The Cell processor from STI, an alliance of Sony Computer Entertainment, Toshiba Corporation, and IBM, is a hardware architecture that can function like a stream processor with appropriate software support. It consists of a controlling processor, the PPE (Power Processing Element, an IBM PowerPC) and a set of SIMD coprocessors, called SPEs (Synergistic Processing Elements), each with independent program counters and instruction memory, in effect a MIMD machine. In the native programming model all DMA and program scheduling is left up to the programmer. The hardware provides a fast ring bus among the processors for local communication. Because the local memory for instructions and data is limited the only programs that can exploit this architecture effectively either require a tiny memory footprint or adhere to a stream programming model. With a suitable algorithm the performance of the Cell can rival that of pure stream processors, however this nearly always requires a complete redesign of algorithms and software.
Stream programming libraries and languages
Most programming languages for stream processors start with Java, C or C++ and add extensions which provide specific instructions to allow application developers to tag kernels and/or streams. This also applies to most shading languages, which can be considered stream programming languages to a certain degree.
Non-commercial examples of stream programming languages include:
- Ateji PX Free Edition, enables a simple expression of stream programming, the actor model, and the MapReduce algorithm on JVM
- Auto-Pipe, from the Stream Based Supercomputing Lab at Washington University in St. Louis, an application development environment for streaming applications that allows authoring of applications for heterogeneous systems (CPU, GPGPU, FPGA). Applications can be developed in any combination of C, C++, and Java for the CPU. Verilog or VHDL for FPGAs. Cuda is currently used for Nvidia GPGPUs. Auto-Pipe also handles coordination of TCP connections between multiple machines.
- ACOTES programming model: language from Polytechnic University of Catalonia based on OpenMP
- BeepBeep, a simple and lightweight Java-based event stream processing library from the Formal Computer Science Lab at Université du Québec à Chicoutimi
- Brook language from Stanford
- CAL Actor Language: a high-level programming language for writing (dataflow) actors, which are stateful operators that transform input streams of data objects (tokens) into output streams.
- Cal2Many a code generation framework from Halmstad University, Sweden. It takes CAL code as input and generates different target specific languages including sequential C, Chisel, parallel C targeting Epiphany architecture, ajava & astruct targeting Ambric architecture, etc..
- DUP language from Technical University of Munich and University of Denver
- HSTREAM: a directive-based language extension for heterogeneous stream computing[12]
- RaftLib - open source C++ stream processing template library originally from the Stream Based Supercomputing Lab at Washington University in St. Louis
- SPar - C++ domain-specific language for expressing stream parallelism from the Application Modelling Group (GMAP) at Pontifical Catholic University of Rio Grande do Sul
- Sh library from the University of Waterloo
- Shallows, an open source project
- S-Net coordination language from the University of Hertfordshire, which provides separation of coordination and algorithmic programming
- StreamIt from MIT
- Siddhi from WSO2
- WaveScript functional stream processing, also from MIT.
- Functional reactive programming could be considered stream processing in a broad sense.
Commercial implementations are either general purpose or tied to specific hardware by a vendor. Examples of general purpose languages include:
- AccelerEyes' Jacket, a commercialization of a GPU engine for MATLAB
- Ateji PX Java extension that enables a simple expression of stream programming, the Actor model, and the MapReduce algorithm
- Embiot, a lightweight embedded streaming analytics agent from Telchemy
- Floodgate, a stream processor provided with the Gamebryo game engine for PlayStation 3, Xbox360, Wii, and PC
- OpenHMPP, a "directive" vision of Many-Core programming
- PeakStream,[13] a spinout of the Brook project (acquired by Google in June 2007)
- IBM Spade - Stream Processing Application Declarative Engine (B. Gedik, et al. SPADE: the system S declarative stream processing engine. ACM SIGMOD 2008.)
- RapidMind, a commercialization of Sh (acquired by Intel in August 2009)
- TStreams,[14][15] Hewlett-Packard Cambridge Research Lab
Vendor-specific languages include:
- Brook+ (AMD hardware optimized implementation of Brook) from AMD/ATI
- CUDA (Compute Unified Device Architecture) from Nvidia
- Intel Ct - C for Throughput Computing
- StreamC from Stream Processors, Inc, a commercialization of the Imagine work at Stanford
Event-Based Processing
- Apama - a combined complex event and stream processing engine by Software AG
- Wallaroo
- WSO2 stream processor by WSO2
- Apache NiFi
Batch file-based processing (emulates some of actual stream processing, but much lower performance in general)
Continuous operator stream processing
- Apache Flink
- Walmartlabs Mupd8[16]
- Eclipse Streamsheets - spreadsheet for stream processing
Stream processing services:
- Amazon Web Services - Kinesis
- Google Cloud - Dataflow
- Microsoft Azure - Stream analytics
- Datastreams - Data streaming analytics platform
- IBM streams
- IBM streaming analytics
- Eventador SQLStreamBuilder
See also
References
- A SHORT INTRO TO STREAM PROCESSING
- FCUDA: Enabling Efficient Compilation of CUDA Kernels onto FPGAs
- IEEE Journal of Solid-State Circuits:"A Programmable 512 GOPS Stream Processor for Signal, Image, and Video Processing", Stanford University and Stream Processors, Inc.
- Khailany, Dally, Rixner, Kapasi, Owens and Towles: "Exploring VLSI Scalability of Stream Processors", Stanford and Rice University.
- Gummaraju and Rosenblum, "Stream processing in General-Purpose Processors", Stanford University.
- Kapasi, Dally, Rixner, Khailany, Owens, Ahn and Mattson, "Programmable Stream Processors", Universities of Stanford, Rice, California (Davis) and Reservoir Labs.
- Eric Chan. "Stanford Real-Time Programmable Shading Project". Research group web site. Retrieved March 9, 2017.
- "The Imagine - Image and Signal Processor". Group web site. Retrieved March 9, 2017.
- "Merrimac - Stanford Streaming Supercomputer Project". Group web site. Archived from the original on December 18, 2013. Retrieved March 9, 2017.
- Imagine
- Merrimac
- Memeti, Suejb; Pllana, Sabri (October 2018). "HSTREAM: A Directive-Based Language Extension for Heterogeneous Stream Computing". 2018 IEEE International Conference on Computational Science and Engineering (CSE). IEEE. pp. 138–145. arXiv:1809.09387. doi:10.1109/CSE.2018.00026. ISBN 978-1-5386-7649-3.
- PeakStream unveils multicore and CPU/GPU programming solution
- TStreams: A Model of Parallel Computation (Technical report).
- TStreams: How to Write a Parallel Program (Technical report).
- "GitHub - walmartlabs/Mupd8: Muppet". GitHub.