l2bud-header-image

L2Bud Blog

CQRS Simplified

July, 2019

tl;dr CQRS is a more complex pattern to implement than traditional single-instance (e.g.) RDBMS solutions, but the concepts are not as daunting as many of the articles on the topic might lead us to believe. It is not an architecture; it is a pattern that can be implemented within various architectural approaches, leveraging various technologies to accomplish the objectives of the pattern. This post attempts to simplify the concept and discuss various technologies within an architectural approach that can lead to a cost-effective, secure, and scalable solution.

CQRS stands for Command Query Responsibility Segregation (CQS is synonymous; Responsibility is implied). Simply put, it is literally just a pattern that separates data object reads from mutations of those data objects. By segregating these responsibilities, you can manage both independently, which provides you the capability of individual "right-sizing" of hardware and computational resources, as well as managing security independently. The quote below does a good job of explaining both what CQS is and more specifically, what it is not:

“CQRS is a very simple pattern that enables many opportunities for architecture that may otherwise not exist. CQRS is not eventual consistency, it is not eventing, it is not messaging, it is not having separated models for reading and writing, nor is it using event sourcing.”

Greg Young

The aforementioned link further provides a good practical example of the simplicity of the CQS/CQRS pattern, where a simple customer service is shown:

CustomerService

void MakeCustomerPreferred(CustomerId)
Customer GetCustomer(CustomerId)
CustomerSet GetCustomersWithName(Name)
CustomerSet GetPreferredCustomers()
void ChangeCustomerLocale(CustomerId, NewLocale)
void CreateCustomer(Customer)
void EditCustomerDetails(CustomerDetails)

Applying CQRS on this would result in two services:

CustomerWriteService

void MakeCustomerPreferred(CustomerId)
void ChangeCustomerLocale(CustomerId, NewLocale)
void CreateCustomer(Customer)
void EditCustomerDetails(CustomerDetails)

CustomerReadService

Customer GetCustomer(CustomerId)
CustomerSet GetCustomersWithName(Name)
CustomerSet GetPreferredCustomers()

