2019-04-15

## Data integration

No work-for-all database exists. To design an application, we need to understand the software products we can use, and the circumstances in which a product is a good fit. Also, a complex application may use its data in several different ways. Therefore, several different pieces of software need to be combined to provide the necessary functionalities.

### Combining specialized tools by derived data

An application may write to a database and update search index with CDC or event sourcing log. Two conflicting writes might be processed by two storage systems in different order. If order of writes is important, it will cause inconsistency. We may need to use total order.

1. Client 1 sets X = A in DB.
2. Client 2 sets X = B in DB.
3. Client 2 sets X = B in index.
4. Client 1 sets X = A in index.

Updating a derived data system based on an event log can often be made deterministic and idempotent, making it easy to recover from faults.

#### Derived data versus distributed transactions

• Guarantees
• Ordering
• Distributed transactions: use locks like 2PL.
• Derived data: use log.
• A change takes exactly once.
• Transactions: use atomic commit.
• Derived data: use deterministic retry and idempotence.
• Linearizability
• Provided by transactions.
• Derived data are often updated asynchronously, so no.
• Practice
• XA as distributed transaction protocol has poor fault tolerance and performance. We may need a better one, but getting it widely adopted is difficult.
• Log-based derived data is currently the most promising approach for integrating different data systems.
• There might be a middle ground between two options.

#### Ordering events to capture causality

The causality between two events may be important. Some practices:

• Local timestamps can provide total ordering without coordination, may help when total order broadcast is not feasible.
• But still require recipients to handle events that are delivered out of order, some additional metadata may need to be passed around.
• An event can be given a unique identifier. The event is logged to record the state of the system that the user saw before making decision. Then any later events can reference the event identifier to record the causal dependency.
• Conflict resolution algorithms help with processing events that are delivered in an unexpected order.
• Useful to maintain state.
• Do not help if events have external side effects.

### Batch and stream processing

Data integration is to make sure that data ends up in the right form in all the right places. Batch and stream processors are used for this purpose, consuming inputs, transforming, joining, filtering, aggregating, training models, evaluating and writing to the outputs.

• Spark performs stream processing on top of a batch processing engine with microbatches.
• Microbatching may perform poorly on hopping or sliding window.
• Apache Flink do batch processing on top of a stream processing engine.

#### Maintaining derived state

• Functional flavor
• Batch processing encourages deterministic, pure functions whose output depends only on the input and no side effects other than the explicit outputs.
• Treat inputs as immutable and outputs as append-only.
• Stream processing is similar, but its operators is extended to allow managed, fault-tolerant states (aggregates, etc).
• Fault tolerance due to idempotence.
• Simplify reasoning about the dataflows in an organization.
• Think the data pipelines as deriving one thing from another, pushing state changes in one system and applying the effects to derived systems.
• Synchrony vs Asynchrony
• Fault tolerance
• Transactions may amplify failures by spreading them to the rest of system.
• Asynchrony makes system based on event logs robust, containing a fault in a part of system locally.
• Asynchrony can also make cross-partition communications more reliable and scalable.
• Like read secondary indexes from all partitions.

#### Reprocessing data for application evolution

• Reprocessing existing data provides a good mechanism for maintaining a system, evolving it to support new features and changed requirements.
• Without reprocessing, schema evolution is limited to simple changes.
• Like adding a new optional field.
• Reprocessing can restructure a dataset into a completely different model to support new requirements.
• We can do gradual evolution, having both old and new derived views.
• Gradually shift users from old view to the new one.

#### The lambda architecture

• Lambda architecture is a proposal to combine batch and stream processing.
• Core idea
• Have the incoming data recorded by appending immutable events to an always-growing dataset, like event sourcing.
• Run batch and stream processing on parallel.
• Stream processor produces an approximate update to the view quickly.
• Fast, approximate.
• Batch processor later consumes the same set of events and produces a corrected version of the derived view.
• Slow, exact.
• Reason
• Batch processing is thought to be simpler, more reliable and less prone to bugs.
• Problems
• Hard to maintain the same logic to run on both frameworks.
• Some libraries such as Summingbird allow computations running on both contexts.
• Problem still remains: complexity of debugging, tuning and maintaining.
• Hard to merge two separate outputs if the view is derived using complex operations.
• e.g., joins, sessionization, non-time-series output.
• Batch processor sometimes only processes incremental batches instead of the entire dataset.
• Problems: handling stragglers, windows that cross boundaries between batches.

