Gokhan Atil's Technology Blog

PySpark Examples

This post contains some sample PySpark scripts. During my “Spark with Python” presentation, I said I would share example codes (with detailed explanations). I posted them separately earlier but decided to put them together in one post.

Grouping Data From CSV File (Using RDDs)

For this sample code, I use the u.user file of MovieLens 100K Dataset. I renamed it as “users.csv”, but you can use it with the current name if you want.

Pyspark1

Using this simple data, I will group users based on gender and find the number of men and women in the users data. As you can see, the 3rd element indicates the gender of a user, and the columns are separated with a pipe symbol instead of a comma. So I write the below script:

Here is the step-by-step explanation of the above script:

If you’re not familiar with the lambda functions, let me share the same script with regular functions:

It produces the same result with the same performance. Now let me write another one. This time, I will group the users based on their occupations:

Here is the step-by-step explanation of the above script:

Grouping Data From CSV File (Using Dataframes)

This time, I will use DataFrames instead of RDDs. DataFrames are distributed data collections organized into named columns (in a structured way). They are similar to tables in relational databases. They also provide a domain-specific language API to manipulate your distributed data, so it’s easier to use.

The Spark SQL module provides DataFrames, which are primarily used as API for Spark’s Machine Learning lib and structured streaming modules. Spark developers recommend using DataFrames instead of RDDs because the Catalyst (Spark Optimizer) will optimize your execution plan and generate better code to process the data.

I will use the “u.user” file of MovieLens 100K Dataset again. I will find the total number of men and women in the users data. I recommend you compare these codes with the previous ones (in which I used RDDs) to see the difference.

Here is the step-by-step explanation of the above script:

What if we want to group the users based on their occupations?

Here is the step-by-step explanation of the above script:

Please compare these scripts with RDD versions. You’ll see that using DataFrames is more straightforward, especially when analyzing data.

Spark SQL Module

Spark SQL Module provides DataFrames (and DataSets – but Python doesn’t support DataSets because it’s a dynamically typed language) to work with structured data. First, let’s start creating a temporary table from a CSV file and run a query on it. I will use the “u.user” file of MovieLens 100K Data (I save it as users.csv).

Here is the step-by-step explanation of the above script:

When I check the tables with “show tables”, I see that the “users” table is temporary, so when our session(job) is done, the table will be gone. What if we want to store our data as persistent? If our Spark environment is configured to connect Hive, we can use the DataFrameWriter object’s “saveAsTable” method. We can also save the file as a parquet table, CSV file, or JSON file.

Here is the step-by-step explanation of the above script:

Spark SQL module also enables you to access various data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data from different data sources.

Discretized Streams (Dstreams)

Spark supports two different ways of streaming: Discretized Streams (DStreams) and Structured Streaming. DStreams is the basic abstraction in Spark Streaming. It is a continuous sequence of RDDs representing a stream of data. Structured Streaming is the newer way of streaming built on the Spark SQL engine.

When you search for any example scripts about DStreams, you find sample codes that read data from TCP sockets. So I decided to write a different one: My sample code will read from files in a directory. The script will check the directory every second and process the new CSV files it finds. Here’s the code:

Here is the step-by-step explanation of the above script:

At every second, the script will check “/tmp/stream” folder, if it finds a new file, it will process the file and write the output. For example, if we put a file that contains the following data in the folder:

Fatih,5
Cenk,4
Ahmet,3
Arda,1

The script will print:

-------------------------------------------
Time: 2023-02-16 13:31:53
-------------------------------------------
['Fatih', '5']
['Cenk', '4']
['Ahmet', '3']
['Arda', '1']

pprint is a perfect function to debug your code, but you probably want to store the streaming data to an external target (such as a Database or HDFS location). DStream object’s foreachRDD method can be used for it. Here’s another code to save the streaming data to JSON files:

Here is the step-by-step explanation of the above script:

After storing all these data in JSON format, we can run a simple script to query data:

Structured Streaming

Structured Streaming is a stream processing engine built on the Spark SQL engine. It supports File and Kafka sources for production; Socket and Rate sources for testing. Here is a very simple example to demonstrate how structured streams work:

Here is the step-by-step explanation of the above script:

For testing, I created 2 CSV files:

1.csv:

Fatih,5
Cenk,4
Ahmet,3
Arda,1

2.csv:

Fatih,1
Cenk,1
Ahmet,2
Osman,1
David,2

Then I started the script, and on another terminal, I copied the above files one by one to /tmp/stream/ directory (if you don’t have the directory, you should create it):

cp 1.csv /tmp/stream 
cp 2.csv /tmp/stream

Here is the output of the PySpark script:

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-----+-----------+
| name|sum(points)|
+-----+-----------+
|Fatih|          5|
| Cenk|          4|
|Ahmet|          3|
| Arda|          1|
+-----+-----------+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+-----+-----------+
| name|sum(points)|
+-----+-----------+
|Fatih|          6|
| Cenk|          5|
|Ahmet|          5|
|David|          2|
| Arda|          1|
|Osman|          1|
+-----+-----------+

Although I also talked about GraphFrames and Spark’s Machine Learning capabilities in my presentation, I will not include examples of them in this blog post. I hope this blog post will be helpful.