If we look at this through the lens of a traditional RDBMS based CRUD approach, all that has occurred here is to separate the R from the C, R and D. One service is responsible for reading the data, whereas another is responsible for creating, updating, and deleting (which I'll refer to collectively as mutating). The model of the data object (in this case, Customer) has not changed; instead the actions applied to the object have been segregated.

Normally, the biggest concern we hear with an approach like this is around data staleness, and synchronizing the data between multiple services. This is a legitimate concern, and part of why this pattern introduces some complexities, which we will break down in more detail within this post.

First, let's debunk an often misunderstood data reality, regarding staleness: as soon as data is exposed to a user, it is potentially stale. It is potentially stale because a separate process could be updating the data once it has been accessed. A good example of this is provided here:

Let’s say we have a customer service representative who is one the phone with a customer. This user is looking at the customer’s details on the screen and wants to make them a ‘preferred’ customer, as well as modifying their address, changing their title from Ms to Mrs, changing their last name, and indicating that they’re now married. What the user doesn’t know is that after opening the screen, an event arrived from the billing department indicating that this same customer doesn’t pay their bills – they’re delinquent. At this point, our user submits their changes.

Should we accept their changes?

Well, we should accept some of them, but not the change to ‘preferred’, since the customer is delinquent. But writing those kinds of checks is a pain – we need to do a diff on the data, infer what the changes mean, which ones are related to each other (name change, title change) and which are separate, identify which data to check against – not just compared to the data the user retrieved, but compared to the current state in the database, and then reject or accept.

Udi Dahan

Data staleness is a reality whether we're using a traditional CRUD or CQS approach (or any other approach; it's simply a characteristic of data and time). If we can accept that reality, we can more easily accept the decoupled service approach of CQS. How often the data is synched (in the above scenario, how quickly a refresh of the customer support screen would reflect the accepted changes) is a separate challenge, and the frequency and timing of that synching can be accomplished in various ways, using various technologies (a couple of which we'll be exploring below), but again, data synchronization between disparate data objects in and of itself is a technology and architectural decision that should be considered separately from the CQS pattern.

The most common RDBMS based approach to implementing CQS is by separating the reads from data mutations using database views. The approach, then, would be to read from views, and the mutations happening on the underlying table itself.

This seems straightforward enough; to be sure, it's a pattern we've witnessed in the RDBMS space since practically the beginning of RDBMS. The problem with this approach is that you cannot easily scale the two independently. Out-of-the-box, accessing a view is going to consume from the same pool of computational resources as data mutations, so the two cannot be scaled independently, and security of the separate objects must be applied at a database resource level, which is certainly possible, but unnecessarily complex to keep updated, as additional tables and views are inevitably added over time.

So the complexities around the CQS pattern are not all that daunting. We're just separating responsibilities. In the above example, we didn't alter the model in any way, we didn't introduce eventing or event sourcing, and don't have to worry about eventual consistency, since any data mutation would immediately be reflected in the next read of the data.

However, where the complexities are introduced, and where we start to see things like varying models, eventing and eventual consistency (amongst the others) are in the practical applications of the pattern. As we walk through one (of many) practical deployments here, we will examine how all those things can come into play.

Perhaps we want to accomplish data access (the R in CRUD) using a document datastore, like DynamoDB, because it gives us the ability to (for all practical purposes) infinitely scale, and manage security independently. This is a common technology for this type of usage, in no small part because of the performance capabilities of the application, but because the access pattern for reads are often well understood. Obviously, doing things like data exploration for analytic purposes is an exception here; that type of exploration does not have a well-defined access pattern, and a traditional RDBMS is better suited for that type of access, but for the customer support example above, or to publish a hello, your name here upon successful sign-on to a website (for instance), the access pattern is well understood. The image below shows this access pattern:

documentDataStore

While it could, and often is, a separate actor that performs data mutations, for the sake of this post, we'll imagine them to be the same. To be clear, in our example, the user (depicted as a laptop in the above image) can update information about themselves, such as their physical and/or email address. Here, we'll introduce some event processing using a messaging approach. The user signs in, at which point certain information is retrieved (e.g. hello, your name here), and then they update their name, which passes a message to a queue, or in this case a SNS topic via a Lambda call, which in turn initiates a separate Lambda call (an event based on the arrival of a message) that updates a RDBMS table, depicted here as an Aurora RDS instance:

Aurora

The responsibilities between query (reading data) and data mutation have now been separated, by using a set of technologies and applications (DynamoDB and Aurora) and an event-driven architectural approach (Lambda), via messaging (SNS). This further allows some classic ETL type functionality, such as data cleansing, validation, conforming, transformation, etc., to occur between the read from SNS and the write to the RDBMS. Additionally, we can now scale the various components independently. DynamoDB will scale based solely on the access requirements of the reads, and the RDBMS can be scaled based on the mutation requirements of the writes. It is worth noting here, that this approach has in fact introduced separate models for the data; DynamoDB is a document datastore, so it more closely resembles a JSON object, comprised of (potentially nested) key/value pairs, as compared to the more rigid and flattened table structures within a RDBMS. Again, this is not a characteristic of the CQS pattern, but this implementation of the pattern, and specifically because of the technologies and applications chosen for that implementation, we do in fact see separate models in this instance.

With regard to security, the approach is also simplified versus the table/view approach discussed previously. We can set read access on DynamoDB differently than we do the security of the Lambda that performs the ETL and subsequent writes to the RDBMS.

However, the problem of eventual consistency has not been addressed, because no synching is shown to keep the data between Aurora and DynamoDB consistent.

Because the Aurora application enables us to trigger events based on database changes, we can accomplish this remaining requirement by extending our event-driven approach, continuing to leverage Lambda and SNS accordingly:

Data Synching

One of the strengths of taking the approach illustrated here is in the timing of events. Because we've taken the event-based approach shown here, we've minimized the latency between a user initiating a mutation (e.g. an update of their email address) and the ability to expose that information back to the user. Here, an event such as that, depending on the extent of the ETL, can happen sub-second.

An alternative solution here might be to use something like SQS as the queueing service between Aurora and DynamoDB, which would allow you to minimize write connections to DynamoDB; you could, for instance, choose to performs updates in mini-batch (perhaps every 5 minutes), applying any mutations that have accumulated since the last execution. While I personally prefer the event-driven SNS approach that minimizes latency, but at the expense of costs (because it allows for concurrent DynamoDB writes based on how frequently Aurora mutations occur, and thus could impact the required provisioned read capacity units), this approach is shown below, and the mutations are managed via a Cloudwatch timed event:

Data Synching with SQS

Finally, we'll extend the diagram with some additional access points for data visualization and data exploration, both of which are better served from the RDBMS, in this case leveraging the native database technology of SQL:

CQRS with RDBMS access

In this post, we've (hopefully) simplified the CQRS pattern and reduced confusion around what the pattern is and isn't, as well as shown a viable technology and architectural approach in which this pattern can be applied. To be sure, this approach introduces a number of complexities over a traditional single-server approach, but the reality is that many real-world problems have certain requirements that are not best met with traditional approaches, and CQS allows us the ability to address those complexities while continuing to exploit the cloud pay-for-what-you-use pricing model, while also managing security at an individual service level.

Tags

L2Bud CQRS CQS Event Driven Messaging DynamoDB AWS Data Science Lambda

Author

...
Joel Lieser

Moderator


FaaS is not a Replacement for Native Distributed Compute Technologies

June, 2019

tl;dr FaaS architectures do not replace everything else, and while they do allow us an inexpensive and fully scalable approach to do amazing things, we always always always want to position each and every one of our technologies correctly in the overall ecosystem. This post attempts to differentiate between where we might use a FaaS approach compared to a traditional batch processing approach, using natively distributed languages and technologies.

I've written somewhat extensively about my passions for serverless, microservices and Function as a Service architectures (FaaS), and some of the feedback I've received reminds me that I'm omitting a large group of folks that I consider myself to be a part of: big data analytics, so I wanted to take the time to differentiate between positioning FaaS, et. al, and big data analytics, as there are absolutely places (and room) for each.

While this post is not focused on FaaS architectures, it is important to recognize, even at a (very) high-level, what I'm talking about with regard to this topic. FaaS architectures take advantage of very small units of compute, and they can be applied to doing relatively fast stateless computations, with high degrees of concurrency. Further, there is no need to provision or manage the hardware for these stateless computations.

We use FaaS functions to do everything from rendering the data behind visualizations in a website to signing users in, to provisioning ephemeral hardware to sending email and text notifications. We subscribe to an architectural approach in which functions often publish messages to a queue (or topic, in the case of AWS SNS), and other functions fire upon receipt of those messages, as one common example.

Big data engineers and architects might quickly dismiss this architectural approach because their data is simply too large to be processed using this approach, and that is often correct. The table below, taken in part from the AWS Lambda limits page shows some of the restrictions to this type of compute:

  Function memory allocation     128 MB to 3,008 MB, in 64 MB increments.  
  Function timeout     900 seconds (15 minutes)  
  Invocation payload (request and response)     6 MB (synchronous), 256 KB (asynchronous)  
  /tmp directory storage     512 MB  

To be sure, Lambda functions can execute in parallel, but because they are independent and completely stateless, one function knows nothing about the others that may be executing. They are more like individual programs that happen to run at the same time than they are parallel processing in which something is managing the overall choreography of all executions, such as is the case with a technology like Apache Spark. It is true that we could accomplish Map/Reduce functionality using Lambda, that feels like a more academic exercise than a practical one, especially with so many tools and technologies out there that do it for us; many of them open source. Thus, I consider Lambda parallelism to be concurrent compute executions, and differentiate it from the type of parallel processing often used for things like distributed batch processing.

Imagine, if you will, that we're processing clickstream data, which is always "big", especially if your site or application is popular. If we were to be required to, say, generate a report that provides the median session time by day, we have a problem that cannot be solved additively. In order to determine median times, we have to process all the data for a given day. Sure, there are algorithmic ways to determine approximate median (beyond the scope of this post, but well worth the research, as it is often accurate enough to meet your needs at a fraction of the cost), but for the purposes of discussion here, we can imagine that we need the true median. We can't do this with a single Lambda function, because the data is too large and because the time it will take to process all of that data is beyond the timeout limitations that a single function call will allow.

As a sidenote, additive metrics are often good candidates for FaaS approaches. We simply consume the new data (perhaps, data having arrived in the last 30 seconds, or in real-time), sum it, retrieve the existing sum value of the metric, and add the two values together, producing a new current value.

In our clickstream example here, our data volumes are too unwieldy for a FaaS approach, but non-additive type metrics are surely not the only use-case for traditional batch processing (or streaming processing; a topic to be discussed in greater detail in a future post). Perhaps we have nightly ETL processes, which consume large volumes of data, cleanse, curate, integrate, validate, and persist it to, say, a file system or MPP/distributed RDBMS. This is a very common approach, which I am going to speak in some detail about here.

I believe in a data lifecycle in which the raw, unadulterated data is stored in perpetuity, in its original state. This enables an organization to be able to replay history, which is an incredibly powerful approach. While it is more costly, because you are storing large quantities of data, if anything ever goes wrong (and it will), such as incorrect code, accidental data deletion, or simply wanting to adjust logic and apply it to history, the data is there. While there is an additional cost, the storage costs are surprisingly small, because the cost of disk is quickly approaching zero. You could, for instance, store this raw data in something like AWS S3 Glacier at a cost of $0.004 per GB/Month, which is easily justifiable for the benefits and safety net it provides. You can usually do this processing leveraging a FaaS approach, because you are simply accumulating data at that point, and not transforming it in any way.

That raw data persisted to disk potentially includes Personally Identifiable Information, or PII, so you must be careful with security, which can be accomplished relatively simply using any number of approaches, such as AWS IAM, and it should go without saying that all data at rest and in-transit should be encrypted. Resources (including service users) with the correct permissions can then obfuscate, cleanse, curate and model the data accordingly, potentially in batch, and set the organized data down again, as illustrated below:

initial data flow

Now, we have our data in its original state, and with it, total and complete recoverability and replay-ability, and we've done some light cleansing, such as necessary data obfuscation, perhaps some basic modeling, validation, light data curation and the like, but that data isn't necessarily integrated with the rest of our data, which our Data Scientists and Analysts will certainly want and need.

At this point, we're accumulating a lot of data, and we may not necessarily be able to (or need to) process that in real-time, or with a FaaS architecture. Perhaps we have an overnight ETL process, for instance, that integrates our days data in bulk. This is common; oftentimes the data would prove to be mis-leading or confusing if it were aggregated and summarized in mini-batch (for instance, day over day sales would be alarming if someone were to look at the information at noon), or perhaps Analysts would struggle to explain data that is changing out from under them, as it is updated intra-day.

Processing this type of data (large volumes, in batch) is a perfect use case for technologies such as Hadoop, whereas it is not a great candidate for something like a FaaS architecture. Further, this batch processing can leverage technologies built for this type of processing, such as Map/Reduce or more recent offerings, such as Apache Spark. To be clear, Spark offers streaming (micro-batch) capabilities, but as I mentioned previously, streaming architectures are a topic for another day.

Spark is a great application for bulk processing, and is far superior than traditional Map/Reduce in a number of ways, not limited to its usage of memory, its use of Directed Acyclic Graphs, which are both big reasons as to why Spark can claim 100 times or more faster processing compared to traditional Map/Reduce. Additionally, it also offers a BYO programming language approach, so users can leverage the Spark framework while writing their code using popular languages like Python, SQL, Scala, Java, or R. Leveraging a language like Python, for instance, allows for superior code reuse compared to a declarative language like SQL, which can further add to the robustness of your data integration system, while SQL users (those writing reports or performing data exploration) can continue to access data in a language they are comfortable using, but while taking advantage of the superior Spark execution engine.

I've updated the previous image to now include Spark as the data integration application, both for the initial obfuscation step, as well as meeting the larger (in our example, nightly) ETL step, in which the data is properly modeled and integrated with other data in the system. Additionally, I've added a commercial distributed SQL MPP to the mix, writing this data to Snowflake, although any number of other offerings can be leveraged in its place, such as AWS Redshift, Apache Presto and GPC BigQuery:

partially complete data flow

We can now hook up our favorite data visualization application, such as Tableau or Looker, while taking advantage of the compute in (in this case) Snowflake. I've updated the running image with Tableau, and also added multiple access points for our Data Scientists and Analysts, with various applications, such as Python, R (via the dplyr package), HiveQL, and SQL:

data flow complete

Now we have access to data in various locations (and in various states), between S3 and Snowflake, and with the technologies best positioned to do the work. We aren't necessarily asking Data Scientists to move from Python to R (or vice-versa), we can leverage distributed compute technologies (like HiveQL), classic analyst languages (like SQL), Spark, or seemingly anything else.

We can further continue to leverage FaaS approaches to do many interesting things, such as provisioning and terminating ephemeral compute, sending emails and alerts, integrating with third-party applications (like Slack, for instance), and a million other things. The point here is that while FaaS architectures may not fulfill all of our compute needs (in this case, because of data volumes and our batch processing that exceeds the time restrictions of Lambda functions), they do fulfill many requirements, at scale and at cost, and they can play a complimentary role to many others, like our traditional batch processing discussed here.

Once again, we've optimized our environment by always positioning technologies correctly. We've introduced some data lifecycle concepts (such as raw, unadulterated data persistence and the ability to replay history), while leveraging distributed technologies (like Apache Spark) over FaaS calls, we've added a MPP SQL engine (and mentioned a number of alternatives), and provided access for our Analysts and Data Scientists at multiple points in the data ecosystem, all while enabling them to use the technologies and tools they are most comfortable with. Finally, we discussed how the FaaS architecture continues to be leveraged; to be sure, these are not mutually exclusive approaches, but rather complimentary ones, which together can produce an incredibly robust, resilient overall data ecosystem.

Tags

L2Bud Batch processing FaaS AWS Apache Spark Snowflake Data Science Lambda

Author

...
Joel Lieser

Moderator


How we leverage our document datastore (DynamoDB)

May, 2019

tl;dr Document datastores, like DynamoDB, offer unparalleled performance and bi-directional scalability, which enables us to exploit the cloud pricing model while providing (largely) transient storage within our serverless architecture.

In previous posts, I mention how we leverage document datastores, and specifically AWS DynamoDB. I'm a huge fan of key/value document datastores, and specifically DynamoDB because it offers "peaks of more than 20 million requests per second" and "single-digit millisecond response times at any scale". That said, like all technologies, it needs to be positioned correctly within the overall architecture.

Within our recommendation engine application, we lean heavily on document datastores as the back-end data semi-persistence layer to the front-end application, and we leverage them (mostly) transiently, which I will discuss in greater detail throughout this post.

Let's begin by talking about what a document datastore is not: it is not our persistent data layer, and it is not the foundation for our analytics offerings, except as it pertains to real-time analytics of extremely recent activity. Document datastores are great, if you know the access pattern. To illustrate, let's take a simple JSON (key/value) object that represents a fictitious customer and their purchases:

{"customerKey" : 123,
   "customerName" : {"firstName": "Ann",
     "lastName" : "Example"},
   "customerAddress" : {"addressLine1": "123 Main Street",
     "addressLine2" : "Apartment B",
     "city" : "Wherever",
     "stateProvince": "OR",
     "country" : "USA",
     "latitude": "45.5155",
     "longitude": "122.6793"},
   "purchases" : [{"purchaseDate" : "20180927",
     "productId" : 123,
     "quantity" : 2,
     "purchasePrice" : 100},
     {"purchaseDate" : "20180927",
     "productId" : 456,
     "quantity": 1,
   "purchasePrice" : 45},
     {"purchaseDate" : "20190102",
     "productId" : 123,
     "quantity": 3,
     "purchasePrice" : 140},
     {"purchaseDate" : "20190103",
     "productId" : 862,
     "quantity": 1,
     "purchasePrice" : 15}]
}

The biggest piece of advice I could give regarding document datastores is that you have to understand the access pattern. Imagine we have a document like this for each customer, which adheres (generally) to the above document structure. If our system knows which customer has signed in, it is incredibly fast to retrieve information for that customer. You simply pass it the customerKey, and all of the information within that document is available for use.

Alternatively, for an analytics system, we may want to answer questions like how many purchases were made on 20190927? A document datastore would be horrible for this, based on the structure of this document. In order to answer that question, we'd have to pull the information for all customers (each with its own document), then filter out the purchase key/value pairs, then filter out data that had purchases on that specific date, then summarize that information to get to our answer. A far superior approach would be to obtain that information from a relational database, which is built for aggregations, summarizations, and data exploration. A fair counter-argument might be to have a different document that contains purchases keyed by date, so instead of accessing the document by the customerKey, we could instead extract only the document pertaining to that specific date. However, the summarization of information from the key/value pairs is still sub-optimal, when compared to a traditional relational database.

Additionally, the document size of this secondary document (with date at the top of the hierarchy) will be a challenging endeavor, to say the least, when compared against technologies built to do that type of work. Customers with lots of purchases could become too large for DynamoDB, for instance, which has an maximum item size of 400 kb. A quick look at my Amazon purchases this month might put that limit to its test...

With regard to our architecture, we leverage these datastores transiently, too, which is to say that we lean heavily on TTL (Time to Live), so that data in any table is only present for a set number of hours or days, for the most part. There are exceptions, for small data that requires persistence and single-digit latency, like customers. When a customer signs in, we have to retrieve certain information about the customer (like name), so we persist just that information (not historic purchases, like the previous example), so we can retrieve information we need in the application, but lean on relational data for analytics from a separate source.

I said previously that we do retrieve certain real-time analytic information from our document datastores, and I want to expand on that with some more details. There are aspects of this that seem to mirror the Lambda architecture. This is not to be confused with AWS Lambda functions, which we use for nearly all of our computations. Rather, the Lambda architecture is a way to combine real-time and batch processing to produce a full picture of state. To be clear, I'm not a proponent of the Lambda architecture, for a number of reasons that are beyond the scope of this document, but can be found in this article that questions the Lambda architecture. Rather, I'm simply stating that we consume certain data from these transient datastores where the data is so fresh as to not be available yet in our relational databases. We don't maintain the same code in multiple places (a consistent complaint of the Lambda architecture), but we do produce real-time analytics and for extremely fresh data that isn't available elsewhere, we can obtain it from DynamoDB. We could get away from this extremely fresh data and provide our consumers with "near-real-time" analytics, where the data is only fifteen minutes old, but that isn't real-time, and while I would argue that producing data visualizations and making only data older than fifteen minutes suffices for probably 99% of all use-cases, we wanted to be able to do the other 1%, too, and this approach allows us to do just that.

If we look at what this looks like from a data perspective, we can see that data flows into our document datastore in real time, and certain information is transiently stored there by leveraging a TTL of a few hours, before flowing to our relational database, which is in S3. You may note that I am referring to a relational database and not a RDBMS, and my April, 2019 post speaks in greater detail to that aspect. For the sake of this conversation, we can consider them to be the synonymous.

documentDataStore flow

In this image, I'm depicting a static site that is being hosted on S3. The user interacts with the website (in this case, but the same thing applies for a mobile app), which triggers RESTful API calls (AWS Lambda functions) that write and read from the CUSTOMER document datastore table. It also calls an AWS Lambda function that writes data to a transient document datastore table called SEARCH_TERMS. As that data is written to SEARCH_TERMS, a separate Lambda function fires that writes it to a different S3 bucket, in its raw (key/value, JSON) form. Then, periodically (say, every 15 minutes), a CloudWatch Event fires that initiates yet another Lambda function (our mini-batch ETL processing) that consumes all JSON since the last execution, cleanses it, models it relationally, and writes it to a new key on S3 for use in analytics and data visualizations.

Now, we have built out a Function-as-a-Service (Faas) compute architecture in which our ETL runs in what we like to call "right time" increments, and if we need the data for visualization or analytics purposes in (true) real-time, we can consume it from a combination of our relational source and the document datastore (the Lambda architecture-esque aspect), while also minimizing DynamoDB storage costs by placing a TTL of, say, 24 hours, on every document in the SEARCH_TERMS table, and persisting the CUSTOMER data (which is small, and with a very well understood access pattern), which again, contains no analytic data; it only contains data needed by the application itself.

The other major benefit that document datastores offer is in their flexibility. So long as all documents within a table contain the key (and sort key, if applicable), the remainder of the document can differ from one to another. For instance, if the previous JSON were keyed on customerKey, with no sort key, we could add another document like this:

{"customerKey" : 123,
  "eyeColor" : "brown"}

This is completely legal, although it makes the use of the data a bit more challenging, since attributes are inconsistent. That said, the ability to have dynamic data structures is arguably the biggest strength of JSON, and taking advantage of that strength is highly advised and comes into play frequently. Whereas relational databases are far more rigid (for good reasons), document datastores are far more dynamic (also for good reasons), and it further underscores my point about positioning technologies appropriately, and playing to the strengths of each one.

In this post, I quickly covered the strengths of document datastores (seemingly infinite bi-directional scalability, unparalleled performance, dynamic data structures), optimizing for costs (TTLs and transient storage), how a relational database can be leveraged in conjunction with a document datastore to produce truly real-time analytics and data access, as well as one way in which a document datastore should be positioned in an overall FaaS architecture.

Tags

L2Bud Document Datastores DynamoDB AWS Data Science Lambda

Author

...
Joel Lieser

Moderator


The case for relational databases, but not necessarily RDBMS

April, 2019

tl;dr Relational databases are great, when they are positioned correctly in an overall architecture. RDBMS are (mostly) unnecessary. Always build for bi-directional scale, and always take advantage of the pricing model of the cloud.

In my previous post on Function as a Service (FaaS) architectures, I mentioned how I was taken not necessarily by what technology components comprise the architecture, but what wasn't there, including a RDBMS (Relational Database Management System), which is traditionally a mainstay in most technology and analytics systems. I want to discuss here the pros and cons of taking that approach.

First, let's be clear about one thing: we have databases. In fact, we have multiple databases, if we use the Oxford dictionary definition of a database, as:

“A structured set of data held in a computer, especially one that is accessible in various ways.”

What we don't have, however, is a RDBMS, which is basically a system to manage our database, or as Wikipedia defines it:

“Connolly and Begg define Database Management System (DBMS) as a "software system that enables users to define, create, maintain and control access to the database". RDBMS is an extension of that acronym that is sometimes used when the underlying database is relational.”

Traditional RDBMS include the likes of MySQL, Oracle, and DB2, to name just a few. Natively distributed RDBMS include (e.g.) AWS Redshift, and Snowflake. I say natively because many traditional RDBMS can now work in a distributed fashion, especially if we consider database sharding, but that's beyond the scope of this post.

Although less so today than in the past, many web applications continue to leverage the LAMP software stack, which is comprised of Linux, Apache, MySQL, and PHP. Using that stack, a RDBMS is clearly part of the core componentry. I personally feel that's an outdated approach for a handful of reasons, including:

  • Scalability
  • Costs
  • Speed
  • Coupling

The image here is a high-level look at a simple LAMP architecture, where the RDBMS is used for both the front-end as well as analytics, using Tableau as the data visualization application.

LAMP stack

Speaking quickly to each of the items in the previous list, the reason I feel scalability and costs are problematic is in large part because of tight coupling. When our compute and our data storage are tightly coupled (as they are, co-located on the same server), we have to consider each in conjunction with one another. To illustrate: imagine we're storing a terabyte of data in our RDBMS-perhaps it houses all of our customer and address information, including all of their purchases and all history. We need a server that is large enough to contain all of that data, and that server is going to have a plethora of computational resources, too, whether we need it or not. The cost of disk is quickly approaching zero, but the cost of compute is definitely not; rather, compute continues to be costly as chips continue to get more powerful and smaller. In this instance, we're paying for compute when all we really need extra of is disk. This doesn't scale well. As our business continues to grow, we continue to need more and more disk, so we have to continue to buy bigger and bigger servers to meet those disk needs.

Alternatively, perhaps we address the above by keeping only the current information we need in the RDBMS. Perhaps we only need, say, current customer information and current address. We can easily segregate historic data to a disparate system for use in analytics (which historically is another RDBMS, which then has the same challenges and limitations). Now, we have a server that is "right-sized" for the data being housed in it, limited to the disk we require plus some additional overhead (for materialized views or various indices, perhaps), but now we're limited to the amount of compute that is available on that server. Now we have a new (and worse) problem on our hands: finite compute.

The image here extends the previous image, now with a separate RDBMS used for analytics and data science.

LAMP with a decoupled DB

Imagine this scenario: we have our RDBMS server, which is right-sized for the amount of disk we require, and that server has enough computational resources (e.g. memory) to handle our daily load. But on Black Friday, our compute needs change because so many people want to buy our widgets. Now, our server can handle the customer information it is obtaining from a storage standpoint, but it can't handle the processing. What happens in that scenario is the server falls over, and we're unable to sell our popular widgets, and on our busiest day of the year!

This means that we have to have a server large enough to handle our peak traffic, even though 364 days a year we're wasting compute that sits idle.

Additionally, the speed of traversing a RDBMS is significantly slower than alternative technologies, such as a document datastore, like DynamoDB. DynamoDB is bi-directionally scalable, so additional resources can be added dynamically and seamlessly, and (per the aforementioned link) can “support peaks of more than 20 million requests per second”. The trick, however, is recognizing the cons of a document datastore; specifically that you need to understand the query patterns to retrieve data, and model your document datastore accordingly. That is the topic of a future post, but it highlights the main point I'm (finally) going to get to: relational database systems aren't going away, and nor should they. However, they need to be positioned properly in an overall architecture, and even then, you can have a relational database without an RDBMS.

The image below extends the previous image, now with a document datastore as a replacement for the front-end RDBMS and a separate RDBMS used for analytics and data science.

DynamoDB with decoupled DB

Before we delve into relational databases without RDBMS, let's talk about correctly positioning the RDBMS, and the benefits of these systems.

To be clear: I'm not anti-relational databases. Far from it; I spent the better part of two decades using relational databases, and RDBMS, writing ETL, building analytics and data visualization systems, and RDBMS are great for that. My points above specifically call out the downsides of the RDBMS within the LAMP architecture for a web application (I prefer a static site, javascript/jquery, and FaaS compute).

First, let's touch quickly on why relational databases are great. I'm a scale guy. My career has largely been about building scalable systems, and the last decade has mostly been doing so in the (AWS) cloud. In order for scale to be achieved, you have to embrace distributed systems, so it's worthwhile here to discuss some distributed RDBMS, such as Redshift, Teradata, and Snowflake, since many of their benefits/use-cases are the same as single-server RDBMS, such as:

  • Data Visualization
  • Data Exploration
  • Analytics

The image here extends the previous image, now with a distributed RDBMS replacing the single-server RDBMS used for analytics and data science.

DynamoDB with Redshift

Most data visualization applications have grown beyond just RDBMS back-ends, but to be sure, they still prefer RDBMS backends, for obvious reasons: it's easy for the visualization application to do the types of things they need to do to produce the data for the visualization. Imagine we're building a data visualization to report sales by month, or sign-ons by geographic area, to name a couple. We have our data neatly put away in (albeit rigid) structures: our star schema data model. We can create a summarization of sales by month or sign-ons by geographic region by writing simple SQL statements, which is the native language for RDBMS. We could do this by accessing the data in a document datastore, but as was discussed previously, that structure is built for different types of data usage (specifically where we have a well understood access pattern). It would be incredibly inefficient to pull the data from a document datastore, parse the nested key/value pairs, and perform the aggregation or summarization. It would also be a red flag that we're not correctly positioning the document datastore, which is not optimized or designed for exploratory analysis.

Speaking of data exploration, this is a big reason we love relational databases. Our data model may be more rigid than the (oft-nested) key/value pairs of a document datastore, but that model is designed to do exploration, again, simply leveraging the extremely approachable SQL language. It is my opinion that every Data Scientist, Analyst and Data Visualization professional should have SQL as part of their toolkit. Using SQL and our relational database, we can join, group, and filter efficiently. If we're doing repeatable summarizations and aggregations, we can further optimize things by adding secondary indexes, without having to duplicate data, like you would in a document datastore.

So, I already talked about scalability, and why traditional RDBMS have issues in that area. Perhaps your data isn't so large as to have introduced challenges to storage (and the computational resources on that RDBMS sufficiently meets your requirements without excessive idleness). Perhaps you come from a traditional RDBMS shop (e.g. MySQL) and you simply want to migrate to a distributed RDBMS (like Redshift) to deal with scale. I encounter a lot of shops where the latter is the case. They say things like "we have existing SQL skills that we want to continue to leverage", or "we don't have the appetite to move to anything too different from what we already know", and these are valid reasons for making the decision of moving to a distributed RDBMS, or MPP system.

That said, I still issue the warning around tightly coupling your compute and storage, which is arguably now worse. Instead of having a single server that we're making storage and compute decisions on, we have a cluster of servers, and the problem can be exasperated, especially when the inevitable time comes when the cluster needs to be expanded, because we now also have to re-distribute the data across those new and existing nodes (servers).

The other thing people often fail to recognize is that while the physical data model is largely the same, applying a single-server star schema physical data model to a distributed RDBMS or MPP is absolutely the wrong approach to take. It will not meet your performance expectations, which we'll explore in some detail here.

In distributed systems, easily the most common performance challenge that I see is because of shuffling data across the network. Imagine an overly-simplified traditional star schema physical data model, like the one here:

Star Schema

If the data is distributed by the primary key of each table (the surrogate key), we are going to have a very nice (mostly) equal distribution of data across each node. This is good, at face value, because it means processing will be about the same on each node. We don't have skew (skew means data is not equally distributed, which means one node may be processing long after the processing on the other nodes is complete). The way data distribution (almost always) works is that a hash is done on the key, and a mod is applied based on the number of nodes in the cluster. For the sake of this conversation, we'll make it even more simplified, by imagining that we're distributing by the last number of the surrogate key, so all the values that end in "1" are stored on one node, all the values that end in "2" are stored on another, and so on. For the sake of this discussion, and to keep things clean, we can pretend our distributed RDBMS cluster has ten nodes, so we have (roughly) an equal amount of data on each of those nodes.

Now, imagine we need to join data between our TRANSACTION_F fact table, our CUSTOMER_D dimension table, and our PRODUCT_D dimension table. The fact table has a degenerate dimension surrogate key on it, and it's distributed based on that key. All the "1s" are on node 1, and so on. What happens when the fact table has a surrogate of "9001" and a customer surrogate key of "1234"? We've retrieved the relevant data from node 1, and then we join it to the data from node 4 to get the corresponding customer information. The product surrogate key corresponding to this transaction is "876", so the data corresponding to the product we care about is on node 6. Now, data needs to be shuffled from node to node in order to make those joins, and that shuffling is a performance killer.

There are ways to model this slightly differently, and it will make a bigger difference to performance than probably anything else you can do. This is a non-issue when we're leveraging traditional RDBMS on a single server, since there isn't the same network overhead associated with shuffling. While discussing those data distribution techniques are beyond the scope of this post, it is worth mentioning here, nonetheless. The bigger point here is that assuming that we understand the techniques for data distribution that allows us to minimize shuffling, we now have more storage and more compute, and our system far outperforms our outdated single-server approach, but to be sure: it is not a lift-and-shift to migrate from a single-server RDBMS to a distributed RDBMS system, and if that is the approach that is taken, you will be very disappointed with the results.

So where does that leave us with regard to distributed RDBMS and MPP systems? We understand the distribution techniques, and we've adjusted our model accordingly, but we continue to have the glaring issue of tight coupling. Again, once we run out of disk to store our data, we add more nodes, and with it we pay for additional compute. If we don't consistently need all of that compute (and you never do, because compute is never consistently above 90%), we're essentially throwing money away. If we add nodes to address peak computational needs, we end up paying for idle compute for the majority of the year (or day, as the case may be; many organizations have peak compute needs during normal business hours or during overnight ETL cycles, but we are paying for that compute 24 hours a day anyway).

What about Hadoop? Doesn't that solve my problems? No, it doesn't. But it's a start. In and of itself, Hadoop can be leveraged as a less expensive alternative to a distributed RDBMS. It doesn't have the RDBMS to manage the data, but the data can be stored relationally. However, my problem with Hadoop is that (often) people use HDFS as persistent storage, which creates the same problem for the exact same reasons discussed above. Leveraging HDFS as persistent storage is tightly coupling compute and storage, just as it was in our single-server RDBMS and just as it was with our distributed RDBMS.

If you want to truly get "infinite" scalability, you first have to accept that you must decouple your compute and storage. This adds a degree of complexity, but it is the key to scalability, and it's the secret to exploiting the pay-for-what-you-use cloud model. Leverage inexpensive disk for data storage, and bi-directionally scalable compute for your computational needs.

I'm going to hand-wave over some of the complexities and keep this somewhat high-level, but in modern AWS architectures in which you have a lot of data that you want to leverage for data science, data visualization and analytics, for instance, one common approach is to use S3 for storage ($13/compressed terabyte/month) and spin up EMR clusters for compute, where you can provision them to be right sized for your computational needs, or even provision multiple clusters at once and segregate your workloads accordingly, and terminate (or downsize) them when your computational requirements decrease.

What does this have to do with RDBMS? Well, for one thing, we talked earlier about how important SQL is in an architecture that traditionally leverages RDBMS (both distributed and single-server). In our Hadoop setting, where we've now decoupled our compute and storage, we can still organize our data relationally, even sans-RDBMS. Further, we can add an application to our cluster(s) to act as a distributed SQL engine, like Presto, which can also be leveraged as the back-end distributed SQL engine for our data visualization application that prefers RDBMS-type functionality and SQL capabilities.

The image below extends the previous image, now with decoupled storage in S3 and leveraging multiple EMR (Hadoop) clusters and Presto as the distributed SQL engine, used for analytics and data science.

DynamoDB with Presto

Now, we have the best of all worlds, as it applies to our data storage and associated compute. We have correctly positioned our document datastore as a back-end to our web application, so we can keep selling widgets, while dynamically scaling in and scaling out, with single-digit millisecond performance. Perhaps we've even done away with our compute server altogether (we've already forgone the "M" in the LAMP stack, we might as well do away with the "P", too) and perhaps opted for a FaaS architecture that leverages fully bi-directionally scalable Lambda functions. We still have relational database needs for our data science and analytics, but we avoided having to leverage a RDBMS there, too, by opting for storing data relationally but without an RDBMS with the decoupled S3/EMR (Presto) approach. Our data visualization application continues to lean on SQL to extract the data it requires to serve up reports, our analysts continue to leverage SQL (or HiveQL) to perform data exploration and analysis, and our ETL folks can leverage the application or language of their choice (such as Apache Spark), without the limitations of a declarative language (e.g. pipeline parallelism, code reusability, etc.), and with each technology choice, we're optimizing for both scale and cost.

The image here extends the previous image, now with serverless compute (Lambda) a static web server (S3), Apache Spark as an ETL/data integration application, and including SQL, HiveQL, and Python as additional languages (amongst anything else) to access the data for analytics and data science.

FaaS with Presto

In my previous post, I talked about how we don't have an RDBMS system in the architecture, which is accurate, but that doesn't mean that we don't leverage relational databases. Far from it. I believe that proper curation, care and organization of data is more important in the world of "big data" than it ever was before, in part because we have so much more of it. What we do focus on is correctly positioning each technology, always focusing on bi-directional scalability, and always exploiting the pricing model that the cloud affords us.

Tags

L2Bud RDBMS Relational Databases AWS Data Science Lambda

Author

...
Joel Lieser

Moderator


Function as a Service Architectures

March, 2019

Beyond the buzz words and a general understanding of certain concepts, Function as a Service (FaaS)/serverless/microservices architectures are not yet well understood, as they are only now beginning to slowly gain momentum in the larger tech ecosystem, and there still is a lack of significant adoption. While there are absolutely nuanced differences between FaaS and Microservices, for the sake of this blog, I'm going to consider them the same, with apologies to those that might shake their heads at my lazy hand waving. There are enough overlaps in the discussion herein to justify the generalizations, I hope.

You'll certainly find aspects of these architectural approaches at a number of organizations, and within the tech heavyweights with more appetite for trailblazing (e.g. Netflix, AWS), you'll find significantly bigger investments, but the lack of tried-and-true solutions, coupled with a lack of documentation in the wild, understandably make it a difficult undertaking for the vast majority of organizations.

To be sure, there are some very distinguished tech professionals out there talking a lot about Microservices and FaaS architectures. Adrian Cockroft and Martin Fowler come to mind, and there have been some books written on the subject, but a real depth of materials won't be available until we start to see more production deployments in the industry.

So, it was not without some risk that we took on building the entire L2Bud architecture using a combination of these bleeding edge architectural approaches.

Since it's so new, and because tech loves to bastardize terms, there isn't necessarily total agreement on what these architectural approaches mean. Case-in-point, I spoke with a potential client a few months back that was very excited about building out a serverless architecture. While plans of Lambda functions danced in my head, managed services danced in his. To me, managed services, such as Snowflake , offer Platform-as-a-Service (PaaS) solutions. Like FaaS architectures, you may not need to provision any infrastructure, but Snowflake is a MPP solution, so it more closely resembles Redshift, or even AWS EMR, than it does the FaaS architectures I like to build out lately. Sure, (e.g.) EMR can (and should) be leveraged ephemerally, so you can manage and take advantage of bi-directional scalability (assuming you've decoupled your storage from the compute), but none of these would be considered FaaS technologies.

As someone who has worked extensively with AWS over the last decade, I'm going to talk about specific AWS technology offerings throughout the remainder of this post, but Google Cloud and Azure (etc.) have similar offerings.

When I talk about FaaS and serverless architectures, I'm talking about leveraging bi-directionally scalable technology solutions that don't require any compute/server provisioning or management, and I'm talking about leveraging (AWS) technologies like Lambda, SNS, and DynamoDB, recognizing that an overall architecture like this has many more applications and technologies. I'm intentionally avoiding unnecessary religious battles: this post doesn't attempt to debate RESTful API best practices, or whether an individual function in a FaaS architecture should contain a single function, or a series of functions (there are places for each); rather, I'm speaking generally here about a handful of the applications and technologies that play a central role in an AWS FaaS architecture, and generally how those fit together.

When I look at our overall technical architecture, I'm most taken not by what is there, but what isn't. As a data and analytics professional, I've spent the better part of a twenty-five year career working with RDBMS technologies, writing SQL queries and ETL jobs, and the like. While a significant part of the L2Bud offering is data visualizations (traditionally the responsibility of a visualization application leveraging the RDBMS), we don't have any RDBMS. Instead, we leverage primarily transient document datastores (specifically DynamoDB), which offers single-digit-millisecond performance, and total bi-directional scalability. This significantly helps with costs, since the only persisted document datastores are small (dispensaries, strains, users), whereas everything else is managed with time-to-live restrictions, with long-term storage in an immutable file (object) system, using S3 (at a cost of $13/compressed terabyte). We then process data in a distributed fashion via Lambda functions that consume and transform those data stored in our document datastore and on S3.

The other thing that a traditional Big Data/Analytics Engineer might notice when looking at our architecture is a lack of a scheduling/orchestration application.

“[In traditional architectures], all flow, control, and security was managed by the central server application. In the Serverless version there is no central arbiter of these concerns. Instead we see a preference for choreography over orchestration, with each component playing a more architecturally aware role—an idea also common in a microservices approach.”

Mike Roberts

Shifting our focus to compute, we rely solely on Lambda functions, which again, gives us total bi-directional scalability with (at ~ $0.0000002 per request) minimal costs, a consistent language (Python), and choreograph practically all computation via an event-driven approach. Lambda functions fire from websites, upon data arrivals, at points in time, after DynamoDB is altered, etc., and write messages to a series of SNS topics, and additional Lambda functions fire based on those messages.

As a practical example, let's quickly explore our data science model training and application:

Whenever a consumer makes a selection (such as insomnia, berry, sativa, or cancer), a Lambda function fires, sending those "tag" selections to our backend. Those tags are then applied against an up-to-the-second deployment of the data science model. Recommended strains are returned to the Lambda function, and rendered to the consumer.

On the backend, whenever a dispensary changes it's "store collection" (which you can loosely think of as "inventory"), a Lambda function fires that writes a message to SNS, indicating that the store collection has been altered, for dispensary foo. Another Lambda function fires upon receipt of that SNS message, which extracts all strains in that updated store collection, plus all the tags associated with that set of strains, and re-trains the model and replaces the coefficients to apply the model. What this means is that we are re-training dispensary-specific data science models hundreds or thousands of times a day across all the dispensaries, with each re-training taking seconds to complete, and at a cost of pennies (in aggregate).

This pattern (Lambda→SNS→Lambda) is repeated across the entire architecture. While this event-driven choreography approach does introduce some complexities in the monitoring of the system, what it really requires is a different way of thinking and some additional discipline.

This approach allows us to really exploit the cloud-hosting model. Our disk/storage costs are minimal (transient DynamoDB and S3), as are our compute costs (Lambda). I believe serverless architectures are the way of the future, and I'm thrilled to be a part of bringing these architectural approaches to the cannabis industry.

Tags

L2Bud Microservices FaaS AWS Data Science Lambda

Author

...
Joel Lieser

Moderator