On another note - these kinds of "let's go back and check what we can do with a single node today" approaches are valuable to recalibrate our thinking once in a while. I didn't interpret this post as a case against distributed processing - there are many things going for it - but just to avoid falling into hype.
A couple of great posts by Frank McSherry along the same lines a few years ago, doing page rank for a 128bil edge graph on a single laptop (in the second post below) -
This is spot on. There are "highly scalable environments" which are flexible and generic, and then there are local focused tools, by their nature optimized in a way generic frameworks can never be.
Think ancient tools like grep versus Hadoop or Spark. Or our word2vec in Gensim vs word2vec in Spark or Tensorflow.
The occasional recalibration between "OMG-big-O-toward-infinity!-need-scale" versus "our-data-is-actually-finite-what's-a-good-operational-compromise" is a worthwhile exercise.
As with most questions like this, the answer is "Yes, of course, sometimes."
I work on IBM Streams, which is a distributed stream processing system [1,2]. When the cost of communicating between hosts is greater than the cost of your computation, distributed processing is not going to help. When your computation is basically a form of word count, then, yes, using multiple hosts may not help. But if your computation is, say, online speech to text for a large call center [3], then the application will eat just about as much compute resources as you can throw at it. That's going to mean using multiple machines, even if each machine is quite large.
As a plug for our system: we scale down as well as up. If you only have a single host, we will run the entire application in a single process, using threads as needed to achieve parallelism using either dedicate threads or a lightweight operator scheduler [4]. Users can configure which without changing application logic. If you have multiple machines, we will automatically partition your application across those machines for you [5]. By default, we do this all for you, without needing any input. But if you want to explicitly control what executes where, and using how many host machines, that's easy to configure, also without changing application logic.
Single process "streaming" or basically CEP engines have been around for a very long time and used to be the norm before distributed stream processing engines came around (such as Storm). CEP engines have much richer functionality than distributed stream processing engines because they don't need to deal with partitioning or data distribution. I'm not quite sure I see the appeal of making a non-distributed engine but with the same limitations of a distributed engine.
I work on Hazelcast Jet [1], which is a Java based distributed stream processing engine. The core engine is fast enough that it can be used with very good throughput on a single node (several times faster compared to Flink or Spark) but usually several nodes are not only needed strictly for parallelization but also for tolerating node failures and being able to restart where you left off. As others have pointed out, not every computation can be parallelised efficiently. Jet also offers in memory storage, so adding more nodes also increases your storage capacity.
Since the core of Jet is small enough (~400kb JAR), we also considered making a non-distributed version that runs strictly in process. Mainly for lightweight usage or embedding but would also offer a path to distributed execution, if it was ever needed.
Which are the limitations shared by both a single node and a distributed engine? I am confused...
There are ways to achieve fault-tolerance, even for a centralised system (e.g, by maintaining an active/passive replica). In cases like this, where there is such a great gap between the performance of current systems, you can always "waste" another node(s) for fault-tolerance and still operate with less cost, if you want.
I agree that some types of computations (e.g., multiple/distributed sources) may not be benefited by a centralised approach and I definitely don't claim that this is a solution for everything. However, the point is to criticise the design choices that we make for a streaming system.
Streaming support, even for popular systems today, is something like an extension (sometimes a hack) on top of the core of the system, hidden beneath multiple layers of abstraction. In addition, modern systems try to do many things at the same time (support AI, batch & stream processing, connectors to publish-subscribe systems, multiple wire protocols) and they end up doing most of them poorly.
I meant that products like Esper, StreamBase, InfoSphere have been around for a long time, which have a _very_ rich set of features [1], and are mostly designed around single process usage. Lot of the type of queries they support are not possible to implement in a performant way in a distributed system. Though nowadays Esper claim to have horizontal scalability - it was originally designed as a single threaded system. They do also have a passive/active type solution as you mentioned.
Stream processing frameworks originally evolved to offer "big scale" through data partitioning compared to the traditional CEP systems. But CEP engines have been able to deal with windowing and similar concepts since many years ago - the main difference of the stream processing frameworks _is_ the distribution and scalability aspect.
My point was that the systems linked in the original article seem to match closely to the limitations of what distributed stream processing frameworks are able to do, but only run on a single node.
I assume that by "InfoSphere" you mean what used to be called IBM InfoSphere Streams, but what is now just IBM Streams. I do research and development on IBM Streams (see a sibling comment), and I can say with certainty that it was designed as a distributed, parallel stream processing system from the start. It is a more general computing platform than CEP engines - in fact, we actually implemented a CEP engine as a Streams operator.
I couldn't find this on your product page, but does Hazelcast Jet support "global windows"? I've found 90% of stream processing systems I've come across only work within a window, but if I need to perform a computation "for all time," I'm SOL.
We are adding something called a "rolling aggregation", where you receive a record, accumulate it and then emit the current accumulated value. I'm not sure if this matches what you want.
More than performance, the primary concern these days is where the data lives and how it moves: Do we need to copy it to a different storage for processing? Change our company processes around how and where we store data? How many moving pieces to juggle?
It's a bit of a chicken and egg problem, since when designing new systems, management asks "How fast can we run this?", to which engineers reply "How much data is there and what speed are we aiming at?".
At this point, a failure to specify the desired volume and latencies leads to a sad cycle where scales are overblown ("just in case") and a complex solution chosen. Distributed solutions necessarily come with a massive overhead, so it is later discovered that even larger systems are needed…
Many such anecdotes circulate the industry, and I can add our own: In 2012 we implemented SVD, a math algorithm at the core of many machine learning techniques like PCA, LSI etc. This was a fast streamed "local" SVD implementation in Gensim (and now even faster in ScaleText). Suddenly, use-cases that needed a cluster of 12 beefy Hadoop (Mahout) machines could be performed on a single laptop, faster, and after a single `pip install`.
The question whether we need distributed stream processing can hardly be answered unambiguously. In general, everything depends on what features we actually need: scalability, fault tolerance, high availability, performance (latency or throughput) etc.
In any case, there are two ways to distribute stream processing tasks:
Normally, distributed stream processing requires also partitioning the data, e.g., by user or session ids, so that these isolated streams can be processed independently at different nodes.
Above observations are true when the conditions you match are simple. I have observed the same several times that when queries are rather simple you are better off with a single node. What happening is that cost of IO dominates.
There is a clear value in making Stream processors work closer to hardware. That does not mean single node can handle does better under all use cases. When the complexity of queries increases and when queries can be portioned, there are use cases where still distributed setup can do better.
I work on WSO2 Stream Processor, https://wso2.com/analytics ( Opensource, Apache Licensed). We have chosen to support both worlds where SP has an HA mode that can two servers in a Hot-warm mode that do 100K event per the second and give you a two-node deployment. If you want more, SP runs on top of Kafka, scales, and support multi-data center deployments as well. If the user is in doubt, he can start small and later switch to Kafka without changing any code.
One real reason to add distribution is for geographic redundancy or other high availability use cases. At the core of distributed databases is a distributed stream. Here's a comparison between the performance of logical timestamps vs atomic clocks for consistency management. https://blog.fauna.com/distributed-consistency-at-scale-span...
Of course these streams are more rigid than the kind the article discusses, but the tradeoffs are similar, and anyone architecting streams should know about the state of the art in database distribution.
We are neither trying to denounce distributed processing nor claim that single node engines can solve all the problems. For example, we still haven't discussed how to ingest this amount of data (80 million tuples/sec) or even more than that to a single node. How realistic is this scenario?
I agree that the benchmark used here is not a good example of a real-world streaming application, as it's not even compute-intensive. However, it is still used as the main benchmark for the evaluation of streaming frameworks.
In this blogpost, we are trying to start a discussion about how modern streaming systems perform and how they are supposed to perform. Towards this direction, we should reconsider what we believe is regular. For example, in cases like this, if you can use a single node instead of 5 for your computations, why shouldn't you consider about it...
I work in the area, and I don't consider the Yahoo set to be the main benchmarks for evaluating streaming frameworks. As a field, we don't have an agreed-upon set. I think a better place to start from are the DEBS Grand Challenges: http://debs.org/grand-challenges/
I agree that there isn't a widely accepted benchmark for streaming frameworks. However, you can find papers accepted recently in SOSP (Drizzle) or SIGMOD (Structured Streaming) --- I think there will be one even in VLDB--- having this benchmark as their main way of comparing streaming systems with each other.
I can't figure out what their modified version of the Yahoo Benchmark is. Does it still involve Kafka? Does it still involve Redis? Did they only generate tuples in-memory for SABER and the other systems pull from Kafka?
The Yahoo Streaming Benchmark isn't really much of a streaming benchmark, its a "reading things off of kakfa, deserializing JSON, sticking things in Redis benchmark". Really not a good benchmark for understanding the system at test.
No, they did not only generate tuples in-memory for SABER. From the linked Flink blog post:
> Databricks made a few modifications to the original benchmark, all of which are explained in their own post:
> Removing Redis from step 5
> Generating data in memory for Flink and Spark; generating data via Spark and writing it to Kafka for Kafka Streams
> Writing data out to Kafka instead of to Redis
I think the authors know that the benchmark is weak. The blog post isn't (to my reading) claiming that it is proof that SABER is great so much as that the distributed stream processors need better evidence that they should exist (vs the Databricks Structured Streaming SIGMOD 2018 paper, in which the YSB benchmark is (I think) the only quantitative evaluation they do).
While the motivation is somehow similar, scaling up versus scaling out, these systems are really solving different problems.
Keep in mind that GraphChi uses a vertex-centric computational model that requires pre-processing of the data and is not really applicable in streaming scenarios where low latency is required!
A couple of great posts by Frank McSherry along the same lines a few years ago, doing page rank for a 128bil edge graph on a single laptop (in the second post below) -
The first post - "scalability! But at what cost" http://www.frankmcsherry.org/graph/scalability/cost/2015/01/...
The second post - "bigger data, same laptop" http://www.frankmcsherry.org/graph/scalability/cost/2015/02/...