#### Unifying batch and stream processing

The lambda architecture can avoid its downsides by allowing both batch and stream computations to be implemented in the same system. This requires following features, which are becoming increasingly available.

• The ability to replay historical events through the same processing engine handing the stream of recent events.
• e.g.
• Log-based message brokers can replay messages.
• Some stream processors can read input from a distributed FS like HDFS.
• Exactly-once semantics for stream processors.
• The output is the same as no fault occurred, even if faults did occur.
• Tools for windowing by event time, not by processing time, which is meaningless when reprocessing historical events.
• e.g., Apache Beam provides an API for such computations, which can be run using Apache Flink or Google Dataflow.

## Unbundling databases

Two philosophies of information storage:

• Unix
• Provides a fairly low-level hardware abstraction of data.
• Uses pipes and files are stored as sequences of bytes.
• Is simpler since it is just a thin wrapper around hardware resources.
• Relational DB
• A high level abstraction, hiding the complexities of data structure on disk, concurrency, recovery, etc.
• Uses SQL and transactions.
• Is simpler because a short declarative query can draw on a lot of powerful infrastructure (query optimization, indexes, join methods, concurrency control, replication, etc.)
• NoSQL is kind of like applying a Unix-esque approach of low-level abstractions to the domain of distributed OLTP data storage.

### Composing data storage technologies

Some features of databases and derived data systems (processed with batch and stream processors) are closely related.

• Secondary indexes
• Materialized views
• Precomputed cache of query results like aggregates.
• Replication logs
• Full-text search indexes

#### Create an index

• For a DB to run CREATE INDEX.
• It will scan over a consistent snapshot of a table, pick out fields to be indexed, sort then and write out index.
• After that, process the backlogs of writes happening after when the snapshot was taken.
• Keep index up to date whenever a transaction writes to the table.
• This process is essentially reprocessing the existing dataset and deriving the index as a new view onto the existing data.

#### The meta-database of everything

We can view the dataflows across an entire organization as one huge database.

• Batch and stream processors are like triggers, stored procedures and materialized view routines.
• Derived data systems are like different index types.

Unified API:

• Provide a unified query interface to a wide variety of underlying storage engines and processing methods.
• Unbundled databases, unifying writes
• To synchronize writes across different systems, ensure that all data changes end up in all the right places, even when faults happen.

#### Making unbundling work

To synchronize write across heterogeneous storage systems, an asynchronous event log with idempotent writes is much more robust and practical, compared with distributed transactions. This solution is much simpler and more feasible to implement across heterogeneous systems.

Advantage is the loose coupling between the various components.

• At system level
• Make the whole system more robust to outages or performance degradation of individual components.
• If a consumer runs slow or fails,
• The event log can buffer messages.
• Producer and other consumers runs unaffected.
• The faulty consumer can catch up.
• At a human level
• Allow different software components and services to be developed, improved and maintained independently.
• Event logs provides an interface powerful enough to capture fairly strong consistency properties.
• Due to durability and ordering events.
• Event log is also general enough to be applicable to almost any kind of data.

#### Unbundled versus integrated systems

• Even if event log method dominates, databases are still needed.
• To maintain states in stream processors.
• Serve queries for the output of batch and stream processors.
• A single integration product is simpler, and able to achieve better and more predictable performance on the kind of workload it is designed for.
• Just use such product when it satisfies all your needs.

#### What’s missing

It is better to have a high level language for composing storage and processing systems in a simple and declarative way.

• Examples
• To create index, do mysql | elasticsearch.
• Create and update caches.
• Early-stage research

### Designing applications around dataflow

Some ways of building applications around the ideas of unbundled databases and dataflow.

