2018-12-02

## Batch Processing

Different types of systems:

• Services (online)
• A service waits for a request or instruction from a client to arrive.
• Primary measure: response time.
• Availability is very important.
• Batch processing systems (offline)
• Take a large amount of data, run a job, produce some outputs.
• Primary measure: throughput.
• Stream processing systems (near-real-time/nearline)
• A stream job operates on events shortly after they happen.
• Lower latency than the equivalent batch systems.

### Batch processing with unix tools

#### Simple log analysis

A example using chains of commands:

cat /var/log/nginx/access.log |             # get contents
awk '$7 !~ /\.css$/ {print \$7}' |     # print the 7th item split by whitespace, ignore css files
sort             |                    # sort
uniq -c          |                    # get unique items with their counts
sort -r -n       |                    # sort based on the number, reversely
head -n 5                             # get the first 5

• Check out more related commands
• sort can handle large data volume that cannot fit into memory.
• Spill to disk, parallelize sorting across multiple CPU cores.

#### The unix philosophy

1. Make each program do one thing well. To do a new job, build a fresh rather than complicate old programs by adding new “features”.
2. Expect the output of every program to become the input to another, as yet unknown, program. Don’t clutter output with extraneous information. Avoid stringently columnar or binary input formats. Don’t insist on interactive input.
3. Design and build software, even operating systems, to be tried early, ideally within weeks. Don’t hesitate to throw away the clumsy parts and rebuild them.
4. Use tools in preference to unskilled help to lighten a programming task, even if you have to detour to build the tools and expect to throw some of them out after you’ve finished using them.

Unix features:

• A uniform interface
• A file as the interface in Unix.
• A sequence of bytes
• Can be an actual file, a communication channel, a device driver, a network socket, etc.
• Many Unix programs treat the sequence of bytes as ASCII text separated by \n or 0x1E.
• Separation of logic and wiring
• Separating the input/output wiring from the program logic.
• Standard input (stdin) and standard output (stdout): Pipes can attach the stdout of one process to the stdin of another process.
• Limitations
• Programs that need multiple inputs or outputs are possible but tricky.
• Can’t pipe a program’s output into a network connection.
• I/O is wired up by the program when
• the program directly opens files for reading and writing;
• the program starts another subprocess;
• the program opens a network connection.
• Transparency and experimentation
• Unix makes it easy to see what’s going on.
• Input files are normally treated as immutable, safe to try different options.
• Can end the pipeline at any point and less the output.
• Can write the output of a pipeline stage to a file and use the file as input to the next stage.
• But Unix tools can only run on a single machine.

### MapReduce and distributed file systems

HDFS:

• MapReduce jobs read and write files on a distributed filesystem (HDFS).
• Other filesystems: GFS, GlusterFS, Quantcast File System, Amazon S3, Azure Blob, OpenStack Swift.
• HDFS is based on the shared-nothing principle.
• HDFS consists of a daemon process running on each machine, exposing a network service that allows other nodes to access files stored on that machine.
• A central server called the NameNode keeps track of which file blocks are stored on which machine.
• In order to tolerate machine and disk failures, file blocks are replicated on multiple machines.
• HDFS can handle a very large scale of number of machines and storage, with low cost as well.
• Using commodity hardware and open source software.

#### MapReduce job execution

• map
• Generate any number of k-v pairs from every input record.
• Try to run the map job on the machine that stores that input file.
• Application code is firstly copied to the appropriate machine.
• A node usually processes a file block.
• Output is sorted and written to temp files.
• Need to implement the mapper function.
• shuffle
• Partitioning by reducer, sorting, and copying data partitions from mappers to reducers.
• When a mapper is done, the MapReduce scheduler notifies the reducers that they can start fetching the output files from that mapper.
• reduce
• A reducer take the pairs with the same key and preserve the sort order.
• The framework uses a hash of the key to determine which reduce task should receive a particular k-v pair.
• Need to implement the reducer function.

