Arun Gupta

Saturday 18 May 2019

Data Science








Data Science Pipeline:-

Data science pipeline is the overall step by step process towards obtaining, cleaning,modeling, interpreting and visualizing data within a business or group. Another way we can say that data science pipelines are sequences of processing and analysis steps applied to data for a specific purpose.





Data Science Stage:-





What is Data Modeling:-

Data modeling is the process of creating a data model for the data to be stored in a Database. Another way we can say that.
    “The process of discovering, analyzing, representing, and communicating             data requirements in a precise form called the data model.” And “data                models depict and enable an organization to understand its data assets.”

Why use Data Model:-

  • Ensures that all data objects required by the database are accurately represented. Omission of data will lead to creation of faulty reports and produce incorrect results.
  • A data model helps design the database at the conceptual, physical and logical levels.
  • Data Model structure helps to define the relational tables, primary and foreign keys and stored procedures.
  • It provides a clear picture of the base data and can be used by database developers to create a physical database.
  • It is also helpful to identify missing and redundant data.
  • Though the initial creation of data model is labor and time consuming, in the long run, it makes your IT infrastructure upgrade and maintenance cheaper and faster.
 Benefits of Data Modeling for Organizations:-
  • Higher quality software development.
  • Reduced costs.
  • Faster time to market.
  • Clear understanding of scope, vocabulary, and other development               elements.
  • Better application and database performance.
  •  High quality documentation.
  • Fewer errors in software.
  • Fewer data errors across organizational systems.
  • Better risk management.
Types of Data Models:-






Conceptual Data Model: This Data Model defines WHAT the system contains. This model is typically created by Business stakeholders and Data Architects. The purpose is to organize scope and define business concepts and rules.
 Logical Data Model:  Defines HOW the system should be implemented regardless of the DBMS. This model is typically created by Data Architects and Business Analysts. The purpose is to developed technical map of rules and data structures.
 Physical Data Model: This Data Model describes HOW the system will be implemented using a specific DBMS system. This model is typically created by DBA and developers. The purpose is actual implementation of the database.



 

Confusion matrix:-

The confusion matrix is a kind of an error matrix. It visualization the predictions for a classification task. Let’s consider a binary classification task. 


Positive (P):-Observation is positive ( is a Man).

 Negative (N):- Observation is not positive (is not a Man).

 True Positive (TP):- Outcome where the model correctly predicts the positive class.

   Predicted that a woman is man and she really is.

  True Negative (TN):- Outcome where the model correctly predicts the negative class.

  Predicted that a man is not woman and he really is not.

 False Positive (FP):- Also called a type 1 error, an outcome where the model incorrectly predicts the positive class when it is actually negative.

Predicted that a man is women but he actually is not.

False Negative (FN):- Also called a type 2 error, an outcome where the model incorrectly predicts the negative class when it is actually positive.

 Predicted that a woman is not man but she really is.

Accuracy:-This is simply equal to the proportion of predictions that the model classified correctly.

                              

Precision:-Precision is also known as positive predictive value and is the proportion of relevant instances among the retrieved instances. In other words, it answers the question “What proportion of positive identifications was actually correct?”

 

Recall:-Recall, also known as the sensitivityhit rate, or the true positive rate (TPR), is the proportion of the total amount of relevant instances that were actually retrieved. It answers the question “What proportion of actual positives was identified correctly?”

 

To really hit it home, the diagram below is a great way to remember the difference between precision and recall (it certainly helped me)!

Specificity: - Specificity, also known as the true negative rate (TNR), measures the proportion of actual negatives that are correctly identified as such. It is the opposite of recall.

 

F1 Score: - The F1 score is a measure of a test’s accuracy — it is the harmonic mean of precision and recall. It can have a maximum score of 1 (perfect precision and recall) and a minimum of 0. Overall, it is a measure of the preciseness and robustness of your model.

 


Python For Data Science -PySpark-SQL

Initializing SparkSession
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("Pyarungupta") \
.appName("pyarungupta.blogspot.com/) \
.getOrCreate()
master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn or mesos depends on your cluster setup.
Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.

PySpark Repartition() vs Coalesce()

repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used to only decrease the number of partitions in an efficient way.Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks.

Use case

Example when to use broadcast variables, assume you are getting a two-letter country state code in a file and you wanted to transform it to full state name, (for example INA to India, NY to New York e.t.c) by doing a lookup to reference mapping. In some instances, this data could be large and you may have many such lookups (like zip code e.t.c).
import pysparkfrom pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
rdd = spark.sparkContext.parallelize(data)
def state_convert(code):
return broadcastStates.value[code]
result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

PySpark Accumulator

Accumulator is a shared variable that is used with RDD and DataFrame to perform sum and counter operations similar to Map-reduce counters.

Some points to note..

sparkContext.accumulator() is used to define accumulator variables.
add() function is used to add/update a value in accumulator
value property on the accumulator variable is used to retrieve the value from the accumulator.

Creating Accumulator Variable

accum=sc.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driverimport pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("accumulator").getOrCreate()
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
global accuSum
accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)
accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1))
print(accumCount.value)toDF() function of the RDD is used to convert RDD to DataFrame.