• Application code as a derivation function
• The code creates a derived dataset from the input.
• Applications:
• Secondary index
• Full-text search index
• Machine learning training
• Display cache of an aggregation of data on UI
• Separation of application code and state
• Tools running application code: Mesos, YARN, Docker, Kubernetes.
• Most web applications are deployed as stateless services; the states go to databases.
• For changes happening in databases, we can poll or subscribe (just beginning to emerge).
• Dataflow: Interplay between state changes and application code
• A data change will trigger the workflow.
• Message processing requires stable message ordering and fault-tolerance.
• Modern stream processors can provide both at scale, with application code to be run as stream operators.
• Stream processors and services
• For application development, functionality can be broken down into a set of services that communicate via synchronous network requests.
• Advantage is the organizational scalability through loose coupling.
• Dataflow is different: one-directional, asynchronous message streams.
• Can achieve better performance with stream join.
• Subscribe a stream of changes, rather than querying the current state when needed.
• Be careful of the time dependency of historical values.

### Observing derived state

A data journey consists of a write path and read path, for example, updating search index when data comes in, read index when user searches something. The derived dataset (e.g., search index) is the place where the write path and the search path meets. It represents a trade-off between the amount of work that needs to be done at write time and the amount that needs to be done at read time.

• Materialized views and caching
• For the example of full-text search,
• Cache/Materialized views: precompute the search results for only a fixed set of the most common queries.
• Serve uncommon queries from the index.
• Basically, the boundary between read path and write path can shift as needed.
• Stateful, offline-capable clients
• We can have stateful clients: using local databases without internet, syncing with server when available.
• We can think of the on-device state as a cache of state on the server.
• Pushing state changes to clients
• Maintain a TCP connection between server and client, push state changes.
• Read/Write path model: extend the write path all the way to the end user.
• If the device is offline, it can catch up easily later.
• End-to-end event streams
• Many client-side stateful tools such as React, Flux, Redux.
• Can allow server to push state-change events into the client-side event pipeline.
• Problem: Many databases only support request/response fashion, not subscription to changes (a request that returns a stream of responses over time).
• Stream processors maintain some states, which can be read by outside clients.
• Aggregations, tables to perform joins.
• Treat reads as a stream of events too.
• The read events can be routed to the stream processor holding the data.
• Recording a log of read events potentially also has benefits with regard to tracking causal dependencies and data provenance across a system.
• Reconstruct what the user saw before they made a decision.
• Multi-partition data processing
• Distributed execution of complex queries that need to combine data from several partitions.
• With the help of message routing, partitioning and joining provided by stream processors.
• e.g., Storm’s distributed RPC feature used on Twitter.

## Aiming for correctness

• Traditional transactions (atomicity, isolation, durability) is useful to build correct applications.
• But at cost of limiting performance, scale and fault-tolerance properties we can achieve.
• Some areas abandoned transaction entirely.
• But difficult to determine whether it is safe to run a particular application at a particular transaction isolation level or replication configuration.
• Often simple solutions are correct at low concurrency, but have subtle bugs in more demanding cases.
• Application codes may use the correct infra product in a wrong way.

### The end-to-end argument for databases

Even a data system with strong safety properties can experience data loss or corruption. For example, people may make mistakes to cause application bugs. It’s good to have immutable and append-only data to recover from faults.

End to end argument:

The function in question can completely and correctly be implemented only with the knowledge and help of the application standing at the endpoints of the communication system. Therefore, providing that questioned function as a feature of the communication system itself is not possible. (Sometimes an incomplete version of the function provided by the communication system may be useful as a performance enhancement.)

Example: duplicate suppression:

• Make operation idempotent is good for fault tolerance.
• But some operations require some effort to be idempotent.
• e.g., maintain some additional metadata, ensure fencing when failing over from one node to another.
• An operation may be executed more than once.
• Duplicate suppression only works in a single TCP connection.
• If a transaction needs to reconnect due to network issue, a second TCP connection is created. It may cause duplicate execution.
• 2PC cannot ensure the transaction is executed once.
• Can use a unique ID to identify an operation from end to end, to prevent it happening twice.
• Such uniqueness constraint can be maintained correctly, even at week isolation levels.

Other examples:

• Checking the integrity of data: end-to-end checksums.
• End-to-end encryption in network.

To be free from data loss or corruption, the application itself need to take end-to-end measures. We currently lake of a high-level fault-tolerance machinery.

• Transaction is one, but too expensive in distributed systems.
• Worth to explore fault-tolerance abstractions that make it easy to provide application-specific end-to-end correctness properties, with good performance as well.

### Enforcing constraints

Besides dedup, we may have other constraints, for example, uniqueness constraint (e.g., make sure a username is unique). Other constraints (e.g. an account balance never goes negative.) are similar.

• Uniqueness constrains require consensus.
• We can avoid it by having a single node making decisions. But problem still exists if the node fails.
• Uniqueness checking can be scaled out by partitioning based on the unique values.
• Synchronous coordination is unavoidable if we want to reject any invalid writes immediately.
• Asynchronous multi-master replication has the problem that different masters concurrently accept conflicting writes.
• Uniqueness in log-based messaging
• The log ensures that all consumers see messages in the same order (total order broadcast).
• Easy to enforce uniqueness constraint: route possibly conflicting request to the same partition and process sequentially.
• A stream processor consumes all messages in a log partition on a single thread.
• Every request for a username is a message, hashed to a partition.
• A processor uses a local DB to keep track of taken usernames.
• Send success/reject message to an output stream.
• Clients watch the output stream.
• Multi-partition request processing
• An event involving multiple partitions can use event log to ensure correctness instead of using transactions.
• For example, payer A pays money to payer B. A and B can be at different partitions.
1. The request is given a unique request ID by the client and appended to a log partition based on the request ID.
2. A processor reads the log of requests. For a single request, it emits two messages with the request ID to output streams.
• Debit to payer A.
• Credit to payee B.
3. Further processors consume the streams of debit and credit instructions.
• Deduplicate by request ID.
• Apply changes to accounts.
• Practices
• Appending a request is atomic.
• If processor at step 2 crashes, it resumes processing from its last checkpoint.
• Duplicate messages may be produced, but can be deduplicated.

### Timeliness and integrity

The term consistency conflates two different requirements that are worth considering separately:

• Timeliness
• Users observe the system in an up-to-date state.
• User may observe it in an inconsistent state if reading from a stale copy, but such inconsistency is temporary.
• Violations of timeliness are eventual consistency.
• Integrity
• Absence of corruption: no data loss, no contradictory or false data.
• In particular, if some derive dataset is maintained as a view of some data, the derivation must be correct. (e.g., indexing)
• If integrity is violated, the inconsistency is permanent.
• Waiting and trying again is not going to fix corruption in most cases.
• May need explicit checking and repair.
• Related ACID: atomicity and durability.
• Violations of integrity are perpetual inconsistency.

Therefore, integrity is much more important than timeliness. Violations of timeliness can be annoying and confusing, but violations of integrity can be catastrophic.

#### Correctness of dataflow systems

• When processing event streams asynchronously, no guarantee of timeliness, but integrity can be guaranteed.
• Without requiring distributed transactions and an atomic commit protocol.
• Ensure integrity: exactly-once semantics, fault-tolerant message delivery and duplicate suppression.
• Mechanism:
• Representing the content of write operation as a single message.
• Can be easily written atomically.
• Deriving all other state updates from that single message using deterministic derivation functions.
• Passing a client-generated request ID through all these levels of processing, enabling end-to-end duplicate suppression and idempotence.
• Making messages immutable and allowing derived data to be reprocessed from time to time, which makes it easier to recover from bugs.

#### Loosely interpreted constraints

Enforcing a uniqueness constraints requires consensus, typically implemented by funneling all events in a particular partition on a single node. Stream processing cannot avoid this. However, many real application can use a much weaker notion of uniqueness. In many business, it’s usually okay to violate some constraints temporarily and fix it up later by apologizing.
These applications do require integrity but not timeliness on the enforcement of the constraint.

• Compensating transactions: inform the clients with the rest of conflicting writes later.
• If oversale, tell customers it’s temporary out of stock and bring more into the warehouse.
• Overdraft fee charged by bank.

#### Coordination-avoiding data systems

Two observations:

