What is Spark Shell Commands?
Spark shell is an interface used to write adhoc queries to work and understand the behavior of Apache Spark. It is called the cluster computing open-source engine which can do in-memory processing of data such as for analytics, ETL, machine learning for huge sets of data. In this topic, we are going to learn about Spark Shell Commands.
There are various types of Spark shell for different programming languages for example:
- spark-shell is written in Scala
- pyspark is in Python and
- sparkR for R language
One can develop his standalone application with the help of Spark. It is widely used because of its super-fast computational speed. This is because it uses MapReduce to process various queries and transformations.
To run spark-shell commands, it requires Java and Scala to be already installed in the system.
Types of Spark Shell Commands
The various kinds of Spark-shell commands are as follows:
1. To check if the Spark is installed and to know its version, below command, is used (All commands hereafter shall be indicated starting with this symbol “$”)
The following output is displayed if the spark is installed:
4.5 (1,962 ratings)
SPARK_MAJOR_VERSION is set to 2, using Spark2
Setting the default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://10.113.59.34:4040
Spark context available as ‘sc’ (master = local[*], app id = local-1568732886588).
Spark session available as ‘spark’.
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 188.8.131.52.6.3.0-235
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type: help for more information.
2. The basic data structure of Spark is called an RDD (Resilient Distributed Datasets) which contains an immutable collection of objects for distributed computing of records. All the datasets of RDD are partitioned logically across multiple nodes of a cluster.
An RDD can be created only by reading from a local file system or by transforming an existing RDD.
a) To create a new RDD we use the following command:
scala> val examplefile = sc.textFile("file.txt")
Here sc is called the object of SparkContext.
examplefile: org.apache.spark.rdd.RDD[String] = file.txt MapPartitionsRDD at textFile at <console>:24
b) An RDD can be created through Parallelized Collection as follows:
scala> val oddnum = Array(1, 3, 5, 7, 9)
oddnum: Array[Int] = Array(1, 3, 5, 7, 9)
scala> val value = sc.parallelize(oddnum)
value: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD at parallelize at <console>:26
c) To create from existing RDD’s:
scala> val newRDD = oddnum.map(value => (value * 2))
newRDD: Array[Int] = Array(2, 6, 10, 14, 18)
3. There are two types of Spark RDD Operations which can be performed on the created datasets:
Actions: It is used to perform certain required operations on the existing datasets. Following are a few of the commands which can be used to perform the below actions on the created datasets:
a) count() function to count the number of elements in RDD:
res3: Long = 5
b) collect() function to display all the elements of the array:
res5: Array[Int] = Array(1, 3, 5, 7, 9)
c) first() function used to display the first element of the dataset:
res4: Int = 1
d) take(n) function displays the first n elements of the array:
res6: Array[Int] = Array(1, 3, 5)
e) takeSample (withReplacement, num, [seed]) function displays a random array of “num” elements where the seed is for the random number generator.
scala> value.takeSample(false, 3, System.nanoTime.toInt)
res8: Array[Int] = Array(3, 1, 7)
f) saveAsTextFile(path) function saves the dataset in the specified path of hdfs location
g) partitions. length function can be used to find the number of partitions in the RDD
res1: Int = 8
Transformation is used to form a new RDD from the existing ones. Since the inputs of the RDD are immutable, the result formed upon transformation can be one or more RDD as output.
There are two types of transformations:
- Narrow transformations
- Wide transformations
Narrow Transformations – Each parent RDD is divided into various partitions and among these only one partition will be used by the child RDD.
Example: map() and filter() are the two basic kinds of basic transformations that are called when an action is called.
- map(func) function operates on each of the elements in the dataset “value” iteratively to produce the output RDD.
Example: In this example, we are adding the value 10 to each of the elements of the dataset value and displaying the transformed output with the help of collect function.
scala> val mapfunc = value.map(x => x+10)
mapfunc: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD at map at <console>:28
res2: Array[Int] = Array(11, 13, 15, 17, 19)
filter(func) function is basically used to filter out the elements satisfying a particular condition specified using the function.
Example: In this example, we are trying to retrieve all the elements except number 2 of the dataset “value” and fetching the output via the collect function.
scala> val fill = value.filter(x => x!=2)
fill: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD at filter at <console>:28
res8: Array[Int] = Array(4, 6, 8, 10)
Wide Transformations – A single parent RDD partition is shared upon its various multiple child RDD partitions.
Example: groupbykey and reducebyKey are examples of wide transformations.
- groupbyKey function groups the dataset values into key-value pairs according to the key values from another RDD. This process involves shuffling to take place when the group by function collects the data associated with a particular key and stores them in a single key-value pair.
Example: In this example, we are assigning the integers 5,6 to the string value “key” and integer 8 assigned to “8” which are displayed in the same key-value pair format in the output.
scala> val data = spark.sparkContext.parallelize(Array(("key",5),("val",8),("key",6)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD at parallelize at <console>:23
scala> val group = data.groupByKey().collect()
group: Array[(String, Iterable[Int])] = Array((key,CompactBuffer(5, 6)), (val,CompactBuffer(8)))
- reduceByKey function also combines the key-value pairs from different RDD’s. It combines the keys and their respective values into a single element after performing the mentioned transformation.
Example: In this example, the common keys of the array “letters” are first parallelized by the function and each letter is mapped with count 10 to it. The reduceByKey will add the values having similar keys and saves in the variable value2. The output is then displayed using the collect function.
scala> val letters = Array("A","B","C","D","B","C","E","D")
letters: Array[String] = Array(A, B, C, D, B, C, E, D)
scala> val value2 = spark.sparkContext.parallelize(letters).map(w => (w,10)).reduceByKey(_+_)
value2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD at reduceByKey at <console>:25
Along with the above-mentioned actions like partitioning to RDD and performing actions/transformations on them, Spark also supports caching which is helpful where the same data is being called recursively.
With the help of all these properties, Apache Spark can process huge volumes of data and perform batch processing and streaming processing. The in-memory computation done by Spark is responsible for the extremely fast processing of applications. Hence Spark is the go-to method because of its versatility of programming over different languages, ease of use and integration capabilities.
This is a guide to Spark Shell Commands. Here we discuss the Various Types of Spark Shell Commands for different programming languages. You may also look at the following article to learn more –