Create Empty RDD in PySpark

emptyRDD() of SparkContext for example spark.sparkContext.emptyRDD().
#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)
df.printSchema()PySpark DataFrame can be converted to Python Pandas DataFrame using a function toPandas()

PySpark Select Columns From DataFrame

select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame, PySpark select() is a transformation function

#Selects first 3 columns and top 3 rows

df.select(df.columns[:3]).show(3)
#Selects columns 2 to 4 and top 3 rows
df.select(df.columns[2:4]).show(3)collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.deptDF.
collect() retrieves all elements in a DataFrame as an Array of Row type to the driver node. printing a resultant array yields the below output.
#Returns value of First Row, First Column which is "Finance"
deptDF.collect()[0][0]
deptDF.collect() returns Array of Row type.
deptDF.collect()[0] returns the first element in an array (1st row).
deptDF.collect[0][0] returns the value of the first row & first column.
select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver

PySpark withColumn() 

withColumn() is a transformation function of DataFrame which is used to change the value, convert the datatype of an existing column, create a new column, and many more.
PySpark lit() function is used to add a constant value to a DataFrame column. We can also chain in order to add multiple column pyspark.sql.types.ArrayType (ArrayType extends DataType class) is used to define an array data type column on DataFrame that holds the same type of elements

PySpark Aggregate Functions


explode()

explode() function to create a new row for each element in the given array column

from pyspark.sql.functions import explode
df.select(df.name,explode(df.languagesAtSchool)).show()
name|   col|

Split()

split() sql function returns an array type after splitting the string column by delimiter.
from pyspark.sql.functions import split
df.select(split(df.name,",").alias("nameAsArray")).show()

array()

array() function to create a new array column by merging the data from multiple columns. All input columns must have the same data type. The below example combines the data from currentState and previousState and creates a new column states.
from pyspark.sql.functions import array
df.select(df.name,array(df.currentState,df.previousState).alias("States")).show()

array_contains()

array_contains() sql function is used to check if array column contains a value. Returns null if the array is null, true if the array contains the value, and false otherwise.


mapprox_count_distinct
avg
collect_list
collect_set
countDistinct
count
grouping
first
last
kurtosis
max
min
mean
skewness
stddev
stddev_samp
stddev_pop
sum
sumDistinct
variance, var_samp, var_pop
WINDOW FUNCTIONS USAGE & SYNTAXPYSPARK WINDOW FUNCTIONS DESCRIPTION
row_number(): ColumnReturns a sequential number starting from 1 within a window partition
rank(): ColumnReturns the rank of rows within a window partition, with gaps.
percent_rank(): ColumnReturns the percentile rank of rows within a window partition.
dense_rank(): ColumnReturns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.
ntile(n: Int): ColumnReturns the ntile id in a window partition
cume_dist(): ColumnReturns the cumulative distribution of values within a window partition
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.

from_json() – Converts JSON string into Struct type or Map type.
to_json() – Converts MapType or Struct type to JSON string.
json_tuple() – Extract the Data from JSON and create them as a new columns.
get_json_object() – Extracts JSON element from a JSON string based on json path specified.
schema_of_json() – Create schema string from JSON string
Parquet  columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model, or programming language.
Pyspark SQL provides methods to read Parquet file into DataFrame and write DataFrame to Parquet files, parquet() function from DataFrameReader and DataFrameWriter are used to read from and write/create a Parquet file respectively.
df.write.parquet("pyarungupta.parquet") 
parDF1=spark.read.parquet("pyarungupta.parquet")

Advantages:

While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. As a result aggregation queries consume less time compared to row-oriented databases.

overwrite – mode is used to overwrite the existing file.

append – To add the data to the existing file.

ignore – Ignores write operation when the file already exists.

error – This is a default option when the file already exists, it returns an error.

 df2.write.mode('Overwrite').json("pyarungupta.json")

PySpark Join Types | Join Two DataFrames

Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. PySpark Joins are wider transformations.


Join StringEquivalent SQL Join
innerINNER JOIN
outer, full, fullouter, full_outerFULL OUTER JOIN
left, leftouter, left_outerLEFT JOIN
right, rightouter, right_outerRIGHT JOIN
cross
anti, leftanti, left_anti
semi, leftsemi, left_semi

Left Semi Join

leftsemi join is similar to inner join difference being leftsemi join returns all columns from the left dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.
df1.join(df2,df1.id ==  df2.id,"leftsemi") \   .show(truncate=False)

Left Anti Join

leftanti join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records.
df1.join(df2,df1.id ==  df2.id,"leftanti") \   .show(truncate=False

Creating DataFrames
From RDDs

from pyspark.sql.types import *
Infer Schema
sc = spark.sparkContext
lines = sc.textFile("people.txt") parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)
Specify Schema
 
