Data Science Pipeline:-
What is Data Modeling:-
Why use Data Model:-
- Ensures that all data objects required by the database are accurately represented. Omission of data will lead to creation of faulty reports and produce incorrect results.
- A data model helps design the database at the conceptual, physical and logical levels.
- Data Model structure helps to define the relational tables, primary and foreign keys and stored procedures.
- It provides a clear picture of the base data and can be used by database developers to create a physical database.
- It is also helpful to identify missing and redundant data.
- Though the initial creation of data model is labor and time consuming, in the long run, it makes your IT infrastructure upgrade and maintenance cheaper and faster.
- Higher quality software development.
- Reduced costs.
- Faster time to market.
- Clear understanding of scope, vocabulary, and other development elements.
- Better application and database performance.
- High quality documentation.
- Fewer errors in software.
- Fewer data errors across organizational systems.
- Better risk management.
Confusion matrix:-
The confusion matrix is a kind of an error matrix. It
visualization the predictions for a classification task. Let’s consider a
binary classification task.
Positive (P):-Observation is positive ( is a Man).
Negative (N):-
Observation is not positive (is
not a Man).
True Positive (TP):-
Outcome where the model correctly predicts the positive class.
Predicted that a
woman is man and she really is.
True Negative (TN):-
Outcome where the model correctly predicts the negative class.
Predicted that a man
is not woman and he really is not.
False Positive (FP):- Also called a type 1 error, an outcome where the model incorrectly predicts the
positive class when it is actually negative.
Predicted that a man is women but he actually is not.
False
Negative (FN):- Also called a type 2 error, an outcome where the model incorrectly predicts the
negative class when it is actually positive.
Predicted that a woman
is not man but she really is.
Accuracy:-This is
simply equal to the proportion of predictions that the model classified
correctly.
Precision:-Precision is also known as positive predictive value and is the proportion of relevant instances among the retrieved instances. In other words, it answers the question “What proportion of positive identifications was actually correct?”
Recall:-Recall, also known as the sensitivity, hit rate, or the true positive rate (TPR), is the proportion of the total amount of relevant instances that were actually retrieved. It answers the question “What proportion of actual positives was identified correctly?”
To really hit it home, the diagram below is a great way to remember the difference between precision and recall (it certainly helped me)!
Specificity:
- Specificity, also known as the true negative rate
(TNR), measures the proportion of actual negatives that are correctly
identified as such. It is the opposite of recall.
F1 Score: - The F1 score is a measure of a test’s accuracy — it is the harmonic mean of precision and recall. It can have a maximum score of 1 (perfect precision and recall) and a minimum of 0. Overall, it is a measure of the preciseness and robustness of your model.
Python For Data Science -PySpark-SQL
Initializing SparkSessionfrom pyspark.sql import SparkSession
spark = SparkSession.builder.master("Pyarungupta") \
.appName("pyarungupta.blogspot.com/) \
.getOrCreate()
master() – If you are running it on the cluster you need to use your master name as an argument to master(). usually, it would be either yarn or mesos depends on your cluster setup.
Use local[x] when running in Standalone mode. x should be an integer value and should be greater than 0; this represents how many partitions it should create when using RDD, DataFrame, and Dataset. Ideally, x value should be the number of CPU cores you have.
appName() – Used to set your application name.
getOrCreate() – This returns a SparkSession object if already exists, creates new one if not exists.
PySpark Repartition() vs Coalesce()
repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used to only decrease the number of partitions in an efficient way.Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks.Use case
Example when to use broadcast variables, assume you are getting a two-letter country state code in a file and you wanted to transform it to full state name, (for example INA to India, NY to New York e.t.c) by doing a lookup to reference mapping. In some instances, this data could be large and you may have many such lookups (like zip code e.t.c).spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
rdd = spark.sparkContext.parallelize(data)
def state_convert(code):
return broadcastStates.value[code]
result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)
PySpark Accumulator
Accumulator is a shared variable that is used with RDD and DataFrame to perform sum and counter operations similar to Map-reduce counters.Some points to note..
sparkContext.accumulator() is used to define accumulator variables.
add() function is used to add/update a value in accumulator
value property on the accumulator variable is used to retrieve the value from the accumulator.
Creating Accumulator Variable
accum=sc.accumulator(0)rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value) #Accessed by driverimport pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("accumulator").getOrCreate()
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value)
accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
global accuSum
accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)
accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1))
print(accumCount.value)toDF() function of the RDD is used to convert RDD to DataFrame.
Create Empty RDD in PySpark
df.printSchema()PySpark DataFrame can be converted to Python Pandas DataFrame using a function toPandas()
PySpark Select Columns From DataFrame
select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame, PySpark select() is a transformation function#Selects first 3 columns and top 3 rows
df.select(df.columns[:3]).show(3)
#Selects columns 2 to 4 and top 3 rows
df.select(df.columns[2:4]).show(3)collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.deptDF.
deptDF.collect() returns Array of Row type.
deptDF.collect()[0] returns the first element in an array (1st row).
deptDF.collect[0][0] returns the value of the first row & first column.
PySpark withColumn()
PySpark Aggregate Functions
explode()
Split()
array()
array_contains()
WINDOW FUNCTIONS USAGE & SYNTAX | PYSPARK WINDOW FUNCTIONS DESCRIPTION |
---|---|
row_number(): Column | Returns a sequential number starting from 1 within a window partition |
rank(): Column | Returns the rank of rows within a window partition, with gaps. |
percent_rank(): Column | Returns the percentile rank of rows within a window partition. |
dense_rank(): Column | Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps. |
ntile(n: Int): Column | Returns the ntile id in a window partition |
cume_dist(): Column | Returns the cumulative distribution of values within a window partition |
lag(e: Column, offset: Int): Column lag(columnName: String, offset: Int): Column lag(columnName: String, offset: Int, defaultValue: Any): Column | returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row. |
lead(columnName: String, offset: Int): Column lead(columnName: String, offset: Int): Column lead(columnName: String, offset: Int, defaultValue: Any): Column | returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row. |
PYSPARK DATE FUNCTION | DATE FUNCTION DESCRIPTION |
---|---|
current_date() | Returns the current date as a date column. |
date_format(dateExpr,format) | Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. |
to_date() | Converts the column into `DateType` by casting rules to `DateType`. |
to_date(column, fmt) | Converts the column into a `DateType` with a specified format |
add_months(Column, numMonths) | Returns the date that is `numMonths` after `startDate`. |
date_add(column, days) date_sub(column, days) | Returns the date that is `days` days after `start` |
datediff(end, start) | Returns the number of days from `start` to `end`. |
months_between(end, start) | Returns number of months between dates `start` and `end`. A whole number is returned if both inputs have the same day of month or both are the last day of their respective months. Otherwise, the difference is calculated assuming 31 days per month. |
months_between(end, start, roundOff) | Returns number of months between dates `end` and `start`. If `roundOff` is set to true, the result is rounded off to 8 digits; it is not rounded otherwise. |
next_day(column, dayOfWeek) | Returns the first date which is later than the value of the `date` column that is on the specified day of the week. For example, `next_day('2015-07-27', "Sunday")` returns 2015-08-02 because that is the first Sunday after 2015-07-27. |
trunc(column, format) | Returns date truncated to the unit specified by the format. For example, `trunc("2018-11-19 12:01:19", "year")` returns 2018-01-01 format: 'year', 'yyyy', 'yy' to truncate by year, 'month', 'mon', 'mm' to truncate by month |
date_trunc(format, timestamp) | Returns timestamp truncated to the unit specified by the format. For example, `date_trunc("year", "2018-11-19 12:01:19")` returns 2018-01-01 00:00:00 format: 'year', 'yyyy', 'yy' to truncate by year, 'month', 'mon', 'mm' to truncate by month, 'day', 'dd' to truncate by day, Other options are: 'second', 'minute', 'hour', 'week', 'month', 'quarter' |
year(column) | Extracts the year as an integer from a given date/timestamp/string |
quarter(column) | Extracts the quarter as an integer from a given date/timestamp/string. |
month(column) | Extracts the month as an integer from a given date/timestamp/string |
dayofweek(column) | Extracts the day of the week as an integer from a given date/timestamp/string. Ranges from 1 for a Sunday through to 7 for a Saturday |
dayofmonth(column) | Extracts the day of the month as an integer from a given date/timestamp/string. |
dayofyear(column) | Extracts the day of the year as an integer from a given date/timestamp/string. |
weekofyear(column) | Extracts the week number as an integer from a given date/timestamp/string. A week is considered to start on a Monday and week 1 is the first week with more than 3 days, as defined by ISO 8601 |
last_day(column) | Returns the last day of the month which the given date belongs to. For example, input "2015-07-27" returns "2015-07-31" since July 31 is the last day of the month in July 2015. |
from_unixtime(column) | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the yyyy-MM-dd HH:mm:ss format. |
from_unixtime(column, f) | Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format. |
unix_timestamp() | Returns the current Unix timestamp (in seconds) as a long |
unix_timestamp(column) | Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale. |
unix_timestamp(column, p) | Converts time string with given pattern to Unix timestamp (in seconds). |
Advantages:
append – To add the data to the existing file.
ignore – Ignores write operation when the file already exists.
error – This is a default option when the file already exists, it returns an error.
PySpark Join Types | Join Two DataFrames
Join String | Equivalent SQL Join |
inner | INNER JOIN |
outer, full, fullouter, full_outer | FULL OUTER JOIN |
left, leftouter, left_outer | LEFT JOIN |
right, rightouter, right_outer | RIGHT JOIN |
cross | |
anti, leftanti, left_anti | |
semi, leftsemi, left_semi |
Left Semi Join
Left Anti Join
from pyspark.sql.types import *
Infer Schema
sc = spark.sparkContext
lines = sc.textFile("people.txt") parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)
Specify Schema
people = parts.map(lambda p: Row(name=p[0],
age=int(p[1].strip())))
schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()
JSON
df = spark.read.json("pyarungupta.json")df.show()
df2 = spark.read.load("pyarungupta.json", format="json")
Parquet files
df3 = spark.read.load("pyarungupta.parquet")TXT files
df4 = spark.read.text("pyarungupta.txt")
df.dtypes Return df column names and data types
df.show() Display the content of df
df.head() Return first n rows
df.first() Return first row
df.take(2) Return the first n rows
df.schema Return the schema of df
df.describe().show() Compute summary statistics
df.columns Return the columns of df
df.count() Count the number of rows in df
df.distinct().count() Count the number of distinct rows in df
df.printSchema() Print the schema of df
df.explain() Print the (logical and physical) plans
Duplicate Values
df = df.dropDuplicates()Queries :Select,When,Like,Startswith - Endswith ,Substring and Between
from pyspark.sql import functions as F
Select
df.select("firstName").show() Show all entries in firstName column
df.select("firstName","lastName") \
.show()
df.select("firstName", Show all entries in firstName, age
"age", and type
explode("phoneNumber") \
.alias("contactInfo")) \
.select("contactInfo.type",
"firstName",
"age") \
.show()
df.select(df["firstName"],df["age"]+ 1) Show all entries in firstName and age,
.show() add 1 to the entries of age
df.select(df['age'] > 24).show() Show all entries where age >24
When
df.select("firstName", Show firstName and 0 or 1 depending
F.when(df.age > 30, 1) \ on age >30
.otherwise(0)) \
.show()
df[df.firstName.isin("Jane","Boris")] Show firstName if in the given options
.collect()
Like
df.select("firstName", Show firstName, and lastName is
df.lastName.like("Smith")) \ TRUE if lastName is like Smith
.show()
Startswith - Endswith
df.select("firstName", Show firstName, and TRUE if
df.lastName \ lastName starts with Sm
.startswith("Sm")) \
.show()
df.select(df.lastName.endswith("th")) \ Show last names ending in th
.show()
Substring
.alias("name")) \
.collect()
Between
df.select(df.age.between(22, 24)) \ Show age: values are TRUE if between
. show() 22 and 24
Add, Update & Remove Columns
df = df.withColumn('city',df.address.city) \
.withColumn('postalCode',df.address.postalCode) \
.withColumn('state',df.address.state) \
.withColumn('streetAddress',df.address.streetAddress) \
.withColumn('telePhoneNumber',
explode(df.phoneNumber.number)) \
.withColumn('telePhoneType',
explode(df.phoneNumber.type))
Updating Columns: withColumnRenamed
df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')
df = df.drop(df.address).drop(df.phoneNumber)
df.groupBy("age")\ Group by age, count the members
.count() \ in the groups
records of which the values are >24
df.sort("age", ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1])\
.collect()df.na.fill(50).show() Replace null values
df.na.drop().show() Return new df omitting rows with null values
df.na \ Return new df replacing one value with
.replace(10, 20) \ anothe
.rdd \
.getNumPartitions()
df.coalesce(1).rdd.getNumPartitions() df with 1 partition
Running SQL Queries Programmatically
Registering DataFrames as Views
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")
peopledf2 = spark.sql("SELECT * FROM global_temp.people")\
.show()
df.toJSON().first() Convert df into a RDD of string
df.toPandas() Return the contents of df as Pandas
DataFrame
.write \
.save("nameAndCity.parquet")
df.select("firstName", "age") \
.write \
.save("namesAndAges.json",format="json")
Pyspark.RDD.join
rdd1 = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])rdd1.join(rdd2)
# Register temporary tables to be able to use `sparkSession.sql`
# inner is a default value so it could be omitted
from pyspark.sql.functions import broadcast
Thank you all for your valuable comments .
ReplyDeleteThank you all for your valuable comments .
ReplyDeleteThanks for visiting our blog website.
ReplyDelete