We can use workflows (chained MapReduce jobs) to solve more complicated problems. Some high-level tools for it exist: Pig, Hive, Cascading, Crunch, FlumeJava.

#### Reduce-side joins and grouping

When a MapReduce job is given a set of files as input, it reads the entire content of all of those files, since it has no concept of indexes. In analytic queries, it’s common to calculate aggregates over a large number of records.

We can take all related database extracted using ETL and put them in the same distributed filesystem, and then use MapReduce to bring together all relevant records in the same place to process.

• Secondary sort
• MapReduce framework can make sure the reducer always processes one type of k-v pair first, then another type. (e.g., activity log first, then user profile)
• Bringing related data together in the same place.
• Mapper sends k-v pairs with the same key to the same place.
• Also support GROUP BY.
• Hot keys
• A key may have a large number of related records. The load is too much for a single reducer.
• Pig’s skewed join
• Run a sample job to check which keys are hot.
• Send hot k-v pairs to a random reducer.
• For other input to join, records related to the hot key need to be replicated to all reducers handling that key.
• Hive’s skewed join
• Require hot keys to be specified explicitly in the table metadata.
• Store records related to those keys in separate files from the rest.
• Use map-side join for the hot keys.
• We can do two-stage MapReduce to group records by a hot key.
• First, send records to a random reducer and grouping.
• Second, run a second job to combine the results.

#### Map-side joins

Reducer-side joins can be expensive due to sorting, copying to reducers and merging of reducer inputs. Map-side join uses a cut-down MapReduce job without reducers and sorting.

• Broadcast hash joins
• A large dataset is joined with a small dataset.
• The small dataset needs to be small enough that it can be loaded entirely into memory as a hash table in each of the mappers.
• Replicated join in Pig, MapJoin in Hive, Cascading, Crunch.
• Partitioned hash joins
• If the inputs to the map-side join are partitioned in the same way, by rows not columns, then the hash join approach can be applied to each partition independently.
• Each mapper can load a smaller amount of data into its hash table.
• This approach only works if both of the join’s inputs have the same number of partitions, with records assigned to partitions based on the same key and the same hash function.
• Bucketed map joins in Hive.
• Map-side merge joins
• Input datasets are not only partitioned in the same way, but also sorted based on the same key.
• Perform the merge with constant memory.

Output of map-side joins is partitioned and sorted in the same way as the large input, while for reducer-side joins, it’s by the join key.

Map-side joins makes assumptions about the size, sorting and partitioning of the input. Such metadata about partitioning is often maintained in HCatalog and the Hive metastore, in the Hadoop ecosystem.

#### The output of batch workflows

Some applications:

• Building search indexes
• Key-value stores as batch process output
• For machine learning systems such as classifiers and recommendation systems.
• The output is often some database.
• e.g., query a user ID to obtain suggested friends.
• Usually build a brand-new database inside the batch job and write it as files.

The MapReduce follows the Unix philosophy: Inputs are immutable and avoiding side effects. The job can be re-run if needed by human, or automatically.

#### Comparing Hadoop to distributed databases

• Storage
• Databases require structured data.
• Distributed filesystems store files that are just byte sequences.
• Make data available quickly.
• Processing models
• Databases
• SQL performance is very good.
• Not all kinds of processing can be expressed as SQL.
• MapReduce
• Can run specific codes over large datasets.
• More different models can be developed on top of Hadoop for different types of processing.
• Frequent faults
• Databases
• abort the query if a node crashed.
• prefer to keep as much data as possible in memory.
• MapReduce
• handles the faults better.
• is very eager to write data to disk.
• for fault tolerance;
• on the assumption that dataset is too big to fit into memory.
• In a mixed datacenter, high probability of termination for a task.
• Lower-priority task can be preempted to free up resources.
• Non-production resources can be overcommitted.
• Hardware failures are not so common.
• At Google, a MapReduce task that runs for an hour has an approximately 5% risk of being terminated.

### Beyond MapReduce

MapReduce is

• easy to understand.
• not easy to implement a quite complex processing job with it.
• Various of higher-level models on top of it: Pig, Hive, Cascading, Crunch.
• not optimal for some kinds of processing.
• robust: process almost arbitrarily large quantities of data on an unreliable multi-tenant system with frequent task terminations.

#### Materialization of intermediate state

• Intermediate state: output of a job to be used as the input of the following job.
• Materialization: the process of writing out this intermediate state to files.
• Unix pipes: stream the output to the input using only a small in-memory buffer.

Downsides of fully materialization:

• A MapReduce job can only start when all tasks in the preceding jobs have completed.
• Unix pipe allows all processes start at the same time.
• Mappers are often redundant.
• They just read back the same file that was just written by a reducer, and prepare it for the next stage of partitioning and sorting.
• The interleaving mapper stages between reducers are redundant.
• Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data.

Dataflow engines:

• e.g., Spark, Tez, Flink
• Tez: a fairly thin library relying on the YARN shuffle service.
• Spark and Flink are big frameworks.
• Handle an entire workflow as one job.
• More flexible operators instead of mapper and reducer.
• Expensive work such as sorting need only be performed when required.
• No unnecessary map tasks.
• The scheduler knows about the process and data and can do locality optimizations.
• Usually sufficient to keep intermediate state in memory or local disk, not HDFS.
• No need to wait for entire preceding stage.
• Existing JVM processes can be reused to run new operators.
• MapReduce launches a new JVM for each task.

Fault-tolerance:

• MapReduce fully materialize intermediate state to HDFS. If a task fails, it can just be restarted on another machine.
• Spark, Flink and Tez: If a machine fails and the intermediate state is lost, it’s recomputed from other data that is still available.
• Track how a piece of data is computed.
• Spark uses resilient distributed dataset abstraction.
• Flink checkpoints operator state.
• Better to make operator deterministic.
• The operators always produce the same output given the same input data.

#### Graphs and iterative processing

• To process a task with data represented as graph, we may need an iterative process.
1. Run a batch process to perform one step of the algorithm.
2. When the process completes, the scheduler check whether it has finished.
3. If not yet finished, the scheduler goes back to step 1 to run another round of batch process.
• Pregel processing model
• A function is called for each vertex, passing it all the messages that were sent to it.
• A vertex remembers its state in memory from one iteration to the next, so the function only needs to process new incoming messages.
• Fault-tolerance
• The framework makes sure the message delivery is reliable. (once and only once)
• By checkpointing the states of all vertices at the end of an iteration.
• Parallel execution
• The framework can partition the graph in arbitrary way. Because the model deals with just one vertex at a time.
• The graph algorithms often have a lot of cross-machine communications (over network) overhead and bigger intermediate states.
• Better to avoid parallel if the data can fit into a single machine. (even disks, not memory)

#### High-level APIs and Languages

• e.g., Hive, Pig, Cascading, Crunch
• Easy to implement the job code.
• Can be interactive interfaces.
• Declarative query languages
• Query optimizer can be applied to improve the performance.
• Different applications
• Machine learning, etc.

## Stream Processing

In general, a “stream” refers to data that is incrementally made available over time. We will look at event streams as a data management mechanism: the unbounded, incrementally processed.

### Transmitting event streams

In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock.

An event:

• is generated by a producer/publisher/sender;
• is processed by multiple consumers/subscribers/recipients.
• can be encoded as a text string or JSON or in some binary form.
• can be appended to a file, inserted into a relational table, or a document database.
• can be sent over the network to another node.

A file or database is sufficient to connect producers and consumers: a producer writes every event that it generates to the datastore, and each consumer periodically polls the datastore to check for events that have appeared since its last run.

However, when moving toward continual processing with low delays, polling becomes expensive if the datastore is not designed for this kind of usage.

#### Messaging systems

A producer sends a message containing the event, which is then pushed to consumers. A messaging systems allows multiple producers and consumers.

Two questions:

• What if the producer sends more messages than what the consumer can process?
• System can drop messages.
• Buffer in a queue.
• Flow control (ask producer to slow down)
• What if nodes crashes or go offline temporarily?
• Write to disk or replication.
• The system may tolerance some lost.

Different communication methods:

• Direct messaging from producers to consumers
• UDP when low latency is important.
• Financial industry such as stock market feeds.
• Application-level protocol can recover lost packets.
• TCP, IP multicast, HTTP, RPC
• The faults the system can tolerate are quite limited.
• Message brokers
• Send messages via a message broker, a kind of database optimized for handling message streams.
• Run as a service, connecting with producers and consumers.
• Standards: JMS, AMQP.
• RabbitMQ, ActiveMQ, Google Cloud Pub/Sub, etc.
• More easily tolerate client come and go.
• can queue messages in memory or disk.
• Asynchronous process. (Producers don’t wait for consumers)
• Can implement 2PC using XA or JTA.
• Broker may fail.

Differences between message brokers and databases:

• Database keeps the data until it is explicitly deleted. Brokers automatically delete consumed messages.
• Brokers assumes the queue is small. If the queue is long, a single message process can take long time.
• Database supports 2nd index and many ways to search for data; broker often only support subscription of some data topics.
• Broker notifies clients when data changes.

When multiple consumers read messages in the same topic, two main patterns of messaging are used:

• Each message is delivered to one of the consumers, so the consumers can share the work of processing the messages in the topic.
• Add consumers to parallelize the processing.
• Fan-out
• Each message is delivered to all of the consumers.
• Combined
• e.g., two separate groups of consumers may each subscribe to a topic, such that each group collectively receives all messages, but within each group only one of the nodes receives each message.

Acknowledgments and redelivery:

• In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
• The load balancing may cause message reordering if one consumer crashed and failed sending the ACK. (This is inevitable with load balancing and redelivery.)
• Can solve by using a separate queue per consumer. (i.e. not using load balancing)

#### Partitioned logs

• Database and filesystem allow re-processing a single piece of data anytime.
• Message system processing is usually destructive.
• Log-based message brokers: a hybrid system combining
• durable storage of databases
• low-latency notification facilities of messaging.

Using logs for message storage:

• A producer sends a message by appending it to the end of the log; a consumer receives messages by reading the log sequentially.
• If a consumer reaches the end of the log; it waits for a notification that a new message has been appended.
• To be distributed, the log can be partitioned across different machines.
• Messages within a partition are totally ordered. No ordering guarantee across different partitions.
• e.g., Apache Kafka, Amazon Kinesis Streams, Twitter’s DistributedLog

Logs compared to traditional messaging:

• Log-based approach supports fan-out very well.
• For load-balancing, a partition is assigned to a consumer, which processes all the messages.
• Downsides
• Number of nodes can be at most the number of log partitions.
• If a single message processing is slow, it holds up all subsequent messages.
• Choice:
• JMS/AMQP: if messages are expensive to process and we want to parallelize processing.
• Log-based: high message throughput, message is fast to process and message ordering is important.

Consumer offset:

Each message in a log partition has a sequential offset number. The consumer also maintains a offset in the partition; thus it knows which messages have been processed. Therefore, the broker does not need to track acknowledgments for every single message; it only needs to periodically record the consumer offsets.

The reduced bookkeeping overhead and the opportunities for batching and pipelining in this approach help increase the throughput of log-based systems. If a consumer failed, a new node can join to process the same partition, but a message could be processed twice if the latest consumer offset is not recorded by the broker.

Also, we can reset the consumer offset to a previous value to reprocessing messages.

Disk space usage:

• Disk space can be used as a buffer; very old segments get deleted or moved to archive.
• If the consumer falls behind the producer, the disk can buffer a long time for human to take effect.
• Traditional messaging systems usually buffer in memory and write into disk if the queue is too large.

### Databases and streams

A replication log is a stream of database write events, produced by the leader as it processes transactions. The followers apply that stream of writes to their own copy of the database and thus end up with an accurate copy of the same data. The events in the replication log describe the data changes that occurred.

#### Keeping systems in sync

Most nontrivial applications need to combine several different technologies. For example, using an OLTP database to serve user requests, a cache to speed up common requests, a full-text index to handle search queries, and a data warehouse for analytics. Each of these has its own copy of the data, stored in its own representation that is optimized for its own purposes.

As the same or related data appears in several different places, they need to be kept in sync with one another. For example, data warehouses are synchronized by ETL processes.

Dual write can sync data too. But it has some problems. Concurrent writes may have unlucky timing across different databases. Also, one of the writes may fail. Both may end up with inconsistent states. Solving them requires additional concurrency detection mechanism or atomic commit.

Different databases may have different leaders, thus causes different orderings.

#### Change data capture

More recently, there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. CDC is especially interesting if changes are made available as a stream, immediately as they are written.

Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database, since it preserves the ordering of messages.

Replication log parsing is a robust approach implement CDC. e.g., LinkedIn’s Databus, Facebook’s Wormhole, Yahoo’s Sherpa.

Initial snapshot:

We can reconstruct then entire state of the database using a snapshot and a truncated logs, to be efficient.

Log compaction:

A background process can do log compaction and merging for the records with the same key to reduce the size of logs. We can build a derived data system from the log offset 0. This feature is implemented in Apache Kafka.

API support:

• RethinkDB allows queries to subscribe to notifications when the results of a query change.
• Firebase and CouchDB provide data synchronization based on a change feed.
• Meteor uses the MongoDB oplog to subscribe to data changes.
• VoltDB allows transactions to continuously export data from a database in the form of a stream.

#### Event sourcing

Event sourcing involves storing all changes to the application state as a log of change events; but it applies the idea at a different level of abstraction:

• Change data capture
• The application uses the database in a mutable way, updating and deleting records at will.
• The log of changes is extracted from the database at a low level. The application writing to the database does not need to be aware of CDC.
• Event sourcing
• The application logic is explicitly built on the basis of immutable events that are written to an event log. Event store is append-only; updates or deletes are discouraged or prohibited.
• Events are designed to reflect things that happened at the application level, rather than low-level state changes.

Reconstruct the application state: Events represent user actions and don’t override prior events; thus log compaction is impossible. To reconstruct, we need to replay full history of events. One optimization is using a state snapshot.

A user command needs to be validated synchronously before it turns into an event. When then event is generated, it becomes a fact. Cancellation or change needs to be another event. Alternatively, the user request to reserve a seat could be split into two events: first a tentative reservation, and then a separate confirmation event once the reservation has been validated; thus asynchronously.

#### State, streams and immutability

The state of the application is mutable; but the log of events is immutable. We can make the state reproducible.

Immutable events can record all versions of history of the state, even incorrect state. We can recover the system from the problem.

Derive different views from the events:

• Many products allow to extract different views from the event stream.
• Druid with Kafka, Pistachio, Kafka Connect.
• Easier to evolve the application: To introduce a new feature, we can use the event log to build a separate read-optimized view for the new feature, and run it along side the old system. Shutdown the old system later.
• Don’t need to do a complicated schema migration.
• We can separate the data-written form and the data-read form.
• Command query responsibility segregation.
• View to read can be read-optimized.

Concurrency control:

• Consumers are usually asynchronous. A user may not read its write after it is done.
• Solution:
• Make the event log and read view update atomic.
• Or linearizable storage using total order broadcast.
• Events make it easy to group side effects into atomic unit.
• A straightforward single-threaded log consumer needs no concurrency control for writes: by construction, it only processes a single event at a time.

Limitations of immutability:

• Some kind of dataset may need many updates and deletes, making the immutable history very large.
• Fragmentation may be an issue. Performance of compaction and garbage collection becomes crucial for operation robustness.
• Sensitive data history are hard to be deleted forever.
• Called excision or shunning.

### Processing streams

We can process one or more input streams to produce one or more output streams. A stream processor consumes input streams in a read-only fashion and writes its output to a different location in an append-only fashion.

#### Uses of stream processing

• Complex event processing
• Specify rules to search for certain patterns of events in a stream.
• Often use a high-level declarative query language like SQL.
• A processing engine consumes the input streams and internally maintains a state machine to do the matching.
• e.g., Esper, IBM InfoSphere Streams, Apama, TIBCO StreamBase and SQLstream.
• Stream analytics
• Focus on aggregation and statistical metrics over a large number of events.
• A time window can be enforced.
• Stream analytics systems sometimes use probabilistic algorithms, such as bloom filters. Those are approximation algorithms using less memory.
• e.g., Apache Storm, Spark Streaming, Flink, Google Cloud Dataflow, Azure Stream Analytics etc.
• Maintaining materialized views
• Deriving an alternative view onto some dataset so that you can query it efficiently, and updating that view whenever the underlying data changes.
• Search on streams
• Similar to Complex event processing, but with complex criteria such as full-text search queries.
• Multiple queries can be indexed to narrow down the set of queries that may match.
• Stream processing is different from Message passing and RPC.
• Actor frameworks are primarily a mechanism for managing concurrency and distributed execution of communicating modules, whereas stream processing is primarily a data management technique.
• Communication between actors is often ephemeral and one-to-one, whereas event logs are durable and multi-subscriber.
• Actors can communicate in arbitrary ways (including cyclic request/response patterns), but stream processors are usually set up in acyclic pipelines where every stream is the output of one particular job, and derived from a well-defined set of input streams.

#### Reasoning about time

Stream processors often need to deal with time, especially when used for analytics purposes, which frequently use time windows such as “the average over the last five minutes.” Many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing. This approach has the advantage of being simple, and it is reasonable if the delay between event creation and event processing is negligibly short. However, it breaks down if there is any significant processing lag.

Processing delay may come from queueing, network faults, restart of the consumer, etc. Also delay may cause unpredictable ordering. Delay leads to bad data, for example, a steady stream of events may cause a spike in analytics.

If we want to process events in a time window, the problem is we can not be sure that we have received all events in this window. You can time out and declare a window ready after you have not seen any new events for a while, but it could still happen that some events were buffered on another machine somewhere, as straggler events.

• We can ignore these straggler events, as they are probably a small percentage.
• We can publish a correction, an updated value for the window later.

We can not trust a local clock on user’s device. Therefore, we need to adjust the timestamp of the action with 3 timestamps:

• The time at which the event occurred, according to the device clock.
• The time at which the event was sent to the server, according to the device clock.
• The time at which the event was received by the server, according to the server clock.

Types of windows:

• Tumbling window
• A tumbling window has a fixed length, fixed boundaries, and every event belongs to exactly one window.
• For example, if you have a 1-minute tumbling window, all the events with timestamps between 10:03:00 and 10:03:59 are grouped into one window, events between 10:04:00 and 10:04:59 into the next window.
• Hopping window
• A hopping window also has a fixed length, fixed boundaries, but allows windows to overlap in order to provide some smoothing.
• For example, a 5-minute window with a hop size of 1 minute would contain the events between 10:03:00 and 10:07:59, then the next window would cover events between 10:04:00 and 10:08:59, and so on.
• Sliding window
• A sliding window contains all the events that occur within some interval of each other.
• Boundaries are not fixed.
• For example, a 5-minute sliding window would cover events at 10:03:39 and 10:08:12, because they are less than 5 minutes apart.
• Session window
• Unlike the other window types, a session window has no fixed duration.
• It is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time.

#### Stream joins

New events can appear anytime on a stream makes joins on streams more challenging than in batch jobs.