people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()

From Spark Data Sources: JSON , Parquet files and TXT files

JSON


df = spark.read.json("pyarungupta.json")df.show()
df2 = spark.read.load("pyarungupta.json", format="json")

Parquet files

df3 = spark.read.load("pyarungupta.parquet")
TXT files
df4 = spark.read.text("pyarungupta.txt")

Inspect Data 


df.dtypes Return df column names and data types
df.show() Display the content of df
df.head() Return first n rows
df.first() Return first row
df.take(2) Return the first n rows
df.schema Return the schema of df

df.describe().show() Compute summary statistics

df.columns Return the columns of df

df.count() Count the number of rows in df

df.distinct().count() Count the number of distinct rows in df

df.printSchema() Print the schema of df

df.explain() Print the (logical and physical) plans

Duplicate Values

df = df.dropDuplicates()

Queries :Select,When,Like,Startswith - Endswith ,Substring and Between

from pyspark.sql import functions as F
Select
df.select("firstName").show() Show all entries in firstName column
df.select("firstName","lastName") \
.show()
df.select("firstName", Show all entries in firstName, age
"age", and type
explode("phoneNumber") \
.alias("contactInfo")) \
.select("contactInfo.type",
"firstName",
"age") \
.show()
df.select(df["firstName"],df["age"]+ 1) Show all entries in firstName and age,
.show() add 1 to the entries of age
df.select(df['age'] > 24).show() Show all entries where age >24
When
 
df.select("firstName", Show firstName and 0 or 1 depending
F.when(df.age > 30, 1) \ on age >30
.otherwise(0)) \
.show()
df[df.firstName.isin("Jane","Boris")] Show firstName if in the given options
.collect()
Like
 
df.select("firstName", Show firstName, and lastName is
df.lastName.like("Smith")) \ TRUE if lastName is like Smith
.show()
Startswith - Endswith
 
df.select("firstName", Show firstName, and TRUE if
df.lastName \ lastName starts with Sm
.startswith("Sm")) \
.show()
df.select(df.lastName.endswith("th")) \ Show last names ending in th
.show()
Substring

df.select(df.firstName.substr(1, 3) \ Return substrings of firstName
.alias("name")) \
.collect()

Between

df.select(df.age.between(22, 24)) \          Show age: values are TRUE if between

.                                                                    show() 22 and 24

Add, Update & Remove Columns

Adding Columns:withColumn



df = df.withColumn('city',df.address.city) \

.withColumn('postalCode',df.address.postalCode) \

.withColumn('state',df.address.state) \

.withColumn('streetAddress',df.address.streetAddress) \

.withColumn('telePhoneNumber',

explode(df.phoneNumber.number)) \

.withColumn('telePhoneType',



explode(df.phoneNumber.type))

Updating Columns: withColumnRenamed

df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')
Removing Columns:drop
df = df.drop("address", "phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)
GroupBy

 df.groupBy("age")\              Group by age, count the members
.count() \ in the groups

Filter
df.filter(df["age"]>24).show() Filter entries of age, only keep those
records of which the values are >24
Sort
peopledf.sort(peopledf.age.desc()).collect()
df.sort("age", ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1])\
.collect()df.na.fill(50).show() Replace null values
df.na.drop().show() Return new df omitting rows with null values
df.na \ Return new df replacing one value with
.replace(10, 20) \ anothe
Repartitioning
df.repartition(10)\ df with 10 partitions
.rdd \
.getNumPartitions()
df.coalesce(1).rdd.getNumPartitions() df with 1 partition
Running SQL Queries Programmatically
Registering DataFrames as Views
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")
Query Views
df5 = spark.sql("SELECT * FROM customer").show()
peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
Output
Data Structures
rdd1 = df.rdd Convert df into an RDD
df.toJSON().first() Convert df into a RDD of string
df.toPandas() Return the contents of df as Pandas
DataFrame
Write & Save to Files
df.select("firstName", "city")\
.write \
.save("nameAndCity.parquet")
df.select("firstName", "age") \
.write \
.save("namesAndAges.json",format="json")
Stopping SparkSession
spark.stop() Stopping SparkSession


Pyspark.RDD.join

rdd1 = sc.parallelize([("A", 1), ("B", 2), ("C", 3)]) 
rdd2 = sc.parallelize([("A", 4), ("B", 5), ("C", 6)])
rdd1.join(rdd2) 
df1 = spark.createDataFrame(rdd1, ('k', 'v1')) 
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))
# Register temporary tables to be able to use `sparkSession.sql` 
df1.createOrReplaceTempView('df1') 
df2.createOrReplaceTempView('df2')
# inner is a default value so it could be omitted 
df1.join(df2, df1.k == df2.k, how='inner') 
 spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')
from pyspark.sql.functions import broadcast 
 df1.join(broadcast(df2), df1.k == df2.k)


Machine Learning / Deep Learning / Data Science End to End Project in Python 














# numpy
### pandas
# matplotlib
# seaborn

3 comments: