Develop a Batch Processing Solution Using an Azure Synapse Analytics Apache Spark – Create and Manage Batch Processing and Pipelines-2

The next code snippet in the ToAvro.py file uses the arguments passed to the job to dynamically construct the endpoint path. The endpoint is used to identify the ADLS container and storage account name, whereas the path defines the directory in which the files are located. The name of the variable into which this information is loaded is readPath, which indicates this is, in fact, the location where the files are to be read from.

For clarity, three arguments are required for the job to run properly. The first argument is the scenario for which the job is running. If you look at the files in the readPath location, all scenarios are included in that location. The correct files are retrieved using a wildcard search. The following is an example of the wildcard pattern and how the files are named. If ClassicalMusic were passed as the first argument, then that file, and any file with ClassicalMusic in it, would be returned.

The second argument is the path to the JSON files. In this case the value is EMEA/brainjammer/in/2022/06/15/08, which is where the Azure Batch job from Exercise 6.2 placed the JSON files with the calculated frequency median per scenario. Finally, the third argument is the ADLS container and storage account name, separated by the @ sign.
Then the next line of code reads (loads) the JSON files matching the wildcard pattern into a DataFrame.

The following code snippet constructs the writePath, which is the location where the AVRO files are to be written. The path uses the year, month, day, and hour from the datetime.now() method to produce the value for the timeWritePath variable. Finally, the third argument contains the ADLS container, and the storage account name is concatenated with the timeWritePath variable to produce the location where the AVRO files will be stored. Notice that there is some partial hard‐coding of the path. You need to change this if you want to store the files elsewhere. If the directory does not exist, it will be created, which is much better than throwing a DirectoryNotFound exception.

The final three lines of code are provided here. The first line dynamically generates the file name using the first argument, which is the brainjammer brain wave scenario, followed by some date information. The date values are used in an attempt to make it unique. The next line uses the write() method to save the AVRO file, then the read()method of the SparkSession class loads the AVRO file. The show() method results in the output being written to the Apache Spark job log file.

You might have noticed that throughout the code numerous print() methods are called. The print() method includes a message; sometimes it includes a variable, which results in the value of that variable being written to the log file. Figure 6.14 illustrates the Apache Spark Applications blade located on the Monitor hub. The Monitor hub is discussed in detail in Chapter 9, “Monitoring Azure Data Storage and Processing.” After you add the Apache Spark definition to the TransformSessionFrequencyToMedian pipeline and run it, you will see the output in the monitor logs. The logs are also gathered when you click the Submit button from the Apache Spark job definition.

FIGURE 6.14 Azure Synapse Analytics—Apache Spark job diagnostics and Monitor hub


Notice the stdout option in the Logs tab. This is where the output of the print() method is stored. This is a very useful tool for debugging and troubleshooting your job. The stderr log is also useful for finding exceptions happening on the node. Figure 6.15 illustrates how the TransformSessionFrequencyToMedian pipeline is now configured.


Numerous other products for batch processing are available on the Azure platform. Read on to learn about a few more.

Ileana Pecos

Learn More

Leave a Reply

Your email address will not be published. Required fields are marked *