Traditionally, real-time analysis of stock data was a complicated endeavor due to the complexities of maintaining a streaming system and ensuring transactional consistency of legacy and streaming data concurrently. Databricks Delta helps solve many of the pain points of building a streaming system to analyze stock data in real-time.
In this blog post we will review:
- The current problems of running such a system
- How Databricks Delta addresses these problems
- How to implement the system in Databricks
Databricks Delta helps solve these problems by combining the scalability, streaming, and access to advanced analytics of Apache Spark with the performance and ACID compliance of a data warehouse.
Traditional pain points prior to Databricks Delta
The pain points of a traditional streaming and data warehousing solution can be broken into two groups: data lake and data warehouse pains.
Data Lake Pain Points
While data lakes allow you to flexibly store an immense amount of data in a file system, there are many pain points including (but not limited to):
- Consolidation of streaming data from many disparate systems is difficult.
- Updating data in a Data Lake is nearly impossible and much of the streaming data needs to be updated as changes are made. This is especially important in scenarios involving financial reconciliation and subsequent adjustments.
- Query speeds for a data lake are typically very slow.
- Optimizing storage and file sizes is very difficult and often require complicated logic.
Data Warehouse Pain Points
The power of a data warehouse is that you have a persistent performant store of your data. But the pain points for building modern Databricks Delta Guide) is a unified data management system that brings data reliability and performance optimizations to cloud data lakes. More succinctly, Databricks Delta takes the advantages of data lakes and data warehouses together with Apache Spark to allow you to do incredible things!
- Databricks Delta, along with https://databricks.com/product/databricks-delta.
As noted in the preceding diagram, we have two datasets to process – one for fundamentals and one for price data. To create our two Databricks Delta tables, we specify the .format(“delta”) against our DBFS locations.
# Create Fundamental Data (Databricks Delta table) dfBaseFund = spark \ .read \ .format('delta') \ .load('/delta/stocksFundamentals') # Create Price Data (Databricks Delta table) dfBasePrice = spark \ .read \ .format('delta') \ .load('/delta/stocksDailyPrices')
While we’re updating the
stocksDailyPrices, we will consolidate this data through a series of ETL jobs into a consolidated view (
stocksDailyPricesWFund). With the following code snippet, we can determine the start and end date of available data and then combine the price and fundamentals data for that date range into DBFS.
# Determine start and end date of available data row = dfBasePrice.agg( func.max(dfBasePrice.price_date).alias("maxDate"), func.min(dfBasePrice.price_date).alias("minDate") ).collect() startDate = row["minDate"] endDate = row["maxDate"] # Define our date range function def daterange(start_date, end_date): for n in range(int ((end_date - start_date).days)): yield start_date + datetime.timedelta(n) # Define combinePriceAndFund information by date and def combinePriceAndFund(theDate): dfFund = dfBaseFund.where(dfBaseFund.price_date == theDate) dfPrice = dfBasePrice.where( dfBasePrice.price_date == theDate ).drop('price_date') # Drop the updated column dfPriceWFund = dfPrice.join(dfFund, ['ticker']).drop('updated') # Save data to DBFS dfPriceWFund .write .format('delta') .mode('append') .save('/delta/stocksDailyPricesWFund') # Loop through dates to complete fundamentals + price ETL process for single_date in daterange( startDate, (endDate + datetime.timedelta(days=1)) ): print 'Starting ' + single_date.strftime('%Y-%m-%d') start = datetime.datetime.now() combinePriceAndFund(single_date) end = datetime.datetime.now() print (end - start)
Now we have a stream of consolidated fundamentals and price data that is being pushed into DBFS in the
/delta/stocksDailyPricesWFundlocation. We can build a Databricks Delta table by specifying .format(“delta”) against that DBFS location.
dfPriceWithFundamentals = spark .readStream .format("delta") .load("/delta/stocksDailyPricesWFund") // Create temporary view of the data dfPriceWithFundamentals.createOrReplaceTempView("priceWithFundamentals")
Now that we have created our initial Databricks Delta table, let’s create a view that will allow us to calculate the price/earnings ratio in real time (because of the underlying streaming data updating our Databricks Delta table).
%sql CREATE OR REPLACE TEMPORARY VIEW viewPE AS select ticker, price_date, first(close) as price, (close/eps_basic_net) as pe from priceWithFundamentals where eps_basic_net > 0 group by ticker, price_date, pe
Analyze streaming stock data in real time
With our view in place, we can quickly analyze our data using
%sql select * from viewPE where ticker == "AAPL" order by price_date
As the underlying source of this consolidated dataset is a Databricks Delta table, this view isn’t just showing the batch data but also any new streams of data that are coming in as per the following streaming dashboard.
Underneath the covers, Structured Streaming isn’t just writing the data to Databricks Delta tables but also keeping the state of the distinct number of keys (in this case ticker symbols) that need to be tracked.
Because you are using Spark SQL, you can execute aggregate queries at scale and in real-time.
%sql SELECT ticker, AVG(close) as Average_Close FROM priceWithFundamentals GROUP BY ticker ORDER BY Average_Close
In closing, we demonstrated how to simplify streaming stock data analysis using Databricks Delta. By combining Spark Structured Streaming and Databricks Delta, we can use the Databricks integrated workspace to create a performant, scalable solution that has the advantages of both data lakes and data warehouses. The Databricks Unified Analytics Platform removes the data engineering complexities commonly associated with streaming and transactional consistency enabling data engineering and data science teams to focus on understanding the trends in their stock data.