- Add another cell to the notebook ➢ click the + Code button below the first cell ➢ enter the code located in the Normalize.txt file in the Chapter05/Ch05Ex13 directory on GitHub (the code snippet is like that from Exercise 5.10) ➢ click the Commit button ➢ add a Notebook activity to your pipeline from the Synapse group ➢ enter a name (I used NORMALIZE) ➢ select the Settings tab ➢ select the notebook you just created (in my example, Ch05Ex13N) ➢ select your Spark pool (SparkPool) ➢ and then link the AGGREGATE Script activity to the NORMALIZE notebook.
- Click the Commit button to save the pipeline to your source code repository ➢ click the Publish button ➢ review the changes ➢ click the OK button ➢ click the Add Trigger button➢ select Trigger Now ➢ and then click OK.
You might have noticed that the steps in Exercise 5.13 were a culmination of pieces of previous exercises. This is how you learn and ultimately improve your data analytics solution. It is improved by iterating through numerous versions of your solution. First, you needed to get the data into a friendly format and structure, then you performed some transformations using aggregate functions, and then performed some normalization and EDA. In Exercise 5.13 you pulled it all together into a single pipeline that retrieves the data from a source and ends with ready‐to‐use Parquet files. The following content discusses each step of the pipeline and what the outcome was, beginning with the DROP Tables Script activity.
The loading of the data from the Azure SQL database into the [brainwaves].[TmpREADING] table took place in Exercise 4.13. It is expected that the data already exists in that table.
The design of the TransformEnrichment pipeline included scripts that will create the FactREADING and SCENARIO_FREQUENCY tables. Therefore, the first activity was to make sure they did not exist and, if they did, they were dropped. This was done to avoid an exception when trying to create a table that already exists. The next activity, SETUP Staging, either deleted data in the [staging].[TmpREADING] table or created it. This staging table is intended to temporarily hold the filtered data from the FILTER Outliers activity. It was determined through some previous EDA that some brainjammer brain wave reading values were a great distance from the mean. The Filter activity removed those values and placed them into the staging table, which resulted in 4,437,221 rows of data remaining. The CREATE FactREADING Script activity pulled the data from the staging table and used the dimension reference tables to build the dataset for the [brainwaves].[FactREADING] table, which was created using the CTAS approach. Validation using a SELECT COUNT(*) statement was performed on both the staging and brain wave table to confirm that the count on each matched.
The CONVERT Parquet notebook used Scala code to retrieve the data stored on the dedicated SQL table and load it into a DataFrame. Then, the data in the DataFrame was written to a Parquet file and placed into the data lake. In this case, the data lake was an ADLS container. The AGGREGATE Script activity performed a CTAS that executed AVG, STDEV, VAR, PERCENTILE_CONT, SQRT, and SQUARE functions on the VALUE column in the just populated [brainwaves].[FactREADING] table. The aggerated and transformed data was then stored on the [brainwaves].[SCENARIO_FREQUENCY] table. Finally, the NORMALIZE notebook contained two cells. The first cell contained Scala code that loaded data into a DataFrame retrieved from a dedicated SQL pool table. The data loaded into the DataFrame was then written to a temporary Apache Hive table. The next cell that used the PySpark MLlib features selected the data from the Hive table and performed normalization on the brain wave reading aggregated values. The results were written to the data lake in both CSV and Parquet form. CSV is a valid format for running EDA and visualizing the output in Power BI, whereas Parquet is useful for Azure Databricks and Apache Spark.