• Stream-stream join (window join)
• e.g., a user search a query and may click some result items. We have search event and click events with the same session ID.
• We can join a click with a search if they occur at most one hour apart.
• Implementation: A stream processor needs to maintain state: e.g., all the events that occurred in the last hour, indexed by session ID.
• Stream-table join (stream enrichment)
• e.g., a stream of user activities and user profile database.
• Load a copy of the database into the stream processor so that it can be queried locally without a network round-trip.
• The stream processor can subscribe to a changelog of the user profile database and update its local copy.
• Table-table join (materialized view maintenance)
• e.g., a user has a tweet cache to view. When someone he/she follows update a tweet, the cache is updated.
• We need a stream of events of tweets and a stream of follow/unfollow.
• The stream process needs to maintain a database containing the set of followers for each user.

Time-dependence of joins:

Stream joins require the stream processor to maintain some state based on one join input and query that state on messages from the other join input.

• The order of the events that maintain the state is important. In a partitioned log, the ordering of events within a single partition is preserved.
• There is typically no ordering guarantee across different streams or partitions.
• Question: if events on different streams happen around a similar time, in which order are they processed?
• If the ordering of events across streams is undetermined, the join becomes nondeterministic.
• Known as slowly changing dimension (SCD) in data warehouses.
• It is often addressed by using a unique identifier for a particular version of the joined record.
• For example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale.
• Join will be deterministic.
• But log compaction is not possible since all versions of the records in the table need to be retained.

#### Fault Tolerance

In batch processing, we can restart some failed sub-tasks. It appears as though every input record was processed exactly once; no records are skipped, and none are processed twice. Although restarting tasks means that records may in fact be processed multiple times, the visible effect in the output is as if they had only been processed once. This principle is known as exactly-once semantics or effectively-once. But stream processing cannot do it the same way.

Solution:

• Microbatching
• One solution is to break the stream into small blocks, and treat each block like a miniature batch process.
• Used in Spark Streaming.
• The batch size is typically around one second, as a performance compromise.
• Checkpointing
• Periodically generate rolling checkpoints of state and write them to durable storage.
• If a stream operator crashes, it can restart from its most recent checkpoint and discard any output generated between the last checkpoint and the crash.
• Used in Apache Flink.
• Both methods can provide the same exactly-once semantics as batch processing within the stream framework, but not when output leaves the processor.

Atomic commit:

• In order to give the appearance of exactly-once processing in the presence of faults, we need to ensure that all outputs and side effects of processing an event take effect if and only if the processing is successful.
• Those effects include any messages sent to downstream operators or external messaging systems (including email or push notifications), any database writes, any changes to operator state, and any acknowledgment of input messages (including moving the consumer offset forward in a log- based message broker).
• Such atomic commit facility can be implemented in stream framework, different from XA.
• Used in Google Cloud Dataflow, VoltDB, and to be added in Apache Kafka.

Idempotence: Another way to discard partial output of any failed tasks, other than distributed transaction.

• An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once.
• For example, setting a key in a key-value store to some fixed value is idempotent. Increasing a counter is not.
• Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata.
• For example, every message in Kafka has a unique offset. When writing the message into an external database, we can include the offset of the last write message; thus we can tell whether an update has been applied.
• When failing over from one processing node to another, fencing may be required to prevent interference from a node that is thought to be dead but is actually alive.

Rebuild state after a failure

• Any stream process that requires state, any tables and indexes used for joins must be recovered after a failure.
• Aggregations: counters, averages and histograms.
• Solution
• Keep the state in a remote datastore and replicate it. But querying a remote database for every message can be slow.
• Or keep state local to stream processor and replicate it periodically.
• Implementation
• Flink: periodically captures snapshots of operator state and writes them to durable storage such as HDFS.
• Samza and Kafka Streams: replicate state changes by sending them to a dedicated Kafka topic with log compaction, similar to change data capture.
• VoltDB replicates state by redundantly processing each input message on several nodes.
• In some case, it may not even be necessary to replicate the state, because it can be rebuilt from the input streams.
• For example, if the state consists of aggregations over a fairly short window, it may be fast enough to simply replay the input events.
• If the local replica state is maintained by CDC, it can be rebuilt from the log-compacted change stream.