• Dataflow systems can maintain integrity guarantees on derived data without atomic commit, linearizability or synchronous cross-partition coordination.
• Many applications are fine with loose constraints that may be temporarily violated and fixed up later.
• Integrity must be preserved.
• Some strict uniqueness constraints are needed too, requiring timeliness and coordination.

The dataflow systems can provide the data management services for many applications without requiring coordination, but still with strong integrity guarantees.

• A system can operate distributed across multiple datacenters in multi-leader configuration, asynchronously replicating between regions.
• In this system, serializable transactions are still useful to maintain derived state, but at a smaller scope - no distributed transactions required.
• Synchronous coordination can be introduced when it’s needed, but not everything.
• Find the balance point with tradeoff: Apologies for inconsistencies vs apologies for bad performance and availability.

### Trust, but verify

Our system models assume something may go wrong, like network interruption, node crash, etc, while others don’t.

• Data written to disk is not lost.
• Data in memory is not corrupted.
• Multiplication instruction of our CPU always returns the correct result.

But the reality is really a question of probabilities: some things are more likely, other things less likely. Some things really happen.

• Data on disks is corrupted.
• Data corruption on the network can evade the TCP checksums.
• Certain pathological memory access patterns can flip bits even in memory that has no faults.
• Rowhammer technique: Used to break security mechanisms in os.

Practices:

• Maintaining integrity in the face of software bugs
• There is always risks of software bugs, even famous ones like MySQL.
• Application code may have even more bugs.
• Don’t just blindly trust what they promise.
• Both hardware and software may have bugs.
• Auditing: We should have a way of finding out if data has been corrupted so that it can be fixed. We can figure out the source of the error.
• Mature systems consider and manage such risks, e.g., HDFS and Amazon S3 don’t fully trust disks.
• A culture of verification
• Systems like HDFS still have to assume that disk work correctly most of the time, but not always correctly.
• But many systems don’t have such self-validating and self-auditing features.
• Designing for auditability
• Compared with transactions, event-based system can provide better audibility, since the derivation can be made deterministic and repeatable.
• A deterministic and well-defined dataflow makes the provenance of data much clearer, making integrity checking much more feasible.
• Use hashes to check event storage for event log.
• Rerun the batch and stream processors to check derived states.
• Or even run a redundant derivation in parallel.
• An explicit dataflow also helps diagnose when something unexpected occurred.
• The end-to-end argument again
• Have continuous end-to-end integrity checks along an entire derived data pipeline. We can increase the chances that bugs will be found quickly.
• Tools for auditable data systems
• Log all changes to a separate audit table.
• The audit log itself may have integrity problems.
• Cryptographic tools proving integrity robust to a wide range of hardware, software and malicious actions.
• Bitcoin, Ethereum, etc.
• Merkle trees: trees of hashes that can be used to efficiently prove that a record appears in some dataset.
• Certificate transparency is a security technology that relies on Merkle trees to check the validity of TLS/SSL certificates.

## Doing the right thing

Develop software for the good of users. Treat user data with humanity and respect.

### Predictive analytics

Businesses use algorithms to prevent fraudulent behaviors. However, algorithms are not perfect. People may be wrongly put into “algorithm prison”.

• Bias and discrimination
• Model are inferred from data. If the input has a bias, the output model will most likely learn and amplify that bias.
• Human efforts are needed to predict future behaviors.
• Responsibility and accountability
• Data driven algorithms make it hard to understand how a wrong decision is made.
• Feedback loops
• A wrong algorithm decision may build a feedback loops to amplify and make things worse.

### Privacy and tracking

Companies may track users’ behaviors to make profits, while users’ interests take the second place.

• Surveillance
• Not all data usages are benign.
• Consent and freedom of choice
• No meaningful consent: Users have little knowledge of what data they are feeding into our databases, or how it is retained and processed.
• Users don’t consent still have little choice since a service may be quite popular.
• Privacy and use of data
• Users should have the freedom to set the privacy aspects of their data.
• Data as assets and power
• User data is core asset of systems like ads, and may be abused.
• Remembering the industry revolution
• The risks of information age should be managed.
• Legislation and self-regulation
• We need to renew regulations regarding data collection and usages.
• Self-regulate data collection and processing practices to establish and maintain user trust.